メインコンテンツへスキップ
メインコンテンツへスキップ

Flink コネクタ

ClickHouse Supported

これは、ClickHouse がサポートする公式の Apache Flink Sink Connector です。Flink の AsyncSinkBase と、公式の ClickHouse Java client を使用して構築されています。

このコネクタは Apache Flink の DataStream API をサポートしています。Table API のサポートは、今後のリリースで予定されています

要件

  • Java 11 以降 (Flink 1.17 以降の場合) または 17 以降 (Flink 2.0 以降の場合)
  • Apache Flink 1.17 以降

このコネクタは、Flink 1.17+ と Flink 2.0+ の両方をサポートするため、2 つのアーティファクトに分かれています。使用する Flink バージョンに対応するアーティファクトを選択してください。

Flink バージョンアーティファクトClickHouse Java Client バージョン必要な Java
最新flink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.1flink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.0flink-connector-clickhouse-2.0.00.9.5Java 17+
1.20.2flink-connector-clickhouse-1.170.9.5Java 11+
1.19.3flink-connector-clickhouse-1.170.9.5Java 11+
1.18.1flink-connector-clickhouse-1.170.9.5Java 11+
1.17.2flink-connector-clickhouse-1.170.9.5Java 11+
注記

このコネクタは、Flink 1.17.2 より前のバージョンではテストされていません。

インストールとセットアップ

依存関係として追加

<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-2.0.0</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>
<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-1.17</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>

バイナリをダウンロードする

バイナリ JAR の名前のパターンは次のとおりです。

flink-connector-clickhouse-${flink_version}-${stable_version}-all.jar

ここで:

利用可能なリリース済みの JAR ファイルはすべて、Maven Central Repository で確認できます。

DataStream APIの使用

スニペット

生の CSV データを ClickHouse に挿入する場合は、次のようになります。

public static void main(String[] args) {
    // ClickHouseClient を設定
    ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);

    // ElementConverter を作成
    ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);

    // シンクを作成し、`setClickHouseFormat` を使用してフォーマットを設定
    ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
            convertorString,
            MAX_BATCH_SIZE,
            MAX_IN_FLIGHT_REQUESTS,
            MAX_BUFFERED_REQUESTS,
            MAX_BATCH_SIZE_IN_BYTES,
            MAX_TIME_IN_BUFFER_MS,
            MAX_RECORD_SIZE_IN_BYTES,
            clickHouseClientConfig
    );

    csvSink.setClickHouseFormat(ClickHouseFormat.CSV);

    // 最後に、DataStream をシンクに接続します。
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Path csvFilePath = new Path(fileFullName);
    FileSource<String> csvSource = FileSource
            .forRecordStreamFormat(new TextLineInputFormat(), csvFilePath)
            .build();

    env.fromSource(
            csvSource,
            WatermarkStrategy.noWatermarks(),
            "GzipCsvSource"
    ).sinkTo(csvSink);
}

その他の例やスニペットは、テストコードで確認できます。

クイックスタート例

ClickHouse Sink を手軽に始められるよう、Maven ベースの例を用意しています。

より詳細な手順については、例ガイドを参照してください

DataStream API の接続オプション

ClickHouse クライアントオプション

Parameters説明Default ValueRequired
url完全修飾された ClickHouse の URLN/AYes
usernameClickHouse データベースのユーザー名N/AYes
passwordClickHouse データベースのパスワードN/AYes
databaseClickHouse データベース名N/AYes
tableClickHouse テーブル名N/AYes
optionsJava クライアントの設定オプションのマップEmpty mapNo
serverSettingsClickHouse サーバーのセッション設定のマップEmpty mapNo
enableJsonSupportAsStringJSON data type に対して JSON 形式の文字列を受け取ることを想定する ClickHouse サーバー設定trueNo

optionsserverSettings は、Map<String, String> としてクライアントに渡してください。どちらかに空のマップを指定した場合は、それぞれクライアントまたはサーバーのデフォルト設定が使用されます。

