Saltar al contenido principal
Si usas ClickHouse Cloud, te recomendamos usar ClickPipes en su lugar. ClickPipes ofrece compatibilidad nativa con conexiones de red privadas, el escalado independiente de la ingestión y de los recursos del clúster, y una monitorización integral para la ingestión en streaming de datos de Kafka en ClickHouse.
  • Publicar datos o suscribirse a flujos de datos.
  • Organizar un almacenamiento tolerante a fallos.
  • Procesar flujos a medida que estén disponibles.

Crear una tabla

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [ALIAS expr1],
    name2 [type2] [ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_security_protocol = '',]
    [kafka_sasl_mechanism = '',]
    [kafka_sasl_username = '',]
    [kafka_sasl_password = '',]
    [kafka_autodetect_client_rack = '',]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_client_id = '',]
    [kafka_poll_timeout_ms = 0,]
    [kafka_poll_max_batch_size = 0,]
    [kafka_flush_interval_ms = 0,]
    [kafka_consumer_reschedule_ms = 0,]
    [kafka_thread_per_consumer = 0,]
    [kafka_handle_error_mode = 'default',]
    [kafka_commit_on_select = false,]
    [kafka_consumer_acquire_timeout_ms = 30000,]
    [kafka_max_rows_per_message = 1,]
    [kafka_compression_codec = '',]
    [kafka_compression_level = -1];
Parámetros obligatorios:
  • kafka_broker_list — Una lista de brókeres separada por comas (por ejemplo, localhost:9092).
  • kafka_topic_list — Una lista de topics de Kafka.
  • kafka_group_name — Un grupo de consumidores de Kafka. Los desplazamientos de lectura se registran por separado para cada grupo. Si no desea que los mensajes se dupliquen en el clúster, use el mismo nombre de grupo en todas partes.
  • kafka_format — Formato del mensaje. Usa la misma notación que la función SQL FORMAT, como JSONEachRow. Para obtener más información, consulte la sección Formatos.
