- Publicar datos o suscribirse a flujos de datos.
- Organizar un almacenamiento tolerante a fallos.
- Procesar flujos a medida que estén disponibles.
Crear una tabla
kafka_broker_list— Una lista de brókeres separada por comas (por ejemplo,localhost:9092).kafka_topic_list— Una lista de topics de Kafka.kafka_group_name— Un grupo de consumidores de Kafka. Los desplazamientos de lectura se registran por separado para cada grupo. Si no desea que los mensajes se dupliquen en el clúster, use el mismo nombre de grupo en todas partes.kafka_format— Formato del mensaje. Usa la misma notación que la función SQLFORMAT, comoJSONEachRow. Para obtener más información, consulte la sección Formatos.
kafka_security_protocol- Protocolo utilizado para comunicarse con los brókeres. Valores posibles:plaintext,ssl,sasl_plaintext,sasl_ssl.kafka_sasl_mechanism- Mecanismo SASL que se utilizará para la autenticación. Valores posibles:GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER.kafka_sasl_username- nombre de usuario de SASL que se usa con los mecanismosPLAINySASL-SCRAM-...kafka_sasl_password- contraseña de SASL para usar con los mecanismosPLAINySASL-SCRAM-...kafka_schema— Parámetro que debe usarse si el format requiere una definición de esquema. Por ejemplo, Cap’n Proto requiere la ruta del archivo de esquema y el nombre del objeto raízschema.capnp:Message.kafka_schema_registry_skip_bytes— El número de bytes que se omiten desde el inicio de cada mensaje al usar un registro de esquemas con cabeceras de envoltura (p. ej., AWS Glue Schema Registry, que incluye una envoltura de 19 bytes). Rango:[0, 255]. Valor predeterminado:0.kafka_num_consumers— El número de consumidores por tabla. Especifique más consumidores si la capacidad de procesamiento de un consumidor no es suficiente. El número total de consumidores no debe superar el número de particiones del topic, ya que solo se puede asignar un consumidor por partición, y no debe ser mayor que el número de núcleos físicos del servidor donde está desplegado ClickHouse. Valor predeterminado:1.kafka_max_block_size— El tamaño máximo del lote (en mensajes) para la operación de poll. Predeterminado: max_insert_block_size.kafka_skip_broken_messages— tolerancia del parser de mensajes de Kafka frente a mensajes incompatibles con el esquema por bloque. Sikafka_skip_broken_messages = N, el motor omite N mensajes de Kafka que no se pueden parsear (un mensaje equivale a una fila de datos). Valor predeterminado:0.kafka_commit_every_batch— Confirma cada lote consumido y procesado, en lugar de realizar una única confirmación después de escribir un bloque completo. Valor predeterminado:0.kafka_client_id— Identificador de cliente. Vacío de forma predeterminada.kafka_poll_timeout_ms— Tiempo de espera para una única operación de sondeo de Kafka. Predeterminado: stream_poll_timeout_ms.kafka_poll_max_batch_size— Cantidad máxima de mensajes que se pueden obtener en una sola operación de sondeo de Kafka. Predeterminado: max_block_size.kafka_flush_interval_ms— Tiempo de espera para el volcado de datos desde Kafka. Predeterminado: stream_flush_interval_ms.kafka_consumer_reschedule_ms— Intervalo de reprogramación cuando el procesamiento de flujo de Kafka se detiene (por ejemplo, cuando no hay mensajes disponibles para consumir). Esta configuración controla el retraso antes de que el consumidor reintente el sondeo. No debe superarkafka_consumers_pool_ttl_ms. Predeterminado:500milisegundos.kafka_thread_per_consumer— Proporciona un hilo independiente para cada consumidor. Cuando está habilitado, cada consumidor vacía el búfer de datos de forma independiente, en paralelo (de lo contrario, las filas de varios consumidores se agrupan para formar un bloque). Valor predeterminado:0.kafka_handle_error_mode— Cómo manejar los errores del motor Kafka. Valores posibles: default (se lanzará una excepción si no se puede analizar un mensaje), stream (el mensaje de excepción y el mensaje sin procesar se guardarán en las columnas virtuales_errory_raw_message), dead_letter_queue (los datos relacionados con el error se guardarán en system.dead_letter_queue).kafka_commit_on_select— Hace commit de los mensajes cuando se realiza una consultaselect. Predeterminado:false.kafka_consumer_acquire_timeout_ms— Tiempo de espera en milisegundos para obtener un consumidor de Kafka durante consultas directasSELECTen una tablaKafka2(con almacenamiento de offsets basado en Keeper). Cuando se ejecutan varias consultas directasSELECTconcurrentes sobre la misma tabla, cada una debe esperar a que haya consumidores disponibles. Este tiempo de espera evita interbloqueos cuando las consultas mantienen asignados distintos subconjuntos de consumidores. Valor predeterminado:30000.kafka_max_rows_per_message— El número máximo de filas escritas en un mensaje de Kafka para los formatos basados en filas. Valor predeterminado:1.kafka_autodetect_client_rack— Establece automáticamente el parámetroclient.rackdelibrdkafkapara dar preferencia a las réplicas de Kafka más cercanas. Fuentes compatibles:AWS_ZONE_IDpara el ID de zona de disponibilidad de AWS IMDSv2, por ejemploeuc1-az1;AWS_ZONE_NAMEpara el nombre de la zona de disponibilidad de AWS IMDSv2, por ejemploeu-central-1a;GCP_ZONEpara la zona del servicio de metadatos de GCP, por ejemploeurope-central2-a;CLICKHOUSEpara usar la detección interna de ClickHouse, que puede basarse en metadatos de la nube o en la configuración;AWS_ZONE_NAME_THEN_GCP_ZONEpara probarAWS_ZONE_NAMEy luegoGCP_ZONE. Valor predeterminado: cadena vacía; deshabilitado. Consejo: los distintos entornos usan formatos diferentes para las zonas de disponibilidad. Amazon MSK suele usar ID de zona, así que prefieraAWS_ZONE_ID. Confluent Cloud suele usar nombres de zona, así que prefieraAWS_ZONE_NAME. Si no está seguro, useAWS_ZONE_NAME_THEN_GCP_ZONEo compruebe el valor debroker.racken su clúster. Nota: los brókeres de Kafka deben configurarse conbroker.rackyreplica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector.kafka_compression_codec— Códec de compresión usado para producir mensajes. Se admiten: cadena vacía,none,gzip,snappy,lz4,zstd. Si se deja una cadena vacía, la tabla no establece el códec de compresión, por lo que se usarán los valores de los archivos de configuración o el valor predeterminado delibrdkafka. Predeterminado: cadena vacía.kafka_compression_level— Parámetro del nivel de compresión para el algoritmo seleccionado por kafka_compression_codec. Los valores más altos producen una mejor compresión, a costa de un mayor uso de CPU. El intervalo utilizable depende del algoritmo:[0-9]paragzip;[0-12]paralz4; solo0parasnappy;[0-12]parazstd;-1= nivel de compresión predeterminado dependiente del códec. Predeterminado:-1.kafka_map_virtual_columns_on_write— Si está habilitado, las columnas con nombres especiales_key,_timestamp,_headers.namey_headers.valuedel esquema de la tabla se asocian con los metadatos correspondientes del mensaje de Kafka enINSERTy se excluyen del payload del mensaje. Consulte Correspondencia entre columnas y metadatos de mensajes de Kafka. Valor predeterminado:false.
El motor de tabla de Kafka no admite columnas con valor por defecto. Si necesita columnas con valor por defecto, puede agregarlas en la vista materializada (consulte más abajo).
Descripción
SELECT no es particularmente útil para leer mensajes (excepto para depuración), porque cada mensaje solo puede leerse una vez. Es más práctico crear flujos en tiempo real usando vistas materializadas. Para ello:
- Use el motor para crear un consumidor de Kafka y considérelo un flujo de datos.
- Cree una tabla con la estructura deseada.
- Cree una vista materializada que transforme los datos del motor y los inserte en una tabla creada previamente.
MATERIALIZED VIEW se une al motor, comienza a recopilar datos en segundo plano. Esto le permite recibir continuamente mensajes de Kafka y convertirlos al formato requerido mediante SELECT.
Una tabla de Kafka puede tener tantas vistas materializadas como desee; no leen datos directamente de la tabla de Kafka, sino que reciben nuevos registros (en bloques). De este modo, puede escribir en varias tablas con distintos niveles de detalle (con agrupación/agregación y sin ella).
Ejemplo:
ALTER, le recomendamos deshabilitar la vista materializada para evitar discrepancias entre la tabla de destino y los datos de la vista.
Configuración
<kafka>) y a nivel de topic (debajo de <kafka><kafka_topic>). La configuración global se aplica primero y, a continuación, se aplica la configuración a nivel de topic (si existe).
_) en lugar del punto en la configuración de ClickHouse. Por ejemplo, check.crcs=true se convierte en <check_crcs>true</check_crcs>.
Soporte para Kerberos
security_protocol con el valor sasl_plaintext. Basta con que el ticket de concesión de tickets de Kerberos se obtenga y el sistema operativo lo almacene en caché.
ClickHouse puede mantener las credenciales de Kerberos mediante un archivo keytab. Considere los elementos secundarios sasl_kerberos_service_name, sasl_kerberos_keytab y sasl_kerberos_principal.
Ejemplo:
Columnas virtuales
_topic— Topic de Kafka. Tipo de dato:LowCardinality(String)._key— Clave del mensaje. Tipo de dato:String._offset— Offset del mensaje. Tipo de dato:UInt64._timestamp— Marca temporal del mensaje. Tipo de dato:Nullable(DateTime)._timestamp_ms— Marca temporal del mensaje en milisegundos. Tipo de dato:Nullable(DateTime64(3))._partition— Partición del topic de Kafka. Tipo de dato:UInt64._headers.name— Array de claves de los encabezados del mensaje. Tipo de dato:Array(String)._headers.value— Array de valores de los encabezados del mensaje. Tipo de dato:Array(String).
kafka_handle_error_mode='stream':
_raw_message- Mensaje sin procesar que no pudo analizarse correctamente. Tipo de dato:String._error- Mensaje de excepción generado durante un error de análisis. Tipo de dato:String.
_raw_message y _error se rellenan solo si se produce una excepción durante el análisis; siempre están vacías cuando el mensaje se analiza correctamente.
Correspondencia de columnas con los metadatos de mensajes de Kafka
INSERT INTO, el motor Kafka siempre usa una columna llamada _key (de tipo String) como clave del mensaje de Kafka y una columna llamada _timestamp (de tipo DateTime) como marca temporal del mensaje de Kafka, si esas columnas existen en la tabla. De forma predeterminada, estas columnas también aparecen en el payload del mensaje junto con las demás columnas.
Con kafka_map_virtual_columns_on_write = 1, el comportamiento cambia:
_key(tipoString) — se asigna a la clave del mensaje de Kafka._timestamp(tipoDateTime) — se asigna a la marca temporal del mensaje de Kafka._headers.name(tipoArray(String)) y_headers.value(tipoArray(String)) — se asignan a los encabezados del mensaje de Kafka. Cada par(_headers.name[i], _headers.value[i])se convierte en un encabezado de Kafka. Como_headers.namey_headers.valuecomparten el prefijo Nested_headers, ClickHouse requiere que ambos arrays tengan el mismo tamaño en cada fila.
{"event_json":"{\"a\":1}"}, la clave session-42, la marca temporal actual y dos encabezados source=api y trace_id=abc-123.
Compatibilidad con formatos de datos
- En los formatos basados en filas, el número de filas en un mensaje de Kafka puede controlarse mediante la opción
kafka_max_rows_per_message. - En los formatos basados en bloques, no se puede dividir un bloque en partes más pequeñas, pero el número de filas de un bloque puede controlarse mediante la opción general max_block_size.
Motor para almacenar offsets confirmados en ClickHouse Keeper
allow_experimental_kafka_offsets_storage_in_keeper está habilitado, se pueden especificar dos ajustes adicionales para el motor de tabla de Kafka:
kafka_keeper_pathespecifica la ruta de la tabla en ClickHouse Keeperkafka_replica_nameespecifica el nombre de la réplica en ClickHouse Keeper
Limitaciones conocidas
- Eliminar y volver a crear rápidamente la tabla, o especificar la misma ruta de ClickHouse Keeper para distintos motores, puede causar problemas. Como práctica recomendada, puede usar
{uuid}enkafka_keeper_pathpara evitar conflictos entre rutas. - Para garantizar lecturas repetibles, los mensajes no pueden consumirse desde varias particiones en un solo hilo. Por otro lado, es necesario hacer poll a los consumidores de Kafka con regularidad para mantenerlos activos. Como resultado de estos dos requisitos, decidimos permitir la creación de varios consumidores solo si
kafka_thread_per_consumerestá habilitado; de lo contrario, resulta demasiado complicado evitar problemas relacionados con hacer poll a los consumidores regularmente.