メインコンテンツへスキップ
メインコンテンツへスキップ

Spark connector

ClickHouse Supported

このコネクタは、高度なパーティショニングや述語プッシュダウンなどの ClickHouse 固有の最適化を活用して、 クエリ性能とデータ処理を向上させます。 このコネクタは ClickHouse の公式 JDBC コネクタ をベースとしており、 独自のカタログを管理します。

Spark 3.0 以前は、Spark には組み込みのカタログという概念がなく、そのためユーザーは通常、 Hive Metastore や AWS Glue などの外部カタログシステムに依存していました。 これらの外部ソリューションでは、Spark でテーブルにアクセスする前に、ユーザーがデータソーステーブルを手動で 登録する必要がありました。 しかし、Spark 3.0 でカタログの概念が導入されて以来、Spark はカタログプラグインを登録することで テーブルを自動的に検出できるようになりました。

Spark のデフォルトのカタログは spark_catalog であり、テーブルは {catalog name}.{database}.{table} という形式で識別されます。 新しいカタログ機能により、1 つの Spark アプリケーション内で複数のカタログを追加して利用することが可能になりました。

Catalog API と TableProvider API の選択

ClickHouse Spark コネクタは、Catalog APITableProvider API(フォーマットベースのアクセス)の 2 種類のアクセスパターンをサポートしています。両者の違いを理解しておくことで、ユースケースに最適なアプローチを選択できます。

Catalog API と TableProvider API の比較

機能Catalog APITableProvider API
設定Spark の設定で一元管理オプションによる操作単位での設定
テーブル検出Catalog による自動検出テーブルを手動で指定
DDL 操作完全サポート (CREATE, DROP, ALTER)制限あり (自動テーブル作成のみ)
Spark SQL との統合ネイティブ (clickhouse.database.table)format の明示的な指定が必要
ユースケース設定を集中管理する長期的かつ安定した接続向けアドホックかつ動的、または一時的なアクセス向け

要件

  • Java 8 または 17(Spark 4.0 では Java 17 以上が必須)
  • Scala 2.12 または 2.13(Spark 4.0 は Scala 2.13 のみをサポート)
  • Apache Spark 3.3、3.4、3.5、または 4.0

互換性マトリックス

バージョン互換性のある Spark バージョンClickHouse JDBC バージョン
mainSpark 3.3, 3.4, 3.5, 4.00.9.4
0.10.0Spark 3.3, 3.4, 3.5, 4.00.9.5
0.9.0Spark 3.3, 3.4, 3.5, 4.00.9.4
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3依存しない
0.3.0Spark 3.2, 3.3依存しない
0.2.1Spark 3.2依存しない
0.1.2Spark 3.2依存しない

インストールとセットアップ

ClickHouse を Spark と統合するには、プロジェクトの構成に応じて複数のインストール方法があります。 Maven の pom.xml や SBT の build.sbt など、プロジェクトのビルドファイルに ClickHouse Spark connector を依存関係として直接追加できます。 あるいは、必要な JAR ファイルを $SPARK_HOME/jars/ フォルダに配置するか、spark-submit コマンドで --jars フラグを使用して Spark のオプションとして直接指定することもできます。 いずれの方法でも、Spark 環境から ClickHouse connector を利用できるようになります。

依存関係としてインポートする

<dependency>
  <groupId>com.clickhouse.spark</groupId>
  <artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
  <version>{{ stable_version }}</version>
</dependency>
<dependency>
  <groupId>com.clickhouse</groupId>
  <artifactId>clickhouse-jdbc</artifactId>
  <classifier>all</classifier>
  <version>{{ clickhouse_jdbc_version }}</version>
  <exclusions>
    <exclusion>
      <groupId>*</groupId>
      <artifactId>*</artifactId>
    </exclusion>
  </exclusions>
</dependency>

SNAPSHOT バージョンを使用する場合は、次のリポジトリを追加します。

<repositories>
  <repository>
    <id>sonatype-oss-snapshots</id>
    <name>Sonatype OSS Snapshots Repository</name>
    <url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
  </repository>
</repositories>

ライブラリをダウンロードする

バイナリ JAR のファイル名のパターンは次のとおりです。

clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar

すべてのリリース済み JAR ファイルは Maven Central Repository から入手でき、 すべての日次ビルド SNAPSHOT 版 JAR ファイルは Sonatype OSS Snapshots Repository から入手できます。

情報

