メインコンテンツへスキップ
メインコンテンツへスキップ

Iceberg テーブルエンジン

注意

ClickHouse で Iceberg データを扱う場合は、Iceberg Table Function の使用を推奨します。Iceberg Table Function は現在、Iceberg テーブルに対する読み取り専用の一部機能に限られたインターフェースを提供しますが、現時点では十分な機能を備えています。

Iceberg Table Engine も利用可能ですが、いくつかの制限があります。ClickHouse はもともと外部でスキーマが変更されるテーブルをサポートするように設計されていないため、この特性が Iceberg Table Engine の機能に影響することがあります。その結果、通常のテーブルで動作する一部の機能が利用できなかったり、特に旧アナライザーを使用している場合に正しく動作しなかったりする可能性があります。

可能な限り高い互換性を確保するため、Iceberg Table Engine のサポートを継続的に改善している間は、Iceberg Table Function の使用を推奨します。

このエンジンは、Amazon S3、Azure、HDFS 上およびローカルに保存された既存の Apache Iceberg テーブルとの読み取り専用統合を提供します。

テーブルを作成

Iceberg テーブルはあらかじめストレージ上に存在している必要がある点に注意してください。このコマンドには、新しいテーブルを作成するための DDL パラメータを指定できません。

CREATE TABLE iceberg_table_s3
    ENGINE = IcebergS3(url,  [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression], [,extra_credentials])

CREATE TABLE iceberg_table_azure
    ENGINE = IcebergAzure(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])

CREATE TABLE iceberg_table_hdfs
    ENGINE = IcebergHDFS(path_to_table, [,format] [,compression_method])

CREATE TABLE iceberg_table_local
    ENGINE = IcebergLocal(path_to_table, [,format] [,compression_method])

エンジン引数

引数の説明は、それぞれ S3AzureBlobStorageHDFS および File エンジンにおける引数の説明と同様です。 format は Iceberg テーブル内のデータファイルの形式を表します。

IcebergS3 では、ClickHouse Cloud でロールベースアクセス用の role_arn を渡すために、オプションの extra_credentials パラメータを使用できます。設定手順については、Secure S3 を参照してください。

エンジンパラメータは Named Collections を使用して指定できます。

CREATE TABLE iceberg_table ENGINE=IcebergS3('http://test.s3.amazonaws.com/clickhouse-bucket/test_table', 'test', 'test')

名前付きコレクションの使用:

<clickhouse>
    <named_collections>
        <iceberg_conf>
            <url>http://test.s3.amazonaws.com/clickhouse-bucket/</url>
            <access_key_id>test</access_key_id>
            <secret_access_key>test</secret_access_key>
        </iceberg_conf>
    </named_collections>
</clickhouse>
CREATE TABLE iceberg_table ENGINE=IcebergS3(iceberg_conf, filename = 'test_table')

エイリアス

テーブルエンジン Iceberg は、現在は IcebergS3 のエイリアスになっています。

データ型

次のテーブルは、スキーマ推論時に Iceberg のデータ型が ClickHouse のデータ型へどのようにマッピングされるか (読み取り時) を示しています。

プリミティブ型

Iceberg 型ClickHouse 型備考
booleanBool
intInt32
long, bigintInt64
floatFloat32
doubleFloat64
dateDate32
timeInt64深夜からのマイクロ秒数
timestampDateTime64(6)マイクロ秒、タイムゾーンなし
timestamptzDateTime64(6, 'UTC')マイクロ秒、UTC タイムゾーン
timestamp_nsDateTime64(9)ナノ秒、タイムゾーンなし (Iceberg v3 以降のみ)
timestamptz_nsDateTime64(9, 'UTC')ナノ秒、UTC タイムゾーン (Iceberg v3 以降のみ)
string, binaryString
uuidUUID
fixed(N)FixedString(N)
decimal(P, S)Decimal(P, S)

複合型

Icebergの型ClickHouseの型
listArray
mapMap
structTuple

