NATS로 다음 작업을 수행할 수 있습니다:
- 메시지 subject를 게시하거나 구독합니다.
- 새 메시지가 도착하는 대로 처리합니다.
테이블 생성하기
nats_url– host:port(예:localhost:5672)..nats_subjects– NATS 테이블이 구독하거나 게시할 subject 목록입니다.foo.*.bar또는baz.>같은 와일드카드 subject를 지원합니다.nats_format– 메시지 포맷입니다.JSONEachRow와 같이 SQLFORMAT함수와 동일한 표기법을 사용합니다. 자세한 내용은 포맷 섹션을 참조하십시오.
nats_schema– 포맷에 스키마 정의가 필요한 경우 반드시 사용해야 하는 매개변수입니다. 예를 들어 Cap’n Proto는 스키마 파일의 경로와 루트schema.capnp:Message객체 이름이 필요합니다.nats_stream– NATS JetStream에 있는 기존 스트림의 이름입니다.nats_consumer– NATS JetStream에 있는 기존 durable pull consumer의 이름입니다.nats_num_consumers– 테이블당 consumer 수입니다. 기본값:1. NATS core에서만, 단일 consumer의 처리량이 부족한 경우 consumer를 더 지정하십시오.nats_queue_group– NATS subscriber의 큐 그룹 이름입니다. 기본값은 테이블 이름입니다.nats_max_reconnect– 더 이상 권장되지 않으며 아무런 효과가 없습니다. 재연결은nats_reconnect_wait타임아웃에 따라 계속 수행됩니다.nats_reconnect_wait– 각 재연결 시도 사이에 대기할 시간(밀리초)입니다. 기본값:5000.nats_server_list- 연결을 위한 서버 목록입니다. NATS 클러스터에 연결할 때 지정할 수 있습니다.nats_skip_broken_messages- 블록당 스키마와 호환되지 않는 메시지에 대한 NATS 메시지 파서 허용치입니다. 기본값:0.nats_skip_broken_messages = N이면 엔진은 파싱할 수 없는 NATS 메시지 N개를 건너뜁니다(메시지 하나는 데이터 한 행과 같습니다).nats_max_block_size- NATS에서 데이터를 플러시하기 위해 폴링으로 수집하는 행 수입니다. 기본값: max_insert_block_size.nats_flush_interval_ms- NATS에서 읽은 데이터를 플러시하는 타임아웃입니다. 기본값: stream_flush_interval_ms.nats_username- NATS 사용자 이름입니다.nats_password- NATS 비밀번호입니다.nats_token- NATS 인증 토큰입니다.nats_credential_file- NATS 자격 증명 파일의 경로입니다.nats_startup_connect_tries- 시작 시 연결 시도 횟수입니다. 기본값:5.nats_max_rows_per_message— 행 기반 포맷에서 하나의 NATS 메시지에 기록할 수 있는 최대 행 수입니다. (기본값:1)nats_handle_error_mode— NATS 엔진의 오류 처리 방식입니다. 가능한 값: default(메시지 파싱에 실패하면 예외가 발생함), stream(예외 메시지와 원시 메시지가 가상 컬럼_error및_raw_message에 저장됨).
nats_secure = 1로 설정하십시오.
인증서 검증은 CLICKHOUSE_NATS_TLS_SECURE 환경 변수로 제어됩니다;
인증서가 만료되었거나, 자체 서명되었거나, 누락되었거나, 그 밖의 이유로 유효하지 않은 경우 CLICKHOUSE_NATS_TLS_SECURE=0으로 설정해 검증을 비활성화하십시오.
NATS 테이블에 쓰기:
테이블이 하나의 subject에서만 읽는 경우, 모든 삽입은 동일한 subject로 게시됩니다.
하지만 테이블이 여러 subject에서 읽는 경우에는 데이터를 게시할 subject를 지정해야 합니다.
따라서 여러 subject를 읽는 테이블에 삽입할 때는 항상 stream_like_engine_insert_queue 설정이 필요합니다.
테이블이 읽는 subject 중 하나를 선택해 해당 subject로 데이터를 게시할 수 있습니다. 예시:
설명
SELECT가 그다지 유용하지 않습니다. 더 실용적인 방법은 materialized view를 사용해 실시간 스레드를 만드는 것입니다. 이를 위해 다음을 수행합니다.
- 엔진을 사용해 NATS consumer를 생성하고 이를 데이터 스트림으로 간주합니다.
- 원하는 구조로 테이블을 생성합니다.
- 엔진의 데이터를 변환해 앞서 생성한 테이블에 저장하는 materialized view를 생성합니다.
MATERIALIZED VIEW가 엔진에 연결되면 백그라운드에서 데이터 수집을 시작합니다. 이렇게 하면 NATS에서 메시지를 지속적으로 받아 SELECT를 사용해 필요한 포맷으로 변환할 수 있습니다.
하나의 NATS 테이블에는 원하는 수만큼 materialized view를 만들 수 있습니다. 이들은 테이블에서 직접 데이터를 읽는 대신 새 레코드(블록 단위)를 받으므로, 여러 테이블에 서로 다른 수준의 상세도(그룹화 및 집계 적용 여부에 따라)로 쓸 수 있습니다.
예시:
ALTER를 사용해 대상 테이블을 변경하려면, 대상 테이블과 뷰의 데이터 사이에 불일치가 생기지 않도록 materialized view를 비활성화하는 것이 좋습니다.
가상 컬럼
_subject- NATS 메시지 subject입니다. 데이터 타입:String.
nats_handle_error_mode='stream'일 때 추가되는 가상 컬럼:
_raw_message- 성공적으로 파싱되지 않은 원시 메시지입니다. 데이터 타입:Nullable(String)._error- 파싱 실패 중 발생한 예외 메시지입니다. 데이터 타입:Nullable(String).
_raw_message 및 _error 가상 컬럼은 파싱 중 예외가 발생한 경우에만 채워지며, 메시지가 성공적으로 파싱된 경우에는 항상 NULL입니다.
데이터 포맷 지원
- 행 기반 포맷에서는 하나의 NATS 메시지에 포함되는 행 수를
nats_max_rows_per_message설정으로 제어할 수 있습니다. - 블록 기반 포맷에서는 블록을 더 작은 부분으로 나눌 수는 없지만, 하나의 블록에 포함되는 행 수는 일반 설정인 max_block_size로 제어할 수 있습니다.
JetStream 사용
nats 유틸리티를 사용할 수 있습니다. 예시는 다음과 같습니다.
stream 생성
stream 생성
durable pull consumer 생성
durable pull consumer 생성
nats_stream, nats_consumer_name, nats_subjects를 초기화해야 합니다: