メインコンテンツへスキップ
メインコンテンツへスキップ

Spark コネクタ

このコネクタは、高度なパーティショニングや述語プッシュダウンなど、ClickHouse 固有の最適化機能を活用して、 クエリのパフォーマンスとデータ処理を向上させます。 このコネクタは ClickHouse の公式 JDBC コネクタ をベースとしており、 独自のカタログを管理します。

Spark 3.0 以前、Spark には組み込みのカタログという概念がなかったため、ユーザーは通常、 Hive Metastore や AWS Glue などの外部カタログシステムに依存していました。 これらの外部ソリューションでは、Spark でアクセスする前に、ユーザーはデータソーステーブルを手動で登録する必要がありました。 しかし、Spark 3.0 でカタログの概念が導入されて以来、Spark はカタログプラグインを登録することでテーブルを自動的に検出できるようになりました。

Spark のデフォルトのカタログは spark_catalog であり、テーブルは {catalog name}.{database}.{table} という形式で識別されます。 新しいカタログ機能により、1 つの Spark アプリケーション内で複数のカタログを追加して利用できるようになりました。

要件

  • Java 8 または 17(Spark 4.0 では Java 17 以上が必須)
  • Scala 2.12 または 2.13(Spark 4.0 は Scala 2.13 のみをサポート)
  • Apache Spark 3.3、3.4、3.5、または 4.0

互換性マトリックス

バージョン対応 Spark バージョンClickHouse JDBC バージョン
mainSpark 3.3, 3.4, 3.5, 4.00.9.4
0.9.0Spark 3.3, 3.4, 3.5, 4.00.9.4
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3依存なし
0.3.0Spark 3.2, 3.3依存なし
0.2.1Spark 3.2依存なし
0.1.2Spark 3.2依存なし

インストールとセットアップ

Spark と ClickHouse を統合するには、さまざまなプロジェクト構成に対応した複数のインストール方法が用意されています。 ClickHouse Spark コネクタを、プロジェクトのビルドファイル(Maven の pom.xml や SBT の build.sbt など)に 依存関係として直接追加できます。 あるいは、必要な JAR ファイルを $SPARK_HOME/jars/ フォルダーに配置するか、spark-submit コマンドで --jars フラグを使って Spark のオプションとして直接指定することもできます。 いずれの方法を用いても、Spark 環境で ClickHouse コネクタを利用できるようになります。

依存関係としてインポートする

<dependency>
  <groupId>com.clickhouse.spark</groupId>
  <artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
  <version>{{ stable_version }}</version>
</dependency>
<dependency>
  <groupId>com.clickhouse</groupId>
  <artifactId>clickhouse-jdbc</artifactId>
  <classifier>all</classifier>
  <version>{{ clickhouse_jdbc_version }}</version>
  <exclusions>
    <exclusion>
      <groupId>*</groupId>
      <artifactId>*</artifactId>
    </exclusion>
  </exclusions>
</dependency>

SNAPSHOT バージョンを使用したい場合は、次のリポジトリを追加します。

<repositories>
  <repository>
    <id>sonatype-oss-snapshots</id>
    <name>Sonatype OSS Snapshots Repository</name>
    <url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
  </repository>
</repositories>

ライブラリをダウンロードする

バイナリ JAR のファイル名パターンは次のとおりです。

clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar

利用可能なすべてのリリース済み JAR ファイルは Maven Central Repository から、 すべてのデイリービルド SNAPSHOT JAR ファイルは Sonatype OSS Snapshots Repository から入手できます。

参考文献

コネクタは clickhouse-http および clickhouse-client に依存しており、 どちらも clickhouse-jdbc:all にバンドルされているため、 classifier が "all" の clickhouse-jdbc JAR を 必ず含める必要があります。 代わりに、完全な JDBC パッケージ一式を使用したくない場合は、 clickhouse-client JAR および clickhouse-http を 個別に追加することもできます。

いずれの場合も、 Compatibility Matrix に従ってパッケージのバージョン互換性が取れていることを確認してください。

カタログを登録する(必須)

