メインコンテンツへスキップ
メインコンテンツへスキップ

Apache NiFiをClickHouseに接続する

Community Maintained

Apache NiFi は、ソフトウェアシステム間のデータフローを自動化するために設計されたオープンソースのワークフロー管理ソフトウェアです。ETLデータパイプラインの作成が可能で、300以上のデータプロセッサが同梱されています。このステップバイステップのチュートリアルでは、Apache NiFiをソースと宛先の両方としてClickHouseに接続し、サンプルデータセットをロードする方法を説明します。

接続情報を収集する

HTTP(S) で ClickHouse に接続するには、次の情報が必要です。

Parameter(s)Description
HOST and PORT通常、TLS を使用する場合のポートは 8443、TLS を使用しない場合のポートは 8123 です。
DATABASE NAME既定で default という名前のデータベースが用意されています。接続したいデータベースの名前を使用してください。
USERNAME and PASSWORD既定のユーザー名は default です。用途に応じて適切なユーザー名を使用してください。

ClickHouse Cloud サービスに関する詳細情報は、ClickHouse Cloud コンソールで確認できます。 サービスを選択し、Connect をクリックします。

ClickHouse Cloud サービスの Connect ボタン

HTTPS を選択します。接続情報は、サンプルの curl コマンド内に表示されます。

ClickHouse Cloud HTTPS 接続詳細

自己管理型の ClickHouse を使用している場合、接続情報は ClickHouse 管理者によって設定されます。

Apache NiFi をダウンロードして実行する

新規セットアップの場合は、https://nifi.apache.org/download.html からバイナリをダウンロードし、./bin/nifi.sh start を実行して起動します

ClickHouse JDBC ドライバをダウンロードする

  1. GitHub 上の ClickHouse JDBC ドライバのリリースページ にアクセスし、最新の JDBC リリースバージョンを探します
  2. リリースバージョンで「Show all xx assets」をクリックし、「shaded」または「all」というキーワードを含む JAR ファイル(例: clickhouse-jdbc-0.5.0-all.jar)を探します。
  3. JAR ファイルを Apache NiFi からアクセス可能なフォルダに配置し、その絶対パスを記録しておきます

DBCPConnectionPool コントローラサービスを追加し、プロパティを設定する

  1. Apache NiFi でコントローラサービスを設定するには、歯車アイコン("gear" ボタン)をクリックして NiFi Flow Configuration ページを開きます

    歯車ボタンが強調表示された NiFi Flow Configuration ページ
  2. Controller Services タブを選択し、右上の + ボタンをクリックして新しいコントローラサービスを追加します

    追加ボタンが強調表示された Controller Services タブ
  3. DBCPConnectionPool を検索し、「Add」ボタンをクリックします

    DBCPConnectionPool が強調表示された Controller Service 選択ダイアログ
  4. 追加したばかりの DBCPConnectionPool は、デフォルトでは無効 (Invalid) 状態になっています。歯車アイコン("gear" ボタン)をクリックして設定を開始します

    Invalid 状態の DBCPConnectionPool と gear ボタンが強調表示された Controller Services 一覧
  5. 「Properties」セクションで、次の値を入力します

PropertyValueRemark
Database Connection URLjdbc:ch:https://HOSTNAME:8443/default?ssl=true接続 URL 中の HOSTNAME を環境に合わせて置き換えます
Database Driver Class Namecom.clickhouse.jdbc.ClickHouseDriver
Database Driver Location(s)/etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jarClickHouse JDBC ドライバ JAR ファイルへの絶対パス
Database UserdefaultClickHouse ユーザー名
PasswordpasswordClickHouse パスワード
  1. Settings セクションで、コントローラサービスの名前を分かりやすくするために「ClickHouse JDBC」に変更します

    プロパティが入力された DBCPConnectionPool 設定ダイアログ
  2. 「lightning」ボタンをクリックし、続いて「Enable」ボタンをクリックして DBCPConnectionPool コントローラサービスを有効化します

    lightning ボタンが強調表示された Controller Services 一覧

    Controller Service を有効化する確認ダイアログ
  3. Controller Services タブを開き、コントローラサービスが有効化されていることを確認します

    有効化された ClickHouse JDBC サービスが表示されている Controller Services 一覧

ExecuteSQL プロセッサを使用してテーブルから読み取る

  1. 適切なアップストリームおよびダウンストリームのプロセッサと共に ExecuteSQL プロセッサを追加します

    ExecuteSQL プロセッサを含むワークフローが表示された NiFi キャンバス
  2. ExecuteSQL プロセッサの「Properties」セクションで次の値を入力します

    PropertyValueRemark
    Database Connection Pooling ServiceClickHouse JDBCClickHouse 用に設定した Controller Service を選択します
    SQL select querySELECT * FROM system.metricsここにクエリを入力します
  3. ExecuteSQL プロセッサを起動する

    プロパティが入力された ExecuteSQL プロセッサ設定ダイアログ
  4. クエリが正常に処理されたことを確認するには、出力キュー内の FlowFile のいずれかを確認してください

    検査対象の FlowFile が表示されている List queue ダイアログ
  5. 出力された FlowFile の結果を確認するには、ビューを「formatted」表示に切り替えます

    整形表示でクエリ結果を表示している FlowFile コンテンツビューア

MergeRecord および PutDatabaseRecord プロセッサを使用してテーブルに書き込む

  1. 1 回の insert で複数の行を書き込むには、まず複数のレコードを 1 つのレコードに結合する必要があります。これは MergeRecord プロセッサを使用することで実現できます。

  2. MergeRecord プロセッサの「Properties」セクションで、次の値を入力します

    PropertyValueRemark
    Record ReaderJSONTreeReader適切な Record Reader を選択します
    Record WriterJSONReadSetWriter適切な Record Writer を選択します
    Minimum Number of Records10001 つのレコードを構成するためにマージされる行の最小数を増やすよう、この値をより大きな値に設定します。デフォルトは 1 行です
    Maximum Number of Records10000"Minimum Number of Records" より大きな値に設定します。デフォルトは 1,000 行です
  3. 複数のレコードが 1 つにマージされていることを確認するには、MergeRecord プロセッサの入力と出力を確認してください。出力は複数の入力レコードから成る配列となる点に注意してください

    入力

    単一レコードの入力が表示されている MergeRecord プロセッサ

    出力

    マージされたレコード配列を表示している MergeRecord プロセッサの出力
  4. PutDatabaseRecord プロセッサの「Properties」セクションで、次の値を入力します

    PropertyValueRemark
    Record ReaderJSONTreeReader適切なレコードリーダーを選択します
    Database TypeGenericデフォルトのままにします
    Statement TypeINSERT
    Database Connection Pooling ServiceClickHouse JDBCClickHouse コントローラサービスを選択します
    Table Nametblここにテーブル名を入力します
    Translate Field Namesfalse挿入されるフィールド名がカラム名と一致するように、"false" に設定します
    Maximum Batch Size10001 回の INSERT あたりの最大行数。この値は、MergeRecord プロセッサの "Minimum Number of Records" の値より小さくしないでください
  5. 各挿入に複数の行が含まれていることを確認するには、テーブルの行数が MergeRecord で定義されている「Minimum Number of Records」の値以上ずつ増加していることを確認します。

    宛先テーブルの行数を表示しているクエリ結果
  6. おめでとうございます。Apache NiFi を使用して ClickHouse へのデータ取り込みに成功しました!