Настройка неупорядоченного режима для непрерывной ингестии
По умолчанию S3 ClickPipe предполагает, что файлы добавляются в бакет в лексикографическом порядке. S3 ClickPipe можно настроить на приём файлов без явного порядка, создав очередь Amazon SQS, подключённую к бакету, и при необходимости используя Amazon EventBridge в качестве маршрутизатора событий. Это позволяет ClickPipes отслеживать события ObjectCreated:* и выполнять приём новых файлов независимо от принятого соглашения об именовании.
Неупорядоченный режим поддерживается только для Amazon S3 и не поддерживается для публичных бакетов или S3-совместимых сервисов. Для него необходимо настроить очередь Amazon SQS, подключённую к бакету, и при необходимости использовать Amazon EventBridge в качестве маршрутизатора событий.
Как это работает
В этом режиме S3 ClickPipe выполняет первоначальную загрузку всех файлов по выбранному пути, а затем отслеживает в очереди события ObjectCreated:*, соответствующие указанному пути. Любое сообщение для ранее обнаруженного файла, файла, не соответствующего пути, или события другого типа будет игнорироваться. Файлы загружаются, когда достигается порог, настроенный в max insert bytes или max file count, либо по истечении настраиваемого интервала (по умолчанию 30 секунд). Невозможно начать ингестию с определённого файла или момента времени — ClickPipes всегда загружают все файлы по выбранному пути.
При приёме данных могут возникать различные сбои, что может приводить к частичным вставкам или дублированию данных. ClickPipes для объектного хранилища устойчивы к сбоям вставки и обеспечивают семантику exactly-once с использованием временных промежуточных таблиц. Сначала данные вставляются в промежуточную таблицу; если что-то идёт не так, промежуточная таблица очищается, и вставка повторяется из чистого состояния. Только после успешного завершения вставки партиции перемещаются в целевую таблицу.
Создание очереди Amazon SQS
1. В консоли AWS перейдите в раздел Simple Queue Service > Create queue. Используйте настройки по умолчанию для создания новой стандартной очереди.
Настоятельно рекомендуем настроить Dead-Letter-Queue (DLQ) для очереди SQS — это упрощает отладку и повторную обработку неудачных сообщений. Если DLQ настроен, неудачные сообщения будут повторно помещены в очередь и обработаны повторно столько раз, сколько указано в параметре maxReceiveCount DLQ.
2. Подключите ваш S3 бакет к очереди SQS, используя один из двух вариантов ниже. EventBridge рекомендуется для большинства сценариев использования, поскольку поддерживает fan-out, более гибкую фильтрацию событий и не подпадает под ограничение S3, допускающее только одно правило уведомления на тип события на prefix.
- через EventBridge
- Напрямую S3 → SQS
a. В свойствах S3 бакета перейдите в Event notifications > Amazon EventBridge и включите отправку уведомлений в EventBridge. Нажмите Save changes.

b. В AWS Console перейдите в Amazon EventBridge > Rules > Create rule. Задайте имя правила (например, S3ObjectCreated), выберите шину событий default и нажмите Next. На шаге Build event pattern выберите AWS events or EventBridge partner events в качестве источника событий, затем вручную введите следующий шаблон события, заменив <bucket-name> именем вашего бакета:

При необходимости добавьте в шаблон условие object.key, чтобы фильтровать по префиксу или суффиксу. Если добавляете его, убедитесь, что оно соответствует пути, заданному для ClickPipe.
c. На шаге Select target(s) выберите AWS service в качестве типа цели и укажите SQS queue. Выберите очередь, созданную на предыдущем шаге. Оставьте флажок Use execution role (recommended) установленным, чтобы EventBridge автоматически создал необходимую роль IAM, затем нажмите Next и завершите мастер.


d. Отредактируйте политику доступа очереди SQS, чтобы разрешить EventBridge отправлять в неё сообщения. Замените <sqs-queue-arn> и <eventbridge-rule-arn> соответствующими значениями:
a. Отредактируйте политику доступа очереди SQS, чтобы разрешить вашему S3 бакету отправлять в неё сообщения. Замените <sqs-queue-arn>, <bucket-arn> и <aws-account-id> соответствующими значениями:
b. В свойствах S3 бакета включите Event notifications для событий ObjectCreated и укажите в качестве назначения очередь SQS. При необходимости задайте префикс или суффикс, чтобы фильтровать, какие объекты будут вызывать уведомления, — если задаёте их, убедитесь, что они соответствуют пути, заданному для ClickPipe.
S3 не позволяет создавать несколько пересекающихся правил уведомлений для одних и тех же типов событий в одном бакете. Если у вас уже есть правило уведомлений для событий ObjectCreated в этом бакете, используйте вместо этого подход с EventBridge.
Настройка роли IAM
1. В Cloud Console ClickHouse перейдите в раздел Settings > Network security information и скопируйте IAM role ARN для вашего сервиса.
2. В консоли AWS перейдите в раздел IAM > Roles > Create role. Выберите Custom trust policy и вставьте следующее содержимое, заменив <ch-cloud-arn> на ARN роли IAM, скопированный на предыдущем шаге:
3. Создайте встроенную политику для роли IAM с необходимыми правами доступа для чтения объектов из S3 и управления сообщениями в очереди SQS. Замените <bucket-arn> и <sqs-queue-arn> соответствующими значениями:
Создание ClickPipe в неупорядоченном режиме
1. В Cloud Console ClickHouse Cloud перейдите в раздел Data Sources > Create ClickPipe и выберите Amazon S3. Введите данные для подключения к вашему S3 бакету. В разделе Authentication method выберите IAM role и укажите ARN роли, созданной на предыдущем шаге.
2. В разделе Incoming data включите Continuous ingestion. Выберите Any order в качестве режима ингестии и укажите SQS queue URL для очереди, подключённой к вашему бакету.
3. В разделе Parse information определите ключ сортировки для целевой таблицы. Внесите необходимые изменения в сопоставленную schema, затем настройте роль для пользователя базы данных ClickPipes.
4. Проверьте настройку и нажмите Create ClickPipe. ClickPipes выполнит первоначальное сканирование вашего бакета для загрузки всех существующих файлов, соответствующих указанному пути, а затем начнёт обрабатывать файлы по мере поступления новых событий ObjectCreated:* в очередь.