ClickHouse のテーブルへアクセスするには、以下の設定で新しい Spark カタログを構成する必要があります。

PropertyValueDefault ValueRequired
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/AYes
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhostNo
spark.sql.catalog.<catalog_name>.protocolhttphttpNo
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123No
spark.sql.catalog.<catalog_name>.user<clickhouse_username>defaultNo
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(empty string)No
spark.sql.catalog.<catalog_name>.database<database>defaultNo
spark.<catalog_name>.write.formatjsonarrowNo

これらの設定は、次のいずれかの方法で指定できます。

  • spark-defaults.conf を編集または作成する。
  • spark-submit コマンド(または spark-shell / spark-sql の CLI コマンド)に設定を渡す。
  • コンテキストを初期化する際に設定を追加する。
参考文献

ClickHouse クラスターで作業する場合は、各インスタンスごとに一意のカタログ名を設定する必要があります。 例えば、次のようにします。

spark.sql.catalog.clickhouse1                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse1.host           10.0.0.1
spark.sql.catalog.clickhouse1.protocol       https
spark.sql.catalog.clickhouse1.http_port      8443
spark.sql.catalog.clickhouse1.user           default
spark.sql.catalog.clickhouse1.password
spark.sql.catalog.clickhouse1.database       default
spark.sql.catalog.clickhouse1.option.ssl     true

spark.sql.catalog.clickhouse2                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse2.host           10.0.0.2
spark.sql.catalog.clickhouse2.protocol       https
spark.sql.catalog.clickhouse2.http_port      8443
spark.sql.catalog.clickhouse2.user           default
spark.sql.catalog.clickhouse2.password
spark.sql.catalog.clickhouse2.database       default
spark.sql.catalog.clickhouse2.option.ssl     true

このように設定すると、Spark SQL から clickhouse1.<ck_db>.<ck_table> を使用して clickhouse1 のテーブル <ck_db>.<ck_table> にアクセスでき、clickhouse2.<ck_db>.<ck_table> を使用して clickhouse2 のテーブル <ck_db>.<ck_table> にアクセスできるようになります。

ClickHouse Cloud の設定

ClickHouse Cloud に接続する際は、SSL を有効にし、適切な SSL モードを設定してください。例えば、次のように指定します。

spark.sql.catalog.clickhouse.option.ssl        true
spark.sql.catalog.clickhouse.option.ssl_mode   NONE

データの読み込み

public static void main(String[] args) {
        // Spark セッションを作成する
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        Dataset<Row> df = spark.sql("select * from clickhouse.default.example_table");

        df.show();

        spark.stop();
    }

データを書き込む

 public static void main(String[] args) throws AnalysisException {

        // Sparkセッションを作成する
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        // DataFrameのスキーマを定義する
        StructType schema = new StructType(new StructField[]{
                DataTypes.createStructField("id", DataTypes.IntegerType, false),
                DataTypes.createStructField("name", DataTypes.StringType, false),
        });

        List<Row> data = Arrays.asList(
                RowFactory.create(1, "Alice"),
                RowFactory.create(2, "Bob")
        );

        // DataFrameを作成する
        Dataset<Row> df = spark.createDataFrame(data, schema);

        df.writeTo("clickhouse.default.example_table").append();

        spark.stop();
    }

DDL 操作

Spark SQL を使用して ClickHouse インスタンスに対して DDL 操作を実行でき、そこで行ったすべての変更は即座に ClickHouse に永続化されます。 Spark SQL では ClickHouse とまったく同じようにクエリを記述できるため、 たとえば CREATE TABLE や TRUNCATE などのコマンドを変更することなく、そのまま直接実行できます。

注記

Spark SQL を使用する場合、一度に実行できるステートメントは 1 つだけです。

USE clickhouse; 

CREATE TABLE test_db.tbl_sql (
  create_time TIMESTAMP NOT NULL,
  m           INT       NOT NULL COMMENT 'パーティションキー',
  id          BIGINT    NOT NULL COMMENT 'ソートキー',
  value       STRING
) USING ClickHouse
PARTITIONED BY (m)
TBLPROPERTIES (
  engine = 'MergeTree()',
  order_by = 'id',
  settings.index_granularity = 8192
);

