将 Apache NiFi 连接到 ClickHouse
Apache NiFi 是一款开源工作流管理软件,旨在实现软件系统间的数据流自动化。它支持创建 ETL 数据管道,并内置超过 300 个数据处理器。本分步教程将演示如何将 Apache NiFi 同时作为数据源和目标连接到 ClickHouse,并加载示例数据集。
收集连接详细信息
要通过 HTTP(S) 连接到 ClickHouse,您需要以下信息:
| 参数 | 说明 |
|---|---|
HOST 和 PORT | 通常,在使用 TLS 时端口为 8443,不使用 TLS 时端口为 8123。 |
DATABASE NAME | 默认提供一个名为 default 的数据库,请填写您要连接的目标数据库名称。 |
USERNAME 和 PASSWORD | 默认用户名为 default。请使用适合您使用场景的用户名。 |
您的 ClickHouse Cloud 服务的详细信息可以在 ClickHouse Cloud 控制台中查看。 选择某个服务并点击 Connect:

选择 HTTPS。连接信息会显示在示例 curl 命令中。

如果您使用的是自托管 ClickHouse,则连接信息由您的 ClickHouse 管理员进行设置。
下载并运行 Apache NiFi
对于全新安装,请从 https://nifi.apache.org/download.html 下载二进制文件,然后运行 ./bin/nifi.sh start 启动
下载 ClickHouse JDBC 驱动
- 前往 GitHub 上的 ClickHouse JDBC 驱动发布页面,查找最新的 JDBC 发行版。
- 在发布页面中,点击 “Show all xx assets”,然后查找包含关键字 “shaded” 或 “all” 的 JAR 文件,例如
clickhouse-jdbc-0.5.0-all.jar - 将 JAR 文件放置在 Apache NiFi 可访问的文件夹中,并记下其绝对路径
添加 DBCPConnectionPool Controller Service 并配置其属性
-
要在 Apache NiFi 中配置 Controller Service,点击“齿轮”按钮打开 NiFi Flow Configuration 页面

-
选择 Controller Services 选项卡,并点击右上角的
+按钮添加一个新的 Controller Service
-
搜索
DBCPConnectionPool并点击 “Add” 按钮
-
新添加的
DBCPConnectionPool默认处于 Invalid 状态。点击“齿轮”按钮开始配置
-
在 Properties 部分中,输入以下值
| Property | Value | Remark |
|---|---|---|
| Database Connection URL | jdbc:ch:https://HOSTNAME:8443/default?ssl=true | 根据实际情况替换连接 URL 中的 HOSTNAME |
| Database Driver Class Name | com.clickhouse.jdbc.ClickHouseDriver | |
| Database Driver Location(s) | /etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jar | ClickHouse JDBC 驱动 JAR 文件的绝对路径 |
| Database User | default | ClickHouse 用户名 |
| Password | password | ClickHouse 密码 |
-
在 Settings 部分,将该 Controller Service 的名称修改为 “ClickHouse JDBC”,以便于识别

-
点击“闪电”按钮,然后点击 “Enable” 按钮,激活
DBCPConnectionPoolController Service

-
检查 Controller Services 选项卡,确认该 Controller Service 已启用

使用 ExecuteSQL 处理器从表中读取数据
-
添加一个
ExecuteSQLProcessor,以及相应的上游和下游 Processor
-
在
ExecuteSQLprocessor 的 Properties 部分中,输入以下值Property Value Remark Database Connection Pooling Service ClickHouse JDBC 选择为 ClickHouse 配置的 Controller Service SQL select query SELECT * FROM system.metrics 在此输入查询 -
启动
ExecuteSQLProcessor
-
要确认查询已成功处理,请检查输出队列中的一个
FlowFile
-
将视图切换为“formatted”,以查看
FlowFile的输出结果
使用 MergeRecord 和 PutDatabaseRecord 处理器写入表
-
若要在一次 insert 操作中写入多行,首先需要将多条记录合并为一条记录。这可以通过使用
MergeRecord处理器来完成 -
在
MergeRecord处理器的 Properties 部分中,输入以下值Property Value Remark Record Reader JSONTreeReader选择合适的 Record Reader Record Writer JSONReadSetWriter选择合适的 Record Writer Minimum Number of Records 1000 将此值修改为更大的数值,以便至少将这么多行合并为一条记录。默认值为 1 行 Maximum Number of Records 10000 将此值修改为大于 "Minimum Number of Records" 的数值。默认值为 1,000 行 -
要确认多条记录已合并为一条,请检查
MergeRecord处理器的输入和输出。注意,输出是一个包含多个输入记录的数组。输入

输出

-
在
PutDatabaseRecordprocessor 的 Properties 部分中,输入以下值Property Value Remark Record Reader JSONTreeReader选择合适的 Record Reader Database Type Generic 保持默认值 Statement Type INSERT Database Connection Pooling Service ClickHouse JDBC 选择 ClickHouse JDBC Controller Service Table Name tbl 在此处输入表名 Translate Field Names false 设置为 "false",这样插入的字段名必须与列名匹配 Maximum Batch Size 1000 每次插入的最大行数。该值不应小于 MergeRecord处理器中 "Minimum Number of Records" 的值 -
要确认每次插入都包含多行,请检查表中的行数在每次插入后是否至少增加
MergeRecord中定义的 "Minimum Number of Records" 的数值。
-
恭喜,您已通过 Apache NiFi 成功将数据加载到 ClickHouse 中!