- 데이터 흐름을 게시하거나 구독합니다.
- 내결함성 스토리지를 구성합니다.
- 스트림이 준비되는 즉시 처리합니다.
테이블 생성하기
kafka_broker_list— 쉼표로 구분된 broker 목록입니다(예:localhost:9092).kafka_topic_list— Kafka topic 목록입니다.kafka_group_name— Kafka 컨슈머 그룹입니다. 읽기 오프셋은 각 그룹별로 개별 추적됩니다. 클러스터에서 메시지 중복을 원하지 않으면 모든 곳에서 동일한 그룹 이름을 사용하십시오.kafka_format— 메시지 포맷입니다.JSONEachRow와 같이 SQLFORMAT함수와 동일한 표기법을 사용합니다. 자세한 내용은 포맷 섹션을 참조하십시오.
kafka_security_protocol- 브로커와 통신하는 데 사용하는 프로토콜입니다. 가능한 값:plaintext,ssl,sasl_plaintext,sasl_ssl.kafka_sasl_mechanism- 인증에 사용할 SASL 메커니즘입니다. 가능한 값:GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER.kafka_sasl_username-PLAIN및SASL-SCRAM-..메커니즘에서 사용할 SASL username입니다.kafka_sasl_password-PLAIN및SASL-SCRAM-..메커니즘에서 사용하는 SASL 비밀번호입니다.kafka_schema— 포맷에 스키마 정의가 필요한 경우 반드시 사용해야 하는 매개변수입니다. 예를 들어, Cap’n Proto에서는 스키마 파일의 경로와 루트schema.capnp:Message객체 이름을 지정해야 합니다.kafka_schema_registry_skip_bytes— 엔벌로프 헤더와 함께 스키마 레지스트리를 사용할 때(예: 19바이트 엔벌로프를 포함하는 AWS Glue Schema Registry) 각 메시지의 시작 부분에서 건너뛸 바이트 수입니다. 범위:[0, 255]. 기본값:0.kafka_num_consumers— 테이블당 consumer 수입니다. 하나의 consumer로 처리량이 충분하지 않으면 더 많은 consumer를 지정하십시오. 전체 consumer 수는 topic의 partition 수를 초과해서는 안 됩니다. 각 partition에는 consumer를 하나만 할당할 수 있기 때문입니다. 또한 ClickHouse가 배포된 server의 물리 코어 수보다 많아서는 안 됩니다. 기본값:1.kafka_max_block_size— 폴링 시 최대 배치 크기(메시지 수)입니다. 기본값: max_insert_block_size.kafka_skip_broken_messages— 블록(block)당 스키마(schema)와 호환되지 않는 메시지에 대해 Kafka 메시지 parser가 허용하는 범위입니다.kafka_skip_broken_messages = N이면 engine은 parse할 수 없는 Kafka 메시지 N개를 건너뜁니다(메시지 1개는 데이터 행 1개에 해당합니다). 기본값:0.kafka_commit_every_batch— 전체 block을 기록한 후 한 번만 commit하는 대신, 소비 및 처리한 각 batch마다 commit합니다. 기본값:0.kafka_client_id— 클라이언트 식별자입니다. 기본적으로 비어 있습니다.kafka_poll_timeout_ms— Kafka에서 단일 폴링에 대한 timeout입니다. 기본값: stream_poll_timeout_ms.kafka_poll_max_batch_size— 한 번의 Kafka 폴링으로 가져올 수 있는 최대 메시지 수입니다. 기본값: max_block_size.kafka_flush_interval_ms— Kafka에서 데이터를 플러시할 때의 시간 제한입니다. 기본값: stream_flush_interval_ms.kafka_consumer_reschedule_ms— Kafka 스트림 처리가 정체되었을 때(예: 소비할 메시지가 없을 때) 다시 스케줄링하는 인터벌입니다. 이 설정은 컨슈머가 폴링을 다시 시도하기 전의 지연 시간을 제어합니다.kafka_consumers_pool_ttl_ms를 초과해서는 안 됩니다. 기본값:500밀리초.kafka_thread_per_consumer— 각 consumer에 대해 독립적인 스레드를 제공합니다. 활성화되면 각 consumer가 데이터를 서로 독립적으로 병렬 플러시합니다(그렇지 않으면 여러 consumer의 행이 합쳐져 하나의 block을 이룹니다). 기본값:0.kafka_handle_error_mode— Kafka engine의 오류를 처리하는 방법입니다. 가능한 값: default(메시지 파싱에 실패하면 예외가 발생합니다), stream(예외 메시지와 원본 메시지가 가상 컬럼(virtual columns)_error및_raw_message에 저장됩니다), dead_letter_queue(오류 관련 데이터가 system.dead_letter_queue에 저장됩니다).kafka_commit_on_select— select 쿼리가 실행될 때 메시지를 커밋합니다. 기본값:false.kafka_consumer_acquire_timeout_ms—Kafka2테이블에서 Keeper 기반 오프셋 저장소를 사용할 때 직접SELECT쿼리 중 Kafka consumer를 확보하기까지 대기하는 제한 시간(밀리초)입니다. 동일한 테이블에서 여러 동시 직접SELECT쿼리가 실행되면 각 쿼리는 consumer를 사용할 수 있을 때까지 기다려야 합니다. 이 제한 시간은 쿼리가 서로 다른 consumer 부분 집합을 점유하고 있을 때 발생할 수 있는 교착 상태를 방지합니다. 기본값:30000.kafka_max_rows_per_message— 행 기반 포맷에서 하나의 Kafka 메시지에 기록할 수 있는 최대 행 수입니다. 기본값:1.kafka_autodetect_client_rack— 가장 가까운 Kafka 레플리카를 우선 사용하도록librdkafka의client.rack매개변수를 자동으로 설정합니다. 지원되는 소스: AWS IMDSv2 가용 영역 ID에는AWS_ZONE_ID를 사용합니다. 예:euc1-az1; AWS IMDSv2 가용 영역 이름에는AWS_ZONE_NAME을 사용합니다. 예:eu-central-1a; GCP 메타데이터 서비스 영역에는GCP_ZONE을 사용합니다. 예:europe-central2-a; Cloud 메타데이터 또는 구성을 기반으로 할 수 있는 ClickHouse 내부 감지를 사용하려면CLICKHOUSE를 사용합니다; 먼저AWS_ZONE_NAME을 시도하고 그다음GCP_ZONE을 시도하려면AWS_ZONE_NAME_THEN_GCP_ZONE을 사용합니다. 기본값: 빈 문자열이며 비활성화된 상태입니다. 팁: 환경에 따라 가용 영역 포맷이 다릅니다. Amazon MSK는 일반적으로 영역 ID를 사용하므로AWS_ZONE_ID를 우선 사용하는 것이 좋습니다. Confluent Cloud는 일반적으로 영역 이름을 사용하므로AWS_ZONE_NAME을 우선 사용하는 것이 좋습니다. 확실하지 않다면AWS_ZONE_NAME_THEN_GCP_ZONE을 사용하거나 클러스터의broker.rack값을 확인하십시오. 참고: Kafka broker는broker.rack및replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector가 구성되어 있어야 합니다.kafka_compression_codec— 메시지 생성 시 사용되는 압축 코덱입니다. 지원되는 값: 빈 문자열,none,gzip,snappy,lz4,zstd. 빈 문자열인 경우 이 압축 코덱은 테이블(table)에서 설정되지 않으므로, 구성 파일의 값 또는librdkafka의 기본값이 사용됩니다. 기본값: 빈 문자열.kafka_compression_level— kafka_compression_codec로 선택한 알고리즘의 압축 수준 매개변수입니다. 값이 클수록 CPU 사용량이 증가하는 대신 압축 효율이 더 좋아집니다. 사용할 수 있는 범위는 알고리즘에 따라 다릅니다:gzip은[0-9],lz4는[0-12],snappy는0만,zstd는[0-12],-1= 코덱별 기본 압축 수준. 기본값:-1.kafka_map_virtual_columns_on_write— 활성화하면 테이블 스키마에서 특수 이름_key,_timestamp,_headers.name,_headers.value를 가진 컬럼이 해당 Kafka 메시지 메타데이터에 매핑되고,INSERT시 메시지 페이로드에서 제외됩니다. 자세한 내용은 컬럼을 Kafka 메시지 메타데이터에 매핑하기를 참조하십시오. 기본값:false.
Kafka 테이블 엔진은 기본값이 있는 컬럼을 지원하지 않습니다. 기본값이 있는 컬럼이 필요하다면 materialized view 수준에서 추가할 수 있습니다(아래 참조).
설명
SELECT가 그다지 유용하지 않습니다(debugging 목적은 예외). 각 메시지는 한 번만 읽을 수 있기 때문입니다. 실시간 스레드는 materialized view를 사용해 만드는 것이 더 실용적입니다. 이를 위해 다음을 수행하십시오:
- engine을 사용해 Kafka 컨슈머를 생성하고 이를 데이터 스트림으로 간주합니다.
- 원하는 구조의 테이블을 만듭니다.
- engine의 데이터를 변환해 앞서 만든 테이블에 넣는 materialized view를 만듭니다.
MATERIALIZED VIEW가 engine에 조인되면 백그라운드에서 데이터 수집을 시작합니다. 이를 통해 Kafka에서 메시지를 지속적으로 수신하고 SELECT를 사용해 필요한 포맷으로 변환할 수 있습니다.
하나의 Kafka 테이블에는 원하는 만큼 많은 materialized view를 둘 수 있습니다. 이들은 Kafka 테이블에서 직접 데이터를 읽지 않고 새 레코드(블록 단위)를 받으므로, 서로 다른 상세 수준(그룹화 및 집계 포함/미포함)으로 여러 테이블에 쓸 수 있습니다.
예시:
ALTER를 사용해 대상 테이블을 변경하려는 경우, 대상 테이블과 뷰에서 들어오는 데이터 사이에 불일치가 생기지 않도록 구체화된 뷰(Materialized View)를 비활성화하는 것이 좋습니다.
구성
<kafka> 아래)과 토픽 수준(topic-level, <kafka><kafka_topic> 아래)입니다. 먼저 전역 구성이 적용되고, 그다음 토픽 수준 구성이 적용됩니다(있는 경우).
.) 대신 밑줄(_)을 사용하십시오. 예를 들어 check.crcs=true는 <check_crcs>true</check_crcs>로 작성됩니다.
Kerberos 지원
security_protocol 하위 요소를 sasl_plaintext 값으로 추가합니다. Kerberos 티켓 부여 티켓(TGT)을 OS 기능을 통해 획득해 캐시해 두면 충분합니다.
ClickHouse는 keytab 파일을 사용해 Kerberos 자격 증명을 유지할 수 있습니다. sasl_kerberos_service_name, sasl_kerberos_keytab, sasl_kerberos_principal 하위 요소를 사용하십시오.
예시:
가상 컬럼
_topic— Kafka 토픽입니다. 데이터 타입:LowCardinality(String)._key— 메시지 키입니다. 데이터 타입:String._offset— 메시지 오프셋입니다. 데이터 타입:UInt64._timestamp— 메시지 타임스탬프입니다. 데이터 타입:Nullable(DateTime)._timestamp_ms— 메시지의 밀리초 단위 타임스탬프입니다. 데이터 타입:Nullable(DateTime64(3))._partition— Kafka 토픽의 파티션입니다. 데이터 타입:UInt64._headers.name— 메시지 헤더 키의 배열입니다. 데이터 타입:Array(String)._headers.value— 메시지 헤더 값의 배열입니다. 데이터 타입:Array(String).
kafka_handle_error_mode='stream'일 때 추가 가상 컬럼:
_raw_message- 파싱에 성공하지 못한 원시 메시지입니다. 데이터 타입:String._error- 파싱 실패 중 발생한 예외 메시지입니다. 데이터 타입:String.
_raw_message 및 _error 가상 컬럼은 파싱 중 예외가 발생한 경우에만 채워지며, 메시지가 성공적으로 파싱된 경우에는 항상 비어 있습니다.
컬럼을 Kafka 메시지 메타데이터에 매핑하기
INSERT INTO로 메시지를 생성할 때 Kafka 엔진은 테이블에 해당 컬럼이 있으면 항상 _key라는 이름의 컬럼(타입 String)을 Kafka 메시지 키로 사용하고, _timestamp라는 이름의 컬럼(타입 DateTime)을 Kafka 메시지 타임스탬프로 사용합니다. 기본적으로 이 컬럼들은 다른 컬럼과 함께 생성된 메시지 페이로드에도 포함됩니다.
kafka_map_virtual_columns_on_write = 1을 사용하면 동작이 다음과 같이 변경됩니다.
_key(타입String) — Kafka 메시지 키에 매핑됩니다._timestamp(타입DateTime) — Kafka 메시지 타임스탬프에 매핑됩니다._headers.name(타입Array(String)) 및_headers.value(타입Array(String)) — Kafka 메시지 헤더에 매핑됩니다. 각 쌍(_headers.name[i], _headers.value[i])은 Kafka 헤더 1개가 됩니다._headers.name과_headers.value는_headers라는 Nested 접두사를 공유하므로, ClickHouse는 모든 행에서 두 배열의 크기가 같아야 합니다.
{"event_json":"{\"a\":1}"}, 키 session-42, 현재 타임스탬프, 그리고 두 개의 header source=api 및 trace_id=abc-123가 포함됩니다.
데이터 포맷 지원
- 행 기반 포맷에서는 하나의 Kafka 메시지에 포함되는 행 수를
kafka_max_rows_per_message설정으로 제어할 수 있습니다. - 블록 기반 포맷에서는 블록을 더 작은 파트로 나눌 수는 없지만, 하나의 블록에 포함되는 행 수는 일반 설정인 max_block_size로 제어할 수 있습니다.
ClickHouse Keeper에 커밋된 오프셋을 저장하는 엔진
allow_experimental_kafka_offsets_storage_in_keeper가 활성화되면 Kafka 테이블 엔진에 다음 두 가지 설정을 추가로 지정할 수 있습니다.
kafka_keeper_path: ClickHouse Keeper의 테이블 경로를 지정합니다kafka_replica_name: ClickHouse Keeper의 레플리카 이름을 지정합니다
알려진 제한 사항
- 테이블을 빠르게 삭제한 후 다시 생성하거나, 서로 다른 엔진에 동일한 ClickHouse Keeper 경로를 지정하면 문제가 발생할 수 있습니다. 모범 사례로, 경로 충돌을 방지하려면
kafka_keeper_path에{uuid}를 사용할 수 있습니다. - 반복 가능한 읽기를 보장하려면 단일 스레드에서 여러 파티션의 메시지를 소비할 수 없습니다. 반면 Kafka 컨슈머를 계속 활성 상태로 유지하려면 정기적으로 폴링해야 합니다. 이러한 두 가지 요구 사항 때문에
kafka_thread_per_consumer가 enabled된 경우에만 여러 컨슈머를 생성할 수 있도록 했습니다. 그렇지 않으면 컨슈머를 정기적으로 폴링할 때 발생할 수 있는 문제를 피하기가 너무 복잡합니다.