Parámetros opcionales:
  • kafka_security_protocol - Protocolo utilizado para comunicarse con los brókeres. Valores posibles: plaintext, ssl, sasl_plaintext, sasl_ssl.
  • kafka_sasl_mechanism - Mecanismo SASL que se utilizará para la autenticación. Valores posibles: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER.
  • kafka_sasl_username - nombre de usuario de SASL que se usa con los mecanismos PLAIN y SASL-SCRAM-...
  • kafka_sasl_password - contraseña de SASL para usar con los mecanismos PLAIN y SASL-SCRAM-...
  • kafka_schema — Parámetro que debe usarse si el format requiere una definición de esquema. Por ejemplo, Cap’n Proto requiere la ruta del archivo de esquema y el nombre del objeto raíz schema.capnp:Message.
  • kafka_schema_registry_skip_bytes — El número de bytes que se omiten desde el inicio de cada mensaje al usar un registro de esquemas con cabeceras de envoltura (p. ej., AWS Glue Schema Registry, que incluye una envoltura de 19 bytes). Rango: [0, 255]. Valor predeterminado: 0.
  • kafka_num_consumers — El número de consumidores por tabla. Especifique más consumidores si la capacidad de procesamiento de un consumidor no es suficiente. El número total de consumidores no debe superar el número de particiones del topic, ya que solo se puede asignar un consumidor por partición, y no debe ser mayor que el número de núcleos físicos del servidor donde está desplegado ClickHouse. Valor predeterminado: 1.
  • kafka_max_block_size — El tamaño máximo del lote (en mensajes) para la operación de poll. Predeterminado: max_insert_block_size.
  • kafka_skip_broken_messages — tolerancia del parser de mensajes de Kafka frente a mensajes incompatibles con el esquema por bloque. Si kafka_skip_broken_messages = N, el motor omite N mensajes de Kafka que no se pueden parsear (un mensaje equivale a una fila de datos). Valor predeterminado: 0.
  • kafka_commit_every_batch — Confirma cada lote consumido y procesado, en lugar de realizar una única confirmación después de escribir un bloque completo. Valor predeterminado: 0.
  • kafka_client_id — Identificador de cliente. Vacío de forma predeterminada.
  • kafka_poll_timeout_ms — Tiempo de espera para una única operación de sondeo de Kafka. Predeterminado: stream_poll_timeout_ms.
  • kafka_poll_max_batch_size — Cantidad máxima de mensajes que se pueden obtener en una sola operación de sondeo de Kafka. Predeterminado: max_block_size.
  • kafka_flush_interval_ms — Tiempo de espera para el volcado de datos desde Kafka. Predeterminado: stream_flush_interval_ms.
  • kafka_consumer_reschedule_ms — Intervalo de reprogramación cuando el procesamiento de flujo de Kafka se detiene (por ejemplo, cuando no hay mensajes disponibles para consumir). Esta configuración controla el retraso antes de que el consumidor reintente el sondeo. No debe superar kafka_consumers_pool_ttl_ms. Predeterminado: 500 milisegundos.
  • kafka_thread_per_consumer — Proporciona un hilo independiente para cada consumidor. Cuando está habilitado, cada consumidor vacía el búfer de datos de forma independiente, en paralelo (de lo contrario, las filas de varios consumidores se agrupan para formar un bloque). Valor predeterminado: 0.
  • kafka_handle_error_mode — Cómo manejar los errores del motor Kafka. Valores posibles: default (se lanzará una excepción si no se puede analizar un mensaje), stream (el mensaje de excepción y el mensaje sin procesar se guardarán en las columnas virtuales _error y _raw_message), dead_letter_queue (los datos relacionados con el error se guardarán en system.dead_letter_queue).
  • kafka_commit_on_select — Hace commit de los mensajes cuando se realiza una consulta select. Predeterminado: false.
  • kafka_consumer_acquire_timeout_ms — Tiempo de espera en milisegundos para obtener un consumidor de Kafka durante consultas directas SELECT en una tabla Kafka2 (con almacenamiento de offsets basado en Keeper). Cuando se ejecutan varias consultas directas SELECT concurrentes sobre la misma tabla, cada una debe esperar a que haya consumidores disponibles. Este tiempo de espera evita interbloqueos cuando las consultas mantienen asignados distintos subconjuntos de consumidores. Valor predeterminado: 30000.
  • kafka_max_rows_per_message — El número máximo de filas escritas en un mensaje de Kafka para los formatos basados en filas. Valor predeterminado: 1.
  • kafka_autodetect_client_rack — Establece automáticamente el parámetro client.rack de librdkafka para dar preferencia a las réplicas de Kafka más cercanas. Fuentes compatibles: AWS_ZONE_ID para el ID de zona de disponibilidad de AWS IMDSv2, por ejemplo euc1-az1; AWS_ZONE_NAME para el nombre de la zona de disponibilidad de AWS IMDSv2, por ejemplo eu-central-1a; GCP_ZONE para la zona del servicio de metadatos de GCP, por ejemplo europe-central2-a; CLICKHOUSE para usar la detección interna de ClickHouse, que puede basarse en metadatos de la nube o en la configuración; AWS_ZONE_NAME_THEN_GCP_ZONE para probar AWS_ZONE_NAME y luego GCP_ZONE. Valor predeterminado: cadena vacía; deshabilitado. Consejo: los distintos entornos usan formatos diferentes para las zonas de disponibilidad. Amazon MSK suele usar ID de zona, así que prefiera AWS_ZONE_ID. Confluent Cloud suele usar nombres de zona, así que prefiera AWS_ZONE_NAME. Si no está seguro, use AWS_ZONE_NAME_THEN_GCP_ZONE o compruebe el valor de broker.rack en su clúster. Nota: los brókeres de Kafka deben configurarse con broker.rack y replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector.
  • kafka_compression_codec — Códec de compresión usado para producir mensajes. Se admiten: cadena vacía, none, gzip, snappy, lz4, zstd. Si se deja una cadena vacía, la tabla no establece el códec de compresión, por lo que se usarán los valores de los archivos de configuración o el valor predeterminado de librdkafka. Predeterminado: cadena vacía.
  • kafka_compression_level — Parámetro del nivel de compresión para el algoritmo seleccionado por kafka_compression_codec. Los valores más altos producen una mejor compresión, a costa de un mayor uso de CPU. El intervalo utilizable depende del algoritmo: [0-9] para gzip; [0-12] para lz4; solo 0 para snappy; [0-12] para zstd; -1 = nivel de compresión predeterminado dependiente del códec. Predeterminado: -1.
  • kafka_map_virtual_columns_on_write — Si está habilitado, las columnas con nombres especiales _key, _timestamp, _headers.name y _headers.value del esquema de la tabla se asocian con los metadatos correspondientes del mensaje de Kafka en INSERT y se excluyen del payload del mensaje. Consulte Correspondencia entre columnas y metadatos de mensajes de Kafka. Valor predeterminado: false.
