Перейти к основному содержимому
Перейти к основному содержимому

Настройка неупорядоченного режима для непрерывной ингестии

По умолчанию S3 ClickPipe предполагает, что файлы добавляются в бакет в лексикографическом порядке. S3 ClickPipe можно настроить на приём файлов без явного порядка, создав очередь Amazon SQS, подключённую к бакету, и при необходимости используя Amazon EventBridge в качестве маршрутизатора событий. Это позволяет ClickPipes отслеживать события ObjectCreated:* и выполнять приём новых файлов независимо от принятого соглашения об именовании.

Примечание

Неупорядоченный режим поддерживается только для Amazon S3 и не поддерживается для публичных бакетов или S3-совместимых сервисов. Для него необходимо настроить очередь Amazon SQS, подключённую к бакету, и при необходимости использовать Amazon EventBridge в качестве маршрутизатора событий.

Как это работает

В этом режиме S3 ClickPipe выполняет первоначальную загрузку всех файлов по выбранному пути, а затем отслеживает в очереди события ObjectCreated:*, соответствующие указанному пути. Любое сообщение для ранее обнаруженного файла, файла, не соответствующего пути, или события другого типа будет игнорироваться. Файлы загружаются, когда достигается порог, настроенный в max insert bytes или max file count, либо по истечении настраиваемого интервала (по умолчанию 30 секунд). Невозможно начать ингестию с определённого файла или момента времени — ClickPipes всегда загружают все файлы по выбранному пути.

При приёме данных могут возникать различные сбои, что может приводить к частичным вставкам или дублированию данных. ClickPipes для объектного хранилища устойчивы к сбоям вставки и обеспечивают семантику exactly-once с использованием временных промежуточных таблиц. Сначала данные вставляются в промежуточную таблицу; если что-то идёт не так, промежуточная таблица очищается, и вставка повторяется из чистого состояния. Только после успешного завершения вставки партиции перемещаются в целевую таблицу.

Создание очереди Amazon SQS

1. В консоли AWS перейдите в раздел Simple Queue Service > Create queue. Используйте настройки по умолчанию для создания новой стандартной очереди.

Совет

Настоятельно рекомендуем настроить Dead-Letter-Queue (DLQ) для очереди SQS — это упрощает отладку и повторную обработку неудачных сообщений. Если DLQ настроен, неудачные сообщения будут повторно помещены в очередь и обработаны повторно столько раз, сколько указано в параметре maxReceiveCount DLQ.

2. Подключите ваш S3 бакет к очереди SQS, используя один из двух вариантов ниже. EventBridge рекомендуется для большинства сценариев использования, поскольку поддерживает fan-out, более гибкую фильтрацию событий и не подпадает под ограничение S3, допускающее только одно правило уведомления на тип события на prefix.

a. В свойствах S3 бакета перейдите в Event notifications > Amazon EventBridge и включите отправку уведомлений в EventBridge. Нажмите Save changes.

Включение уведомлений Amazon EventBridge в свойствах S3 бакета

b. В AWS Console перейдите в Amazon EventBridge > Rules > Create rule. Задайте имя правила (например, S3ObjectCreated), выберите шину событий default и нажмите Next. На шаге Build event pattern выберите AWS events or EventBridge partner events в качестве источника событий, затем вручную введите следующий шаблон события, заменив <bucket-name> именем вашего бакета:

Задание имени правила EventBridge и шины событий
{
  "source": ["aws.s3"],
  "detail-type": ["Object Created"],
  "detail": {
    "bucket": {
      "name": ["<bucket-name>"]
    }
  }
}

При необходимости добавьте в шаблон условие object.key, чтобы фильтровать по префиксу или суффиксу. Если добавляете его, убедитесь, что оно соответствует пути, заданному для ClickPipe.

c. На шаге Select target(s) выберите AWS service в качестве типа цели и укажите SQS queue. Выберите очередь, созданную на предыдущем шаге. Оставьте флажок Use execution role (recommended) установленным, чтобы EventBridge автоматически создал необходимую роль IAM, затем нажмите Next и завершите мастер.

Настройка очереди SQS в качестве цели правила EventBridge
Правило EventBridge успешно создано

d. Отредактируйте политику доступа очереди SQS, чтобы разрешить EventBridge отправлять в неё сообщения. Замените <sqs-queue-arn> и <eventbridge-rule-arn> соответствующими значениями:

{
  "Version": "2012-10-17",
  "Id": "example-ID",
  "Statement": [
    {
      "Sid": "AllowEventBridgeToSendMessage",
      "Effect": "Allow",
      "Principal": {
        "Service": "events.amazonaws.com"
      },
      "Action": "SQS:SendMessage",
      "Resource": "<sqs-queue-arn>",
      "Condition": {
        "ArnLike": {
          "aws:SourceArn": "<eventbridge-rule-arn>"
        }
      }
    }
  ]
}

Настройка роли IAM

1. В Cloud Console ClickHouse перейдите в раздел Settings > Network security information и скопируйте IAM role ARN для вашего сервиса.

2. В консоли AWS перейдите в раздел IAM > Roles > Create role. Выберите Custom trust policy и вставьте следующее содержимое, заменив <ch-cloud-arn> на ARN роли IAM, скопированный на предыдущем шаге:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "AllowAssumeRole",
      "Effect": "Allow",
      "Principal": {
        "AWS": "<ch-cloud-arn>"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

3. Создайте встроенную политику для роли IAM с необходимыми правами доступа для чтения объектов из S3 и управления сообщениями в очереди SQS. Замените <bucket-arn> и <sqs-queue-arn> соответствующими значениями:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "S3BucketMetadataAccess",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketLocation",
        "s3:ListBucket"
      ],
      "Resource": "<bucket-arn>"
    },
    {
      "Sid": "AllowGetListObjects",
      "Effect": "Allow",
      "Action": [
        "s3:Get*",
        "s3:List*"
      ],
      "Resource": "<bucket-arn>/*"
    },
    {
      "Sid": "SQSNotificationsAccess",
      "Effect": "Allow",
      "Action": [
        "sqs:DeleteMessage",
        "sqs:ListQueues",
        "sqs:ReceiveMessage",
        "sqs:GetQueueAttributes"
      ],
      "Resource": "<sqs-queue-arn>"
    }
  ]
}

Создание ClickPipe в неупорядоченном режиме

1. В Cloud Console ClickHouse Cloud перейдите в раздел Data Sources > Create ClickPipe и выберите Amazon S3. Введите данные для подключения к вашему S3 бакету. В разделе Authentication method выберите IAM role и укажите ARN роли, созданной на предыдущем шаге.

2. В разделе Incoming data включите Continuous ingestion. Выберите Any order в качестве режима ингестии и укажите SQS queue URL для очереди, подключённой к вашему бакету.

3. В разделе Parse information определите ключ сортировки для целевой таблицы. Внесите необходимые изменения в сопоставленную schema, затем настройте роль для пользователя базы данных ClickPipes.

4. Проверьте настройку и нажмите Create ClickPipe. ClickPipes выполнит первоначальное сканирование вашего бакета для загрузки всех существующих файлов, соответствующих указанному пути, а затем начнёт обрабатывать файлы по мере поступления новых событий ObjectCreated:* в очередь.