- S3 ClickPipes による初期スナップショット
- Kinesis ClickPipes によるリアルタイム更新
ReplacingMergeTree に取り込まれます。このテーブルエンジンは、更新操作を適用できるようにするため、CDC のユースケースで一般的に使用されます。このパターンの詳細については、以下のブログ記事を参照してください。
- PostgreSQL と ClickHouse を使った変更データキャプチャ (CDC) - 第1部
- PostgreSQL と ClickHouse を使った変更データキャプチャ (CDC) - 第2部
1. Kinesis ストリーム を設定する
2. スナップショットを作成する
3. スナップショットをClickHouse に読み込む
必要なテーブルを作成する
JSONExtract 関数を使って行えます。
作成するテーブルは 3 つあります。
- DynamoDB の生データを格納するためのテーブル
- 最終的なフラット化済みデータを格納するためのテーブル (宛先テーブル)
- データをフラット化するための materialized view
- このテーブルは
ReplacingMergeTreeテーブルである必要があります - テーブルには
versionカラムが必要です- 後続の手順では、Kinesis Stream の
ApproximateCreationDateTimeフィールドをversionカラムにマッピングします。
- 後続の手順では、Kinesis Stream の
- テーブルでは、パーティションキーをソートキー (
ORDER BYで指定) として使用する必要があります- 同じソートキーを持つ行は、
versionカラムに基づいて重複排除されます。
- 同じソートキーを持つ行は、
スナップショット ClickPipe を作成する
- 取り込みパス: S3 内のエクスポート済み JSON ファイルのパスを確認する必要があります。パスは次のようになります。
- フォーマット: JSONEachRow
- テーブル: 使用するスナップショットテーブル (例: 上記の例では
default.snapshot)
4. Kinesis ClickPipe を作成する
- Stream: 手順 1 で使用した Kinesisストリーム
- Table: 宛先テーブル (例: 上記の
default.destination) - Flatten object: true
- Column mappings:
ApproximateCreationDateTime:version- 以下のように、他のフィールドを適切な宛先カラムにマッピングします