Подключение Streamkap к ClickHouse
Streamkap — это платформа интеграции данных в реальном времени, специализирующаяся на потоковой CDC (фиксация изменений данных) и обработке потоков данных. Она построена на высокопроизводительном, масштабируемом стеке с использованием Apache Kafka, Apache Flink и Debezium и предлагается как полностью управляемый сервис в формате SaaS или в вариантах развертывания BYOC (Bring your own Cloud).
Streamkap позволяет в режиме потока передавать каждую операцию INSERT, UPDATE и DELETE из исходных баз данных, таких как PostgreSQL, MySQL, SQL Server, MongoDB и других, непосредственно в ClickHouse с задержкой в миллисекунды.
Это делает её идеальным решением для построения аналитических дашбордов в реальном времени, операционной аналитики и подачи данных в реальном времени в модели машинного обучения.
Ключевые возможности
-
Потоковая CDC (фиксация изменений данных) в реальном времени: Streamkap считывает изменения напрямую из журналов вашей базы данных, гарантируя, что данные в ClickHouse являются репликой источника в режиме реального времени.
Упрощённая потоковая обработка: преобразуйте, обогащайте, маршрутизируйте, форматируйте данные и создавайте эмбеддинги в реальном времени до их записи в ClickHouse. Решение работает на базе Flink, но без присущей ему сложности. -
Полностью управляемое и масштабируемое решение: Обеспечивает готовый к продакшну конвейер без обслуживания, устраняя необходимость управлять собственной инфраструктурой Kafka, Flink, Debezium или реестром схем. Платформа рассчитана на высокую пропускную способность и может линейно масштабироваться для обработки миллиардов событий.
-
Автоматическая эволюция схемы: Streamkap автоматически обнаруживает изменения схемы в исходной базе данных и распространяет их в ClickHouse. Может обрабатывать добавление новых столбцов или изменение типов столбцов без ручного вмешательства.
-
Оптимизировано для ClickHouse: Интеграция создана для эффективной работы с возможностями ClickHouse. По умолчанию используется движок ReplacingMergeTree, который прозрачно обрабатывает обновления и удаления из исходной системы.
-
Отказоустойчивая доставка: Платформа предоставляет гарантию доставки как минимум один раз, обеспечивая согласованность данных между источником и ClickHouse. Для операций upsert выполняется дедупликация на основе первичного ключа.
Начало работы
В этом руководстве даётся общий высокоуровневый обзор настройки конвейера Streamkap для загрузки данных в ClickHouse.
Предварительные требования
- Учётная запись Streamkap.
- Параметры подключения к вашему кластеру ClickHouse: Hostname, Port, Username и Password.
- Исходная база данных (например, PostgreSQL, SQL Server), настроенная для поддержки CDC. Подробные руководства по настройке доступны в документации Streamkap.
Шаг 1: Настройте источник в Streamkap
- Войдите в свою учетную запись Streamkap.
- В боковой панели перейдите в Connectors и выберите вкладку Sources.
- Нажмите + Add и выберите тип базы данных-источника (например, SQL Server RDS).
- Заполните параметры подключения, включая endpoint, порт, имя базы данных и учетные данные пользователя.
- Сохраните коннектор.
Шаг 2: Настройте назначение ClickHouse
- В разделе Connectors откройте вкладку Destinations.
- Нажмите + Add и выберите ClickHouse из списка.
- Укажите параметры подключения к вашему сервису ClickHouse:
- Hostname: Хост (имя узла) экземпляра ClickHouse (например,
abc123.us-west-2.aws.clickhouse.cloud) - Port: Защищённый HTTPS-порт, обычно
8443 - Username and Password: Учётные данные пользователя ClickHouse
- Database: Имя целевой базы данных в ClickHouse
- Hostname: Хост (имя узла) экземпляра ClickHouse (например,
- Сохраните назначение.
Шаг 3: Создайте и запустите конвейер
- Перейдите в раздел Pipelines в боковой панели и нажмите + Create.
- Выберите источник (Source) и приёмник (Destination), которые вы только что настроили.
- Выберите схемы и таблицы, которые вы хотите передавать в потоковом режиме.
- Задайте имя конвейеру и нажмите Save.
После создания конвейер станет активным. Streamkap сначала создаст снимок (snapshot) существующих данных, а затем начнёт передавать в потоковом режиме все новые изменения по мере их появления.
Шаг 4: Проверьте данные в ClickHouse
Подключитесь к своему кластеру ClickHouse и выполните запрос, чтобы убедиться, что данные поступают в целевую таблицу.
Как это работает с ClickHouse
Интеграция Streamkap разработана для эффективной работы с данными CDC в ClickHouse.
Движок таблицы и обработка данных
По умолчанию Streamkap использует режим upsert-ингестии. При создании таблицы в ClickHouse применяется движок ReplacingMergeTree. Этот движок оптимален для обработки событий CDC:
-
Первичный ключ исходной таблицы используется как ключ ORDER BY в определении таблицы ReplacingMergeTree.
-
Обновления в источнике записываются как новые строки в ClickHouse. В процессе фонового слияния ReplacingMergeTree схлопывает эти строки, сохраняя только последнюю версию на основе ключа сортировки.
-
Удаления обрабатываются с помощью метаданных, записываемых в параметр ReplacingMergeTree
is_deleted. Строки, удалённые в источнике, не удаляются немедленно, а помечаются как удалённые.- При необходимости удалённые записи могут сохраняться в ClickHouse для аналитических целей
Столбцы метаданных
Streamkap добавляет несколько столбцов метаданных в каждую таблицу для управления состоянием данных:
| Имя столбца | Описание |
|---|---|
_STREAMKAP_SOURCE_TS_MS | Метка времени (в миллисекундах) события в исходной базе данных. |
_STREAMKAP_TS_MS | Метка времени (в миллисекундах), когда Streamkap обработал событие. |
__DELETED | Булевый флаг (true/false), указывающий, была ли строка удалена в источнике. |
_STREAMKAP_OFFSET | Значение смещения из внутренних логов Streamkap, полезное для упорядочивания и отладки. |
Выборка актуальных данных
Поскольку ReplacingMergeTree обрабатывает обновления и удаления в фоновом режиме, простой запрос SELECT * может отображать исторические или удалённые строки до завершения операции слияния. Чтобы получить наиболее актуальное состояние ваших данных, необходимо отфильтровать удалённые записи и выбрать только последнюю версию каждой строки.
Это можно сделать с помощью модификатора FINAL, который удобен, но может повлиять на производительность запроса:
Для повышения производительности при работе с большими таблицами, особенно если вам не нужно читать все столбцы и при разовых аналитических запросах, вы можете использовать функцию argMax, чтобы вручную выбрать последнюю запись для каждого первичного ключа:
Для продакшн-сценариев и многократно выполняемых параллельных запросов конечных пользователей можно использовать Materialized Views, чтобы смоделировать данные так, чтобы они лучше соответствовали последующим паттернам доступа.