Перейти к основному содержанию
Перейти к основному содержанию

Подключение Streamkap к ClickHouse

Partner Integration

Streamkap — это платформа интеграции данных в реальном времени, специализирующаяся на потоковой CDC (фиксация изменений данных) и обработке потоков данных. Она построена на высокопроизводительном, масштабируемом стеке с использованием Apache Kafka, Apache Flink и Debezium и предлагается как полностью управляемый сервис в формате SaaS или в вариантах развертывания BYOC (Bring your own Cloud).

Streamkap позволяет в режиме потока передавать каждую операцию INSERT, UPDATE и DELETE из исходных баз данных, таких как PostgreSQL, MySQL, SQL Server, MongoDB и других, непосредственно в ClickHouse с задержкой в миллисекунды.

Это делает её идеальным решением для построения аналитических дашбордов в реальном времени, операционной аналитики и подачи данных в реальном времени в модели машинного обучения.

Ключевые возможности

  • Потоковая CDC (фиксация изменений данных) в реальном времени: Streamkap считывает изменения напрямую из журналов вашей базы данных, гарантируя, что данные в ClickHouse являются репликой источника в режиме реального времени.
    Упрощённая потоковая обработка: преобразуйте, обогащайте, маршрутизируйте, форматируйте данные и создавайте эмбеддинги в реальном времени до их записи в ClickHouse. Решение работает на базе Flink, но без присущей ему сложности.

  • Полностью управляемое и масштабируемое решение: Обеспечивает готовый к продакшну конвейер без обслуживания, устраняя необходимость управлять собственной инфраструктурой Kafka, Flink, Debezium или реестром схем. Платформа рассчитана на высокую пропускную способность и может линейно масштабироваться для обработки миллиардов событий.

  • Автоматическая эволюция схемы: Streamkap автоматически обнаруживает изменения схемы в исходной базе данных и распространяет их в ClickHouse. Может обрабатывать добавление новых столбцов или изменение типов столбцов без ручного вмешательства.

  • Оптимизировано для ClickHouse: Интеграция создана для эффективной работы с возможностями ClickHouse. По умолчанию используется движок ReplacingMergeTree, который прозрачно обрабатывает обновления и удаления из исходной системы.

  • Отказоустойчивая доставка: Платформа предоставляет гарантию доставки как минимум один раз, обеспечивая согласованность данных между источником и ClickHouse. Для операций upsert выполняется дедупликация на основе первичного ключа.

Начало работы

В этом руководстве даётся общий высокоуровневый обзор настройки конвейера Streamkap для загрузки данных в ClickHouse.

Предварительные требования

  • Учётная запись Streamkap.
  • Параметры подключения к вашему кластеру ClickHouse: Hostname, Port, Username и Password.
  • Исходная база данных (например, PostgreSQL, SQL Server), настроенная для поддержки CDC. Подробные руководства по настройке доступны в документации Streamkap.

Шаг 1: Настройте источник в Streamkap

  1. Войдите в свою учетную запись Streamkap.
  2. В боковой панели перейдите в Connectors и выберите вкладку Sources.
  3. Нажмите + Add и выберите тип базы данных-источника (например, SQL Server RDS).
  4. Заполните параметры подключения, включая endpoint, порт, имя базы данных и учетные данные пользователя.
  5. Сохраните коннектор.

Шаг 2: Настройте назначение ClickHouse

  1. В разделе Connectors откройте вкладку Destinations.
  2. Нажмите + Add и выберите ClickHouse из списка.
  3. Укажите параметры подключения к вашему сервису ClickHouse:
    • Hostname: Хост (имя узла) экземпляра ClickHouse (например, abc123.us-west-2.aws.clickhouse.cloud)
    • Port: Защищённый HTTPS-порт, обычно 8443
    • Username and Password: Учётные данные пользователя ClickHouse
    • Database: Имя целевой базы данных в ClickHouse
  4. Сохраните назначение.

Шаг 3: Создайте и запустите конвейер

  1. Перейдите в раздел Pipelines в боковой панели и нажмите + Create.
  2. Выберите источник (Source) и приёмник (Destination), которые вы только что настроили.
  3. Выберите схемы и таблицы, которые вы хотите передавать в потоковом режиме.
  4. Задайте имя конвейеру и нажмите Save.

После создания конвейер станет активным. Streamkap сначала создаст снимок (snapshot) существующих данных, а затем начнёт передавать в потоковом режиме все новые изменения по мере их появления.

Шаг 4: Проверьте данные в ClickHouse

Подключитесь к своему кластеру ClickHouse и выполните запрос, чтобы убедиться, что данные поступают в целевую таблицу.

SELECT * FROM your_table_name LIMIT 10;

Как это работает с ClickHouse

Интеграция Streamkap разработана для эффективной работы с данными CDC в ClickHouse.

Движок таблицы и обработка данных

По умолчанию Streamkap использует режим upsert-ингестии. При создании таблицы в ClickHouse применяется движок ReplacingMergeTree. Этот движок оптимален для обработки событий CDC:

  • Первичный ключ исходной таблицы используется как ключ ORDER BY в определении таблицы ReplacingMergeTree.

  • Обновления в источнике записываются как новые строки в ClickHouse. В процессе фонового слияния ReplacingMergeTree схлопывает эти строки, сохраняя только последнюю версию на основе ключа сортировки.

  • Удаления обрабатываются с помощью метаданных, записываемых в параметр ReplacingMergeTree is_deleted. Строки, удалённые в источнике, не удаляются немедленно, а помечаются как удалённые.

    • При необходимости удалённые записи могут сохраняться в ClickHouse для аналитических целей

Столбцы метаданных

Streamkap добавляет несколько столбцов метаданных в каждую таблицу для управления состоянием данных:

Имя столбцаОписание
_STREAMKAP_SOURCE_TS_MSМетка времени (в миллисекундах) события в исходной базе данных.
_STREAMKAP_TS_MSМетка времени (в миллисекундах), когда Streamkap обработал событие.
__DELETEDБулевый флаг (true/false), указывающий, была ли строка удалена в источнике.
_STREAMKAP_OFFSETЗначение смещения из внутренних логов Streamkap, полезное для упорядочивания и отладки.

Выборка актуальных данных

Поскольку ReplacingMergeTree обрабатывает обновления и удаления в фоновом режиме, простой запрос SELECT * может отображать исторические или удалённые строки до завершения операции слияния. Чтобы получить наиболее актуальное состояние ваших данных, необходимо отфильтровать удалённые записи и выбрать только последнюю версию каждой строки.

Это можно сделать с помощью модификатора FINAL, который удобен, но может повлиять на производительность запроса:

-- Using FINAL to get the correct current state
SELECT * FROM your_table_name FINAL WHERE __DELETED = 'false';
SELECT * FROM your_table_name FINAL LIMIT 10;
SELECT * FROM your_table_name FINAL WHERE <filter by keys in ORDER BY clause>;
SELECT count(*) FROM your_table_name FINAL;

Для повышения производительности при работе с большими таблицами, особенно если вам не нужно читать все столбцы и при разовых аналитических запросах, вы можете использовать функцию argMax, чтобы вручную выбрать последнюю запись для каждого первичного ключа:

SELECT key,
       argMax(col1, version) AS col1,
       argMax(col2, version) AS col2
FROM t
WHERE <your predicates>
GROUP BY key;

Для продакшн-сценариев и многократно выполняемых параллельных запросов конечных пользователей можно использовать Materialized Views, чтобы смоделировать данные так, чтобы они лучше соответствовали последующим паттернам доступа.

Дополнительные материалы