Esta guía presenta patrones comunes para trabajar con datos JSON replicados de MongoDB a ClickHouse mediante ClickPipes.
Supongamos que creamos una colección t1 en MongoDB para hacer seguimiento de los pedidos de los clientes:
db.t1.insertOne({
"order_id": "ORD-001234",
"customer_id": 98765,
"status": "completed",
"total_amount": 299.97,
"order_date": new Date(),
"shipping": {
"method": "express",
"city": "Seattle",
"cost": 19.99
},
"items": [
{
"category": "electronics",
"price": 149.99
},
{
"category": "accessories",
"price": 24.99
}
]
})
El conector CDC de MongoDB replica documentos de MongoDB en ClickHouse mediante el tipo de datos JSON nativo. La tabla replicada t1 de ClickHouse contendrá la siguiente fila:
Fila 1:
──────
_id: "68a4df4b9fe6c73b541703b0"
doc: {"_id":"68a4df4b9fe6c73b541703b0","customer_id":"98765","items":[{"category":"electronics","price":149.99},{"category":"accessories","price":24.99}],"order_date":"2025-08-19T20:32:11.705Z","order_id":"ORD-001234","shipping":{"city":"Seattle","cost":19.99,"method":"express"},"status":"completed","total_amount":299.97}
_peerdb_synced_at: 2025-08-19 20:50:42.005000000
_peerdb_is_deleted: 0
_peerdb_version: 0
Las tablas replicadas usan este esquema estándar:
┌─name───────────────┬─type──────────┐
│ _id │ String │
│ doc │ JSON │
│ _peerdb_synced_at │ DateTime64(9) │
│ _peerdb_version │ Int64 │
│ _peerdb_is_deleted │ Int8 │
└────────────────────┴───────────────┘
_id: Clave primaria de MongoDB
doc: Documento de MongoDB replicado como tipo de datos JSON
_peerdb_synced_at: Registra cuándo se sincronizó por última vez la fila
_peerdb_version: Lleva un seguimiento de la versión de la fila; se incrementa cuando la fila se actualiza o se elimina
_peerdb_is_deleted: Indica si la fila está eliminada
Motor de tabla ReplacingMergeTree
ClickPipes mapea las colecciones de MongoDB en ClickHouse mediante la familia de motores de tabla ReplacingMergeTree. Con este motor, las actualizaciones se modelan como inserciones con una versión más reciente (_peerdb_version) del documento para una clave primaria determinada (_id), lo que permite gestionar de forma eficiente actualizaciones, reemplazos y eliminaciones como inserciones versionadas.
ReplacingMergeTree elimina los duplicados de forma asíncrona en segundo plano. Para garantizar la ausencia de duplicados en la misma fila, utilice el FINAL modificador. Por ejemplo:
Las eliminaciones de MongoDB se propagan como nuevas filas marcadas como eliminadas mediante la columna _peerdb_is_deleted. Normalmente, querrás excluirlas de tus consultas:
SELECT * FROM t1 FINAL WHERE _peerdb_is_deleted = 0;
También puede crear una política de nivel de fila para filtrar automáticamente las filas eliminadas, en lugar de especificar el filtro en cada consulta:
CREATE ROW POLICY policy_name ON t1
FOR SELECT USING _peerdb_is_deleted = 0;
Puede consultar directamente los campos JSON mediante la sintaxis de punto:
SELECT
doc.order_id,
doc.shipping.method
FROM t1;
┌-─doc.order_id─┬─doc.shipping.method─┐
│ ORD-001234 │ express │
└───────────────┴─────────────────────┘
Al consultar campos de objetos anidados con la sintaxis de punto, asegúrese de añadir el operador ^:
SELECT doc.^shipping as shipping_info FROM t1;
┌─shipping_info──────────────────────────────────────┐
│ {"city":"Seattle","cost":19.99,"method":"express"} │
└────────────────────────────────────────────────────┘
En ClickHouse, cada campo de JSON tiene el tipo Dynamic. El tipo Dynamic permite a ClickHouse almacenar valores de cualquier tipo sin conocerlo de antemano. Puede comprobarlo con la función toTypeName:
SELECT toTypeName(doc.customer_id) AS type FROM t1;
┌─type────┐
│ Dynamic │
└─────────┘
Para examinar los tipos de datos subyacentes de un campo, puedes consultarlos con la función dynamicType. Ten en cuenta que puede haber distintos tipos de datos para un mismo nombre de campo en diferentes filas:
SELECT dynamicType(doc.customer_id) AS type FROM t1;
┌─type──┐
│ Int64 │
└───────┘
Funciones regulares funcionan con el tipo Dynamic igual que con las columnas normales:
Ejemplo 1: Análisis de fechas
SELECT parseDateTimeBestEffortOrNull(doc.order_date) AS order_date FROM t1;
┌─order_date──────────┐
│ 2025-08-19 20:32:11 │
└─────────────────────┘
Ejemplo 2: Lógica condicional
SELECT multiIf(
doc.total_amount < 100, 'less_than_100',
doc.total_amount < 1000, 'less_than_1000',
'1000+') AS spendings
FROM t1;
┌─spendings──────┐
│ less_than_1000 │
└────────────────┘
Ejemplo 3: operaciones con Array
SELECT length(doc.items) AS item_count FROM t1;
┌─item_count─┐
│ 2 │
└────────────┘
Conversión de tipos en campos
Las funciones de agregación de ClickHouse no funcionan directamente con el tipo Dynamic. Por ejemplo, si intentas aplicar directamente la función sum a un tipo Dynamic, obtendrás el siguiente error:
SELECT sum(doc.shipping.cost) AS shipping_cost FROM t1;
-- DB::Exception: Illegal type Dynamic of argument for aggregate function sum. (ILLEGAL_TYPE_OF_ARGUMENT)
Para usar las funciones de agregación, convierta el campo al tipo adecuado mediante la función CAST o la sintaxis :::
SELECT sum(doc.shipping.cost::Float32) AS shipping_cost FROM t1;
┌─shipping_cost─┐
│ 19.99 │
└───────────────┘
La conversión del tipo Dynamic al tipo de dato subyacente (determinado por dynamicType) ofrece un rendimiento muy alto, ya que ClickHouse ya almacena internamente el valor en ese tipo subyacente.
Puede crear vistas normales sobre la tabla JSON para encapsular la lógica de aplanado, conversión de tipos y transformación, y así consultar los datos como si se tratara de una tabla relacional. Las vistas normales son ligeras, ya que solo almacenan la consulta en sí, no los datos subyacentes. Por ejemplo:
CREATE VIEW v1 AS
SELECT
CAST(doc._id, 'String') AS object_id,
CAST(doc.order_id, 'String') AS order_id,
CAST(doc.customer_id, 'Int64') AS customer_id,
CAST(doc.status, 'String') AS status,
CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
doc.^shipping AS shipping_info,
doc.items AS items
FROM t1 FINAL
WHERE _peerdb_is_deleted = 0;
Esta vista tendrá el siguiente esquema:
┌─name────────────┬─type───────────┐
│ object_id │ String │
│ order_id │ String │
│ customer_id │ Int64 │
│ status │ String │
│ total_amount │ Decimal(18, 2) │
│ order_date │ DateTime64(3) │
│ shipping_info │ JSON │
│ items │ Dynamic │
└─────────────────┴────────────────┘
Ahora puedes consultar la vista de forma similar a una tabla aplanada:
SELECT
customer_id,
sum(total_amount)
FROM v1
WHERE shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;
Vista materializada actualizable
Puede crear vistas materializadas actualizables, que permiten programar la ejecución de consultas para deduplicar filas y almacenar los resultados en una tabla de destino aplanada. Con cada actualización programada, la tabla de destino se sustituye por los resultados más recientes de la consulta.
La principal ventaja de este método es que la consulta que usa la palabra clave FINAL se ejecuta solo una vez durante la actualización, lo que elimina la necesidad de usar FINAL en las consultas posteriores sobre la tabla de destino.
Una desventaja es que los datos de la tabla de destino solo están actualizados hasta la última actualización. Para muchos casos de uso, intervalos de actualización de varios minutos a unas pocas horas ofrecen un buen equilibrio entre la frescura de los datos y el rendimiento de las consultas.
CREATE TABLE flattened_t1 (
`_id` String,
`order_id` String,
`customer_id` Int64,
`status` String,
`total_amount` Decimal(18, 2),
`order_date` DateTime64(3),
`shipping_info` JSON,
`items` Dynamic
)
ENGINE = ReplacingMergeTree()
PRIMARY KEY _id
ORDER BY _id;
CREATE MATERIALIZED VIEW rmv REFRESH EVERY 1 HOUR TO flattened_t1 AS
SELECT
CAST(doc._id, 'String') AS _id,
CAST(doc.order_id, 'String') AS order_id,
CAST(doc.customer_id, 'Int64') AS customer_id,
CAST(doc.status, 'String') AS status,
CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
doc.^shipping AS shipping_info,
doc.items AS items
FROM t1 FINAL
WHERE _peerdb_is_deleted = 0;
Ahora puedes consultar la tabla flattened_t1 directamente sin el modificador FINAL:
SELECT
customer_id,
sum(total_amount)
FROM flattened_t1
WHERE shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;
Vista materializada incremental
Si desea acceder a columnas aplanadas en tiempo real, puede crear vistas materializadas incrementales. Si su tabla tiene actualizaciones frecuentes, no se recomienda usar el modificador FINAL en su vista materializada, ya que cada actualización desencadenará una fusión. En su lugar, puede deduplicar los datos en tiempo de consulta creando una vista normal sobre la vista materializada.
CREATE TABLE flattened_t1 (
`_id` String,
`order_id` String,
`customer_id` Int64,
`status` String,
`total_amount` Decimal(18, 2),
`order_date` DateTime64(3),
`shipping_info` JSON,
`items` Dynamic,
`_peerdb_version` Int64,
`_peerdb_synced_at` DateTime64(9),
`_peerdb_is_deleted` Int8
)
ENGINE = ReplacingMergeTree()
PRIMARY KEY _id
ORDER BY _id;
CREATE MATERIALIZED VIEW imv TO flattened_t1 AS
SELECT
CAST(doc._id, 'String') AS _id,
CAST(doc.order_id, 'String') AS order_id,
CAST(doc.customer_id, 'Int64') AS customer_id,
CAST(doc.status, 'String') AS status,
CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
doc.^shipping AS shipping_info,
doc.items,
_peerdb_version,
_peerdb_synced_at,
_peerdb_is_deleted
FROM t1;
CREATE VIEW flattened_t1_final AS
SELECT * FROM flattened_t1 FINAL WHERE _peerdb_is_deleted = 0;
Ahora puedes consultar la vista flattened_t1_final de la siguiente manera:
SELECT
customer_id,
sum(total_amount)
FROM flattened_t1_final
AND shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;