コネクタは clickhouse-http および clickhouse-client に依存しており、 これらはどちらも clickhouse-jdbc:all にバンドルされているため、 classifier が「all」の clickhouse-jdbc JAR を含めることが重要です。 代わりに、フルの JDBC パッケージを使用したくない場合は、 clickhouse-client JAR および clickhouse-http を個別に追加することもできます。

いずれの場合でも、パッケージのバージョンが Compatibility Matrix に従って互換性があることを確認してください。

カタログの登録(必須)

ClickHouse のテーブルにアクセスするには、次の設定で新しい Spark カタログを構成する必要があります。

プロパティデフォルト値必須
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/Aはい
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhostいいえ
spark.sql.catalog.<catalog_name>.protocolhttphttpいいえ
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123いいえ
spark.sql.catalog.<catalog_name>.user<clickhouse_username>defaultいいえ
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(空文字列)いいえ
spark.sql.catalog.<catalog_name>.database<database>defaultいいえ
spark.<catalog_name>.write.formatjsonarrowいいえ

これらの設定は、次のいずれかの方法で指定できます。

  • spark-defaults.conf を編集または作成します。
  • 設定を spark-submit コマンド(または spark-shell / spark-sql の CLI コマンド)に渡します。
  • コンテキストを初期化するときに設定を追加します。
情報

ClickHouse クラスターを扱う場合は、インスタンスごとに一意のカタログ名を設定する必要があります。 例:

spark.sql.catalog.clickhouse1                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse1.host           10.0.0.1
spark.sql.catalog.clickhouse1.protocol       https
spark.sql.catalog.clickhouse1.http_port      8443
spark.sql.catalog.clickhouse1.user           default
spark.sql.catalog.clickhouse1.password
spark.sql.catalog.clickhouse1.database       default
spark.sql.catalog.clickhouse1.option.ssl     true

spark.sql.catalog.clickhouse2                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse2.host           10.0.0.2
spark.sql.catalog.clickhouse2.protocol       https
spark.sql.catalog.clickhouse2.http_port      8443
spark.sql.catalog.clickhouse2.user           default
spark.sql.catalog.clickhouse2.password
spark.sql.catalog.clickhouse2.database       default
spark.sql.catalog.clickhouse2.option.ssl     true

このようにすることで、Spark SQL からは clickhouse1 のテーブル <ck_db>.<ck_table> には clickhouse1.<ck_db>.<ck_table> としてアクセスでき、clickhouse2 のテーブル <ck_db>.<ck_table> には clickhouse2.<ck_db>.<ck_table> としてアクセスできるようになります。

TableProvider API の使用 (フォーマットベースアクセス)

カタログベースのアプローチに加えて、ClickHouse Spark コネクタは TableProvider API を利用したフォーマットベースのアクセスパターンもサポートしています。

フォーマットベースの読み込み例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# フォーマット API を使用して ClickHouse から読み込む
df = spark.read \
    .format("clickhouse") \
    .option("host", "your-clickhouse-host") \
    .option("protocol", "https") \
    .option("http_port", "8443") \
    .option("database", "default") \
    .option("table", "your_table") \
    .option("user", "default") \
    .option("password", "your_password") \
    .option("ssl", "true") \
    .load()

df.show()

フォーマットベースの書き込み例

# フォーマット API を使用して ClickHouse に書き込む
df.write \
    .format("clickhouse") \
    .option("host", "your-clickhouse-host") \
    .option("protocol", "https") \
    .option("http_port", "8443") \
    .option("database", "default") \
    .option("table", "your_table") \
    .option("user", "default") \
    .option("password", "your_password") \
    .option("ssl", "true") \
    .mode("append") \
    .save()

TableProvider の機能

TableProvider API は、いくつかの強力な機能を提供します。

自動テーブル作成

存在しないテーブルに対して書き込みを行う場合、コネクタは適切なスキーマでテーブルを自動作成します。コネクタは次のようなインテリジェントなデフォルトを提供します:

  • Engine: 指定がない場合は MergeTree() がデフォルトになります。engine オプションを使用して、別のエンジン(例: ReplacingMergeTree(), SummingMergeTree() など)を指定できます。
  • ORDER BY: 必須 - 新しいテーブルを作成する際には、order_by オプションを明示的に指定する必要があります。コネクタは、指定されたすべてのカラムがスキーマ内に存在することを検証します。
  • Nullable キーのサポート: ORDER BY に Nullable なカラムが含まれている場合、自動的に settings.allow_nullable_key=1 を追加します。
# ORDER BY を明示的に指定しているため、テーブルは自動的に作成される(必須)
df.write \
    .format("clickhouse") \
    .option("host", "your-host") \
    .option("database", "default") \
    .option("table", "new_table") \
    .option("order_by", "id") \
    .mode("append") \
    .save()

