Flink コネクタ
これは、ClickHouse がサポートする公式の Apache Flink Sink Connector です。Flink の AsyncSinkBase と、公式の ClickHouse Java client を使用して構築されています。
このコネクタは Apache Flink の DataStream API をサポートしています。Table API のサポートは、今後のリリースで予定されています。
要件
- Java 11 以降 (Flink 1.17 以降の場合) または 17 以降 (Flink 2.0 以降の場合)
- Apache Flink 1.17 以降
Flink バージョン互換性マトリクス
このコネクタは、Flink 1.17+ と Flink 2.0+ の両方をサポートするため、2 つのアーティファクトに分かれています。使用する Flink バージョンに対応するアーティファクトを選択してください。
| Flink バージョン | アーティファクト | ClickHouse Java Client バージョン | 必要な Java |
|---|---|---|---|
| 最新 | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 2.0.1 | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 2.0.0 | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 1.20.2 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.19.3 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.18.1 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.17.2 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
このコネクタは、Flink 1.17.2 より前のバージョンではテストされていません。
インストールとセットアップ
依存関係として追加
Flink 2.0 以降の場合
- Maven
- Gradle
- SBT
Flink 1.17以降の場合
- Maven
- Gradle
- SBT
バイナリをダウンロードする
バイナリ JAR の名前のパターンは次のとおりです。
ここで:
flink_versionは2.0.0または1.17のいずれかですstable_versionは安定版アーティファクトのリリースバージョンです
利用可能なリリース済みの JAR ファイルはすべて、Maven Central Repository で確認できます。
DataStream APIの使用
スニペット
生の CSV データを ClickHouse に挿入する場合は、次のようになります。
- Java
その他の例やスニペットは、テストコードで確認できます。
クイックスタート例
ClickHouse Sink を手軽に始められるよう、Maven ベースの例を用意しています。
より詳細な手順については、例ガイドを参照してください
DataStream API の接続オプション
ClickHouse クライアントオプション
| Parameters | 説明 | Default Value | Required |
|---|---|---|---|
url | 完全修飾された ClickHouse の URL | N/A | Yes |
username | ClickHouse データベースのユーザー名 | N/A | Yes |
password | ClickHouse データベースのパスワード | N/A | Yes |
database | ClickHouse データベース名 | N/A | Yes |
table | ClickHouse テーブル名 | N/A | Yes |
options | Java クライアントの設定オプションのマップ | Empty map | No |
serverSettings | ClickHouse サーバーのセッション設定のマップ | Empty map | No |
enableJsonSupportAsString | JSON data type に対して JSON 形式の文字列を受け取ることを想定する ClickHouse サーバー設定 | true | No |
options と serverSettings は、Map<String, String> としてクライアントに渡してください。どちらかに空のマップを指定した場合は、それぞれクライアントまたはサーバーのデフォルト設定が使用されます。
利用可能なすべての Java クライアントオプションは、ClientConfigProperties.java および このドキュメントページ に一覧があります。
利用可能なすべてのサーバーセッション設定は、このドキュメントページ に一覧があります。
例:
- Java
シンクオプション
以下のオプションは、Flink の AsyncSinkBase に直接由来するものです。
| パラメータ | 説明 | デフォルト値 | 必須 |
|---|---|---|---|
maxBatchSize | 1 回のバッチで挿入できるレコードの最大数 | N/A | はい |
maxInFlightRequests | シンクがバックプレッシャーを適用するまでに許可される、進行中リクエストの最大数 | N/A | はい |
maxBufferedRequests | バックプレッシャーが適用されるまでに、シンク内でバッファできるレコードの最大数 | N/A | はい |
maxBatchSizeInBytes | バッチの最大サイズ (バイト単位) 。送信されるすべてのバッチは、このサイズ以下になります | N/A | はい |
maxTimeInBufferMS | フラッシュされるまでにレコードをシンク内に保持できる最大時間 | N/A | はい |
maxRecordSizeInBytes | シンクが受け入れるレコードの最大サイズ。これを超えるレコードは自動的に拒否されます | N/A | はい |
サポートされているデータ型
以下の表は、Flink から ClickHouse にデータを挿入する際のデータ型変換について、簡単に参照できる一覧です。
Flink から ClickHouse へのデータ挿入
| Java型 | ClickHouse型 | サポート状況 | シリアライズ方式 |
|---|---|---|---|
byte/Byte | Int8 | ✅ | DataWriter.writeInt8 |
short/Short | Int16 | ✅ | DataWriter.writeInt16 |
int/Integer | Int32 | ✅ | DataWriter.writeInt32 |
long/Long | Int64 | ✅ | DataWriter.writeInt64 |
BigInteger | Int128 | ✅ | DataWriter.writeInt128 |
BigInteger | Int256 | ✅ | DataWriter.writeInt256 |
short/Short | UInt8 | ✅ | DataWriter.writeUInt8 |
int/Integer | UInt8 | ✅ | DataWriter.writeUInt8 |
int/Integer | UInt16 | ✅ | DataWriter.writeUInt16 |
long/Long | UInt32 | ✅ | DataWriter.writeUInt32 |
long/Long | UInt64 | ✅ | DataWriter.writeUInt64 |
BigInteger | UInt64 | ✅ | DataWriter.writeUInt64 |
BigInteger | UInt128 | ✅ | DataWriter.writeUInt128 |
BigInteger | UInt256 | ✅ | DataWriter.writeUInt256 |
BigDecimal | Decimal | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal32 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal64 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal128 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal256 | ✅ | DataWriter.writeDecimal |
float/Float | Float | ✅ | DataWriter.writeFloat32 |
double/Double | Double | ✅ | DataWriter.writeFloat64 |
boolean/Boolean | Boolean | ✅ | DataWriter.writeBoolean |
String | String | ✅ | DataWriter.writeString |
String | FixedString | ✅ | DataWriter.writeFixedString |
LocalDate | Date | ✅ | DataWriter.writeDate |
LocalDate | Date32 | ✅ | DataWriter.writeDate32 |
LocalDateTime | DateTime | ✅ | DataWriter.writeDateTime |
ZonedDateTime | DateTime | ✅ | DataWriter.writeDateTime |
LocalDateTime | DateTime64 | ✅ | DataWriter.writeDateTime64 |
ZonedDateTime | DateTime64 | ✅ | DataWriter.writeDateTime64 |
int/Integer | Time | ❌ | 該当なし |
long/Long | Time64 | ❌ | 該当なし |
byte/Byte | Enum8 | ✅ | DataWriter.writeInt8 |
int/Integer | Enum16 | ✅ | DataWriter.writeInt16 |
java.util.UUID | UUID | ✅ | DataWriter.writeIntUUID |
String | JSON | ✅ | DataWriter.writeJSON |
Array<Type> | Array<Type> | ✅ | DataWriter.writeArray |
Map<K,V> | Map<K,V> | ✅ | DataWriter.writeMap |
Tuple<Type,..> | Tuple<T1,T2,..> | ✅ | DataWriter.writeTuple |
Object | Variant | ❌ | 該当なし |
注意:
- 日付操作を行う際は、
ZoneIdを指定する必要があります。 - 10 進数の操作を行う際は、精度とスケールを指定する必要があります。
- ClickHouse が Java の String を JSON として解析できるようにするには、
ClickHouseClientConfigでenableJsonSupportAsStringを有効にする必要があります。 - このコネクタでは、入力 DataStream 内の要素を ClickHouse のペイロードにマッピングするために
ElementConvertorが必要です。そのため、このコネクタはClickHouseConvertorとPOJOConvertorを提供しており、上記のDataWriterのシリアライズメソッドを使用してこのマッピングを実装できます。
サポートされている入力形式
利用可能な ClickHouse の入力形式の一覧は、こちらのドキュメントページと ClickHouseFormat.java で確認できます。
DataStream を ClickHouse のペイロードにシリアライズする際にコネクタが使用する形式を指定するには、setClickHouseFormat 関数を使用します。例:
デフォルトでは、ClickHouseClientConfig の setSupportDefault が明示的に true に設定されている場合は RowBinaryWithDefaults、false に設定されている場合は RowBinary を使用します。
メトリクス
このコネクタは、Flink の既存のメトリクスに加えて、以下の追加メトリクスを公開します。
| Metric | 説明 | Type | Status |
|---|---|---|---|
numBytesSend | リクエストペイロード内で ClickHouse に送信された総バイト数。注: このメトリクスは、ネットワーク経由で送信されたシリアライズ済みデータのサイズを測定するため、system.query_log 内の ClickHouse の written_bytes と異なる場合があります。written_bytes は、処理後にストレージへ実際に書き込まれたバイト数を反映します | カウンタ | ✅ |
numRecordSend | ClickHouse に送信されたレコードの総数 | カウンタ | ✅ |
numRequestSubmitted | 送信されたリクエストの総数 (実際に実行されたフラッシュ回数) | カウンタ | ✅ |
numOfDroppedBatches | 再試行できない障害により破棄されたバッチの総数 | カウンタ | ✅ |
numOfDroppedRecords | 再試行できない障害により破棄されたレコードの総数 | カウンタ | ✅ |
totalBatchRetries | 再試行可能な障害により実行されたバッチ再試行の総数 | カウンタ | ✅ |
writeLatencyHistogram | 書き込み成功時のレイテンシ分布のヒストグラム (ms) | ヒストグラム | ✅ |
writeFailureLatencyHistogram | 書き込み失敗時のレイテンシ分布のヒストグラム (ms) | ヒストグラム | ✅ |
triggeredByMaxBatchSizeCounter | maxBatchSize への到達によりトリガーされたフラッシュの総数 | カウンタ | ✅ |
triggeredByMaxBatchSizeInBytesCounter | maxBatchSizeInBytes への到達によりトリガーされたフラッシュの総数 | カウンタ | ✅ |
triggeredByMaxTimeInBufferMSCounter | maxTimeInBufferMS への到達によりトリガーされたフラッシュの総数 | カウンタ | ✅ |
actualRecordsPerBatch | 実際のバッチサイズ分布のヒストグラム | ヒストグラム | ✅ |
actualBytesPerBatch | バッチあたりの実際のバイト数分布のヒストグラム | ヒストグラム | ✅ |
制限事項
- このシンクは現在、at-least-once の配信保証を提供します。exactly-once セマンティクスに向けた作業はこちらで追跡されています。
- このシンクは、処理できないレコードをバッファリングするためのデッドレターキュー (DLQ) をまだサポートしていません。現時点では、コネクタは挿入に失敗したレコードの再挿入を試み、それでも成功しない場合はそれらを破棄します。この機能はこちらで追跡されています。
- このシンクは、Flink の Table API または Flink SQL を介した作成をまだサポートしていません。この機能はこちらで追跡されています。
ClickHouse のバージョン互換性とセキュリティ
- このコネクタは、最新版や head を含む複数の最近の ClickHouse バージョンに対して、日次の CI ワークフローでテストされています。テスト対象のバージョンは、新しい ClickHouse リリースが有効になるのに合わせて定期的に更新されます。コネクタが日次でテストしているバージョンについては、こちらを参照してください。
- 既知のセキュリティ脆弱性と脆弱性の報告方法については、ClickHouse のセキュリティポリシーを参照してください。
- セキュリティ修正や新しい改善を見逃さないよう、コネクタは継続的にアップグレードすることを推奨します。
- 移行で問題が発生した場合は、GitHub の issue を作成してください。対応します。
高度な使用方法と推奨事項
- 最適なパフォーマンスを得るには、DataStream の要素型が汎用型でないことを確認してください。詳しくは、Flink の型の区別に関するこちらの説明を参照してください。汎用型でない要素を使用すると、Kryo によるシリアライズのオーバーヘッドを回避でき、ClickHouse へのスループットが向上します。
maxBatchSizeは少なくとも 1000、理想的には 10,000 ~ 100,000 に設定することを推奨します。詳しくは、バルク挿入に関するこのガイドを参照してください。- OLTP スタイルの重複排除や ClickHouse への upsert を行う場合は、このドキュメントページを参照してください。注: これは、以下で詳しく説明する、再試行時に発生するバッチ重複排除とは異なります。
トラブルシューティング
CANNOT_READ_ALL_DATA
以下のエラーが発生することがあります:
原因: 一般的に、CANNOT_READ_ALL_DATA エラーは、ClickHouse テーブルのスキーマと Flink レコードのスキーマに不整合が生じていることを意味します。これは、いずれか一方または両方が後方互換性のない形で変更された場合に発生します。
解決策: ClickHouse テーブルまたはコネクタの入力データ型、あるいはその両方のスキーマを更新し、互換性を持たせてください。必要に応じて、Java 型を ClickHouse 型にどのようにマッピングするかについては、型マッピングを参照してください。注: まだ処理中のレコードがある場合は、コネクタの再起動時に Flink の状態をリセットする必要があります。
スループットが低い
ClickHouse への書き込み時に、コネクタのスループットがジョブの並列度 (Flink のタスク数) に応じて向上しないことがあります。
原因: ClickHouse のバックグラウンドで実行されるパートのマージ処理によって、挿入が遅くなっている可能性があります。これは、設定したバッチサイズが小さすぎる場合、コネクタのフラッシュ頻度が高すぎる場合、またはその両方が重なった場合に発生することがあります。
解決策: numRequestSubmitted と actualRecordsPerBatch のメトリクスを監視し、バッチサイズ (maxBatchSize) とフラッシュ頻度をどのように調整すべきか判断してください。また、バッチサイズに関する推奨事項については、高度な使用方法と推奨される使用方法も参照してください。
ClickHouse テーブルで行が欠落する
原因: バッチは、再試行不可能な障害が発生したか、設定された再試行回数 (ClickHouseClientConfig.setNumberOfRetries()で設定可能) 以内に挿入できなかったため、破棄されました。注: デフォルトでは、コネクタはバッチを破棄する前に、最大 3 回まで再挿入を試行します。
解決策: 根本原因を特定するため、TaskManager のログやスタックトレースを確認してください。
コントリビューションとサポート
プロジェクトへのコントリビューションや問題の報告をご希望の場合は、ぜひご意見をお寄せください。 issue の作成、改善の提案、または pull request の送信については、GitHub リポジトリをご覧ください。
コントリビューションを歓迎します。開始する前に、リポジトリ内のコントリビューションガイドを確認してください。 ClickHouse Flink コネクタの改善にご協力いただきありがとうございます。