跳转到主内容
跳转到主内容

Flink 连接器

ClickHouse Supported

这是 ClickHouse 官方支持的 Apache Flink Sink Connector。它基于 Flink 的 AsyncSinkBase 和 ClickHouse 官方 Java 客户端 构建。

该连接器支持 Apache Flink 的 DataStream API。对 Table API 的支持计划在未来版本中推出

要求

  • Java 11+ (用于 Flink 1.17+) 或 17+ (用于 Flink 2.0+)
  • Apache Flink 1.17+

该连接器分为两个制品,以支持 Flink 1.17+ 和 Flink 2.0+。请选择与目标 Flink 版本对应的制品:

Flink 版本制品ClickHouse Java 客户端版本所需 Java 版本
latestflink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.1flink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.0flink-connector-clickhouse-2.0.00.9.5Java 17+
1.20.2flink-connector-clickhouse-1.170.9.5Java 11+
1.19.3flink-connector-clickhouse-1.170.9.5Java 11+
1.18.1flink-connector-clickhouse-1.170.9.5Java 11+
1.17.2flink-connector-clickhouse-1.170.9.5Java 11+
注意

该连接器尚未针对早于 1.17.2 的 Flink 版本进行测试。

安装与配置

导入依赖项

<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-2.0.0</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>
<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-1.17</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>

下载二进制包

二进制 JAR 的命名规则如下:

flink-connector-clickhouse-${flink_version}-${stable_version}-all.jar

其中:

你可以在 Maven Central Repository 中找到所有已发布的可用 JAR 文件。

使用 DataStream API

示例代码

假设你想将原始 CSV 数据插入 ClickHouse:

public static void main(String[] args) {
    // 配置 ClickHouseClient
    ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);

    // 创建 ElementConverter
    ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);

    // 创建 sink,并使用 `setClickHouseFormat` 设置格式
    ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
            convertorString,
            MAX_BATCH_SIZE,
            MAX_IN_FLIGHT_REQUESTS,
            MAX_BUFFERED_REQUESTS,
            MAX_BATCH_SIZE_IN_BYTES,
            MAX_TIME_IN_BUFFER_MS,
            MAX_RECORD_SIZE_IN_BYTES,
            clickHouseClientConfig
    );

    csvSink.setClickHouseFormat(ClickHouseFormat.CSV);

    // 最后,将 DataStream 连接到 sink。
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Path csvFilePath = new Path(fileFullName);
    FileSource<String> csvSource = FileSource
            .forRecordStreamFormat(new TextLineInputFormat(), csvFilePath)
            .build();

    env.fromSource(
            csvSource,
            WatermarkStrategy.noWatermarks(),
            "GzipCsvSource"
    ).sinkTo(csvSink);
}

更多示例和代码片段可在测试代码中找到:

快速入门示例

我们提供了基于 Maven 的示例,帮助您快速开始使用 ClickHouse Sink:

如需更详细的说明,请参阅示例指南

DataStream API 连接选项

ClickHouse 客户端选项

Parameters描述Default ValueRequired
url完整限定的 ClickHouse URLN/A
usernameClickHouse 数据库用户名N/A
passwordClickHouse 数据库密码N/A
databaseClickHouse 数据库名称N/A
tableClickHouse 表名N/A
optionsJava 客户端配置选项映射空映射
serverSettingsClickHouse 服务器会话设置映射空映射
enableJsonSupportAsString用于指定 JSON 数据类型 应以 JSON 格式字符串传入的 ClickHouse 服务器设置true

应将 optionsserverSettings 作为 Map<String, String> 传递给客户端。两者中任一项使用空映射时,分别采用客户端或服务器的默认值。

注意

所有可用的 Java 客户端选项均列在 ClientConfigProperties.java此文档页面中。

所有可用的服务器会话设置均列在此文档页面中。

例如:

Map<String, String> javaClientOptions = Map.of(
    ClientConfigProperties.CA_CERTIFICATE.getKey(), "<my_CA_cert>",
    ClientConfigProperties.SSL_CERTIFICATE.getKey(), "<my_SSL_cert>",
    ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), "30000",
    ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS.getKey(), "5"
);

Map<String, String> serverSettings = Map.of(
    "insert_deduplicate", "1"
);

ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(
    url,
    username,
    password,
    database,
    tableName,
    javaClientOptions,
    serverSettings,
    false // 启用 enableJsonSupportAsString
);

Sink 选项

以下选项直接来自 Flink 的 AsyncSinkBase