# カスタムエンジンを含むテーブル作成オプションを指定
df.write \
    .format("clickhouse") \
    .option("host", "your-host") \
    .option("database", "default") \
    .option("table", "new_table") \
    .option("order_by", "id, timestamp") \
    .option("engine", "ReplacingMergeTree()") \
    .option("settings.allow_nullable_key", "1") \
    .mode("append") \
    .save()
情報

ORDER BY は必須: TableProvider API 経由で新しいテーブルを作成する際、order_by オプションは 必須 です。ORDER BY 句に使用するカラムを明示的に指定する必要があります。コネクタは、指定されたすべてのカラムがスキーマ内に存在することを検証し、いずれかのカラムが存在しない場合はエラーをスローします。

エンジンの選択: デフォルトのエンジンは MergeTree() ですが、engine オプションを使用して任意の ClickHouse テーブルエンジン(例: ReplacingMergeTree(), SummingMergeTree(), AggregatingMergeTree() など)を指定できます。

TableProvider Connection Options

フォーマットベースの API を使用する場合、次の接続オプションを使用できます。

接続オプション

オプション説明デフォルト値必須
hostClickHouse サーバーのホスト名localhostYes
protocol接続プロトコル(http または httpshttpNo
http_portHTTP/HTTPS ポート8123No
databaseデータベース名defaultYes
tableテーブル名N/AYes
user認証に使用するユーザー名defaultNo
password認証に使用するパスワード(空文字列)No
sslSSL 接続を有効にするかどうかfalseNo
ssl_modeSSL モード(NONESTRICT など)STRICTNo
timezone日付/時刻処理に使用するタイムゾーンserverNo

テーブル作成オプション

これらのオプションは、テーブルがまだ存在せず、新規作成が必要な場合に使用します。

OptionDescriptionDefault ValueRequired
order_byORDER BY 句に使用するカラム。複数カラムの場合はカンマ区切りN/AYes
engineClickHouse のテーブルエンジン(例: MergeTree(), ReplacingMergeTree(), SummingMergeTree() など)MergeTree()No
settings.allow_nullable_keyORDER BY で Nullable なキーを許可(ClickHouse Cloud 向け)自動検出**No
settings.<key>任意の ClickHouse テーブルの settingN/ANo
cluster分散テーブル用のクラスタ名N/ANo
clickhouse.column.<name>.variant_typesVariant カラム用の ClickHouse 型のカンマ区切りリスト(例: String, Int64, Bool, JSON)。型名は大文字小文字を区別します。カンマ後のスペースは任意です。N/ANo

* 新しいテーブルを作成する場合、order_by オプションは必須です。指定したすべてのカラムはスキーマ内に存在している必要があります。
** ORDER BY に Nullable カラムが含まれており、かつ明示的に指定されていない場合は、自動的に 1 に設定されます。

ヒント

ベストプラクティス: ClickHouse Cloud では、ORDER BY のカラムが Nullable になる可能性がある場合、ClickHouse Cloud がこの setting を必須としているため、settings.allow_nullable_key=1 を明示的に設定することを推奨します。

書き込みモード

Spark コネクタ(TableProvider API と Catalog API の両方)は、次の Spark の書き込みモードをサポートします。

  • append: 既存のテーブルにデータを追加
  • overwrite: テーブル内のすべてのデータを置き換え(テーブルを先に空にする)
情報

パーティションの上書きは未サポート: コネクタは現在、パーティション単位の上書き操作(例: partitionBy を指定した overwrite モード)をサポートしていません。この機能は開発中です。この機能の進捗は GitHub issue #34 を参照してください。

# Overwrite モード(最初にテーブルを空にする)
df.write \
    .format("clickhouse") \
    .option("host", "your-host") \
    .option("database", "default") \
    .option("table", "my_table") \
    .mode("overwrite") \
    .save()

ClickHouse オプションの設定

Catalog API と TableProvider API の両方で、ClickHouse 固有のオプション(コネクタのオプションではない)を設定できます。これらはテーブル作成時やクエリ実行時に、ClickHouse にそのまま渡されます。

ClickHouse オプションを使用すると、allow_nullable_keyindex_granularity などの ClickHouse 固有の設定や、その他のテーブルレベル/クエリレベルの設定を指定できます。これらは、ClickHouse への接続方法を制御するコネクタオプション(hostdatabasetable など)とは異なります。

TableProvider API の使用

TableProvider API を使用する場合は、settings.<key> 形式のオプションを指定します。

df.write \
    .format("clickhouse") \
    .option("host", "your-host") \
    .option("database", "default") \
    .option("table", "my_table") \
    .option("order_by", "id") \
    .option("settings.allow_nullable_key", "1") \
    .option("settings.index_granularity", "8192") \
    .mode("append") \
    .save()

Catalog API の使用

Catalog API を使用する場合は、Spark の設定で spark.sql.catalog.<catalog_name>.option.<key> という形式を使用してください。

spark.sql.catalog.clickhouse.option.allow_nullable_key 1
spark.sql.catalog.clickhouse.option.index_granularity 8192

または、Spark SQL でテーブル作成時に設定します:

CREATE TABLE clickhouse.default.my_table (
  id INT,
  name STRING
) USING ClickHouse
TBLPROPERTIES (
  engine = 'MergeTree()',
  order_by = 'id',
  'settings.allow_nullable_key' = '1',
  'settings.index_granularity' = '8192'
)

ClickHouse Cloud の設定

ClickHouse Cloud に接続する場合は、SSL を有効にし、適切な SSL モードを設定してください。例えば、次のように指定します。

spark.sql.catalog.clickhouse.option.ssl        true
spark.sql.catalog.clickhouse.option.ssl_mode   NONE

データの読み込み

public static void main(String[] args) {
        // Spark セッションを作成する
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        Dataset<Row> df = spark.sql("select * from clickhouse.default.example_table");

        df.show();

        spark.stop();
    }

データの書き込み

情報

パーティションの上書きは未サポート: Catalog API は現在、パーティションレベルの上書き操作(例: partitionBy を指定した overwrite モード)をサポートしていません。この機能は開発中です。進捗状況については GitHub issue #34 を参照してください。

 public static void main(String[] args) throws AnalysisException {

        // Create a Spark session
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        // Define the schema for the DataFrame
        StructType schema = new StructType(new StructField[]{
                DataTypes.createStructField("id", DataTypes.IntegerType, false),
                DataTypes.createStructField("name", DataTypes.StringType, false),
        });

        List<Row> data = Arrays.asList(
                RowFactory.create(1, "Alice"),
                RowFactory.create(2, "Bob")
        );

        // Create a DataFrame
        Dataset<Row> df = spark.createDataFrame(data, schema);

        df.writeTo("clickhouse.default.example_table").append();

        spark.stop();
    }

DDL 操作

Spark SQL を使用して ClickHouse インスタンスに対して DDL 操作を実行でき、その結果生じるすべての変更は即座に ClickHouse に永続化されます。 Spark SQL では、ClickHouse とまったく同じようにクエリを記述できるため、 CREATE TABLE や TRUNCATE などのコマンドを変更なしでそのまま直接実行できます。たとえば、次のようにします:

注記

Spark SQL を使用する場合、一度に実行できるステートメントは 1 つだけです。

USE clickhouse; 

CREATE TABLE test_db.tbl_sql (
  create_time TIMESTAMP NOT NULL,
  m           INT       NOT NULL COMMENT 'part key',
  id          BIGINT    NOT NULL COMMENT 'sort key',
  value       STRING
) USING ClickHouse
PARTITIONED BY (m)
TBLPROPERTIES (
  engine = 'MergeTree()',
  order_by = 'id',
  settings.index_granularity = 8192
);

上記の例に示した Spark SQL クエリは、Java、Scala、PySpark、あるいはシェルなど任意の API を用いて、アプリケーション内で実行できます。

VariantType の利用

注記

VariantType のサポートは Spark 4.0 以降で利用可能であり、実験的な JSON/Variant 型が有効化された ClickHouse 25.3 以降が必要です。

このコネクタは、半構造化データを扱うために Spark の VariantType をサポートします。VariantType は ClickHouse の JSON および Variant 型に対応しており、柔軟なスキーマを持つデータを効率的に保存およびクエリできます。

注記

このセクションでは、VariantType のマッピングと利用方法に特化して説明します。サポートされているすべてのデータ型の完全な概要については、Supported data types セクションを参照してください。

ClickHouse 型のマッピング

ClickHouse 型Spark 型説明
JSONVariantTypeJSON オブジェクトのみを保存({ で始まる必要があります)
Variant(T1, T2, ...)VariantTypeプリミティブ型、配列、JSON を含む複数の型を保存

VariantType データの読み取り

ClickHouse からデータを読み出す場合、JSON および Variant カラムは自動的に Spark の VariantType にマッピングされます:

// JSON カラムを VariantType として読み取る
val df = spark.sql("SELECT id, data FROM clickhouse.default.json_table")

// Variant データにアクセスする
df.show()

// 確認のために Variant を JSON 文字列に変換する
import org.apache.spark.sql.functions._
df.select(
  col("id"),
  to_json(col("data")).as("data_json")
).show()

VariantType データの書き込み

JSON あるいは Variant 型カラムのいずれかを使用して、VariantType データを ClickHouse に書き込むことができます。

import org.apache.spark.sql.functions._

// JSON データを持つ DataFrame を作成
val jsonData = Seq(
  (1, """{"name": "Alice", "age": 30}"""),
  (2, """{"name": "Bob", "age": 25}"""),
  (3, """{"name": "Charlie", "city": "NYC"}""")
).toDF("id", "json_string")

// JSON 文字列を VariantType に変換
val variantDF = jsonData.select(
  col("id"),
  parse_json(col("json_string")).as("data")
)

// JSON 型で ClickHouse に書き込み (JSON オブジェクトのみ)
variantDF.writeTo("clickhouse.default.user_data").create()

// もしくは複数の型を持つ Variant を指定
spark.sql("""
  CREATE TABLE clickhouse.default.mixed_data (
    id INT,
    data VARIANT
  ) USING clickhouse
  TBLPROPERTIES (
    'clickhouse.column.data.variant_types' = 'String, Int64, Bool, JSON',
    'engine' = 'MergeTree()',
    'order_by' = 'id'
  )
""")

Spark SQL での VariantType テーブルの作成

Spark SQL の DDL 文を使用して VariantType テーブルを作成できます。

-- Create table with JSON type (default)
CREATE TABLE clickhouse.default.json_table (
  id INT,
  data VARIANT
) USING clickhouse
TBLPROPERTIES (
  'engine' = 'MergeTree()',
  'order_by' = 'id'
)
-- Create table with Variant type supporting multiple types
CREATE TABLE clickhouse.default.flexible_data (
  id INT,
  data VARIANT
) USING clickhouse
TBLPROPERTIES (
  'clickhouse.column.data.variant_types' = 'String, Int64, Float64, Bool, Array(String), JSON',
  'engine' = 'MergeTree()',
  'order_by' = 'id'
)

VariantType の設定

VariantType カラムを持つテーブルを作成する際、どの ClickHouse の型を使用するかを指定できます。

JSON 型 (デフォルト)

variant_types プロパティが指定されていない場合、そのカラムの型はデフォルトで ClickHouse の JSON 型となり、JSON オブジェクトのみを受け付けます。

CREATE TABLE clickhouse.default.json_table (
  id INT,
  data VARIANT
) USING clickhouse
TBLPROPERTIES (
  'engine' = 'MergeTree()',
  'order_by' = 'id'
)

これにより、次の ClickHouse クエリが作成されます:

CREATE TABLE json_table (id Int32, data JSON) ENGINE = MergeTree() ORDER BY id

複数の型を持つ VariantType

プリミティブ型、配列、および JSON オブジェクトをサポートするには、variant_types プロパティで型を指定します。

CREATE TABLE clickhouse.default.flexible_data (
  id INT,
  data VARIANT
) USING clickhouse
TBLPROPERTIES (
  'clickhouse.column.data.variant_types' = 'String, Int64, Float64, Bool, Array(String), JSON',
  'engine' = 'MergeTree()',
  'order_by' = 'id'
)

これにより、以下の ClickHouse クエリが作成されます。

CREATE TABLE flexible_data (
  id Int32, 
  data Variant(String, Int64, Float64, Bool, Array(String), JSON)
) ENGINE = MergeTree() ORDER BY id

サポートされる Variant 型

Variant() で使用できる ClickHouse の型は次のとおりです:

  • プリミティブ: String, Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64, Float32, Float64, Bool
  • 配列: Array(T)。ここでの T は、ネストされた配列を含むサポート対象の任意の型です
  • JSON: JSON(JSON オブジェクトを格納するための型)

読み取りフォーマットの設定

デフォルトでは、JSON および Variant カラムは VariantType として読み取られます。この挙動は、次のように設定することで文字列として読み取るように変更できます:

// JSON/Variant を VariantType ではなく文字列として読み取る
spark.conf.set("spark.clickhouse.read.jsonAs", "string")

val df = spark.sql("SELECT id, data FROM clickhouse.default.json_table")
// data カラムは JSON 文字列を格納する StringType になります

書き込みフォーマットのサポート

VariantType の書き込みサポートはフォーマットごとに異なります。

FormatSupportNotes
JSON✅ FullJSON 型と Variant 型の両方をサポートします。VariantType データにはこのフォーマットの利用を推奨します
Arrow⚠️ PartialClickHouse の JSON 型への書き込みをサポートします。ClickHouse の Variant 型はサポートしません。完全なサポートは https://github.com/ClickHouse/ClickHouse/issues/92752 の解決待ちです

書き込みフォーマットを設定します:

spark.conf.set("spark.clickhouse.write.format", "json")  // Recommended for Variant types
ヒント

ClickHouse の Variant 型に書き込む必要がある場合は、JSON フォーマットを使用してください。Arrow フォーマットでは JSON 型への書き込みのみがサポートされています。

ベストプラクティス

  1. JSON のみのデータには JSON 型を使用する: JSON オブジェクトのみを保存する場合は、variant_types プロパティを指定しないデフォルトの JSON 型を使用します
  2. 型を明示的に指定する: Variant() を使用する場合、保存する予定のすべての型を明示的に列挙します
  3. 実験的機能を有効にする: ClickHouse で allow_experimental_json_type = 1 が有効になっていることを確認します
  4. 書き込みには JSON フォーマットを使用する: 互換性を高めるため、VariantType データには JSON フォーマットでの書き込みを推奨します
  5. クエリパターンを考慮する: JSON/Variant 型は、効率的なフィルタリングのために ClickHouse の JSON パス クエリをサポートしています
  6. パフォーマンス向上のためのカラムヒント: ClickHouse で JSON フィールドを使用する場合、カラムヒントを追加することでクエリパフォーマンスが向上します。現在、Spark 経由でのカラムヒント追加はサポートされていません。この機能の進捗については GitHub issue #497 を参照してください。

ワークフロー全体の例

import org.apache.spark.sql.functions._

// Enable experimental JSON type in ClickHouse
spark.sql("SET allow_experimental_json_type = 1")

// Create table with Variant column
spark.sql("""
  CREATE TABLE clickhouse.default.events (
    event_id BIGINT,
    event_time TIMESTAMP,
    event_data VARIANT
  ) USING clickhouse
  TBLPROPERTIES (
    'clickhouse.column.event_data.variant_types' = 'String, Int64, Bool, JSON',
    'engine' = 'MergeTree()',
    'order_by' = 'event_time'
  )
""")

// Prepare data with mixed types
val events = Seq(
  (1L, "2024-01-01 10:00:00", """{"action": "login", "user_id": 123}"""),
  (2L, "2024-01-01 10:05:00", """{"action": "purchase", "amount": 99.99}"""),
  (3L, "2024-01-01 10:10:00", """{"action": "logout", "duration": 600}""")
).toDF("event_id", "event_time", "json_data")

// Convert to VariantType and write
val variantEvents = events.select(
  col("event_id"),
  to_timestamp(col("event_time")).as("event_time"),
  parse_json(col("json_data")).as("event_data")
)

variantEvents.writeTo("clickhouse.default.events").append()

// Read and query
val result = spark.sql("""
  SELECT event_id, event_time, event_data
  FROM clickhouse.default.events
  WHERE event_time >= '2024-01-01'
  ORDER BY event_time
""")

result.show(false)

設定

以下は、コネクタで利用可能な設定可能な項目です。

注記

設定の使用方法: これらは Catalog API と TableProvider API の両方に適用される、Spark レベルの設定オプションです。次の 2 つの方法で設定できます:

  1. グローバルな Spark 設定(すべての操作に適用):

    spark.conf.set("spark.clickhouse.write.batchSize", "20000")
    spark.conf.set("spark.clickhouse.write.compression.codec", "lz4")
    
  2. 操作ごとの上書き(TableProvider API のみ - グローバル設定を上書き可能):

    df.write \
        .format("clickhouse") \
        .option("host", "your-host") \
        .option("database", "default") \
        .option("table", "my_table") \
        .option("spark.clickhouse.write.batchSize", "20000") \
        .option("spark.clickhouse.write.compression.codec", "lz4") \
        .mode("append") \
        .save()
    

あるいは、spark-defaults.conf に設定するか、Spark セッションを作成するときに指定します。


キーデフォルト値説明導入バージョン
spark.clickhouse.ignoreUnsupportedTransformtrueClickHouse は、cityHash64(col_1, col_2) のような複雑な式を分片キーやパーティション値として使用することをサポートしますが、これらは現時点では Spark ではサポートされていません。true の場合、サポートされていない式を無視して警告をログに記録し、それ以外の場合は例外をスローして即座に失敗させます。警告: spark.clickhouse.write.distributed.convertLocal=true のときにサポートされていない分片キーを無視すると、データが破損する可能性があります。コネクタはこれを検証し、デフォルトではエラーをスローします。これを許可するには、明示的に spark.clickhouse.write.distributed.convertLocal.allowUnsupportedSharding=true を設定してください。0.4.0
spark.clickhouse.read.compression.codeclz4読み込み時にデータを伸長する際に使用するコーデック。サポートされているコーデック: none, lz4。0.5.0
spark.clickhouse.read.distributed.convertLocaltrueDistributed テーブルを読み込む際、そのテーブル自体ではなくローカルテーブルを読み込みます。true の場合、spark.clickhouse.read.distributed.useClusterNodes を無視します。0.1.0
spark.clickhouse.read.fixedStringAsbinary指定した Spark データ型として ClickHouse の FixedString 型を読み込みます。サポートされる型: binary, string0.8.0
spark.clickhouse.read.formatjson読み込み時のシリアル化フォーマット。サポートされるフォーマット: json, binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalse読み取り用のランタイムフィルタを有効にします。0.8.0
spark.clickhouse.read.splitByPartitionIdtruetrue の場合、パーティション値ではなく仮想カラム _partition_id によって入力用パーティションフィルターを構築します。パーティション値を使って SQL 述語を組み立てる際には、既知の問題があります。この機能を利用するには ClickHouse Server v21.6 以降が必要です。0.4.0
spark.clickhouse.useNullableQuerySchemafalsetrue の場合、テーブル作成時に CREATE/REPLACE TABLE ... AS SELECT ... を実行するとき、クエリスキーマ内のすべてのフィールドを Nullable としてマークします。なお、この設定には SPARK-43390(Spark 3.5 で利用可能)が必要です。このパッチがない場合は、常に true として動作します。0.8.0
spark.clickhouse.write.batchSize10000ClickHouse への書き込み時における、バッチごとのレコード数。0.1.0
spark.clickhouse.write.compression.codeclz4書き込み時にデータを圧縮するために使用されるコーデック。サポートされるコーデック: none, lz4。0.3.0
spark.clickhouse.write.distributed.convertLocalfalseDistributed テーブルへの書き込み時には、そのテーブル自体ではなくローカルテーブルに書き込みます。true の場合、spark.clickhouse.write.distributed.useClusterNodes は無視されます。これにより ClickHouse のネイティブルーティングがバイパスされるため、Spark がシャーディングキーを評価する必要があります。サポートされていないシャーディング式を使用する場合、暗黙のデータ分散エラーを防ぐために spark.clickhouse.ignoreUnsupportedTransformfalse に設定してください。0.1.0
spark.clickhouse.write.distributed.convertLocal.allowUnsupportedShardingfalseシャーディングキーがサポートされていない場合に、convertLocal=true かつ ignoreUnsupportedTransform=true の組み合わせで分散テーブルへの書き込みを許可します。これは危険であり、不正なシャーディングによってデータ破損を引き起こす可能性があります。true に設定する場合、Spark はサポートされていないシャーディング式を評価できないため、書き込み前にデータが正しくソート/分散(シャーディング)されていることを必ず保証する必要があります。リスクを理解し、データ分布を検証済みである場合にのみ true に設定してください。デフォルトでは、この組み合わせではサイレントなデータ破損を防ぐためにエラーをスローします。0.10.0
spark.clickhouse.write.distributed.useClusterNodestrue分散テーブルに書き込む際、クラスター内のすべてのノードに書き込みます。0.1.0
spark.clickhouse.write.formatarrow書き込み時に使用するシリアライズ形式。サポートされている形式: json, arrow0.4.0
spark.clickhouse.write.localSortByKeytruetrue の場合、書き込み前にソートキーでローカルソートを行います。0.3.0
spark.clickhouse.write.localSortByPartitionspark.clickhouse.write.repartitionByPartition の値true の場合、書き込み前にパーティションごとにローカルソートを行います。未設定の場合は、spark.clickhouse.write.repartitionByPartition と同じ値になります。0.3.0
spark.clickhouse.write.maxRetry3リトライ可能なエラーコードで失敗した単一バッチ書き込みに対して再試行する最大回数。0.1.0
spark.clickhouse.write.repartitionByPartitiontrue書き込み前に、ClickHouse のパーティションキーでデータを再パーティショニングして、ClickHouse テーブルのデータ分布に合わせるかどうか。0.3.0
spark.clickhouse.write.repartitionNum0ClickHouse テーブルの分布要件を満たすために書き込み前にデータの再パーティションが必要な場合、この設定で再パーティション数を指定します。1 未満の値は、再パーティション数に関する要件なしを意味します。0.1.0
spark.clickhouse.write.repartitionStrictlyfalsetrue の場合、Spark は書き込み時にレコードをデータソーステーブルへ渡す前に、要求される分散要件を満たすように、入力レコードをパーティション間で厳密に分配します。そうでない場合、Spark はクエリを高速化するためにいくつかの最適化を適用することがありますが、その結果、分散要件が満たされなくなる可能性があります。なお、この設定には SPARK-37523(Spark 3.4 で利用可能)が必要であり、このパッチがない場合は常に true として扱われます。0.3.0
spark.clickhouse.write.retryInterval10s書き込み再試行の間隔(秒)。0.1.0
spark.clickhouse.write.retryableErrorCodes241書き込みが失敗したときに ClickHouse サーバーから返される再試行可能なエラーコード。0.1.0

サポートされているデータ型

このセクションでは、Spark と ClickHouse の間のデータ型の対応関係について説明します。以下の表は、ClickHouse から Spark へデータを読み込む場合と、Spark から ClickHouse へデータを挿入する場合のデータ型変換に関するクイックリファレンスです。

ClickHouse から Spark へのデータ読み込み

ClickHouse Data TypeSpark Data TypeSupportedIs PrimitiveNotes
NothingNullTypeYes
BoolBooleanTypeYes
UInt8, Int16ShortTypeYes
Int8ByteTypeYes
UInt16,Int32IntegerTypeYes
UInt32,Int64, UInt64LongTypeYes
Int128,UInt128, Int256, UInt256DecimalType(38, 0)Yes
Float32FloatTypeYes
Float64DoubleTypeYes
String, UUID, Enum8, Enum16, IPv4, IPv6StringTypeYes
FixedStringBinaryType, StringTypeYes設定 READ_FIXED_STRING_AS によって制御されます
DecimalDecimalTypeYesDecimal128 までの精度とスケール
Decimal32DecimalType(9, scale)Yes
Decimal64DecimalType(18, scale)Yes
Decimal128DecimalType(38, scale)Yes
Date, Date32DateTypeYes
DateTime, DateTime32, DateTime64TimestampTypeYes
ArrayArrayTypeNo配列要素の型も変換されます
MapMapTypeNoキーは StringType に限定されます
IntervalYearYearMonthIntervalType(Year)Yes
IntervalMonthYearMonthIntervalType(Month)Yes
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalTypeNo対応する Interval 型が使用されます
JSON, VariantVariantTypeNoSpark 4.0+ および ClickHouse 25.3+ が必要です。spark.clickhouse.read.jsonAs=string を指定すると StringType として読み取れます
Object
Nested
TupleStructTypeNo名前付きおよび名前なしの両方のタプルをサポートします。名前付きタプルはフィールド名で struct フィールドにマッピングされ、名前なしタプルは _1_2 などを使用します。ネストした struct と Nullable フィールドをサポートします
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

Spark から ClickHouse へのデータ挿入

Spark Data TypeClickHouse Data Typeサポートプリミティブ型か備考
BooleanTypeBoolはいバージョン 0.9.0 以降、UInt8 ではなく Bool 型にマッピングされます
ByteTypeInt8はい
ShortTypeInt16はい
IntegerTypeInt32はい
LongTypeInt64はい
FloatTypeFloat32はい
DoubleTypeFloat64はい
StringTypeStringはい
VarcharTypeStringはい
CharTypeStringはい
DecimalTypeDecimal(p, s)はいDecimal128 までの精度およびスケール
DateTypeDateはい
TimestampTypeDateTimeはい
ArrayType (list, tuple, or array)Arrayいいえ配列要素の型も変換されます
MapTypeMapいいえキーは StringType に制限されます
StructTypeTupleいいえフィールド名付きの Tuple に変換されます
VariantTypeJSON or VariantいいえSpark 4.0+ と ClickHouse 25.3+ が必要です。デフォルトでは JSON 型になります。複数の型を持つ Variant を指定するには clickhouse.column.<name>.variant_types プロパティを使用してください。
Object
Nested

貢献とサポート

プロジェクトへの貢献や問題の報告をご希望の場合は、ぜひお寄せください。 GitHub リポジトリを参照し、Issue の作成、改善提案、 または Pull Request の送信を行ってください。 貢献は大歓迎です。作業を始める前に、リポジトリ内のコントリビューションガイドラインを確認してください。 ClickHouse Spark コネクタの改善にご協力いただきありがとうございます。