スキーマの進化

ClickHouse は、時間の経過とともにスキーマが進化した Iceberg テーブルの読み取りをサポートしています。現在サポートしているのは、カラムの追加・削除が行われたり、その順序が変更されたテーブルの読み取りです。また、値が必須だったカラムを、NULL を許容するカラムに変更することもできます。加えて、単純型に対しては、以下の許可されている型変換をサポートしています:

  • int -> long
  • float -> double
  • decimal(P, S) -> decimal(P', S)(P' > P の場合)

現時点では、ネストされた構造や、配列およびマップ内の要素の型を変更することはできません。

作成後にスキーマが変更されたテーブルを動的スキーマ推論で読み取るには、テーブル作成時に allow_dynamic_metadata_for_data_lakes = true を設定してください。

パーティションプルーニング

ClickHouse は Iceberg テーブルに対する SELECT クエリの実行時にパーティションプルーニングをサポートしており、関係のないデータファイルをスキップすることでクエリパフォーマンスを最適化できます。パーティションプルーニングを有効にするには、use_iceberg_partition_pruning = 1 を設定します。Iceberg におけるパーティションプルーニングの詳細については https://iceberg.apache.org/spec/#partitioning を参照してください。

タイムトラベル

ClickHouse は Iceberg テーブルに対するタイムトラベルをサポートしており、特定のタイムスタンプまたはスナップショット ID を指定して過去のデータをクエリできます。

削除された行を含むテーブルの処理

ClickHouse は、次の削除方式を使用する Iceberg テーブルの読み取りに対応しています。

以下の削除方式はサポートされていません

基本的な使用方法

 SELECT * FROM example_table ORDER BY 1 
 SETTINGS iceberg_timestamp_ms = 1714636800000
 SELECT * FROM example_table ORDER BY 1 
 SETTINGS iceberg_snapshot_id = 3547395809148285433

注意: 同じクエリ内で iceberg_timestamp_ms パラメータと iceberg_snapshot_id パラメータを同時に指定することはできません。

重要な考慮事項

  • スナップショット は通常、次のタイミングで作成されます:

    • 新しいデータがテーブルに書き込まれたとき
    • 何らかのデータコンパクションが実行されたとき
  • スキーマ変更によってスナップショットが作成されることは通常ない — このため、スキーマ進化を行ったテーブルでタイムトラベルを使用する場合に特有の挙動が発生します。

シナリオ例

すべてのシナリオは Spark を用いて記述されています。これは、CH がまだ Iceberg テーブルへの書き込みをサポートしていないためです。

シナリオ 1: 新しいスナップショットを伴わないスキーマ変更

次の一連の操作を考えてみます:

 -- Create a table with two columns
  CREATE TABLE IF NOT EXISTS spark_catalog.db.time_travel_example (
  order_number int, 
  product_code string
  ) 
  USING iceberg 
  OPTIONS ('format-version'='2')

-- Insert data into the table
  INSERT INTO spark_catalog.db.time_travel_example VALUES 
    (1, 'Mars')

  ts1 = now() // A piece of pseudo code

-- Alter table to add a new column
  ALTER TABLE spark_catalog.db.time_travel_example ADD COLUMN (price double)
 
  ts2 = now()

-- Insert data into the table
  INSERT INTO spark_catalog.db.time_travel_example VALUES (2, 'Venus', 100)

   ts3 = now()

-- Query the table at each timestamp
  SELECT * FROM spark_catalog.db.time_travel_example TIMESTAMP AS OF ts1;

+------------+------------+
|order_number|product_code|
+------------+------------+
|           1|        Mars|
+------------+------------+
  SELECT * FROM spark_catalog.db.time_travel_example TIMESTAMP AS OF ts2;

+------------+------------+
|order_number|product_code|
+------------+------------+
|           1|        Mars|
+------------+------------+

  SELECT * FROM spark_catalog.db.time_travel_example TIMESTAMP AS OF ts3;