参数描述默认值必填
maxBatchSize单个批次可插入的最大记录数N/A
maxInFlightRequestssink 开始施加背压之前,允许的最大进行中请求数N/A
maxBufferedRequestssink 开始施加背压之前,可在其中缓冲的最大记录数N/A
maxBatchSizeInBytes单个批次允许达到的最大大小 (以字节为单位) 。发送的所有批次都将小于或等于该大小N/A
maxTimeInBufferMS记录在被刷新之前可在 sink 中停留的最长时间N/A
maxRecordSizeInBytessink 可接受的最大记录大小,超过该大小的记录将被自动拒绝N/A

支持的数据类型

下表简要说明了从 Flink 插入数据到 ClickHouse 时的数据类型转换。

Java 类型ClickHouse 类型是否支持序列化方法
byte/ByteInt8DataWriter.writeInt8
short/ShortInt16DataWriter.writeInt16
int/IntegerInt32DataWriter.writeInt32
long/LongInt64DataWriter.writeInt64
BigIntegerInt128DataWriter.writeInt128
BigIntegerInt256DataWriter.writeInt256
short/ShortUInt8DataWriter.writeUInt8
int/IntegerUInt8DataWriter.writeUInt8
int/IntegerUInt16DataWriter.writeUInt16
long/LongUInt32DataWriter.writeUInt32
long/LongUInt64DataWriter.writeUInt64
BigIntegerUInt64DataWriter.writeUInt64
BigIntegerUInt128DataWriter.writeUInt128
BigIntegerUInt256DataWriter.writeUInt256
BigDecimalDecimalDataWriter.writeDecimal
BigDecimalDecimal32DataWriter.writeDecimal
BigDecimalDecimal64DataWriter.writeDecimal
BigDecimalDecimal128DataWriter.writeDecimal
BigDecimalDecimal256DataWriter.writeDecimal
float/FloatFloatDataWriter.writeFloat32
double/DoubleDoubleDataWriter.writeFloat64
boolean/BooleanBooleanDataWriter.writeBoolean
StringStringDataWriter.writeString
StringFixedStringDataWriter.writeFixedString
LocalDateDateDataWriter.writeDate
LocalDateDate32DataWriter.writeDate32
LocalDateTimeDateTimeDataWriter.writeDateTime
ZonedDateTimeDateTimeDataWriter.writeDateTime
LocalDateTimeDateTime64DataWriter.writeDateTime64
ZonedDateTimeDateTime64DataWriter.writeDateTime64
int/IntegerTimeN/A
long/LongTime64N/A
byte/ByteEnum8DataWriter.writeInt8
int/IntegerEnum16DataWriter.writeInt16
java.util.UUIDUUIDDataWriter.writeIntUUID
StringJSONDataWriter.writeJSON
Array<Type>Array<Type>DataWriter.writeArray
Map<K,V>Map<K,V>DataWriter.writeMap
Tuple<Type,..>Tuple<T1,T2,..>DataWriter.writeTuple
ObjectVariantN/A

注意:

  • 执行日期相关操作时,必须提供 ZoneId
  • 执行十进制相关操作时,必须提供精度和小数位数
  • 要让 ClickHouse 将 Java String 解析为 JSON,您需要在 ClickHouseClientConfig 中启用 enableJsonSupportAsString
  • 该连接器需要一个 ElementConvertor,用于将输入 DataStream 中的元素映射为 ClickHouse 数据载荷。为此,连接器提供了 ClickHouseConvertorPOJOConvertor,您可以使用它们结合上述 DataWriter 序列化方法来实现此映射。

支持的输入格式

可在此文档页面ClickHouseFormat.java 中查看可用的 ClickHouse 输入格式列表。

要指定连接器使用哪种格式将您的 DataStream 序列化为发送到 ClickHouse 的载荷,请使用 setClickHouseFormat 函数。例如:

ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
        convertorString,
        MAX_BATCH_SIZE,
        MAX_IN_FLIGHT_REQUESTS,
        MAX_BUFFERED_REQUESTS,
        MAX_BATCH_SIZE_IN_BYTES,
        MAX_TIME_IN_BUFFER_MS,
        MAX_RECORD_SIZE_IN_BYTES,
        clickHouseClientConfig
);
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
注意

默认情况下,如果在 ClickHouseClientConfig 中将 setSupportDefault 显式设置为 true 或 false,连接器将分别使用 RowBinaryWithDefaultsRowBinary

指标

该连接器除 Flink 现有指标外,还会额外暴露以下指标:

