메인 콘텐츠로 건너뛰기
이 엔진을 사용하면 ClickHouse를 RabbitMQ와 통합할 수 있습니다. RabbitMQ를 사용하면 다음 작업을 수행할 수 있습니다:
  • 데이터 흐름을 게시하거나 구독할 수 있습니다.
  • 스트림을 사용할 수 있게 되는 즉시 처리할 수 있습니다.

테이블 생성

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1],
    name2 [type2],
    ...
) ENGINE = RabbitMQ SETTINGS
    rabbitmq_host_port = 'host:port' [or rabbitmq_address = 'amqp(s)://guest:guest@localhost/vhost'],
    rabbitmq_exchange_name = 'exchange_name',
    rabbitmq_format = 'data_format'[,]
    [rabbitmq_exchange_type = 'exchange_type',]
    [rabbitmq_routing_key_list = 'key1,key2,...',]
    [rabbitmq_secure = 0,]
    [rabbitmq_schema = '',]
    [rabbitmq_num_consumers = N,]
    [rabbitmq_num_queues = N,]
    [rabbitmq_queue_base = 'queue',]
    [rabbitmq_deadletter_exchange = 'dl-exchange',]
    [rabbitmq_persistent = 0,]
    [rabbitmq_skip_broken_messages = N,]
    [rabbitmq_max_block_size = N,]
    [rabbitmq_flush_interval_ms = N,]
    [rabbitmq_queue_settings_list = 'x-dead-letter-exchange=my-dlx,x-max-length=10,x-overflow=reject-publish',]
    [rabbitmq_queue_consume = false,]
    [rabbitmq_address = '',]
    [rabbitmq_vhost = '/',]
    [rabbitmq_username = '',]
    [rabbitmq_password = '',]
    [rabbitmq_commit_on_select = false,]
    [rabbitmq_max_rows_per_message = 1,]
    [rabbitmq_handle_error_mode = 'default']
필수 매개변수:
  • rabbitmq_host_port – host:port(예: localhost:5672).
  • rabbitmq_exchange_name – RabbitMQ exchange 이름.
  • rabbitmq_format – 메시지 포맷. JSONEachRow와 같이 SQL FORMAT 함수와 같은 표기법을 사용합니다. 자세한 내용은 포맷 섹션을 참조하십시오.
