RabbitMQ를 사용하면 다음 작업을 수행할 수 있습니다:
- 데이터 흐름을 게시하거나 구독할 수 있습니다.
- 스트림을 사용할 수 있게 되는 즉시 처리할 수 있습니다.
테이블 생성
rabbitmq_host_port– host:port(예:localhost:5672).rabbitmq_exchange_name– RabbitMQ exchange 이름.rabbitmq_format– 메시지 포맷.JSONEachRow와 같이 SQLFORMAT함수와 같은 표기법을 사용합니다. 자세한 내용은 포맷 섹션을 참조하십시오.
rabbitmq_exchange_type– RabbitMQ exchange 유형입니다:direct,fanout,topic,headers,consistent_hash. 기본값:fanout.rabbitmq_routing_key_list– 쉼표로 구분된 라우팅 키 목록입니다.rabbitmq_schema– 포맷에 schema 정의가 필요한 경우 반드시 사용해야 하는 매개변수입니다. 예를 들어 Cap’n Proto는 schema 파일의 경로와 루트schema.capnp:Message객체 이름이 필요합니다.rabbitmq_num_consumers– table당 consumer 수입니다. consumer 1개의 처리량이 충분하지 않으면 consumer 수를 늘리십시오. 기본값:1rabbitmq_num_queues– 전체 큐 수입니다. 이 값을 늘리면 성능이 크게 향상될 수 있습니다. 기본값:1.rabbitmq_queue_base- 큐 이름에 대한 힌트를 지정합니다. 이 설정의 사용 사례는 아래에 설명되어 있습니다.rabbitmq_persistent- 1(true)로 설정하면 삽입 쿼리의 전달 모드가 2로 설정됩니다(메시지가 ‘persistent’로 표시됨). 기본값:0.rabbitmq_skip_broken_messages– 블록당 schema와 호환되지 않는 메시지에 대해 RabbitMQ 메시지 parser가 허용하는 한도입니다.rabbitmq_skip_broken_messages = N이면 engine은 parse할 수 없는 RabbitMQ 메시지 N개를 건너뜁니다(메시지 하나는 데이터 행 하나에 해당함). 기본값:0.rabbitmq_max_block_size- RabbitMQ에서 데이터를 플러시하기 전에 수집할 행 수입니다. 기본값: max_insert_block_size.rabbitmq_flush_interval_ms- RabbitMQ에서 데이터를 플러시하기 위한 timeout입니다. 기본값: stream_flush_interval_ms.rabbitmq_queue_settings_list- 큐를 생성할 때 RabbitMQ 설정을 지정할 수 있습니다. 사용 가능한 설정:x-max-length,x-max-length-bytes,x-message-ttl,x-expires,x-priority,x-max-priority,x-overflow,x-dead-letter-exchange,x-queue-type. 큐에서는durable설정이 자동으로 활성화됩니다.rabbitmq_address- 연결에 사용할 주소입니다. 이 설정 또는rabbitmq_host_port를 사용하십시오.rabbitmq_vhost- RabbitMQ vhost입니다. 기본값:'\'.rabbitmq_queue_consume- 사용자 정의 큐를 사용하고 exchange, 큐, binding 선언 등 RabbitMQ 설정은 수행하지 않습니다. 기본값:false.rabbitmq_username- RabbitMQ 사용자 이름입니다.rabbitmq_password- RabbitMQ 비밀번호입니다.reject_unhandled_messages- 오류 발생 시 메시지를 거부합니다(RabbitMQ negative acknowledgement 전송).rabbitmq_queue_settings_list에x-dead-letter-exchange가 정의되어 있으면 이 설정은 자동으로 활성화됩니다.rabbitmq_commit_on_select- select 쿼리가 실행될 때 메시지를 commit합니다. 기본값:false.rabbitmq_max_rows_per_message— 행 기반 포맷에서 하나의 RabbitMQ 메시지에 기록할 수 있는 최대 행 수입니다. 기본값:1.rabbitmq_empty_queue_backoff_start_ms— RabbitMQ 큐가 비어 있을 때 읽기를 다시 예약하기 위한 시작 백오프 지점입니다.rabbitmq_empty_queue_backoff_end_ms— RabbitMQ 큐가 비어 있을 때 읽기를 다시 예약하기 위한 종료 백오프 지점입니다.rabbitmq_empty_queue_backoff_step_ms— RabbitMQ 큐가 비어 있을 때 읽기를 다시 예약하기 위한 백오프 간격입니다.rabbitmq_handle_error_mode— RabbitMQ engine에서 오류를 처리하는 방식입니다. 가능한 값: default(메시지를 parse하지 못하면 예외 발생), stream(예외 메시지와 원시 메시지를 가상 컬럼_error및_raw_message에 저장), dead_letter_queue(오류 관련 데이터를 system.dead_letter_queue에 저장).
SSL 연결
rabbitmq_secure = 1 또는 amqps를 사용하십시오: rabbitmq_address = 'amqps://guest:guest@localhost/vhost'.
사용 중인 라이브러리의 기본 동작은 생성된 TLS 연결이 충분히 안전한지 확인하지 않는 것입니다. 인증서가 만료되었거나, 자체 서명되었거나, 누락되었거나, 유효하지 않더라도 연결은 그대로 허용됩니다. 더 엄격한 인증서 검사는 향후 구현될 수 있습니다.
또한 rabbitmq 관련 설정과 함께 포맷 설정도 추가할 수 있습니다.
예시:
설명
SELECT를 사용하는 방법은(디버깅 목적을 제외하면) 그다지 유용하지 않습니다. 각 메시지는 한 번만 읽을 수 있기 때문입니다. 더 실용적인 방법은 materialized view를 사용해 실시간 스레드를 만드는 것입니다. 이를 위해 다음과 같이 하십시오:
- 엔진을 사용해 RabbitMQ consumer를 만들고 이를 데이터 스트림으로 간주합니다.
- 원하는 구조의 테이블을 생성합니다.
- 엔진의 데이터를 변환해 앞서 생성한 테이블에 넣는 materialized view를 생성합니다.
MATERIALIZED VIEW를 엔진에 조인하면 백그라운드에서 데이터 수집이 시작됩니다. 이를 통해 RabbitMQ에서 메시지를 지속적으로 수신하고, SELECT를 사용해 필요한 포맷으로 변환할 수 있습니다.
하나의 RabbitMQ 테이블에는 원하는 수만큼 materialized view를 둘 수 있습니다.
데이터는 rabbitmq_exchange_type 및 지정된 rabbitmq_routing_key_list에 따라 전달할 수 있습니다.
테이블당 exchange는 최대 1개만 사용할 수 있습니다. 하나의 exchange를 여러 테이블이 공유할 수 있으며, 이를 통해 여러 테이블로 동시에 라우팅할 수 있습니다.
Exchange 유형 옵션:
direct- 정확히 일치하는 키를 기준으로 라우팅합니다. 예시 테이블 키 목록:key1,key2,key3,key4,key5, 메시지 키는 이들 중 어느 것과도 일치할 수 있습니다.fanout- 키와 관계없이 모든 테이블로 라우팅합니다(exchange 이름이 같은 경우).topic- 점으로 구분된 키 패턴을 기준으로 라우팅합니다. 예시:*.logs,records.*.*.2020,*.2018,*.2019,*.2020.headers-x-match=all또는x-match=any설정과 함께key=value일치를 기준으로 라우팅합니다. 예시 테이블 키 목록:x-match=all,format=logs,type=report,year=2020.consistent_hash- 모든 바인딩된 테이블에 데이터가 고르게 분산됩니다(exchange 이름이 같은 경우). 이 exchange 유형을 사용하려면 RabbitMQ plugin을 활성화해야 합니다:rabbitmq-plugins enable rabbitmq_consistent_hash_exchange.
rabbitmq_queue_base 설정은 다음과 같은 경우에 사용할 수 있습니다:
- 서로 다른 테이블이 큐를 공유하도록 하여, 동일한 큐에 여러 consumer를 등록할 수 있게 함으로써 성능을 높일 수 있습니다.
rabbitmq_num_consumers및/또는rabbitmq_num_queues설정을 사용하는 경우, 이러한 매개변수가 같으면 큐를 정확히 일치시킬 수 있습니다. - 모든 메시지가 성공적으로 처리되지 않았을 때 특정 durable 큐에서 읽기를 복원할 수 있습니다. 특정 큐 하나에서 읽기를 재개하려면
rabbitmq_queue_base설정에 해당 큐 이름을 지정하고rabbitmq_num_consumers및rabbitmq_num_queues는 지정하지 마십시오(기본값은 1). 특정 테이블에 대해 선언된 모든 큐에서 읽기를 재개하려면rabbitmq_queue_base,rabbitmq_num_consumers,rabbitmq_num_queues에 이전과 동일한 설정을 지정하면 됩니다. 기본적으로 큐 이름은 테이블마다 고유합니다. - 큐가 durable로 선언되어 있고 자동 삭제되지 않으므로 재사용할 수 있습니다. (RabbitMQ CLI 도구를 사용해 삭제할 수 있습니다.)
rabbitmq_num_consumers 및/또는 rabbitmq_num_queues 설정이 rabbitmq_exchange_type와 함께 지정된 경우:
rabbitmq-consistent-hash-exchangeplugin이 활성화되어 있어야 합니다.- 게시된 메시지의
message_idproperty를 지정해야 합니다(각 메시지/Batch마다 고유해야 함).
messageID와 republished 플래그(true는 두 번 이상 게시되었음을 의미) - 메시지 headers를 통해 접근할 수 있습니다.
삽입과 materialized view에 동일한 테이블을 사용하지 마십시오.
예시:
가상 컬럼
_exchange_name- RabbitMQ exchange 이름입니다. 데이터 타입:String._channel_id- 메시지를 수신한 consumer가 선언된 ChannelID입니다. 데이터 타입:String._delivery_tag- 수신된 메시지의 DeliveryTag입니다. 채널별로 범위가 지정됩니다. 데이터 타입:UInt64._redelivered- 메시지의redelivered플래그입니다. 데이터 타입:UInt8._message_id- 수신된 메시지의 messageID입니다. 메시지가 게시될 때 설정된 경우 비어 있지 않습니다. 데이터 타입:String._timestamp- 수신된 메시지의 timestamp입니다. 메시지가 게시될 때 설정된 경우 비어 있지 않습니다. 데이터 타입:UInt64.
rabbitmq_handle_error_mode='stream'일 때 추가되는 가상 컬럼:
_raw_message- 성공적으로 파싱되지 않은 원시 메시지입니다. 데이터 타입:Nullable(String)._error- 파싱 실패 시 발생한 예외 메시지입니다. 데이터 타입:Nullable(String).
_raw_message 및 _error 가상 컬럼은 파싱 중 예외가 발생한 경우에만 채워지며, 메시지가 성공적으로 파싱되면 항상 NULL입니다.
주의사항
DEFAULT, MATERIALIZED, ALIAS 등)을 지정하더라도 무시됩니다. 대신 각 컬럼에는 해당 타입의 기본값이 채워집니다.
데이터 포맷 지원
- 행 기반 포맷에서는 하나의 RabbitMQ 메시지에 들어가는 행 수를
rabbitmq_max_rows_per_message설정으로 제어할 수 있습니다. - 블록 기반 포맷에서는 블록을 더 작은 파트로 나눌 수는 없지만, 하나의 블록에 들어가는 행 수는 공통 설정 max_block_size로 제어할 수 있습니다.