上記の例は Spark SQL クエリを示しており、Java や Scala、PySpark、シェルなどの任意の API からアプリケーション内で実行できます。

設定

コネクタで変更可能な設定項目は次のとおりです。


キーデフォルト概要以降
spark.clickhouse.ignoreUnsupportedTransformfalseClickHouse は、シャーディングキーやパーティション値として複雑な式(例: cityHash64(col_1, col_2))を使用できますが、これらは現在 Spark ではサポートされていません。true の場合はサポートされていない式を無視し、それ以外の場合は例外をスローして即座にエラー終了します。なお、spark.clickhouse.write.distributed.convertLocal が有効な場合、サポートされていないシャーディングキーを無視するとデータが破損するおそれがあります。0.4.0
spark.clickhouse.read.compression.codeclz4読み取り時にデータを解凍するために使用するコーデック。サポートされるコーデック: none, lz4。0.5.0
spark.clickhouse.read.distributed.convertLocaltrueDistributed テーブルを読み込む際は、自身ではなくローカルテーブルを読み込みます。true の場合、spark.clickhouse.read.distributed.useClusterNodes は無視されます。0.1.0
spark.clickhouse.read.fixedStringAsバイナリClickHouse の FixedString 型を指定した Spark データ型として読み取ります。サポートされる型:binary、string0.8.0
spark.clickhouse.read.formatjson読み取り用のシリアライズ形式。サポートされる形式: JSON, Binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalse読み取り用のランタイムフィルターを有効化します。0.8.0
spark.clickhouse.read.splitByPartitionIdtruetrue の場合、パーティション値ではなく仮想カラム _partition_id を使って入力パーティションフィルタを構成します。パーティション値によって SQL の述語を組み立てる場合には、既知の問題があります。この機能には ClickHouse Server v21.6 以降が必要です。0.4.0
spark.clickhouse.useNullableQuerySchemafalsetrue の場合、テーブル作成時に CREATE/REPLACE TABLE ... AS SELECT ... を実行すると、クエリスキーマ内のすべてのフィールドを nullable としてマークします。なお、この設定には SPARK-43390(Spark 3.5 で利用可能)が必要であり、このパッチがない場合は設定値に関係なく常に true として動作します。0.8.0
spark.clickhouse.write.batchSize10000ClickHouse への書き込み時に、1 バッチあたりに含めるレコード数。0.1.0
spark.clickhouse.write.compression.codeclz4書き込み時にデータを圧縮するためのコーデック。サポートされているコーデックは none と lz4 です。0.3.0
spark.clickhouse.write.distributed.convertLocalfalseDistributed テーブルに書き込む際は、自身ではなくローカルテーブルに書き込みます。true の場合、spark.clickhouse.write.distributed.useClusterNodes を無視します。0.1.0
spark.clickhouse.write.distributed.useClusterNodestrueDistributed テーブルへの書き込み時に、クラスタ内のすべてのノードに書き込む。0.1.0
spark.clickhouse.write.format矢印書き込み時のシリアル化形式。サポートされる形式: JSON、Arrow0.4.0
spark.clickhouse.write.localSortByKeytruetrue の場合、書き込み前にソートキーに基づいてローカルでソートを行います。0.3.0
spark.clickhouse.write.localSortByPartitionspark.clickhouse.write.repartitionByPartition の値true の場合、書き込み前にローカルでパーティションごとにソートを行います。設定されていない場合は、spark.clickhouse.write.repartitionByPartition と同じ値になります。0.3.0
spark.clickhouse.write.maxRetry3再試行可能なエラーコードによって単一バッチ書き込みが失敗した場合に、その書き込みを再試行する最大回数。0.1.0
spark.clickhouse.write.repartitionByPartitiontrue書き込み前に、ClickHouse テーブルのパーティション分布に合わせて ClickHouse のパーティションキーでデータを再パーティションするかどうか。0.3.0
spark.clickhouse.write.repartitionNum0書き込み前に ClickHouse テーブルのディストリビューションに合うようデータを再パーティションする必要がある場合に、この設定で再パーティション数を指定します。値が 1 未満の場合は、再パーティションを要求しないことを意味します。0.1.0
spark.clickhouse.write.repartitionStrictlyfalsetrue の場合、Spark は書き込み時にデータソーステーブルへレコードを渡す前に、要求されるデータ分散を満たすよう、入力レコードを厳密にパーティション間へ分配します。true でない場合、Spark はクエリを高速化するために特定の最適化を適用することがありますが、その結果、分散要件が満たされないことがあります。なお、この設定は SPARK-37523(Spark 3.4 で利用可能)の適用が前提であり、このパッチがない場合は常に true として動作します。0.3.0
spark.clickhouse.write.retryInterval10秒書き込み再試行間隔(秒)0.1.0
spark.clickhouse.write.retryableErrorCodes241書き込み処理が失敗した際に ClickHouse サーバーから返される再試行可能なエラーコード。0.1.0

