Spark コネクタ
このコネクタは、高度なパーティショニングや述語プッシュダウンなど、ClickHouse 固有の最適化機能を活用して、 クエリのパフォーマンスとデータ処理を向上させます。 このコネクタは ClickHouse の公式 JDBC コネクタ をベースとしており、 独自のカタログを管理します。
Spark 3.0 以前、Spark には組み込みのカタログという概念がなかったため、ユーザーは通常、 Hive Metastore や AWS Glue などの外部カタログシステムに依存していました。 これらの外部ソリューションでは、Spark でアクセスする前に、ユーザーはデータソーステーブルを手動で登録する必要がありました。 しかし、Spark 3.0 でカタログの概念が導入されて以来、Spark はカタログプラグインを登録することでテーブルを自動的に検出できるようになりました。
Spark のデフォルトのカタログは spark_catalog であり、テーブルは {catalog name}.{database}.{table} という形式で識別されます。
新しいカタログ機能により、1 つの Spark アプリケーション内で複数のカタログを追加して利用できるようになりました。
要件
- Java 8 または 17(Spark 4.0 では Java 17 以上が必須)
- Scala 2.12 または 2.13(Spark 4.0 は Scala 2.13 のみをサポート)
- Apache Spark 3.3、3.4、3.5、または 4.0
互換性マトリックス
| バージョン | 対応 Spark バージョン | ClickHouse JDBC バージョン |
|---|---|---|
| main | Spark 3.3, 3.4, 3.5, 4.0 | 0.9.4 |
| 0.9.0 | Spark 3.3, 3.4, 3.5, 4.0 | 0.9.4 |
| 0.8.1 | Spark 3.3, 3.4, 3.5 | 0.6.3 |
| 0.7.3 | Spark 3.3, 3.4 | 0.4.6 |
| 0.6.0 | Spark 3.3 | 0.3.2-patch11 |
| 0.5.0 | Spark 3.2, 3.3 | 0.3.2-patch11 |
| 0.4.0 | Spark 3.2, 3.3 | 依存なし |
| 0.3.0 | Spark 3.2, 3.3 | 依存なし |
| 0.2.1 | Spark 3.2 | 依存なし |
| 0.1.2 | Spark 3.2 | 依存なし |
インストールとセットアップ
Spark と ClickHouse を統合するには、さまざまなプロジェクト構成に対応した複数のインストール方法が用意されています。
ClickHouse Spark コネクタを、プロジェクトのビルドファイル(Maven の pom.xml や SBT の build.sbt など)に
依存関係として直接追加できます。
あるいは、必要な JAR ファイルを $SPARK_HOME/jars/ フォルダーに配置するか、spark-submit コマンドで
--jars フラグを使って Spark のオプションとして直接指定することもできます。
いずれの方法を用いても、Spark 環境で ClickHouse コネクタを利用できるようになります。
依存関係としてインポートする
- Maven
- Gradle
- SBT
- Spark SQL/Shell CLI
SNAPSHOT バージョンを使用したい場合は、次のリポジトリを追加します。
SNAPSHOT バージョンを使用したい場合は、次のリポジトリを追加します。
Spark のシェルオプション(Spark SQL CLI、Spark Shell CLI、Spark Submit コマンド)を使用する場合、依存関係は 必要な JAR を引数として渡すことで解決できます。
JAR ファイルを Spark クライアントノードにコピーしたくない場合は、代わりに次のように指定できます。
注: SQL のみのユースケースの場合、本番環境では Apache Kyuubi の使用を推奨します。
ライブラリをダウンロードする
バイナリ JAR のファイル名パターンは次のとおりです。
利用可能なすべてのリリース済み JAR ファイルは Maven Central Repository から、 すべてのデイリービルド SNAPSHOT JAR ファイルは Sonatype OSS Snapshots Repository から入手できます。
コネクタは clickhouse-http および clickhouse-client に依存しており、 どちらも clickhouse-jdbc:all にバンドルされているため、 classifier が "all" の clickhouse-jdbc JAR を 必ず含める必要があります。 代わりに、完全な JDBC パッケージ一式を使用したくない場合は、 clickhouse-client JAR および clickhouse-http を 個別に追加することもできます。
いずれの場合も、 Compatibility Matrix に従ってパッケージのバージョン互換性が取れていることを確認してください。
カタログを登録する(必須)
ClickHouse のテーブルへアクセスするには、以下の設定で新しい Spark カタログを構成する必要があります。
| Property | Value | Default Value | Required |
|---|---|---|---|
spark.sql.catalog.<catalog_name> | com.clickhouse.spark.ClickHouseCatalog | N/A | Yes |
spark.sql.catalog.<catalog_name>.host | <clickhouse_host> | localhost | No |
spark.sql.catalog.<catalog_name>.protocol | http | http | No |
spark.sql.catalog.<catalog_name>.http_port | <clickhouse_port> | 8123 | No |
spark.sql.catalog.<catalog_name>.user | <clickhouse_username> | default | No |
spark.sql.catalog.<catalog_name>.password | <clickhouse_password> | (empty string) | No |
spark.sql.catalog.<catalog_name>.database | <database> | default | No |
spark.<catalog_name>.write.format | json | arrow | No |
これらの設定は、次のいずれかの方法で指定できます。
spark-defaults.confを編集または作成する。spark-submitコマンド(またはspark-shell/spark-sqlの CLI コマンド)に設定を渡す。- コンテキストを初期化する際に設定を追加する。
ClickHouse クラスターで作業する場合は、各インスタンスごとに一意のカタログ名を設定する必要があります。 例えば、次のようにします。
このように設定すると、Spark SQL から clickhouse1.<ck_db>.<ck_table> を使用して clickhouse1 のテーブル <ck_db>.<ck_table> にアクセスでき、clickhouse2.<ck_db>.<ck_table> を使用して clickhouse2 のテーブル <ck_db>.<ck_table> にアクセスできるようになります。
ClickHouse Cloud の設定
ClickHouse Cloud に接続する際は、SSL を有効にし、適切な SSL モードを設定してください。例えば、次のように指定します。
データの読み込み
- Java
- Scala
- Python
- Spark SQL
データを書き込む
- Java
- Scala
- Python
- Spark SQL
DDL 操作
Spark SQL を使用して ClickHouse インスタンスに対して DDL 操作を実行でき、そこで行ったすべての変更は即座に ClickHouse に永続化されます。 Spark SQL では ClickHouse とまったく同じようにクエリを記述できるため、 たとえば CREATE TABLE や TRUNCATE などのコマンドを変更することなく、そのまま直接実行できます。
Spark SQL を使用する場合、一度に実行できるステートメントは 1 つだけです。
上記の例は Spark SQL クエリを示しており、Java や Scala、PySpark、シェルなどの任意の API からアプリケーション内で実行できます。
設定
コネクタで変更可能な設定項目は次のとおりです。
| キー | デフォルト | 概要 | 以降 |
|---|---|---|---|
| spark.clickhouse.ignoreUnsupportedTransform | false | ClickHouse は、シャーディングキーやパーティション値として複雑な式(例: cityHash64(col_1, col_2))を使用できますが、これらは現在 Spark ではサポートされていません。true の場合はサポートされていない式を無視し、それ以外の場合は例外をスローして即座にエラー終了します。なお、spark.clickhouse.write.distributed.convertLocal が有効な場合、サポートされていないシャーディングキーを無視するとデータが破損するおそれがあります。 | 0.4.0 |
| spark.clickhouse.read.compression.codec | lz4 | 読み取り時にデータを解凍するために使用するコーデック。サポートされるコーデック: none, lz4。 | 0.5.0 |
| spark.clickhouse.read.distributed.convertLocal | true | Distributed テーブルを読み込む際は、自身ではなくローカルテーブルを読み込みます。true の場合、spark.clickhouse.read.distributed.useClusterNodes は無視されます。 | 0.1.0 |
| spark.clickhouse.read.fixedStringAs | バイナリ | ClickHouse の FixedString 型を指定した Spark データ型として読み取ります。サポートされる型:binary、string | 0.8.0 |
| spark.clickhouse.read.format | json | 読み取り用のシリアライズ形式。サポートされる形式: JSON, Binary | 0.6.0 |
| spark.clickhouse.read.runtimeFilter.enabled | false | 読み取り用のランタイムフィルターを有効化します。 | 0.8.0 |
| spark.clickhouse.read.splitByPartitionId | true | true の場合、パーティション値ではなく仮想カラム _partition_id を使って入力パーティションフィルタを構成します。パーティション値によって SQL の述語を組み立てる場合には、既知の問題があります。この機能には ClickHouse Server v21.6 以降が必要です。 | 0.4.0 |
| spark.clickhouse.useNullableQuerySchema | false | true の場合、テーブル作成時に CREATE/REPLACE TABLE ... AS SELECT ... を実行すると、クエリスキーマ内のすべてのフィールドを nullable としてマークします。なお、この設定には SPARK-43390(Spark 3.5 で利用可能)が必要であり、このパッチがない場合は設定値に関係なく常に true として動作します。 | 0.8.0 |
| spark.clickhouse.write.batchSize | 10000 | ClickHouse への書き込み時に、1 バッチあたりに含めるレコード数。 | 0.1.0 |
| spark.clickhouse.write.compression.codec | lz4 | 書き込み時にデータを圧縮するためのコーデック。サポートされているコーデックは none と lz4 です。 | 0.3.0 |
| spark.clickhouse.write.distributed.convertLocal | false | Distributed テーブルに書き込む際は、自身ではなくローカルテーブルに書き込みます。true の場合、spark.clickhouse.write.distributed.useClusterNodes を無視します。 | 0.1.0 |
| spark.clickhouse.write.distributed.useClusterNodes | true | Distributed テーブルへの書き込み時に、クラスタ内のすべてのノードに書き込む。 | 0.1.0 |
| spark.clickhouse.write.format | 矢印 | 書き込み時のシリアル化形式。サポートされる形式: JSON、Arrow | 0.4.0 |
| spark.clickhouse.write.localSortByKey | true | true の場合、書き込み前にソートキーに基づいてローカルでソートを行います。 | 0.3.0 |
| spark.clickhouse.write.localSortByPartition | spark.clickhouse.write.repartitionByPartition の値 | true の場合、書き込み前にローカルでパーティションごとにソートを行います。設定されていない場合は、spark.clickhouse.write.repartitionByPartition と同じ値になります。 | 0.3.0 |
| spark.clickhouse.write.maxRetry | 3 | 再試行可能なエラーコードによって単一バッチ書き込みが失敗した場合に、その書き込みを再試行する最大回数。 | 0.1.0 |
| spark.clickhouse.write.repartitionByPartition | true | 書き込み前に、ClickHouse テーブルのパーティション分布に合わせて ClickHouse のパーティションキーでデータを再パーティションするかどうか。 | 0.3.0 |
| spark.clickhouse.write.repartitionNum | 0 | 書き込み前に ClickHouse テーブルのディストリビューションに合うようデータを再パーティションする必要がある場合に、この設定で再パーティション数を指定します。値が 1 未満の場合は、再パーティションを要求しないことを意味します。 | 0.1.0 |
| spark.clickhouse.write.repartitionStrictly | false | true の場合、Spark は書き込み時にデータソーステーブルへレコードを渡す前に、要求されるデータ分散を満たすよう、入力レコードを厳密にパーティション間へ分配します。true でない場合、Spark はクエリを高速化するために特定の最適化を適用することがありますが、その結果、分散要件が満たされないことがあります。なお、この設定は SPARK-37523(Spark 3.4 で利用可能)の適用が前提であり、このパッチがない場合は常に true として動作します。 | 0.3.0 |
| spark.clickhouse.write.retryInterval | 10秒 | 書き込み再試行間隔(秒) | 0.1.0 |
| spark.clickhouse.write.retryableErrorCodes | 241 | 書き込み処理が失敗した際に ClickHouse サーバーから返される再試行可能なエラーコード。 | 0.1.0 |
サポートされているデータ型
このセクションでは、Spark と ClickHouse 間のデータ型マッピングについて説明します。以下の表は、ClickHouse から Spark へデータを読み込む場合、および Spark から ClickHouse へデータを挿入する場合のデータ型変換に関するクイックリファレンスです。
ClickHouse から Spark へのデータの読み取り
| ClickHouse データ型 | Spark データ型 | サポート状況 | プリミティブ型 | 備考 |
|---|---|---|---|---|
Nothing | NullType | ✅ | はい | |
Bool | BooleanType | ✅ | はい | |
UInt8, Int16 | ShortType | ✅ | はい | |
Int8 | ByteType | ✅ | はい | |
UInt16,Int32 | IntegerType | ✅ | はい | |
UInt32,Int64, UInt64 | LongType | ✅ | はい | |
Int128,UInt128, Int256, UInt256 | DecimalType(38, 0) | ✅ | はい | |
Float32 | FloatType | ✅ | はい | |
Float64 | DoubleType | ✅ | はい | |
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6 | StringType | ✅ | はい | |
FixedString | BinaryType, StringType | ✅ | はい | 設定 READ_FIXED_STRING_AS によって制御されます |
Decimal | DecimalType | ✅ | はい | Decimal128 までの精度とスケール |
Decimal32 | DecimalType(9, scale) | ✅ | はい | |
Decimal64 | DecimalType(18, scale) | ✅ | はい | |
Decimal128 | DecimalType(38, scale) | ✅ | はい | |
Date, Date32 | DateType | ✅ | はい | |
DateTime, DateTime32, DateTime64 | TimestampType | ✅ | はい | |
Array | ArrayType | ✅ | いいえ | 配列要素の型も変換されます |
Map | MapType | ✅ | いいえ | キーは StringType に制限されます |
IntervalYear | YearMonthIntervalType(Year) | ✅ | はい | |
IntervalMonth | YearMonthIntervalType(Month) | ✅ | はい | |
IntervalDay, IntervalHour, IntervalMinute, IntervalSecond | DayTimeIntervalType | ✅ | いいえ | 対応する Interval 型が使用されます |
Object | ❌ | |||
Nested | ❌ | |||
Tuple | StructType | ✅ | いいえ | 名前付きおよび名前なしのタプルをサポートします。名前付きタプルは構造体フィールドに名前で対応付けられ、名前なしタプルは _1、_2 などを使用します。入れ子の構造体および Nullable フィールドをサポートします |
Point | ❌ | |||
Polygon | ❌ | |||
MultiPolygon | ❌ | |||
Ring | ❌ | |||
IntervalQuarter | ❌ | |||
IntervalWeek | ❌ | |||
Decimal256 | ❌ | |||
AggregateFunction | ❌ | |||
SimpleAggregateFunction | ❌ |
Spark から ClickHouse へのデータ挿入
| Spark Data Type | ClickHouse Data Type | サポート有無 | プリミティブ型か | 備考 |
|---|---|---|---|---|
BooleanType | Bool | ✅ | はい | バージョン 0.9.0 以降、Bool 型(UInt8 ではない)にマッピングされます |
ByteType | Int8 | ✅ | はい | |
ShortType | Int16 | ✅ | はい | |
IntegerType | Int32 | ✅ | はい | |
LongType | Int64 | ✅ | はい | |
FloatType | Float32 | ✅ | はい | |
DoubleType | Float64 | ✅ | はい | |
StringType | String | ✅ | はい | |
VarcharType | String | ✅ | はい | |
CharType | String | ✅ | はい | |
DecimalType | Decimal(p, s) | ✅ | はい | 精度とスケールは Decimal128 まで対応 |
DateType | Date | ✅ | はい | |
TimestampType | DateTime | ✅ | はい | |
ArrayType (list, tuple, or array) | Array | ✅ | いいえ | 配列要素の型も変換されます |
MapType | Map | ✅ | いいえ | キーは StringType に制限されます |
StructType | Tuple | ✅ | いいえ | フィールド名付きの Tuple に変換されます |
VariantType | VariantType | ❌ | いいえ | |
Object | ❌ | |||
Nested | ❌ |
貢献とサポート
プロジェクトへの貢献や問題の報告をご希望の場合は、ぜひご協力ください。 GitHub リポジトリにアクセスして、issue の作成、改善提案、 または Pull Request の送信を行ってください。 コントリビューションは大歓迎です。作業を始める前に、リポジトリ内のコントリビューションガイドラインを確認してください。 ClickHouse Spark コネクタの改善にご協力いただき、ありがとうございます。