将 Streamkap 连接到 ClickHouse
Streamkap 是一个实时数据集成平台,专注于 CDC(变更数据捕获)流式传输和流处理。它构建在使用 Apache Kafka、Apache Flink 和 Debezium 的高吞吐量、可扩展技术栈之上,并以 SaaS 或 BYOC(Bring Your Own Cloud,自带 Cloud)部署形式提供全托管服务。
Streamkap 允许将来自 PostgreSQL、MySQL、SQL Server、MongoDB 等源数据库以及更多数据源的每一次插入、更新和删除,以毫秒级延迟直接流式传输到 ClickHouse 中。
因此,它非常适合支撑实时分析型仪表盘、运营分析,以及为机器学习模型持续提供实时数据。
Key Features
-
Real-time Streaming CDC: Streamkap 直接从数据库日志中捕获变更,确保 ClickHouse 中的数据始终是源数据库的实时副本。 Simplified Stream Processing: 在数据写入 ClickHouse 之前,以实时方式对数据进行转换、丰富、路由、格式化,并创建 embeddings。由 Flink 提供驱动,同时对用户屏蔽其复杂性。
-
Fully Managed and Scalable: 提供适用于生产环境、零维护的数据管道,无需自行管理 Kafka、Flink、Debezium 或 schema registry 基础设施。该平台针对高吞吐量场景设计,并可线性扩展以处理数十亿级事件。
-
Automated Schema Evolution: Streamkap 会自动检测源数据库中的 schema 变更,并将其同步到 ClickHouse。它可以在无需人工干预的情况下处理新增列或更改列类型。
-
Optimized for ClickHouse: 此集成专为高效利用 ClickHouse 功能而构建。默认情况下,它使用 ReplacingMergeTree 引擎,以无缝方式处理来自源系统的更新和删除操作。
-
Resilient Delivery: 该平台提供至少一次投递(at-least-once)保证,确保源端与 ClickHouse 之间的数据一致性。对于 upsert 操作,它会基于主键执行去重。
入门
本指南概述了如何配置 Streamkap 数据管道,将数据加载到 ClickHouse 中。
前提条件
- 一个 Streamkap 帐户。
- 您的 ClickHouse 集群连接信息:主机名(Hostname)、端口(Port)、用户名(Username)和密码(Password)。
- 一个已配置为允许 CDC(变更数据捕获)的源数据库(例如 PostgreSQL、SQL Server)。您可以在 Streamkap 文档中找到详细的设置指南。
步骤 1:在 Streamkap 中配置数据源
- 登录到你的 Streamkap 账户。
- 在侧边栏中进入 Connectors,然后选择 Sources 选项卡。
- 点击 + Add,选择你的源数据库类型(例如 SQL Server RDS)。
- 填写连接配置信息,包括端点(endpoint)、端口、数据库名称和用户凭据。
- 保存该连接器。
步骤 2:配置 ClickHouse 目标
- 在 Connectors 部分中,选择 Destinations 选项卡。
- 点击 + Add,并从列表中选择 ClickHouse。
- 输入 ClickHouse 服务的连接信息:
- Hostname: ClickHouse 实例的主机名(例如
abc123.us-west-2.aws.clickhouse.cloud) - Port: 安全的 HTTPS 端口,通常为
8443 - Username and Password: ClickHouse 用户的凭据
- Database: ClickHouse 中的目标数据库名称
- Hostname: ClickHouse 实例的主机名(例如
- 保存该目标。
步骤 3:创建并运行 Pipeline
- 在侧边栏中点击 Pipelines,然后点击 + Create。
- 选择你刚刚配置好的 Source 和 Destination。
- 选择你希望进行流式传输的 schema 和表。
- 为你的 pipeline 命名,然后点击 Save。
创建完成后,pipeline 会自动激活。Streamkap 会先对现有数据进行一次快照,然后在有新变更发生时开始持续进行流式传输。
步骤 4:在 ClickHouse 中验证数据
连接到你的 ClickHouse 集群,运行查询以查看写入目标表的数据。
与 ClickHouse 的工作原理
Streamkap 集成专为在 ClickHouse 中高效管理 CDC 数据而设计。
表引擎和数据处理
默认情况下,Streamkap 使用 upsert 摄取模式。在 ClickHouse 中创建表时,它会使用 ReplacingMergeTree 引擎。此引擎非常适合处理 CDC 事件:
-
源表的主键会在 ReplacingMergeTree 表定义中作为 ORDER BY 键使用。
-
源中的更新会作为新行写入 ClickHouse。在后台合并过程中,ReplacingMergeTree 会将这些行合并,仅根据排序键保留最新版本。
-
删除通过一个元数据标志映射到 ReplacingMergeTree 的
is_deleted参数来处理。源端被删除的行不会立即被移除,而是被标记为已删除。- 可以选择在 ClickHouse 中保留这些已删除记录,以便用于分析
元数据列
Streamkap 为每个表添加了多个元数据列,用于管理数据状态:
| 列名 | 描述 |
|---|---|
_STREAMKAP_SOURCE_TS_MS | 源数据库中事件的时间戳(毫秒)。 |
_STREAMKAP_TS_MS | Streamkap 处理该事件时的时间戳(毫秒)。 |
__DELETED | 一个布尔标志位(true/false),指示该行是否在源端被删除。 |
_STREAMKAP_OFFSET | 来自 Streamkap 内部日志的偏移量值,用于排序和调试。 |
查询最新数据
由于 ReplacingMergeTree 在后台处理更新和删除操作,在合并完成之前,简单的 SELECT * 查询可能会显示历史或已删除的行。若要获取数据的最新状态,必须过滤掉已删除的记录,并且只选择每一行的最新版本。
可以使用 FINAL 修饰符来实现这一点,它非常方便,但可能会影响查询性能:
为了在大型表上获得更好的性能,尤其是在不需要读取所有列且只执行一次性分析查询的情况下,可以使用 argMax 函数为每个主键手动选出最新的记录:
在生产环境场景下,对于存在并发且反复执行的终端用户查询,可以使用 materialized view 对数据进行建模,使其更好地匹配下游的访问模式。