Saltar al contenido principal
Esta guía presenta patrones comunes para trabajar con datos JSON replicados de MongoDB a ClickHouse mediante ClickPipes. Supongamos que creamos una colección t1 en MongoDB para hacer seguimiento de los pedidos de los clientes:
db.t1.insertOne({
  "order_id": "ORD-001234",
  "customer_id": 98765,
  "status": "completed",
  "total_amount": 299.97,
  "order_date": new Date(),
  "shipping": {
    "method": "express",
    "city": "Seattle",
    "cost": 19.99
  },
  "items": [
    {
      "category": "electronics",
      "price": 149.99
    },
    {
      "category": "accessories",
      "price": 24.99
    }
  ]
})
El conector CDC de MongoDB replica documentos de MongoDB en ClickHouse mediante el tipo de datos JSON nativo. La tabla replicada t1 de ClickHouse contendrá la siguiente fila:
Fila 1:
──────
_id:                "68a4df4b9fe6c73b541703b0"
doc:                {"_id":"68a4df4b9fe6c73b541703b0","customer_id":"98765","items":[{"category":"electronics","price":149.99},{"category":"accessories","price":24.99}],"order_date":"2025-08-19T20:32:11.705Z","order_id":"ORD-001234","shipping":{"city":"Seattle","cost":19.99,"method":"express"},"status":"completed","total_amount":299.97}
_peerdb_synced_at:  2025-08-19 20:50:42.005000000
_peerdb_is_deleted: 0
_peerdb_version:    0

Esquema de tabla

Las tablas replicadas usan este esquema estándar:
┌─name───────────────┬─type──────────┐
 _id String
 doc JSON
 _peerdb_synced_at DateTime64(9) 
 _peerdb_version Int64
 _peerdb_is_deleted Int8
└────────────────────┴───────────────┘
  • _id: Clave primaria de MongoDB
  • doc: Documento de MongoDB replicado como tipo de datos JSON
  • _peerdb_synced_at: Registra cuándo se sincronizó por última vez la fila
  • _peerdb_version: Lleva un seguimiento de la versión de la fila; se incrementa cuando la fila se actualiza o se elimina
  • _peerdb_is_deleted: Indica si la fila está eliminada

Motor de tabla ReplacingMergeTree

ClickPipes mapea las colecciones de MongoDB en ClickHouse mediante la familia de motores de tabla ReplacingMergeTree. Con este motor, las actualizaciones se modelan como inserciones con una versión más reciente (_peerdb_version) del documento para una clave primaria determinada (_id), lo que permite gestionar de forma eficiente actualizaciones, reemplazos y eliminaciones como inserciones versionadas. ReplacingMergeTree elimina los duplicados de forma asíncrona en segundo plano. Para garantizar la ausencia de duplicados en la misma fila, utilice el FINAL modificador. Por ejemplo:
SELECT * FROM t1 FINAL;

Gestión de eliminaciones

Las eliminaciones de MongoDB se propagan como nuevas filas marcadas como eliminadas mediante la columna _peerdb_is_deleted. Normalmente, querrás excluirlas de tus consultas:
SELECT * FROM t1 FINAL WHERE _peerdb_is_deleted = 0;
También puede crear una política de nivel de fila para filtrar automáticamente las filas eliminadas, en lugar de especificar el filtro en cada consulta:
CREATE ROW POLICY policy_name ON t1
FOR SELECT USING _peerdb_is_deleted = 0;

Consultas de datos JSON

Puede consultar directamente los campos JSON mediante la sintaxis de punto:
Query
SELECT
    doc.order_id,
    doc.shipping.method
FROM t1;
Result
┌-─doc.order_id─┬─doc.shipping.method─┐
 ORD-001234 express
└───────────────┴─────────────────────┘
Al consultar campos de objetos anidados con la sintaxis de punto, asegúrese de añadir el operador ^:
Query
SELECT doc.^shipping as shipping_info FROM t1;
Result
┌─shipping_info──────────────────────────────────────┐
 {"city":"Seattle","cost":19.99,"method":"express"}
└────────────────────────────────────────────────────┘

Tipo Dynamic

En ClickHouse, cada campo de JSON tiene el tipo Dynamic. El tipo Dynamic permite a ClickHouse almacenar valores de cualquier tipo sin conocerlo de antemano. Puede comprobarlo con la función toTypeName:
Query
SELECT toTypeName(doc.customer_id) AS type FROM t1;
Result
┌─type────┐
 Dynamic
└─────────┘
Para examinar los tipos de datos subyacentes de un campo, puedes consultarlos con la función dynamicType. Ten en cuenta que puede haber distintos tipos de datos para un mismo nombre de campo en diferentes filas:
Query
SELECT dynamicType(doc.customer_id) AS type FROM t1;
Result
┌─type──┐
 Int64
└───────┘
Funciones regulares funcionan con el tipo Dynamic igual que con las columnas normales: Ejemplo 1: Análisis de fechas
Query
SELECT parseDateTimeBestEffortOrNull(doc.order_date) AS order_date FROM t1;
Result
┌─order_date──────────┐
 2025-08-19 20:32:11
└─────────────────────┘
Ejemplo 2: Lógica condicional
Query
SELECT multiIf(
    doc.total_amount < 100, 'less_than_100',
    doc.total_amount < 1000, 'less_than_1000',
    '1000+') AS spendings
FROM t1;
Result
┌─spendings──────┐
 less_than_1000
└────────────────┘
Ejemplo 3: operaciones con Array
Query
SELECT length(doc.items) AS item_count FROM t1;
Result
┌─item_count─┐
          2
└────────────┘

Conversión de tipos en campos

