跳转到主要内容
本指南介绍处理通过 ClickPipes 从 MongoDB 复制到 ClickHouse 的 JSON 数据的常见方式。 假设我们在 MongoDB 中创建了一个集合 t1,用于跟踪客户订单:
db.t1.insertOne({
  "order_id": "ORD-001234",
  "customer_id": 98765,
  "status": "completed",
  "total_amount": 299.97,
  "order_date": new Date(),
  "shipping": {
    "method": "express",
    "city": "Seattle",
    "cost": 19.99
  },
  "items": [
    {
      "category": "electronics",
      "price": 149.99
    },
    {
      "category": "accessories",
      "price": 24.99
    }
  ]
})
MongoDB CDC Connector 使用原生 JSON 数据类型将 MongoDB 文档复制到 ClickHouse。ClickHouse 中的复制表 t1 将包含以下行:
Row 1:
──────
_id:                "68a4df4b9fe6c73b541703b0"
doc:                {"_id":"68a4df4b9fe6c73b541703b0","customer_id":"98765","items":[{"category":"electronics","price":149.99},{"category":"accessories","price":24.99}],"order_date":"2025-08-19T20:32:11.705Z","order_id":"ORD-001234","shipping":{"city":"Seattle","cost":19.99,"method":"express"},"status":"completed","total_amount":299.97}
_peerdb_synced_at:  2025-08-19 20:50:42.005000000
_peerdb_is_deleted: 0
_peerdb_version:    0

表 schema

这些复制表使用以下标准 schema:
┌─name───────────────┬─type──────────┐
 _id String
 doc JSON
 _peerdb_synced_at DateTime64(9) 
 _peerdb_version Int64
 _peerdb_is_deleted Int8
└────────────────────┴───────────────┘
  • _id:来自 MongoDB 的主键
  • doc:复制为 JSON 数据类型的 MongoDB 文档
  • _peerdb_synced_at:记录该行上次同步的时间
  • _peerdb_version:记录该行的版本;当该行被更新或删除时递增
  • _peerdb_is_deleted:标记该行是否已删除

ReplacingMergeTree 表引擎

ClickPipes 使用 ReplacingMergeTree 表引擎家族将 MongoDB 集合映射到 ClickHouse 中。使用此引擎时,对于给定的主键 (_id) ,更新会被表示为插入该文档的较新版本 (_peerdb_version) ,从而能够以版本化插入的方式高效处理更新、替换和删除操作。 ReplacingMergeTree 会在后台异步去重。若要确保同一行不存在重复项,请使用 FINAL 修饰符。例如:
SELECT * FROM t1 FINAL;

处理删除操作

来自 MongoDB 的删除操作会以新增行的形式传播,并通过 _peerdb_is_deleted 列标记为已删除。通常,你会希望在查询中过滤掉这些行:
SELECT * FROM t1 FINAL WHERE _peerdb_is_deleted = 0;
你也可以创建行级策略,自动过滤已删除的行,而不必在每个查询中单独指定过滤条件:
CREATE ROW POLICY policy_name ON t1
FOR SELECT USING _peerdb_is_deleted = 0;

查询 JSON 数据

可以直接使用点语法查询 JSON 字段:
Query
SELECT
    doc.order_id,
    doc.shipping.method
FROM t1;
Result
┌-─doc.order_id─┬─doc.shipping.method─┐
 ORD-001234 express
└───────────────┴─────────────────────┘
使用点语法查询嵌套对象中的字段时,请务必添加 ^ 运算符:
Query
SELECT doc.^shipping as shipping_info FROM t1;
Result
┌─shipping_info──────────────────────────────────────┐
 {"city":"Seattle","cost":19.99,"method":"express"}
└────────────────────────────────────────────────────┘

Dynamic 类型

在 ClickHouse 中,JSON 里的每个字段都是 Dynamic 类型。Dynamic 类型使 ClickHouse 在无需预先知道具体类型的情况下,也能存储任意类型的值。你可以用 toTypeName 函数验证这一点:
Query
SELECT toTypeName(doc.customer_id) AS type FROM t1;
Result
┌─type────┐
 Dynamic
└─────────┘
要查看某个字段的底层数据类型,可以使用 dynamicType 函数。请注意,同一字段名在不同行中可能对应不同的数据类型:
Query
SELECT dynamicType(doc.customer_id) AS type FROM t1;
Result
┌─type──┐
 Int64
└───────┘
常规函数 适用于 Dynamic 类型,就像适用于常规列一样: 示例 1:日期解析
Query
SELECT parseDateTimeBestEffortOrNull(doc.order_date) AS order_date FROM t1;
Result
┌─order_date──────────┐
 2025-08-19 20:32:11
└─────────────────────┘
示例 2:条件逻辑
Query
SELECT multiIf(
    doc.total_amount < 100, 'less_than_100',
    doc.total_amount < 1000, 'less_than_1000',
    '1000+') AS spendings
FROM t1;
Result
┌─spendings──────┐
 less_than_1000
└────────────────┘
示例 3:Array 操作
Query
SELECT length(doc.items) AS item_count FROM t1;
Result
┌─item_count─┐
          2
└────────────┘

字段类型转换