注記

利用可能なすべての Java クライアントオプションは、ClientConfigProperties.java および このドキュメントページ に一覧があります。

利用可能なすべてのサーバーセッション設定は、このドキュメントページ に一覧があります。

例:

Map<String, String> javaClientOptions = Map.of(
    ClientConfigProperties.CA_CERTIFICATE.getKey(), "<my_CA_cert>",
    ClientConfigProperties.SSL_CERTIFICATE.getKey(), "<my_SSL_cert>",
    ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), "30000",
    ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS.getKey(), "5"
);

Map<String, String> serverSettings = Map.of(
    "insert_deduplicate", "1"
);

ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(
    url,
    username,
    password,
    database,
    tableName,
    javaClientOptions,
    serverSettings,
    false // enableJsonSupportAsString
);

シンクオプション

以下のオプションは、Flink の AsyncSinkBase に直接由来するものです。

パラメータ説明デフォルト値必須
maxBatchSize1 回のバッチで挿入できるレコードの最大数N/Aはい
maxInFlightRequestsシンクがバックプレッシャーを適用するまでに許可される、進行中リクエストの最大数N/Aはい
maxBufferedRequestsバックプレッシャーが適用されるまでに、シンク内でバッファできるレコードの最大数N/Aはい
maxBatchSizeInBytesバッチの最大サイズ (バイト単位) 。送信されるすべてのバッチは、このサイズ以下になりますN/Aはい
maxTimeInBufferMSフラッシュされるまでにレコードをシンク内に保持できる最大時間N/Aはい
maxRecordSizeInBytesシンクが受け入れるレコードの最大サイズ。これを超えるレコードは自動的に拒否されますN/Aはい

サポートされているデータ型

以下の表は、Flink から ClickHouse にデータを挿入する際のデータ型変換について、簡単に参照できる一覧です。

Java型ClickHouse型サポート状況シリアライズ方式
byte/ByteInt8DataWriter.writeInt8
short/ShortInt16DataWriter.writeInt16
int/IntegerInt32DataWriter.writeInt32
long/LongInt64DataWriter.writeInt64
BigIntegerInt128DataWriter.writeInt128
BigIntegerInt256DataWriter.writeInt256
short/ShortUInt8DataWriter.writeUInt8
int/IntegerUInt8DataWriter.writeUInt8
int/IntegerUInt16DataWriter.writeUInt16
long/LongUInt32DataWriter.writeUInt32
long/LongUInt64DataWriter.writeUInt64
BigIntegerUInt64DataWriter.writeUInt64
BigIntegerUInt128DataWriter.writeUInt128
BigIntegerUInt256DataWriter.writeUInt256
BigDecimalDecimalDataWriter.writeDecimal
BigDecimalDecimal32DataWriter.writeDecimal
BigDecimalDecimal64DataWriter.writeDecimal
BigDecimalDecimal128DataWriter.writeDecimal
BigDecimalDecimal256DataWriter.writeDecimal
float/FloatFloatDataWriter.writeFloat32
double/DoubleDoubleDataWriter.writeFloat64
boolean/BooleanBooleanDataWriter.writeBoolean
StringStringDataWriter.writeString
StringFixedStringDataWriter.writeFixedString
LocalDateDateDataWriter.writeDate
LocalDateDate32DataWriter.writeDate32
LocalDateTimeDateTimeDataWriter.writeDateTime
ZonedDateTimeDateTimeDataWriter.writeDateTime
LocalDateTimeDateTime64DataWriter.writeDateTime64
ZonedDateTimeDateTime64DataWriter.writeDateTime64
int/IntegerTime該当なし
long/LongTime64該当なし
byte/ByteEnum8DataWriter.writeInt8
int/IntegerEnum16DataWriter.writeInt16
java.util.UUIDUUIDDataWriter.writeIntUUID
StringJSONDataWriter.writeJSON
Array<Type>Array<Type>DataWriter.writeArray
Map<K,V>Map<K,V>DataWriter.writeMap
Tuple<Type,..>Tuple<T1,T2,..>DataWriter.writeTuple
ObjectVariant該当なし