+------------+------------+-----+
|order_number|product_code|price|
+------------+------------+-----+
|           1|        Mars| NULL|
|           2|       Venus|100.0|
+------------+------------+-----+

異なるタイムスタンプにおけるクエリ結果:

  • ts1 と ts2 の時点: 元の 2 列のみが表示される
  • ts3 の時点: 3 列すべてが表示され、1 行目の price 列は NULL になる

シナリオ 2: 履歴スキーマと現在のスキーマの差異

現在時点を指定したタイムトラベルクエリでは、現在のテーブルとは異なるスキーマが表示される場合があります:

-- Create a table
  CREATE TABLE IF NOT EXISTS spark_catalog.db.time_travel_example_2 (
  order_number int, 
  product_code string
  ) 
  USING iceberg 
  OPTIONS ('format-version'='2')

-- Insert initial data into the table
  INSERT INTO spark_catalog.db.time_travel_example_2 VALUES (2, 'Venus');

-- Alter table to add a new column
  ALTER TABLE spark_catalog.db.time_travel_example_2 ADD COLUMN (price double);

  ts = now();

-- Query the table at a current moment but using timestamp syntax

  SELECT * FROM spark_catalog.db.time_travel_example_2 TIMESTAMP AS OF ts;

    +------------+------------+
    |order_number|product_code|
    +------------+------------+
    |           2|       Venus|
    +------------+------------+

-- Query the table at a current moment
  SELECT * FROM spark_catalog.db.time_travel_example_2;
    +------------+------------+-----+
    |order_number|product_code|price|
    +------------+------------+-----+
    |           2|       Venus| NULL|
    +------------+------------+-----+

これは、ALTER TABLE が新しいスナップショットを作成せず、現在のテーブルについては Spark がスナップショットではなく最新のメタデータファイルから schema_id の値を取得するために発生します。

シナリオ 3: 過去と現在のスキーマの差異

2つ目の制約は、タイムトラベルを行っても、テーブルに最初のデータが書き込まれる前の状態は取得できないという点です。

-- Create a table
  CREATE TABLE IF NOT EXISTS spark_catalog.db.time_travel_example_3 (
  order_number int, 
  product_code string
  ) 
  USING iceberg 
  OPTIONS ('format-version'='2');

  ts = now();

-- Query the table at a specific timestamp
  SELECT * FROM spark_catalog.db.time_travel_example_3 TIMESTAMP AS OF ts; -- Finises with error: Cannot find a snapshot older than ts.

ClickHouse における挙動は Spark と同じです。概念的には Spark の SELECT クエリを ClickHouse の SELECT クエリに置き換えて考えれば、同じように動作します。

メタデータファイルの解決

ClickHouse で Iceberg テーブルエンジンを使用する場合、システムは Iceberg テーブル構造を記述する適切な metadata.json ファイルを特定する必要があります。以下は、この解決プロセスの概要です。

  1. パスの直接指定:
  • iceberg_metadata_file_path を設定した場合、システムは Iceberg テーブルディレクトリパスにこれを結合したパスをそのまま使用します。
  • この設定が指定されている場合、他の解決用設定はすべて無視されます。
  1. テーブル UUID の一致:
  • iceberg_metadata_table_uuid が指定されている場合、システムは次のように動作します:
    • metadata ディレクトリ内の .metadata.json ファイルのみを参照する
    • 指定された UUID と一致する table-uuid フィールドを含むファイルをフィルタリングする(大文字小文字は区別しない)
  1. デフォルト検索:
  • 上記いずれの設定も指定されていない場合、metadata ディレクトリ内のすべての .metadata.json ファイルが候補になります

最新のファイルの選択

上記のルールを使用して候補ファイルを特定した後、システムはどれが最新であるかを判定します:

  • iceberg_recent_metadata_file_by_last_updated_ms_field が有効な場合:

    • last-updated-ms の値が最大のファイルが選択されます
  • それ以外の場合:

    • バージョン番号が最も高いファイルが選択されます
    • (V.metadata.json または V-uuid.metadata.json 形式のファイル名では、バージョンは V として表されます)