선택적 매개변수:
  • rabbitmq_exchange_type – RabbitMQ exchange 유형입니다: direct, fanout, topic, headers, consistent_hash. 기본값: fanout.
  • rabbitmq_routing_key_list – 쉼표로 구분된 라우팅 키 목록입니다.
  • rabbitmq_schema – 포맷에 schema 정의가 필요한 경우 반드시 사용해야 하는 매개변수입니다. 예를 들어 Cap’n Proto는 schema 파일의 경로와 루트 schema.capnp:Message 객체 이름이 필요합니다.
  • rabbitmq_num_consumers – table당 consumer 수입니다. consumer 1개의 처리량이 충분하지 않으면 consumer 수를 늘리십시오. 기본값: 1
  • rabbitmq_num_queues – 전체 큐 수입니다. 이 값을 늘리면 성능이 크게 향상될 수 있습니다. 기본값: 1.
  • rabbitmq_queue_base - 큐 이름에 대한 힌트를 지정합니다. 이 설정의 사용 사례는 아래에 설명되어 있습니다.
  • rabbitmq_persistent - 1(true)로 설정하면 삽입 쿼리의 전달 모드가 2로 설정됩니다(메시지가 ‘persistent’로 표시됨). 기본값: 0.
  • rabbitmq_skip_broken_messages – 블록당 schema와 호환되지 않는 메시지에 대해 RabbitMQ 메시지 parser가 허용하는 한도입니다. rabbitmq_skip_broken_messages = N이면 engine은 parse할 수 없는 RabbitMQ 메시지 N개를 건너뜁니다(메시지 하나는 데이터 행 하나에 해당함). 기본값: 0.
  • rabbitmq_max_block_size - RabbitMQ에서 데이터를 플러시하기 전에 수집할 행 수입니다. 기본값: max_insert_block_size.
  • rabbitmq_flush_interval_ms - RabbitMQ에서 데이터를 플러시하기 위한 timeout입니다. 기본값: stream_flush_interval_ms.
  • rabbitmq_queue_settings_list - 큐를 생성할 때 RabbitMQ 설정을 지정할 수 있습니다. 사용 가능한 설정: x-max-length, x-max-length-bytes, x-message-ttl, x-expires, x-priority, x-max-priority, x-overflow, x-dead-letter-exchange, x-queue-type. 큐에서는 durable 설정이 자동으로 활성화됩니다.
  • rabbitmq_address - 연결에 사용할 주소입니다. 이 설정 또는 rabbitmq_host_port를 사용하십시오.
  • rabbitmq_vhost - RabbitMQ vhost입니다. 기본값: '\'.
  • rabbitmq_queue_consume - 사용자 정의 큐를 사용하고 exchange, 큐, binding 선언 등 RabbitMQ 설정은 수행하지 않습니다. 기본값: false.
  • rabbitmq_username - RabbitMQ 사용자 이름입니다.
  • rabbitmq_password - RabbitMQ 비밀번호입니다.
  • reject_unhandled_messages - 오류 발생 시 메시지를 거부합니다(RabbitMQ negative acknowledgement 전송). rabbitmq_queue_settings_listx-dead-letter-exchange가 정의되어 있으면 이 설정은 자동으로 활성화됩니다.
  • rabbitmq_commit_on_select - select 쿼리가 실행될 때 메시지를 commit합니다. 기본값: false.
  • rabbitmq_max_rows_per_message — 행 기반 포맷에서 하나의 RabbitMQ 메시지에 기록할 수 있는 최대 행 수입니다. 기본값: 1.
  • rabbitmq_empty_queue_backoff_start_ms — RabbitMQ 큐가 비어 있을 때 읽기를 다시 예약하기 위한 시작 백오프 지점입니다.
  • rabbitmq_empty_queue_backoff_end_ms — RabbitMQ 큐가 비어 있을 때 읽기를 다시 예약하기 위한 종료 백오프 지점입니다.
  • rabbitmq_empty_queue_backoff_step_ms — RabbitMQ 큐가 비어 있을 때 읽기를 다시 예약하기 위한 백오프 간격입니다.
  • rabbitmq_handle_error_mode — RabbitMQ engine에서 오류를 처리하는 방식입니다. 가능한 값: default(메시지를 parse하지 못하면 예외 발생), stream(예외 메시지와 원시 메시지를 가상 컬럼 _error_raw_message에 저장), dead_letter_queue(오류 관련 데이터를 system.dead_letter_queue에 저장).

SSL 연결

연결 주소에 rabbitmq_secure = 1 또는 amqps를 사용하십시오: rabbitmq_address = 'amqps://guest:guest@localhost/vhost'. 사용 중인 라이브러리의 기본 동작은 생성된 TLS 연결이 충분히 안전한지 확인하지 않는 것입니다. 인증서가 만료되었거나, 자체 서명되었거나, 누락되었거나, 유효하지 않더라도 연결은 그대로 허용됩니다. 더 엄격한 인증서 검사는 향후 구현될 수 있습니다. 또한 rabbitmq 관련 설정과 함께 포맷 설정도 추가할 수 있습니다. 예시:
  CREATE TABLE queue (
    key UInt64,
    value UInt64,
    date DateTime
  ) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
                            rabbitmq_exchange_name = 'exchange1',
                            rabbitmq_format = 'JSONEachRow',
                            rabbitmq_num_consumers = 5,
                            date_time_input_format = 'best_effort';
RabbitMQ 서버 구성은 ClickHouse 구성 파일에 추가해야 합니다. 필수 구성:
 <rabbitmq>
    <username>root</username>
    <password>clickhouse</password>
 </rabbitmq>