注意:

  • 日付操作を行う際は、ZoneId を指定する必要があります。
  • 10 進数の操作を行う際は、精度とスケールを指定する必要があります。
  • ClickHouse が Java の String を JSON として解析できるようにするには、ClickHouseClientConfigenableJsonSupportAsString を有効にする必要があります。
  • このコネクタでは、入力 DataStream 内の要素を ClickHouse のペイロードにマッピングするために ElementConvertor が必要です。そのため、このコネクタは ClickHouseConvertorPOJOConvertor を提供しており、上記の DataWriter のシリアライズメソッドを使用してこのマッピングを実装できます。

サポートされている入力形式

利用可能な ClickHouse の入力形式の一覧は、こちらのドキュメントページClickHouseFormat.java で確認できます。

DataStream を ClickHouse のペイロードにシリアライズする際にコネクタが使用する形式を指定するには、setClickHouseFormat 関数を使用します。例:

ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
        convertorString,
        MAX_BATCH_SIZE,
        MAX_IN_FLIGHT_REQUESTS,
        MAX_BUFFERED_REQUESTS,
        MAX_BATCH_SIZE_IN_BYTES,
        MAX_TIME_IN_BUFFER_MS,
        MAX_RECORD_SIZE_IN_BYTES,
        clickHouseClientConfig
);
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
注記

デフォルトでは、ClickHouseClientConfigsetSupportDefault が明示的に true に設定されている場合は RowBinaryWithDefaults、false に設定されている場合は RowBinary を使用します。

メトリクス

このコネクタは、Flink の既存のメトリクスに加えて、以下の追加メトリクスを公開します。

Metric説明TypeStatus
numBytesSendリクエストペイロード内で ClickHouse に送信された総バイト数。注: このメトリクスは、ネットワーク経由で送信されたシリアライズ済みデータのサイズを測定するため、system.query_log 内の ClickHouse の written_bytes と異なる場合があります。written_bytes は、処理後にストレージへ実際に書き込まれたバイト数を反映しますカウンタ
numRecordSendClickHouse に送信されたレコードの総数カウンタ
numRequestSubmitted送信されたリクエストの総数 (実際に実行されたフラッシュ回数)カウンタ
numOfDroppedBatches再試行できない障害により破棄されたバッチの総数カウンタ
numOfDroppedRecords再試行できない障害により破棄されたレコードの総数カウンタ
totalBatchRetries再試行可能な障害により実行されたバッチ再試行の総数カウンタ
writeLatencyHistogram書き込み成功時のレイテンシ分布のヒストグラム (ms)ヒストグラム
writeFailureLatencyHistogram書き込み失敗時のレイテンシ分布のヒストグラム (ms)ヒストグラム
triggeredByMaxBatchSizeCountermaxBatchSize への到達によりトリガーされたフラッシュの総数カウンタ
triggeredByMaxBatchSizeInBytesCountermaxBatchSizeInBytes への到達によりトリガーされたフラッシュの総数カウンタ
triggeredByMaxTimeInBufferMSCountermaxTimeInBufferMS への到達によりトリガーされたフラッシュの総数カウンタ
actualRecordsPerBatch実際のバッチサイズ分布のヒストグラムヒストグラム
actualBytesPerBatchバッチあたりの実際のバイト数分布のヒストグラムヒストグラム

制限事項

  • このシンクは現在、at-least-once の配信保証を提供します。exactly-once セマンティクスに向けた作業はこちらで追跡されています。
  • このシンクは、処理できないレコードをバッファリングするためのデッドレターキュー (DLQ) をまだサポートしていません。現時点では、コネクタは挿入に失敗したレコードの再挿入を試み、それでも成功しない場合はそれらを破棄します。この機能はこちらで追跡されています。
  • このシンクは、Flink の Table API または Flink SQL を介した作成をまだサポートしていません。この機能はこちらで追跡されています。