Metric描述TypeStatus
numBytesSend请求负载中发送到 ClickHouse 的总字节数。注意:该指标衡量的是通过网络发送的序列化数据大小,可能与 ClickHouse 在 system.query_log 中的 written_bytes 不同;后者反映的是数据经处理后实际写入存储的字节数Counter
numRecordSend发送到 ClickHouse 的记录总数Counter
numRequestSubmitted已发送的请求总数 (即实际执行的 flush 次数)Counter
numOfDroppedBatches因不可重试故障而丢弃的批次总数Counter
numOfDroppedRecords因不可重试故障而丢弃的记录总数Counter
totalBatchRetries因可重试故障而进行的批次重试总数Counter
writeLatencyHistogram成功写入延迟分布直方图 (毫秒)Histogram
writeFailureLatencyHistogram写入失败延迟分布直方图 (毫秒)Histogram
triggeredByMaxBatchSizeCounter因达到 maxBatchSize 而触发的 flush 总数Counter
triggeredByMaxBatchSizeInBytesCounter因达到 maxBatchSizeInBytes 而触发的 flush 总数Counter
triggeredByMaxTimeInBufferMSCounter因达到 maxTimeInBufferMS 而触发的 flush 总数Counter
actualRecordsPerBatch实际每批记录数分布直方图Histogram
actualBytesPerBatch实际每批字节数分布直方图Histogram

限制

  • 该 sink 目前提供至少一次 (at-least-once) 交付保证。对 exactly-once 语义的支持工作正在此处跟进。
  • 该 sink 尚不支持使用死信队列 (DLQ) 缓冲无法处理的记录。目前,连接器会尝试重新插入失败的记录;如果仍然失败,则会将其丢弃。此功能正在此处跟进。
  • 该 sink 尚不支持通过 Flink 的 Table API 或 Flink SQL 创建。此功能正在此处跟进。

ClickHouse 版本兼容性与安全性

  • 该连接器通过每日 CI 工作流对一系列较新的 ClickHouse 版本进行测试,包括 latest 和 head。随着新的 ClickHouse 版本发布并进入活跃支持阶段,测试版本会定期更新。有关该连接器每日测试的版本,请参见此处
  • 有关已知安全漏洞以及如何报告漏洞,请参见 ClickHouse 安全策略
  • 我们建议持续升级该连接器,以免错过安全修复和功能改进。
  • 如果你在迁移过程中遇到问题,请在 GitHub 上创建 issue,我们会尽快回复!
  • 为获得最佳性能,请确保 DataStream 的元素类型不是泛型类型——请参阅此处有关 Flink 类型区分的说明。非泛型元素可避免 Kryo 带来的序列化开销,并提高写入 ClickHouse 的吞吐量。
  • 我们建议将 maxBatchSize 设置为至少 1000,理想情况下在 10,000 到 100,000 之间。更多信息请参阅这篇关于批量插入的指南
  • 如需在 ClickHouse 中执行 OLTP 风格的去重或 upsert,请参阅此文档页面注意:不要将其与重试时发生的批次去重混淆,详见下文

故障排查

CANNOT_READ_ALL_DATA

可能会出现以下报错:

com.clickhouse.client.api.ServerException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 9205. Bytes expected: 1100022.: (at row 9) : While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)

原因:最常见的情况是,CANNOT_READ_ALL_DATA 错误意味着 ClickHouse 表的 schema 与 Flink 记录的 schema 不一致。当其中任意一方以不向后兼容的方式发生变更时,就可能出现这种情况。

解决方案:更新 ClickHouse 表或连接器输入数据类型中的 schema (或同时更新两者) ,使其相互兼容。如有需要,请参阅type mapping,了解如何将 Java 类型映射为 ClickHouse 类型。注意:如果仍有正在传输中的记录,则在重启连接器时需要重置 Flink 状态。

吞吐量低

向 ClickHouse 写入时,你可能会遇到这样的问题:连接器的吞吐量不会随着作业并行度 (Flink 任务数) 的增加而提升。

原因:ClickHouse 的后台分区片段合并过程可能会拖慢插入速度。当配置的批次大小过小、连接器刷新过于频繁,或两者同时存在时,就可能发生这种情况。

解决方案:监控 numRequestSubmittedactualRecordsPerBatch 指标,以帮助确定如何调整批次大小 (maxBatchSize) 以及刷新频率。另请参见高级与推荐用法中的批次大小建议。

我的 ClickHouse 表中缺少一些行

原因:这些批次之所以被丢弃,要么是因为发生了不可重试的故障,要么是因为在已配置的重试次数内仍无法完成插入(可通过 ClickHouseClientConfig.setNumberOfRetries() 设置)。注意:默认情况下,连接器在丢弃一个批次之前,最多会尝试重新插入 3 次。

解决方案:检查 TaskManager 日志和/或堆栈跟踪,找出根本原因。

贡献与支持

如果您想为该项目做出贡献或报告任何问题,欢迎提出宝贵意见! 请访问我们的 GitHub 仓库 提交 issue、提出改进建议, 或发起拉取请求。

欢迎贡献!开始之前,请先查看仓库中的贡献指南。 感谢您帮助改进 ClickHouse Flink 连接器!