メインコンテンツへスキップ
このガイドでは、ClickPipes 経由で MongoDB から ClickHouse にレプリケートされた JSON データを扱う際の一般的なパターンを紹介します。 たとえば、顧客の注文を追跡するためのコレクション t1 を MongoDB に作成したとします。
db.t1.insertOne({
  "order_id": "ORD-001234",
  "customer_id": 98765,
  "status": "completed",
  "total_amount": 299.97,
  "order_date": new Date(),
  "shipping": {
    "method": "express",
    "city": "Seattle",
    "cost": 19.99
  },
  "items": [
    {
      "category": "electronics",
      "price": 149.99
    },
    {
      "category": "accessories",
      "price": 24.99
    }
  ]
})
MongoDB CDC コネクタは、ネイティブの JSON データ型を使用して、MongoDB のドキュメントを ClickHouse にレプリケートします。ClickHouse のレプリケートテーブル t1 には、次の行が含まれます。
 1:
──────
_id:                "68a4df4b9fe6c73b541703b0"
doc:                {"_id":"68a4df4b9fe6c73b541703b0","customer_id":"98765","items":[{"category":"electronics","price":149.99},{"category":"accessories","price":24.99}],"order_date":"2025-08-19T20:32:11.705Z","order_id":"ORD-001234","shipping":{"city":"Seattle","cost":19.99,"method":"express"},"status":"completed","total_amount":299.97}
_peerdb_synced_at:  2025-08-19 20:50:42.005000000
_peerdb_is_deleted: 0
_peerdb_version:    0

テーブルスキーマ

レプリケートテーブルでは、以下の標準スキーマを使用します。
┌─name───────────────┬─type──────────┐
 _id String
 doc JSON
 _peerdb_synced_at DateTime64(9) 
 _peerdb_version Int64
 _peerdb_is_deleted Int8
└────────────────────┴───────────────┘
  • _id: MongoDB の主キー
  • doc: JSON データ型としてレプリケートされた MongoDB ドキュメント
  • _peerdb_synced_at: 行が最後に同期された時刻を記録します
  • _peerdb_version: 行のバージョンを追跡します。行が更新または削除されるとインクリメントされます
  • _peerdb_is_deleted: 行が削除済みかどうかを示します

ReplacingMergeTree テーブルエンジン

ClickPipes は、ReplacingMergeTree テーブルエンジンファミリーを使用して、MongoDB のコレクションを ClickHouse にマッピングします。このエンジンでは、更新は、特定の主キー (_id) に対応する、より新しいバージョン (_peerdb_version) のドキュメントを insert する形で表現されます。これにより、更新、置換、削除をバージョン付き insert として効率的に処理できます。 ReplacingMergeTree は、バックグラウンドで非同期に重複を削除します。同じ行の重複がないことを保証するには、FINAL 修飾子 を使用します。例:
SELECT * FROM t1 FINAL;

削除の扱い

MongoDB で削除されたデータは、_peerdb_is_deleted カラムに削除済みの印が付いた新しい行として伝播されます。通常、クエリではこれらを除外します。
SELECT * FROM t1 FINAL WHERE _peerdb_is_deleted = 0;
各クエリで毎回フィルタを指定する代わりに、削除済みの行を自動的に除外する行レベルポリシーを作成することもできます。
CREATE ROW POLICY policy_name ON t1
FOR SELECT USING _peerdb_is_deleted = 0;

JSONデータのクエリ

ドット記法を使うと、JSONフィールドを直接クエリできます。
Query
SELECT
    doc.order_id,
    doc.shipping.method
FROM t1;
Result
┌-─doc.order_id─┬─doc.shipping.method─┐
 ORD-001234 express
└───────────────┴─────────────────────┘
ドット構文を使ってネストされたオブジェクトのフィールドをクエリする場合は、^ 演算子を必ず追加してください。
Query
SELECT doc.^shipping as shipping_info FROM t1;
Result
┌─shipping_info──────────────────────────────────────┐
 {"city":"Seattle","cost":19.99,"method":"express"}
└────────────────────────────────────────────────────┘

Dynamic 型

ClickHouse では、JSON の各フィールドは Dynamic 型になります。Dynamic 型を使うと、ClickHouse は型を事前に把握していなくても、どのような型の値でも格納できます。これは toTypeName 関数で確認できます。
Query
SELECT toTypeName(doc.customer_id) AS type FROM t1;
Result
┌─type────┐
 Dynamic
└─────────┘
フィールドの実際のデータ型を確認するには、dynamicType 関数を使います。同じフィールド名でも、行によってデータ型が異なる場合がある点に注意してください。
Query
SELECT dynamicType(doc.customer_id) AS type FROM t1;
Result
┌─type──┐
 Int64
└───────┘
通常の関数は、通常のカラムと同様に Dynamic 型でも使用できます。 例 1: 日付のパース
Query
SELECT parseDateTimeBestEffortOrNull(doc.order_date) AS order_date FROM t1;
Result
┌─order_date──────────┐
 2025-08-19 20:32:11
└─────────────────────┘
例 2: 条件分岐
Query
SELECT multiIf(
    doc.total_amount < 100, 'less_than_100',
    doc.total_amount < 1000, 'less_than_1000',
    '1000+') AS spendings
FROM t1;
Result
┌─spendings──────┐
 less_than_1000
└────────────────┘
例 3: Arrayの操作
Query
SELECT length(doc.items) AS item_count FROM t1;
Result
┌─item_count─┐
          2
└────────────┘

フィールドの型変換