ClickHouse のバージョン互換性とセキュリティ

  • このコネクタは、最新版や head を含む複数の最近の ClickHouse バージョンに対して、日次の CI ワークフローでテストされています。テスト対象のバージョンは、新しい ClickHouse リリースが有効になるのに合わせて定期的に更新されます。コネクタが日次でテストしているバージョンについては、こちらを参照してください。
  • 既知のセキュリティ脆弱性と脆弱性の報告方法については、ClickHouse のセキュリティポリシーを参照してください。
  • セキュリティ修正や新しい改善を見逃さないよう、コネクタは継続的にアップグレードすることを推奨します。
  • 移行で問題が発生した場合は、GitHub の issue を作成してください。対応します。
  • 最適なパフォーマンスを得るには、DataStream の要素型が汎用型でないことを確認してください。詳しくは、Flink の型の区別に関するこちらの説明を参照してください。汎用型でない要素を使用すると、Kryo によるシリアライズのオーバーヘッドを回避でき、ClickHouse へのスループットが向上します。
  • maxBatchSize は少なくとも 1000、理想的には 10,000 ~ 100,000 に設定することを推奨します。詳しくは、バルク挿入に関するこのガイドを参照してください。
  • OLTP スタイルの重複排除や ClickHouse への upsert を行う場合は、このドキュメントページを参照してください。注: これは、以下で詳しく説明する、再試行時に発生するバッチ重複排除とは異なります。

トラブルシューティング

CANNOT_READ_ALL_DATA

以下のエラーが発生することがあります:

com.clickhouse.client.api.ServerException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 9205. Bytes expected: 1100022.: (at row 9) : While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)

原因: 一般的に、CANNOT_READ_ALL_DATA エラーは、ClickHouse テーブルのスキーマと Flink レコードのスキーマに不整合が生じていることを意味します。これは、いずれか一方または両方が後方互換性のない形で変更された場合に発生します。

解決策: ClickHouse テーブルまたはコネクタの入力データ型、あるいはその両方のスキーマを更新し、互換性を持たせてください。必要に応じて、Java 型を ClickHouse 型にどのようにマッピングするかについては、型マッピングを参照してください。注: まだ処理中のレコードがある場合は、コネクタの再起動時に Flink の状態をリセットする必要があります。

スループットが低い

ClickHouse への書き込み時に、コネクタのスループットがジョブの並列度 (Flink のタスク数) に応じて向上しないことがあります。

原因: ClickHouse のバックグラウンドで実行されるパートのマージ処理によって、挿入が遅くなっている可能性があります。これは、設定したバッチサイズが小さすぎる場合、コネクタのフラッシュ頻度が高すぎる場合、またはその両方が重なった場合に発生することがあります。

解決策: numRequestSubmittedactualRecordsPerBatch のメトリクスを監視し、バッチサイズ (maxBatchSize) とフラッシュ頻度をどのように調整すべきか判断してください。また、バッチサイズに関する推奨事項については、高度な使用方法と推奨される使用方法も参照してください。

ClickHouse テーブルで行が欠落する

原因: バッチは、再試行不可能な障害が発生したか、設定された再試行回数 (ClickHouseClientConfig.setNumberOfRetries()で設定可能) 以内に挿入できなかったため、破棄されました。注: デフォルトでは、コネクタはバッチを破棄する前に、最大 3 回まで再挿入を試行します。

解決策: 根本原因を特定するため、TaskManager のログやスタックトレースを確認してください。

コントリビューションとサポート

プロジェクトへのコントリビューションや問題の報告をご希望の場合は、ぜひご意見をお寄せください。 issue の作成、改善の提案、または pull request の送信については、GitHub リポジトリをご覧ください。

コントリビューションを歓迎します。開始する前に、リポジトリ内のコントリビューションガイドを確認してください。 ClickHouse Flink コネクタの改善にご協力いただきありがとうございます。