ClickHouse 中的聚合函数不能直接作用于 Dynamic 类型。例如,如果你尝试直接对 Dynamic 类型使用 sum 函数,就会得到以下错误:
SELECT sum(doc.shipping.cost) AS shipping_cost FROM t1;
-- DB::Exception: 聚合函数 sum 的参数类型 Dynamic 不合法。(ILLEGAL_TYPE_OF_ARGUMENT)
要使用聚合函数,请使用 CAST 函数或 :: 语法将该字段转换为合适的类型:
Query
SELECT sum(doc.shipping.cost::Float32) AS shipping_cost FROM t1;
Result
┌─shipping_cost─┐
         19.99
└───────────────┘
将 Dynamic 类型转换为其底层数据类型 (由 dynamicType 决定) 时,性能非常高,因为 ClickHouse 在内部已按底层类型存储该值。

扁平化 JSON

常规视图

您可以基于 JSON 表创建常规视图,将扁平化、类型转换和转换逻辑封装起来,从而像查询关系型表一样查询数据。常规视图非常轻量,因为它们只存储查询本身,不存储底层数据。例如:
CREATE VIEW v1 AS
SELECT
    CAST(doc._id, 'String') AS object_id,
    CAST(doc.order_id, 'String') AS order_id,
    CAST(doc.customer_id, 'Int64') AS customer_id,
    CAST(doc.status, 'String') AS status,
    CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
    CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
    doc.^shipping AS shipping_info,
    doc.items AS items
FROM t1 FINAL
WHERE _peerdb_is_deleted = 0;
该视图的 schema 如下:
┌─name────────────┬─type───────────┐
 object_id String
 order_id String
 customer_id Int64
 status String
 total_amount Decimal(18, 2) 
 order_date DateTime64(3)  
 shipping_info JSON
 items Dynamic
└─────────────────┴────────────────┘
现在,您可以像查询扁平化后的表一样查询这个视图:
SELECT
    customer_id,
    sum(total_amount)
FROM v1
WHERE shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;

可刷新materialized view

你可以创建可刷新materialized view,按计划执行查询,对行去重,并将结果存储到扁平化的目标表中。每次按计划刷新时,目标表都会被最新的查询结果替换。 这种方法的主要优势在于,带有 FINAL 关键字的查询只会在刷新时运行一次,因此后续针对目标表的查询都不再需要使用 FINAL 缺点是,目标表中的数据只能更新到最近一次刷新时的状态。对于许多使用场景,将刷新间隔设为几分钟到几小时,通常能在数据新鲜度和查询性能之间取得较好的平衡。
CREATE TABLE flattened_t1 (
    `_id` String,
    `order_id` String,
    `customer_id` Int64,
    `status` String,
    `total_amount` Decimal(18, 2),
    `order_date` DateTime64(3),
    `shipping_info` JSON,
    `items` Dynamic
)
ENGINE = ReplacingMergeTree()
PRIMARY KEY _id
ORDER BY _id;

CREATE MATERIALIZED VIEW rmv REFRESH EVERY 1 HOUR TO flattened_t1 AS
SELECT 
    CAST(doc._id, 'String') AS _id,
    CAST(doc.order_id, 'String') AS order_id,
    CAST(doc.customer_id, 'Int64') AS customer_id,
    CAST(doc.status, 'String') AS status,
    CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
    CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
    doc.^shipping AS shipping_info,
    doc.items AS items
FROM t1 FINAL
WHERE _peerdb_is_deleted = 0;
现在,你可以直接查询表 flattened_t1,无需使用 FINAL 修饰符:
SELECT
    customer_id,
    sum(total_amount)
FROM flattened_t1
WHERE shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;

增量materialized view

如果你想实时访问扁平化后的列,可以创建增量materialized view。如果你的表更新频繁,不建议在 materialized view 中使用 FINAL 修饰符,因为每次更新都会触发合并。相反,你可以在 materialized view 之上构建一个常规视图,在查询时对数据去重。
CREATE TABLE flattened_t1 (
    `_id` String,
    `order_id` String,
    `customer_id` Int64,
    `status` String,
    `total_amount` Decimal(18, 2),
    `order_date` DateTime64(3),
    `shipping_info` JSON,
    `items` Dynamic,
    `_peerdb_version` Int64,
    `_peerdb_synced_at` DateTime64(9),
    `_peerdb_is_deleted` Int8
)
ENGINE = ReplacingMergeTree()
PRIMARY KEY _id
ORDER BY _id;

CREATE MATERIALIZED VIEW imv TO flattened_t1 AS
SELECT 
    CAST(doc._id, 'String') AS _id,
    CAST(doc.order_id, 'String') AS order_id,
    CAST(doc.customer_id, 'Int64') AS customer_id,
    CAST(doc.status, 'String') AS status,
    CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
    CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
    doc.^shipping AS shipping_info,
    doc.items,
    _peerdb_version,
    _peerdb_synced_at,   
    _peerdb_is_deleted
FROM t1;

CREATE VIEW flattened_t1_final AS
SELECT * FROM flattened_t1 FINAL WHERE _peerdb_is_deleted = 0;
现在,你可以按如下方式查询视图 flattened_t1_final
SELECT
    customer_id,
    sum(total_amount)
FROM flattened_t1_final
AND shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;
最后修改于 2026年6月10日