注記: 上記で言及した設定はすべて(特に明示されていない限り)エンジンレベルの設定であり、以下に示すようにテーブル作成時に指定する必要があります:

CREATE TABLE example_table ENGINE = Iceberg(
    's3://bucket/path/to/iceberg_table'
) SETTINGS iceberg_metadata_table_uuid = '6f6f6407-c6a5-465f-a808-ea8900e35a38';

注記: 通常、Iceberg Catalog がメタデータの解決を処理しますが、ClickHouse の Iceberg テーブルエンジンは S3 に保存されたファイルを Iceberg テーブルとして直接解釈するため、これらの解決ルールを理解することが重要です。

データキャッシュ

Iceberg テーブルエンジンおよびテーブル関数は、S3AzureBlobStorageHDFS ストレージと同様にデータキャッシュをサポートします。詳細はこちらを参照してください。

メタデータキャッシュ

Iceberg テーブルエンジンおよびテーブル関数は、マニフェストファイル、マニフェストリスト、メタデータ JSON の情報を格納するメタデータキャッシュをサポートしています。キャッシュはメモリ内に保存されます。この機能は use_iceberg_metadata_files_cache 設定によって制御され、デフォルトで有効になっています。

非同期メタデータ事前フェッチ

非同期メタデータ事前フェッチは、iceberg_metadata_async_prefetch_period_ms を設定することで、Iceberg テーブルの作成時に有効にできます。0 (デフォルト) に設定されている場合、またはメタデータキャッシュが有効でない場合、非同期事前フェッチは無効になります。 この機能を有効にするには、0 以外のミリ秒単位の値を指定する必要があります。これは、事前フェッチサイクルの間隔を表します。

有効にすると、サーバーはリモートカタログを一覧し、新しいメタデータバージョンを検出するための定期的なバックグラウンド処理を実行します。次に、それを解析し、スナップショットを再帰的にたどって、アクティブな manifest list ファイルおよび manifest ファイルを取得します。 メタデータキャッシュですでに利用可能なファイルは、再度ダウンロードされません。各事前フェッチサイクルの終了時には、最新のメタデータスナップショットがメタデータキャッシュで利用可能になります。

CREATE TABLE example_table ENGINE = Iceberg(
    's3://bucket/path/to/iceberg_table'
) SETTINGS
    iceberg_metadata_async_prefetch_period_ms = 60000;

読み取り時の非同期メタデータプリフェッチを最大限に活用するには、iceberg_metadata_staleness_ms パラメータをクエリまたはセッションのパラメータとして指定する必要があります。デフォルトでは (0 - 未指定) 、各クエリの Context で、サーバーはリモート catalog から最新のメタデータを取得します。 メタデータの古さに対する許容範囲を指定すると、サーバーはリモート catalog を呼び出さずに、メタデータスナップショットのキャッシュされたバージョンを使用できます。キャッシュ内にメタデータバージョンがあり、指定された古さの範囲内にダウンロードされていれば、それを使用してクエリを処理します。 それ以外の場合は、リモート catalog から最新バージョンが取得されます。

SELECT count() FROM icebench_table WHERE ...
SETTINGS iceberg_metadata_staleness_ms=120000

: 非同期メタデータの prefetchICEBERG_SCEDULE_POOL で実行されます。これは、アクティブな Iceberg テーブルに対するバックグラウンド処理用のサーバー側スレッドプールです。このスレッドプールのサイズは、サーバー設定パラメータ iceberg_background_schedule_pool_size (デフォルトは 10) で制御されます。

: 現時点では、非同期 prefetch が有効な場合、メタデータキャッシュのサイズは、すべてのアクティブなテーブルについて最新のメタデータ snapshot 全体を保持するのに十分であることが前提とされています。

関連項目