跳转到主内容
跳转到主内容

将 Apache NiFi 连接到 ClickHouse

Community Maintained

Apache NiFi 是一款开源工作流管理软件,旨在实现软件系统间的数据流自动化。它支持创建 ETL 数据管道,并内置超过 300 个数据处理器。本分步教程将演示如何将 Apache NiFi 同时作为数据源和目标连接到 ClickHouse,并加载示例数据集。

收集连接详细信息

要通过 HTTP(S) 连接到 ClickHouse,您需要以下信息:

参数说明
HOSTPORT通常,在使用 TLS 时端口为 8443,不使用 TLS 时端口为 8123。
DATABASE NAME默认提供一个名为 default 的数据库,请填写您要连接的目标数据库名称。
USERNAMEPASSWORD默认用户名为 default。请使用适合您使用场景的用户名。

您的 ClickHouse Cloud 服务的详细信息可以在 ClickHouse Cloud 控制台中查看。 选择某个服务并点击 Connect

ClickHouse Cloud 服务 Connect 按钮

选择 HTTPS。连接信息会显示在示例 curl 命令中。

ClickHouse Cloud HTTPS 连接信息

如果您使用的是自托管 ClickHouse,则连接信息由您的 ClickHouse 管理员进行设置。

下载并运行 Apache NiFi

对于全新安装,请从 https://nifi.apache.org/download.html 下载二进制文件,然后运行 ./bin/nifi.sh start 启动

下载 ClickHouse JDBC 驱动

  1. 前往 GitHub 上的 ClickHouse JDBC 驱动发布页面,查找最新的 JDBC 发行版。
  2. 在发布页面中,点击 “Show all xx assets”,然后查找包含关键字 “shaded” 或 “all” 的 JAR 文件,例如 clickhouse-jdbc-0.5.0-all.jar
  3. 将 JAR 文件放置在 Apache NiFi 可访问的文件夹中,并记下其绝对路径

添加 DBCPConnectionPool Controller Service 并配置其属性

  1. 要在 Apache NiFi 中配置 Controller Service,点击“齿轮”按钮打开 NiFi Flow Configuration 页面

    高亮显示齿轮按钮的 NiFi Flow Configuration 页面
  2. 选择 Controller Services 选项卡,并点击右上角的 + 按钮添加一个新的 Controller Service

    高亮显示添加按钮的 Controller Services 选项卡
  3. 搜索 DBCPConnectionPool 并点击 “Add” 按钮

    在 Controller Service 选择对话框中高亮显示 DBCPConnectionPool
  4. 新添加的 DBCPConnectionPool 默认处于 Invalid 状态。点击“齿轮”按钮开始配置

    Controller Services 列表中显示无效�的 DBCPConnectionPool 并高亮显示齿轮按钮
  5. 在 Properties 部分中,输入以下值

PropertyValueRemark
Database Connection URLjdbc:ch:https://HOSTNAME:8443/default?ssl=true根据实际情况替换连接 URL 中的 HOSTNAME
Database Driver Class Namecom.clickhouse.jdbc.ClickHouseDriver
Database Driver Location(s)/etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jarClickHouse JDBC 驱动 JAR 文件的绝对路径
Database UserdefaultClickHouse 用户名
PasswordpasswordClickHouse 密码
  1. 在 Settings 部分,将该 Controller Service 的名称修改为 “ClickHouse JDBC”,以便于识别

    DBCPConnectionPool 配置对话框,展示已填写的属性
  2. 点击“闪电”按钮,然后点击 “Enable” 按钮,激活 DBCPConnectionPool Controller Service

    在 Controller Services 列表中高亮显示闪电按钮

    启用 Controller Service 的确认对话框
  3. 检查 Controller Services 选项卡,确认该 Controller Service 已启用

    Controller Services 列表中显示已启用的 ClickHouse JDBC 服务

使用 ExecuteSQL 处理器从表中读取数据

  1. 添加一个 ExecuteSQL Processor,以及相应的上游和下游 Processor

    NiFi 画布中展示工作流中的 ExecuteSQL 处理器
  2. ExecuteSQL processor 的 Properties 部分中,输入以下值

    PropertyValueRemark
    Database Connection Pooling ServiceClickHouse JDBC选择为 ClickHouse 配置的 Controller Service
    SQL select querySELECT * FROM system.metrics在此输入查询
  3. 启动 ExecuteSQL Processor

    ExecuteSQL 处理器配置,展示已填写的属性
  4. 要确认查询已成功处理,请检查输出队列中的一个 FlowFile

    List queue 对话框中显示可供检查的 flowfiles
  5. 将视图切换为“formatted”,以查看 FlowFile 的输出结果

    FlowFile 内容查看器以格式化视图显示查询结果

使用 MergeRecordPutDatabaseRecord 处理器写入表

  1. 若要在一次 insert 操作中写入多行,首先需要将多条记录合并为一条记录。这可以通过使用 MergeRecord 处理器来完成

  2. MergeRecord 处理器的 Properties 部分中,输入以下值

    PropertyValueRemark
    Record ReaderJSONTreeReader选择合适的 Record Reader
    Record WriterJSONReadSetWriter选择合适的 Record Writer
    Minimum Number of Records1000将此值修改为更大的数值,以便至少将这么多行合并为一条记录。默认值为 1 行
    Maximum Number of Records10000将此值修改为大于 "Minimum Number of Records" 的数值。默认值为 1,000 行
  3. 要确认多条记录已合并为一条,请检查 MergeRecord 处理器的输入和输出。注意,输出是一个包含多个输入记录的数组。

    输入

    包含单条记录的 MergeRecord 处理器输入

    输出

    MergeRecord Processor 输出,显示合并后的记录数组
  4. PutDatabaseRecord processor 的 Properties 部分中,输入以下值

    PropertyValueRemark
    Record ReaderJSONTreeReader选择合适的 Record Reader
    Database TypeGeneric保持默认值
    Statement TypeINSERT
    Database Connection Pooling ServiceClickHouse JDBC选择 ClickHouse JDBC Controller Service
    Table Nametbl在此处输入表名
    Translate Field Namesfalse设置为 "false",这样插入的字段名必须与列名匹配
    Maximum Batch Size1000每次插入的最大行数。该值不应小于 MergeRecord 处理器中 "Minimum Number of Records" 的值
  5. 要确认每次插入都包含多行,请检查表中的行数在每次插入后是否至少增加 MergeRecord 中定义的 "Minimum Number of Records" 的数值。

    查询结果显示目标表的行数
  6. 恭喜,您已通过 Apache NiFi 成功将数据加载到 ClickHouse 中!