サポートされているデータ型

このセクションでは、Spark と ClickHouse 間のデータ型マッピングについて説明します。以下の表は、ClickHouse から Spark へデータを読み込む場合、および Spark から ClickHouse へデータを挿入する場合のデータ型変換に関するクイックリファレンスです。

ClickHouse から Spark へのデータの読み取り

ClickHouse データ型Spark データ型サポート状況プリミティブ型備考
NothingNullTypeはい
BoolBooleanTypeはい
UInt8, Int16ShortTypeはい
Int8ByteTypeはい
UInt16,Int32IntegerTypeはい
UInt32,Int64, UInt64LongTypeはい
Int128,UInt128, Int256, UInt256DecimalType(38, 0)はい
Float32FloatTypeはい
Float64DoubleTypeはい
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6StringTypeはい
FixedStringBinaryType, StringTypeはい設定 READ_FIXED_STRING_AS によって制御されます
DecimalDecimalTypeはいDecimal128 までの精度とスケール
Decimal32DecimalType(9, scale)はい
Decimal64DecimalType(18, scale)はい
Decimal128DecimalType(38, scale)はい
Date, Date32DateTypeはい
DateTime, DateTime32, DateTime64TimestampTypeはい
ArrayArrayTypeいいえ配列要素の型も変換されます
MapMapTypeいいえキーは StringType に制限されます
IntervalYearYearMonthIntervalType(Year)はい
IntervalMonthYearMonthIntervalType(Month)はい
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalTypeいいえ対応する Interval 型が使用されます
Object
Nested
TupleStructTypeいいえ名前付きおよび名前なしのタプルをサポートします。名前付きタプルは構造体フィールドに名前で対応付けられ、名前なしタプルは _1_2 などを使用します。入れ子の構造体および Nullable フィールドをサポートします
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

Spark から ClickHouse へのデータ挿入

Spark Data TypeClickHouse Data Typeサポート有無プリミティブ型か備考
BooleanTypeBoolはいバージョン 0.9.0 以降、Bool 型(UInt8 ではない)にマッピングされます
ByteTypeInt8はい
ShortTypeInt16はい
IntegerTypeInt32はい
LongTypeInt64はい
FloatTypeFloat32はい
DoubleTypeFloat64はい
StringTypeStringはい
VarcharTypeStringはい
CharTypeStringはい
DecimalTypeDecimal(p, s)はい精度とスケールは Decimal128 まで対応
DateTypeDateはい
TimestampTypeDateTimeはい
ArrayType (list, tuple, or array)Arrayいいえ配列要素の型も変換されます
MapTypeMapいいえキーは StringType に制限されます
StructTypeTupleいいえフィールド名付きの Tuple に変換されます
VariantTypeVariantTypeいいえ
Object
Nested

貢献とサポート

プロジェクトへの貢献や問題の報告をご希望の場合は、ぜひご協力ください。 GitHub リポジトリにアクセスして、issue の作成、改善提案、 または Pull Request の送信を行ってください。 コントリビューションは大歓迎です。作業を始める前に、リポジトリ内のコントリビューションガイドラインを確認してください。 ClickHouse Spark コネクタの改善にご協力いただき、ありがとうございます。