ClickHouse の集約関数は、Dynamic 型を直接扱えません。たとえば、Dynamic 型に対して sum 関数を直接使用しようとすると、次のエラーが発生します。
SELECT sum(doc.shipping.cost) AS shipping_cost FROM t1;
-- DB::Exception: 集計関数 sum の引数の型 Dynamic は使用できません。 (ILLEGAL_TYPE_OF_ARGUMENT)
集約関数を使用するには、CAST 関数または :: 構文を使って、フィールドを適切な型にキャストします。
Query
SELECT sum(doc.shipping.cost::Float32) AS shipping_cost FROM t1;
Result
┌─shipping_cost─┐
         19.99
└───────────────┘
Dynamic型から基になるデータ型 (dynamicType によって決まる型) へのキャストは非常に高速です。これは、ClickHouse が値を内部的にすでにその基の型で保持しているためです。

JSONのフラット化

通常のビュー

JSONテーブルの上に通常のビューを作成すると、フラット化、CAST、変換のロジックをカプセル化し、リレーショナルテーブルのようにデータをクエリできるようになります。通常のビューは、基になるデータではなくクエリ自体のみを保存するため、軽量です。たとえば、次のようになります。
CREATE VIEW v1 AS
SELECT
    CAST(doc._id, 'String') AS object_id,
    CAST(doc.order_id, 'String') AS order_id,
    CAST(doc.customer_id, 'Int64') AS customer_id,
    CAST(doc.status, 'String') AS status,
    CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
    CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
    doc.^shipping AS shipping_info,
    doc.items AS items
FROM t1 FINAL
WHERE _peerdb_is_deleted = 0;
このビューのスキーマは次のとおりです:
┌─name────────────┬─type───────────┐
 object_id String
 order_id String
 customer_id Int64
 status String
 total_amount Decimal(18, 2) 
 order_date DateTime64(3)  
 shipping_info JSON
 items Dynamic
└─────────────────┴────────────────┘
これで、フラット化されたテーブルに対するのと同じように、このビューにクエリを実行できます。
SELECT
    customer_id,
    sum(total_amount)
FROM v1
WHERE shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;

リフレッシャブルmaterialized view

リフレッシュ可能なマテリアライズドビュー を作成できます。これにより、行の重複を排除し、その結果をフラット化した宛先テーブルに保存するクエリの実行をスケジュールできます。スケジュールされた更新が実行されるたびに、宛先テーブルは最新のクエリ結果で置き換えられます。 この方法の主な利点は、FINAL キーワードを使用するクエリが更新時に一度だけ実行されることです。これにより、その後に宛先テーブルに対して実行するクエリで FINAL を使う必要がなくなります。 一方で、宛先テーブル内のデータは直近の更新時点の内容に限られるという欠点があります。多くのユースケースでは、更新間隔を数分から数時間に設定することで、データ鮮度とクエリ性能のバランスを適切に取ることができます。
CREATE TABLE flattened_t1 (
    `_id` String,
    `order_id` String,
    `customer_id` Int64,
    `status` String,
    `total_amount` Decimal(18, 2),
    `order_date` DateTime64(3),
    `shipping_info` JSON,
    `items` Dynamic
)
ENGINE = ReplacingMergeTree()
PRIMARY KEY _id
ORDER BY _id;

CREATE MATERIALIZED VIEW rmv REFRESH EVERY 1 HOUR TO flattened_t1 AS
SELECT 
    CAST(doc._id, 'String') AS _id,
    CAST(doc.order_id, 'String') AS order_id,
    CAST(doc.customer_id, 'Int64') AS customer_id,
    CAST(doc.status, 'String') AS status,
    CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
    CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
    doc.^shipping AS shipping_info,
    doc.items AS items
FROM t1 FINAL
WHERE _peerdb_is_deleted = 0;
これで、FINAL修飾子を使わずに、テーブル flattened_t1 に直接クエリできるようになりました:
SELECT
    customer_id,
    sum(total_amount)
FROM flattened_t1
WHERE shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;

インクリメンタルmaterialized view

フラット化されたカラムにリアルタイムでアクセスしたい場合は、インクリメンタルmaterialized viewを作成できます。テーブルが頻繁に更新される場合は、更新のたびにマージがトリガーされるため、materialized view で FINAL modifier を使用することは推奨されません。代わりに、materialized view の上に通常のビューを作成することで、クエリ時にデータを重複排除できます。
CREATE TABLE flattened_t1 (
    `_id` String,
    `order_id` String,
    `customer_id` Int64,
    `status` String,
    `total_amount` Decimal(18, 2),
    `order_date` DateTime64(3),
    `shipping_info` JSON,
    `items` Dynamic,
    `_peerdb_version` Int64,
    `_peerdb_synced_at` DateTime64(9),
    `_peerdb_is_deleted` Int8
)
ENGINE = ReplacingMergeTree()
PRIMARY KEY _id
ORDER BY _id;

CREATE MATERIALIZED VIEW imv TO flattened_t1 AS
SELECT 
    CAST(doc._id, 'String') AS _id,
    CAST(doc.order_id, 'String') AS order_id,
    CAST(doc.customer_id, 'Int64') AS customer_id,
    CAST(doc.status, 'String') AS status,
    CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
    CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
    doc.^shipping AS shipping_info,
    doc.items,
    _peerdb_version,
    _peerdb_synced_at,   
    _peerdb_is_deleted
FROM t1;

CREATE VIEW flattened_t1_final AS
SELECT * FROM flattened_t1 FINAL WHERE _peerdb_is_deleted = 0;
これで、次のようにビュー flattened_t1_final に対してクエリを実行できます:
SELECT
    customer_id,
    sum(total_amount)
FROM flattened_t1_final
AND shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;
最終更新日 2026年6月10日