メインコンテンツへスキップ
メインコンテンツへスキップ

継続的インジェスト向けの順不同モードの設定

デフォルトでは、S3 ClickPipe は、ファイルがバケットに辞書順で追加されることを前提としています。明確な順序付けがないファイルを取り込むように S3 ClickPipe を設定することもでき、その場合はバケットに接続された Amazon SQS キューを設定し、必要に応じて Amazon EventBridge をイベントルーターとして使用します。これにより、ClickPipes は ObjectCreated:* イベントを監視し、ファイル名の命名規則に関係なく新しいファイルを取り込むことができます。

注記

順不同モードは Amazon S3 でのみサポートされており、公開バケットや S3 互換サービスではサポートされていません。このモードを使用するには、バケットに接続された Amazon SQS キューを設定し、必要に応じて Amazon EventBridge をイベントルーターとして使用する必要があります。

仕組み

このモードでは、S3 ClickPipe は選択したパス内のすべてのファイルを初回ロードし、その後、指定したパスに一致するキュー内の ObjectCreated:* イベントを待ち受けます。すでに処理済みのファイルに対するメッセージ、パスに一致しないファイル、または別の種類のイベントは、いずれも無視されます。ファイルは、max insert bytes または max file count で設定したしきい値に達した時点、または設定可能な間隔の経過後 (デフォルトでは 30 秒) に取り込まれます。特定のファイルまたは時点からインジェストを開始することはできません。ClickPipes は常に、選択したパス内のすべてのファイルをロードします。

データの取り込み時にはさまざまな障害が発生する可能性があり、その結果、部分的な insert や重複データが生じることがあります。オブジェクトストレージ ClickPipes は insert の失敗に対して耐性があり、一時的なステージングテーブルを使用して exactly-once semantics を提供します。データはまずステージングテーブルに insert されます。問題が発生した場合は、ステージングテーブルを切り詰め、クリーンな状態から insert を再試行します。insert が正常に完了した場合にのみ、パーティションが target テーブルに移動されます。

Amazon SQS キューの作成

1. AWSコンソールで、Simple Queue Service > Create queue に移動します。デフォルト設定のまま新しい標準キューを作成します。

ヒント

SQSキューに**Dead-Letter-Queue (DLQ)**を設定することを強くお勧めします。これにより、失敗したメッセージのデバッグと再試行が容易になります。DLQが設定されている場合、失敗したメッセージはDLQのmaxReceiveCountパラメータで設定された回数まで再エンキューおよび再処理されます。

2. 以下の2つのオプションのいずれかを使用して、S3バケットをSQSキューに接続します。EventBridgeはほとんどのユースケースで推奨されます。ファンアウトをサポートし、より柔軟なイベントのフィルタリングが可能であるほか、イベントタイプおよびプレフィックスごとに通知ルールが1つに制限されるS3の制約を受けないためです。

a. S3 バケットのプロパティで Event notifications > Amazon EventBridge に移動し、EventBridge への通知送信を有効にします。Save changes をクリックします。

S3 バケットのプロパティで Amazon EventBridge 通知を有効にする

b. AWS Console で Amazon EventBridge > Rules > Create rule に移動します。ルール名 (例: S3ObjectCreated) を入力し、default イベントバスを選択して Next をクリックします。Build event pattern ステップで、イベントソースとして AWS events or EventBridge partner events を選択し、以下のイベントパターンを手動で入力します。<bucket-name> は実際のバケット名に置き換えてください。

EventBridge ルール名とイベントバスを定義する
{
  "source": ["aws.s3"],
  "detail-type": ["Object Created"],
  "detail": {
    "bucket": {
      "name": ["<bucket-name>"]
    }
  }
}

必要に応じて、プレフィックスまたはサフィックスでフィルタリングするために、パターンに object.key 条件を追加できます。追加する場合は、ClickPipe に設定したパスと一致していることを確認してください。

c. Select target(s) ステップで、ターゲットタイプとして AWS service を選択し、SQS queue を指定します。前のステップで作成したキューを選択します。EventBridge が必要な IAM ロールを自動作成できるよう、Use execution role (recommended) はチェックしたままにして、Next をクリックし、ウィザードを完了します。

EventBridge ルールのターゲットとして SQS キューを設定する
EventBridge ルールが正常に作成された

d. EventBridge がこのキューにメッセージを送信できるように、SQS キューのアクセスポリシーを編集します。<sqs-queue-arn><eventbridge-rule-arn> を適切な値に置き換えてください。

{
  "Version": "2012-10-17",
  "Id": "example-ID",
  "Statement": [
    {
      "Sid": "AllowEventBridgeToSendMessage",
      "Effect": "Allow",
      "Principal": {
        "Service": "events.amazonaws.com"
      },
      "Action": "SQS:SendMessage",
      "Resource": "<sqs-queue-arn>",
      "Condition": {
        "ArnLike": {
          "aws:SourceArn": "<eventbridge-rule-arn>"
        }
      }
    }
  ]
}

IAM ロールの設定

1. ClickHouse Cloud コンソールで、設定 > ネットワークセキュリティ情報に移動し、サービスの IAM ロール ARN をコピーします。

2. AWSコンソールで、IAM > Roles > Create role に移動します。Custom trust policy を選択し、以下の内容を貼り付けてください。その際、<ch-cloud-arn> を前の手順でコピーしたIAM Role ARNに置き換えます。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "AllowAssumeRole",
      "Effect": "Allow",
      "Principal": {
        "AWS": "<ch-cloud-arn>"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

3. IAMロールに対して、S3からオブジェクトを読み取り、SQSキュー内のメッセージを管理するための必須権限を持つインラインポリシーを作成します。<bucket-arn> および <sqs-queue-arn> を適切な値に置き換えてください。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "S3BucketMetadataAccess",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketLocation",
        "s3:ListBucket"
      ],
      "Resource": "<bucket-arn>"
    },
    {
      "Sid": "AllowGetListObjects",
      "Effect": "Allow",
      "Action": [
        "s3:Get*",
        "s3:List*"
      ],
      "Resource": "<bucket-arn>/*"
    },
    {
      "Sid": "SQSNotificationsAccess",
      "Effect": "Allow",
      "Action": [
        "sqs:DeleteMessage",
        "sqs:ListQueues",
        "sqs:ReceiveMessage",
        "sqs:GetQueueAttributes"
      ],
      "Resource": "<sqs-queue-arn>"
    }
  ]
}

順序なしモードでClickPipeを作成する

1. ClickHouse Cloud コンソールで、Data Sources > Create ClickPipe に移動し、Amazon S3 を選択します。S3 バケットへの接続情報を入力します。Authentication methodIAM role を選択し、前の手順で作成したロールの ARN を指定します。

2. Incoming data の下で、Continuous ingestion をオンに切り替えます。インジェストモードとして Any order を選択し、バケットに接続されたキューの SQS queue URL を指定します。

3. Parse information で、target テーブルの Sorting key を定義します。マップされたスキーマに必要な調整を行い、ClickPipes データベースユーザーのロールを設定します。

4. 設定を確認し、Create ClickPipe をクリックします。ClickPipes は指定されたパスに一致する既存のファイルをすべて読み込むためにバケットの初期スキャンを実行し、その後、新しい ObjectCreated:* イベントがキューに到着し次第、ファイルの処理を開始します。