Las funciones de agregación de ClickHouse no funcionan directamente con el tipo Dynamic. Por ejemplo, si intentas aplicar directamente la función sum a un tipo Dynamic, obtendrás el siguiente error:
SELECT sum(doc.shipping.cost) AS shipping_cost FROM t1;
-- DB::Exception: Illegal type Dynamic of argument for aggregate function sum. (ILLEGAL_TYPE_OF_ARGUMENT)
Para usar las funciones de agregación, convierta el campo al tipo adecuado mediante la función CAST o la sintaxis :::
Query
SELECT sum(doc.shipping.cost::Float32) AS shipping_cost FROM t1;
Result
┌─shipping_cost─┐
         19.99
└───────────────┘
La conversión del tipo Dynamic al tipo de dato subyacente (determinado por dynamicType) ofrece un rendimiento muy alto, ya que ClickHouse ya almacena internamente el valor en ese tipo subyacente.

Aplanado de JSON

Vista normal

Puede crear vistas normales sobre la tabla JSON para encapsular la lógica de aplanado, conversión de tipos y transformación, y así consultar los datos como si se tratara de una tabla relacional. Las vistas normales son ligeras, ya que solo almacenan la consulta en sí, no los datos subyacentes. Por ejemplo:
CREATE VIEW v1 AS
SELECT
    CAST(doc._id, 'String') AS object_id,
    CAST(doc.order_id, 'String') AS order_id,
    CAST(doc.customer_id, 'Int64') AS customer_id,
    CAST(doc.status, 'String') AS status,
    CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
    CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
    doc.^shipping AS shipping_info,
    doc.items AS items
FROM t1 FINAL
WHERE _peerdb_is_deleted = 0;
Esta vista tendrá el siguiente esquema:
┌─name────────────┬─type───────────┐
 object_id String
 order_id String
 customer_id Int64
 status String
 total_amount Decimal(18, 2) 
 order_date DateTime64(3)  
 shipping_info JSON
 items Dynamic
└─────────────────┴────────────────┘
Ahora puedes consultar la vista de forma similar a una tabla aplanada:
SELECT
    customer_id,
    sum(total_amount)
FROM v1
WHERE shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;

Vista materializada actualizable

Puede crear vistas materializadas actualizables, que permiten programar la ejecución de consultas para deduplicar filas y almacenar los resultados en una tabla de destino aplanada. Con cada actualización programada, la tabla de destino se sustituye por los resultados más recientes de la consulta. La principal ventaja de este método es que la consulta que usa la palabra clave FINAL se ejecuta solo una vez durante la actualización, lo que elimina la necesidad de usar FINAL en las consultas posteriores sobre la tabla de destino. Una desventaja es que los datos de la tabla de destino solo están actualizados hasta la última actualización. Para muchos casos de uso, intervalos de actualización de varios minutos a unas pocas horas ofrecen un buen equilibrio entre la frescura de los datos y el rendimiento de las consultas.
CREATE TABLE flattened_t1 (
    `_id` String,
    `order_id` String,
    `customer_id` Int64,
    `status` String,
    `total_amount` Decimal(18, 2),
    `order_date` DateTime64(3),
    `shipping_info` JSON,
    `items` Dynamic
)
ENGINE = ReplacingMergeTree()
PRIMARY KEY _id
ORDER BY _id;

CREATE MATERIALIZED VIEW rmv REFRESH EVERY 1 HOUR TO flattened_t1 AS
SELECT 
    CAST(doc._id, 'String') AS _id,
    CAST(doc.order_id, 'String') AS order_id,
    CAST(doc.customer_id, 'Int64') AS customer_id,
    CAST(doc.status, 'String') AS status,
    CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
    CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
    doc.^shipping AS shipping_info,
    doc.items AS items
FROM t1 FINAL
WHERE _peerdb_is_deleted = 0;
Ahora puedes consultar la tabla flattened_t1 directamente sin el modificador FINAL:
SELECT
    customer_id,
    sum(total_amount)
FROM flattened_t1
WHERE shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;

Vista materializada incremental

Si desea acceder a columnas aplanadas en tiempo real, puede crear vistas materializadas incrementales. Si su tabla tiene actualizaciones frecuentes, no se recomienda usar el modificador FINAL en su vista materializada, ya que cada actualización desencadenará una fusión. En su lugar, puede deduplicar los datos en tiempo de consulta creando una vista normal sobre la vista materializada.
CREATE TABLE flattened_t1 (
    `_id` String,
    `order_id` String,
    `customer_id` Int64,
    `status` String,
    `total_amount` Decimal(18, 2),
    `order_date` DateTime64(3),
    `shipping_info` JSON,
    `items` Dynamic,
    `_peerdb_version` Int64,
    `_peerdb_synced_at` DateTime64(9),
    `_peerdb_is_deleted` Int8
)
ENGINE = ReplacingMergeTree()
PRIMARY KEY _id
ORDER BY _id;

CREATE MATERIALIZED VIEW imv TO flattened_t1 AS
SELECT 
    CAST(doc._id, 'String') AS _id,
    CAST(doc.order_id, 'String') AS order_id,
    CAST(doc.customer_id, 'Int64') AS customer_id,
    CAST(doc.status, 'String') AS status,
    CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
    CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
    doc.^shipping AS shipping_info,
    doc.items,
    _peerdb_version,
    _peerdb_synced_at,   
    _peerdb_is_deleted
FROM t1;

CREATE VIEW flattened_t1_final AS
SELECT * FROM flattened_t1 FINAL WHERE _peerdb_is_deleted = 0;
Ahora puedes consultar la vista flattened_t1_final de la siguiente manera:
SELECT
    customer_id,
    sum(total_amount)
FROM flattened_t1_final
AND shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;
Última modificación el 10 de junio de 2026