추가 구성:
 <rabbitmq>
    <vhost>clickhouse</vhost>
 </rabbitmq>

설명

메시지를 읽기 위해 SELECT를 사용하는 방법은(디버깅 목적을 제외하면) 그다지 유용하지 않습니다. 각 메시지는 한 번만 읽을 수 있기 때문입니다. 더 실용적인 방법은 materialized view를 사용해 실시간 스레드를 만드는 것입니다. 이를 위해 다음과 같이 하십시오:
  1. 엔진을 사용해 RabbitMQ consumer를 만들고 이를 데이터 스트림으로 간주합니다.
  2. 원하는 구조의 테이블을 생성합니다.
  3. 엔진의 데이터를 변환해 앞서 생성한 테이블에 넣는 materialized view를 생성합니다.
MATERIALIZED VIEW를 엔진에 조인하면 백그라운드에서 데이터 수집이 시작됩니다. 이를 통해 RabbitMQ에서 메시지를 지속적으로 수신하고, SELECT를 사용해 필요한 포맷으로 변환할 수 있습니다. 하나의 RabbitMQ 테이블에는 원하는 수만큼 materialized view를 둘 수 있습니다. 데이터는 rabbitmq_exchange_type 및 지정된 rabbitmq_routing_key_list에 따라 전달할 수 있습니다. 테이블당 exchange는 최대 1개만 사용할 수 있습니다. 하나의 exchange를 여러 테이블이 공유할 수 있으며, 이를 통해 여러 테이블로 동시에 라우팅할 수 있습니다. Exchange 유형 옵션:
  • direct - 정확히 일치하는 키를 기준으로 라우팅합니다. 예시 테이블 키 목록: key1,key2,key3,key4,key5, 메시지 키는 이들 중 어느 것과도 일치할 수 있습니다.
  • fanout - 키와 관계없이 모든 테이블로 라우팅합니다(exchange 이름이 같은 경우).
  • topic - 점으로 구분된 키 패턴을 기준으로 라우팅합니다. 예시: *.logs, records.*.*.2020, *.2018,*.2019,*.2020.
  • headers - x-match=all 또는 x-match=any 설정과 함께 key=value 일치를 기준으로 라우팅합니다. 예시 테이블 키 목록: x-match=all,format=logs,type=report,year=2020.
  • consistent_hash - 모든 바인딩된 테이블에 데이터가 고르게 분산됩니다(exchange 이름이 같은 경우). 이 exchange 유형을 사용하려면 RabbitMQ plugin을 활성화해야 합니다: rabbitmq-plugins enable rabbitmq_consistent_hash_exchange.
rabbitmq_queue_base 설정은 다음과 같은 경우에 사용할 수 있습니다:
  • 서로 다른 테이블이 큐를 공유하도록 하여, 동일한 큐에 여러 consumer를 등록할 수 있게 함으로써 성능을 높일 수 있습니다. rabbitmq_num_consumers 및/또는 rabbitmq_num_queues 설정을 사용하는 경우, 이러한 매개변수가 같으면 큐를 정확히 일치시킬 수 있습니다.
  • 모든 메시지가 성공적으로 처리되지 않았을 때 특정 durable 큐에서 읽기를 복원할 수 있습니다. 특정 큐 하나에서 읽기를 재개하려면 rabbitmq_queue_base 설정에 해당 큐 이름을 지정하고 rabbitmq_num_consumersrabbitmq_num_queues는 지정하지 마십시오(기본값은 1). 특정 테이블에 대해 선언된 모든 큐에서 읽기를 재개하려면 rabbitmq_queue_base, rabbitmq_num_consumers, rabbitmq_num_queues에 이전과 동일한 설정을 지정하면 됩니다. 기본적으로 큐 이름은 테이블마다 고유합니다.
  • 큐가 durable로 선언되어 있고 자동 삭제되지 않으므로 재사용할 수 있습니다. (RabbitMQ CLI 도구를 사용해 삭제할 수 있습니다.)