Ejemplos:
  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  SELECT * FROM queue LIMIT 5;

  CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
                            kafka_topic_list = 'topic',
                            kafka_group_name = 'group1',
                            kafka_format = 'JSONEachRow',
                            kafka_num_consumers = 4;

  CREATE TABLE queue3 (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
              SETTINGS kafka_format = 'JSONEachRow',
                       kafka_num_consumers = 4;
El motor de tabla de Kafka no admite columnas con valor por defecto. Si necesita columnas con valor por defecto, puede agregarlas en la vista materializada (consulte más abajo).

Descripción

Los mensajes entregados se rastrean automáticamente, por lo que cada mensaje de un grupo solo se cuenta una vez. Si desea obtener los datos dos veces, cree una copia de la tabla con otro nombre de grupo. Los grupos son flexibles y se sincronizan en el clúster. Por ejemplo, si tiene 10 topics y 5 copias de una tabla en un clúster, cada copia obtiene 2 topics. Si cambia el número de copias, los topics se redistribuyen automáticamente entre ellas. Lea más sobre esto en http://kafka.apache.org/intro. Se recomienda que cada topic de Kafka tenga su propio grupo de consumidores dedicado, lo que garantiza una relación exclusiva entre el topic y el grupo, especialmente en entornos donde los topics pueden crearse y eliminarse dinámicamente (p. ej., en pruebas o staging). SELECT no es particularmente útil para leer mensajes (excepto para depuración), porque cada mensaje solo puede leerse una vez. Es más práctico crear flujos en tiempo real usando vistas materializadas. Para ello:
  1. Use el motor para crear un consumidor de Kafka y considérelo un flujo de datos.
  2. Cree una tabla con la estructura deseada.
  3. Cree una vista materializada que transforme los datos del motor y los inserte en una tabla creada previamente.
Cuando la MATERIALIZED VIEW se une al motor, comienza a recopilar datos en segundo plano. Esto le permite recibir continuamente mensajes de Kafka y convertirlos al formato requerido mediante SELECT. Una tabla de Kafka puede tener tantas vistas materializadas como desee; no leen datos directamente de la tabla de Kafka, sino que reciben nuevos registros (en bloques). De este modo, puede escribir en varias tablas con distintos niveles de detalle (con agrupación/agregación y sin ella). Ejemplo:
  CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

  CREATE TABLE daily (
    day Date,
    level String,
    total UInt64
  ) ENGINE = SummingMergeTree(day, (day, level), 8192);

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() AS total
    FROM queue GROUP BY day, level;

  SELECT level, sum(total) FROM daily GROUP BY level;
Para mejorar el rendimiento, los mensajes recibidos se agrupan en bloques de tamaño max_insert_block_size. Si el bloque no se forma dentro de stream_flush_interval_ms milisegundos, los datos se escribirán en la tabla independientemente de si el bloque está completo o no. Para dejar de recibir datos del topic o cambiar la lógica de conversión, desvincule la vista materializada:
  DETACH TABLE consumer;
  ATTACH TABLE consumer;
Si desea cambiar la tabla de destino mediante ALTER, le recomendamos deshabilitar la vista materializada para evitar discrepancias entre la tabla de destino y los datos de la vista.

Configuración

Al igual que GraphiteMergeTree, el motor Kafka admite una configuración ampliada mediante el archivo de configuración de ClickHouse. Hay dos claves de configuración que puede usar: global (debajo de <kafka>) y a nivel de topic (debajo de <kafka><kafka_topic>). La configuración global se aplica primero y, a continuación, se aplica la configuración a nivel de topic (si existe).
  <kafka>
    <!-- Opciones de configuración globales para todas las tablas de tipo motor Kafka -->
    <debug>cgrp</debug>
    <statistics_interval_ms>3000</statistics_interval_ms>

    <kafka_topic>
        <name>logs</name>
        <statistics_interval_ms>4000</statistics_interval_ms>
    </kafka_topic>

    <!-- Configuración para el consumidor -->
    <consumer>
        <auto_offset_reset>smallest</auto_offset_reset>
        <kafka_topic>
            <name>logs</name>
            <fetch_min_bytes>100000</fetch_min_bytes>
        </kafka_topic>

        <kafka_topic>
            <name>stats</name>
            <fetch_min_bytes>50000</fetch_min_bytes>
        </kafka_topic>
    </consumer>

    <!-- Configuración para el productor -->
    <producer>
        <kafka_topic>
            <name>logs</name>
            <retry_backoff_ms>250</retry_backoff_ms>
        </kafka_topic>

        <kafka_topic>
            <name>stats</name>
            <retry_backoff_ms>400</retry_backoff_ms>
        </kafka_topic>
    </producer>
  </kafka>
Para obtener una lista de las posibles opciones de configuración, consulte la referencia de configuración de librdkafka. Use el guion bajo (_) en lugar del punto en la configuración de ClickHouse. Por ejemplo, check.crcs=true se convierte en <check_crcs>true</check_crcs>.

Soporte para Kerberos

Para trabajar con Kafka con compatibilidad con Kerberos, agregue el elemento secundario security_protocol con el valor sasl_plaintext. Basta con que el ticket de concesión de tickets de Kerberos se obtenga y el sistema operativo lo almacene en caché. ClickHouse puede mantener las credenciales de Kerberos mediante un archivo keytab. Considere los elementos secundarios sasl_kerberos_service_name, sasl_kerberos_keytab y sasl_kerberos_principal. Ejemplo:
<!-- Kafka con soporte para Kerberos -->
<kafka>
  <security_protocol>SASL_PLAINTEXT</security_protocol>
  <sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
  <sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
</kafka>

Columnas virtuales

  • _topic — Topic de Kafka. Tipo de dato: LowCardinality(String).
  • _key — Clave del mensaje. Tipo de dato: String.
  • _offset — Offset del mensaje. Tipo de dato: UInt64.
  • _timestamp — Marca temporal del mensaje. Tipo de dato: Nullable(DateTime).
  • _timestamp_ms — Marca temporal del mensaje en milisegundos. Tipo de dato: Nullable(DateTime64(3)).
  • _partition — Partición del topic de Kafka. Tipo de dato: UInt64.
  • _headers.name — Array de claves de los encabezados del mensaje. Tipo de dato: Array(String).
  • _headers.value — Array de valores de los encabezados del mensaje. Tipo de dato: Array(String).
Columnas virtuales adicionales cuando kafka_handle_error_mode='stream':
  • _raw_message - Mensaje sin procesar que no pudo analizarse correctamente. Tipo de dato: String.
  • _error - Mensaje de excepción generado durante un error de análisis. Tipo de dato: String.
Nota: las columnas virtuales _raw_message y _error se rellenan solo si se produce una excepción durante el análisis; siempre están vacías cuando el mensaje se analiza correctamente.

Correspondencia de columnas con los metadatos de mensajes de Kafka

Al producir mensajes con INSERT INTO, el motor Kafka siempre usa una columna llamada _key (de tipo String) como clave del mensaje de Kafka y una columna llamada _timestamp (de tipo DateTime) como marca temporal del mensaje de Kafka, si esas columnas existen en la tabla. De forma predeterminada, estas columnas también aparecen en el payload del mensaje junto con las demás columnas. Con kafka_map_virtual_columns_on_write = 1, el comportamiento cambia:
  • _key (tipo String) — se asigna a la clave del mensaje de Kafka.
  • _timestamp (tipo DateTime) — se asigna a la marca temporal del mensaje de Kafka.
  • _headers.name (tipo Array(String)) y _headers.value (tipo Array(String)) — se asignan a los encabezados del mensaje de Kafka. Cada par (_headers.name[i], _headers.value[i]) se convierte en un encabezado de Kafka. Como _headers.name y _headers.value comparten el prefijo Nested _headers, ClickHouse requiere que ambos arrays tengan el mismo tamaño en cada fila.
Las columnas con estos nombres se excluyen del payload del mensaje solo si sus tipos coinciden con los indicados anteriormente; de lo contrario, permanecen en el payload, por lo que los esquemas que reutilizan esos nombres para datos no relacionados siguen funcionando. Ejemplo:
CREATE TABLE kafka_out
(
    event_json String,
    `_key` String,
    `_timestamp` DateTime,
    `_headers.name` Array(String),
    `_headers.value` Array(String)
)
ENGINE = Kafka
SETTINGS
    kafka_broker_list = 'broker:9092',
    kafka_topic_list = 'events',
    kafka_group_name = 'events-producer',
    kafka_format = 'JSONEachRow',
    kafka_map_virtual_columns_on_write = 1;

INSERT INTO kafka_out VALUES
    ('{"a":1}', 'session-42', now(), ['source', 'trace_id'], ['api', 'abc-123']);
El mensaje de Kafka generado contiene el payload {"event_json":"{\"a\":1}"}, la clave session-42, la marca temporal actual y dos encabezados source=api y trace_id=abc-123.

Compatibilidad con formatos de datos

El motor Kafka admite todos los formatos compatibles con ClickHouse. El número de filas en un mensaje de Kafka depende de si el formato está basado en filas o en bloques:
  • En los formatos basados en filas, el número de filas en un mensaje de Kafka puede controlarse mediante la opción kafka_max_rows_per_message.
  • En los formatos basados en bloques, no se puede dividir un bloque en partes más pequeñas, pero el número de filas de un bloque puede controlarse mediante la opción general max_block_size.

Motor para almacenar offsets confirmados en ClickHouse Keeper

Si allow_experimental_kafka_offsets_storage_in_keeper está habilitado, se pueden especificar dos ajustes adicionales para el motor de tabla de Kafka:
  • kafka_keeper_path especifica la ruta de la tabla en ClickHouse Keeper
  • kafka_replica_name especifica el nombre de la réplica en ClickHouse Keeper
Deben especificarse ambos ajustes o ninguno de los dos. Cuando se especifican ambos, se utilizará un nuevo motor Kafka experimental. El nuevo motor no depende de almacenar los offsets confirmados en Kafka, sino que los almacena en ClickHouse Keeper. Sigue intentando confirmar los offsets en Kafka, pero solo depende de ellos cuando se crea la tabla. En cualquier otro caso (si la tabla se reinicia o se recupera después de algún error), se usarán los offsets almacenados en ClickHouse Keeper para seguir consumiendo mensajes. Además del offset confirmado, también almacena cuántos mensajes se consumieron en el último lote, de modo que, si la inserción falla, se volverá a consumir la misma cantidad de mensajes, lo que permite la deduplicación si es necesario. Ejemplo:
CREATE TABLE experimental_kafka (key UInt64, value UInt64)
ENGINE = Kafka('localhost:19092', 'my-topic', 'my-consumer', 'JSONEachRow')
SETTINGS
  kafka_keeper_path = '/clickhouse/{database}/{uuid}',
  kafka_replica_name = '{replica}'
SETTINGS allow_experimental_kafka_offsets_storage_in_keeper=1;

Limitaciones conocidas

Como el nuevo motor es experimental, todavía no está listo para producción. La implementación presenta algunas limitaciones conocidas:
  • Eliminar y volver a crear rápidamente la tabla, o especificar la misma ruta de ClickHouse Keeper para distintos motores, puede causar problemas. Como práctica recomendada, puede usar {uuid} en kafka_keeper_path para evitar conflictos entre rutas.
  • Para garantizar lecturas repetibles, los mensajes no pueden consumirse desde varias particiones en un solo hilo. Por otro lado, es necesario hacer poll a los consumidores de Kafka con regularidad para mantenerlos activos. Como resultado de estos dos requisitos, decidimos permitir la creación de varios consumidores solo si kafka_thread_per_consumer está habilitado; de lo contrario, resulta demasiado complicado evitar problemas relacionados con hacer poll a los consumidores regularmente.
Véase también
Última modificación el 10 de junio de 2026