メインコンテンツへスキップ
このページでは、ClickPipes を使用して DynamoDB から ClickHouse への CDC (変更データキャプチャ) を設定する方法を説明します。このインテグレーションは、次の 2 つのコンポーネントで構成されています。
  1. S3 ClickPipes による初期スナップショット
  2. Kinesis ClickPipes によるリアルタイム更新
データは ReplacingMergeTree に取り込まれます。このテーブルエンジンは、更新操作を適用できるようにするため、CDC のユースケースで一般的に使用されます。このパターンの詳細については、以下のブログ記事を参照してください。

1. Kinesis ストリーム を設定する

まず、DynamoDB テーブルで Kinesis ストリーム を有効にし、変更をリアルタイムで取得できるようにします。データの取りこぼしを防ぐため、スナップショットを作成する前にこの設定を行います。 AWS のガイドはこちらを参照してください。

2. スナップショットを作成する

次に、DynamoDBテーブルのスナップショットを作成します。これは、AWS から S3 へのエクスポートで行えます。AWS のガイドはこちらを参照してください。 DynamoDB JSON フォーマットで「Full export」を実行してください。

3. スナップショットをClickHouse に読み込む

必要なテーブルを作成する

DynamoDB から取得したスナップショットデータは、次のようになります:
{
  "age": {
    "N": "26"
  },
  "first_name": {
    "S": "sally"
  },
  "id": {
    "S": "0A556908-F72B-4BE6-9048-9E60715358D4"
  }
}
データがネストされた形式になっていることに注目してください。このデータを ClickHouse に読み込む前に、フラット化する必要があります。これは、ClickHouse の materialized view で JSONExtract 関数を使って行えます。 作成するテーブルは 3 つあります。
  1. DynamoDB の生データを格納するためのテーブル
  2. 最終的なフラット化済みデータを格納するためのテーブル (宛先テーブル)
  3. データをフラット化するための materialized view
上記の DynamoDB データ例に対しては、ClickHouse のテーブルは次のようになります。
/* スナップショットテーブル */
CREATE TABLE IF NOT EXISTS "default"."snapshot"
(
    `item` String
)
ORDER BY tuple();

/* フラット化済みデータの最終テーブル */
CREATE MATERIALIZED VIEW IF NOT EXISTS "default"."snapshot_mv" TO "default"."destination" AS
SELECT
    JSONExtractString(item, 'id', 'S') AS id,
    JSONExtractInt(item, 'age', 'N') AS age,
    JSONExtractString(item, 'first_name', 'S') AS first_name
FROM "default"."snapshot";

/* フラット化済みデータの最終テーブル */
CREATE TABLE IF NOT EXISTS "default"."destination" (
    "id" String,
    "first_name" String,
    "age" Int8,
    "version" Int64
)
ENGINE ReplacingMergeTree("version")
ORDER BY id;
宛先テーブルには、いくつかの要件があります。
  • このテーブルは ReplacingMergeTree テーブルである必要があります
  • テーブルには version カラムが必要です
    • 後続の手順では、Kinesis Stream の ApproximateCreationDateTime フィールドを version カラムにマッピングします。
  • テーブルでは、パーティションキーをソートキー (ORDER BY で指定) として使用する必要があります
    • 同じソートキーを持つ行は、version カラムに基づいて重複排除されます。

スナップショット ClickPipe を作成する

これで、S3 から ClickHouse にスナップショットデータを読み込むための ClickPipe を作成できます。こちらの S3 ClickPipe ガイドに従ってください。ただし、以下の設定を使用します。
  • 取り込みパス: S3 内のエクスポート済み JSON ファイルのパスを確認する必要があります。パスは次のようになります。
https://{bucket}.s3.amazonaws.com/{prefix}/AWSDynamoDB/{export-id}/data/*
  • フォーマット: JSONEachRow
  • テーブル: 使用するスナップショットテーブル (例: 上記の例では default.snapshot)
作成されると、データはスナップショットテーブルと宛先テーブルに取り込まれ始めます。次のステップに進む前に、スナップショットの読み込みが完了するのを待つ必要はありません。

4. Kinesis ClickPipe を作成する

これで、Kinesisストリームからリアルタイムの変更を取り込む Kinesis ClickPipe を設定できます。こちらの Kinesis ClickPipe ガイドに従って、次の設定を使用してください。
  • Stream: 手順 1 で使用した Kinesisストリーム
  • Table: 宛先テーブル (例: 上記の default.destination)
  • Flatten object: true
  • Column mappings:
    • ApproximateCreationDateTime: version
    • 以下のように、他のフィールドを適切な宛先カラムにマッピングします

5. クリーンアップ (任意)

スナップショット用の ClickPipe が完了したら、スナップショットテーブルと materialized view を削除できます。
DROP TABLE IF EXISTS "default"."snapshot";
DROP TABLE IF EXISTS "default"."snapshot_clickpipes_error";
DROP VIEW IF EXISTS "default"."snapshot_mv";
最終更新日 2026年6月10日