성능을 높이기 위해 수신된 메시지는 max_insert_block_size 크기의 블록으로 그룹화됩니다. stream_flush_interval_ms 밀리초 안에 블록이 형성되지 않으면, 블록이 완전하지 않더라도 데이터가 테이블로 플러시됩니다. rabbitmq_num_consumers 및/또는 rabbitmq_num_queues 설정이 rabbitmq_exchange_type와 함께 지정된 경우:
  • rabbitmq-consistent-hash-exchange plugin이 활성화되어 있어야 합니다.
  • 게시된 메시지의 message_id property를 지정해야 합니다(각 메시지/Batch마다 고유해야 함).
삽입 쿼리에는 각 게시된 메시지에 추가되는 메시지 메타데이터가 있습니다: messageIDrepublished 플래그(true는 두 번 이상 게시되었음을 의미) - 메시지 headers를 통해 접근할 수 있습니다. 삽입과 materialized view에 동일한 테이블을 사용하지 마십시오. 예시:
  CREATE TABLE queue (
    key UInt64,
    value UInt64
  ) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
                            rabbitmq_exchange_name = 'exchange1',
                            rabbitmq_exchange_type = 'headers',
                            rabbitmq_routing_key_list = 'format=logs,type=report,year=2020',
                            rabbitmq_format = 'JSONEachRow',
                            rabbitmq_num_consumers = 5;

  CREATE TABLE daily (key UInt64, value UInt64)
    ENGINE = MergeTree() ORDER BY key;

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT key, value FROM queue;

  SELECT key, value FROM daily ORDER BY key;

가상 컬럼

  • _exchange_name - RabbitMQ exchange 이름입니다. 데이터 타입: String.
  • _channel_id - 메시지를 수신한 consumer가 선언된 ChannelID입니다. 데이터 타입: String.
  • _delivery_tag - 수신된 메시지의 DeliveryTag입니다. 채널별로 범위가 지정됩니다. 데이터 타입: UInt64.
  • _redelivered - 메시지의 redelivered 플래그입니다. 데이터 타입: UInt8.
  • _message_id - 수신된 메시지의 messageID입니다. 메시지가 게시될 때 설정된 경우 비어 있지 않습니다. 데이터 타입: String.
  • _timestamp - 수신된 메시지의 timestamp입니다. 메시지가 게시될 때 설정된 경우 비어 있지 않습니다. 데이터 타입: UInt64.
rabbitmq_handle_error_mode='stream'일 때 추가되는 가상 컬럼:
  • _raw_message - 성공적으로 파싱되지 않은 원시 메시지입니다. 데이터 타입: Nullable(String).
  • _error - 파싱 실패 시 발생한 예외 메시지입니다. 데이터 타입: Nullable(String).
참고: _raw_message_error 가상 컬럼은 파싱 중 예외가 발생한 경우에만 채워지며, 메시지가 성공적으로 파싱되면 항상 NULL입니다.

주의사항

테이블 정의에서 기본 컬럼 표현식 (DEFAULT, MATERIALIZED, ALIAS 등)을 지정하더라도 무시됩니다. 대신 각 컬럼에는 해당 타입의 기본값이 채워집니다.

데이터 포맷 지원

RabbitMQ 엔진은 ClickHouse에서 지원하는 모든 포맷을 지원합니다. 하나의 RabbitMQ 메시지에 들어가는 행 수는 포맷이 행 기반인지 블록 기반인지에 따라 달라집니다.
  • 행 기반 포맷에서는 하나의 RabbitMQ 메시지에 들어가는 행 수를 rabbitmq_max_rows_per_message 설정으로 제어할 수 있습니다.
  • 블록 기반 포맷에서는 블록을 더 작은 파트로 나눌 수는 없지만, 하나의 블록에 들어가는 행 수는 공통 설정 max_block_size로 제어할 수 있습니다.
마지막 수정일 2026년 6월 10일