메인 콘텐츠로 건너뛰기
이 엔진을 사용하면 ClickHouse를 NATS와 통합할 수 있습니다. NATS로 다음 작업을 수행할 수 있습니다:
  • 메시지 subject를 게시하거나 구독합니다.
  • 새 메시지가 도착하는 대로 처리합니다.

테이블 생성하기

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = NATS SETTINGS
    nats_url = 'host:port',
    nats_subjects = 'subject1,subject2,...',
    nats_format = 'data_format'[,]
    [nats_schema = '',]
    [nats_num_consumers = N,]
    [nats_queue_group = 'group_name',]
    [nats_secure = false,]
    [nats_max_reconnect = N,]
    [nats_reconnect_wait = N,]
    [nats_server_list = 'host1:port1,host2:port2,...',]
    [nats_skip_broken_messages = N,]
    [nats_max_block_size = N,]
    [nats_flush_interval_ms = N,]
    [nats_username = 'user',]
    [nats_password = 'password',]
    [nats_token = 'clickhouse',]
    [nats_credential_file = '/var/nats_credentials',]
    [nats_startup_connect_tries = '5']
    [nats_max_rows_per_message = 1,]
    [nats_handle_error_mode = 'default']
필수 매개변수:
  • nats_url – host:port(예: localhost:5672)..
  • nats_subjects – NATS 테이블이 구독하거나 게시할 subject 목록입니다. foo.*.bar 또는 baz.> 같은 와일드카드 subject를 지원합니다.
  • nats_format – 메시지 포맷입니다. JSONEachRow와 같이 SQL FORMAT 함수와 동일한 표기법을 사용합니다. 자세한 내용은 포맷 섹션을 참조하십시오.
선택적 매개변수:
  • nats_schema – 포맷에 스키마 정의가 필요한 경우 반드시 사용해야 하는 매개변수입니다. 예를 들어 Cap’n Proto는 스키마 파일의 경로와 루트 schema.capnp:Message 객체 이름이 필요합니다.
  • nats_stream – NATS JetStream에 있는 기존 스트림의 이름입니다.
  • nats_consumer – NATS JetStream에 있는 기존 durable pull consumer의 이름입니다.
  • nats_num_consumers – 테이블당 consumer 수입니다. 기본값: 1. NATS core에서만, 단일 consumer의 처리량이 부족한 경우 consumer를 더 지정하십시오.
  • nats_queue_group – NATS subscriber의 큐 그룹 이름입니다. 기본값은 테이블 이름입니다.
  • nats_max_reconnect – 더 이상 권장되지 않으며 아무런 효과가 없습니다. 재연결은 nats_reconnect_wait 타임아웃에 따라 계속 수행됩니다.
  • nats_reconnect_wait – 각 재연결 시도 사이에 대기할 시간(밀리초)입니다. 기본값: 5000.
  • nats_server_list - 연결을 위한 서버 목록입니다. NATS 클러스터에 연결할 때 지정할 수 있습니다.
  • nats_skip_broken_messages - 블록당 스키마와 호환되지 않는 메시지에 대한 NATS 메시지 파서 허용치입니다. 기본값: 0. nats_skip_broken_messages = N이면 엔진은 파싱할 수 없는 NATS 메시지 N개를 건너뜁니다(메시지 하나는 데이터 한 행과 같습니다).
  • nats_max_block_size - NATS에서 데이터를 플러시하기 위해 폴링으로 수집하는 행 수입니다. 기본값: max_insert_block_size.
  • nats_flush_interval_ms - NATS에서 읽은 데이터를 플러시하는 타임아웃입니다. 기본값: stream_flush_interval_ms.
  • nats_username - NATS 사용자 이름입니다.
  • nats_password - NATS 비밀번호입니다.
  • nats_token - NATS 인증 토큰입니다.
  • nats_credential_file - NATS 자격 증명 파일의 경로입니다.
  • nats_startup_connect_tries - 시작 시 연결 시도 횟수입니다. 기본값: 5.
  • nats_max_rows_per_message — 행 기반 포맷에서 하나의 NATS 메시지에 기록할 수 있는 최대 행 수입니다. (기본값: 1)
  • nats_handle_error_mode — NATS 엔진의 오류 처리 방식입니다. 가능한 값: default(메시지 파싱에 실패하면 예외가 발생함), stream(예외 메시지와 원시 메시지가 가상 컬럼 _error_raw_message에 저장됨).
SSL 연결: 보안 connection을 사용하려면 nats_secure = 1로 설정하십시오. 인증서 검증은 CLICKHOUSE_NATS_TLS_SECURE 환경 변수로 제어됩니다; 인증서가 만료되었거나, 자체 서명되었거나, 누락되었거나, 그 밖의 이유로 유효하지 않은 경우 CLICKHOUSE_NATS_TLS_SECURE=0으로 설정해 검증을 비활성화하십시오. NATS 테이블에 쓰기: 테이블이 하나의 subject에서만 읽는 경우, 모든 삽입은 동일한 subject로 게시됩니다. 하지만 테이블이 여러 subject에서 읽는 경우에는 데이터를 게시할 subject를 지정해야 합니다. 따라서 여러 subject를 읽는 테이블에 삽입할 때는 항상 stream_like_engine_insert_queue 설정이 필요합니다. 테이블이 읽는 subject 중 하나를 선택해 해당 subject로 데이터를 게시할 수 있습니다. 예시:
  CREATE TABLE queue (
    key UInt64,
    value UInt64
  ) ENGINE = NATS
    SETTINGS nats_url = 'localhost:4444',
             nats_subjects = 'subject1,subject2',
             nats_format = 'JSONEachRow';

  INSERT INTO queue
  SETTINGS stream_like_engine_insert_queue = 'subject2'
  VALUES (1, 1);
포맷 설정은 NATS 관련 설정과 함께 추가할 수도 있습니다. 예시:
  CREATE TABLE queue (
    key UInt64,
    value UInt64,
    date DateTime
  ) ENGINE = NATS
    SETTINGS nats_url = 'localhost:4444',
             nats_subjects = 'subject1',
             nats_format = 'JSONEachRow',
             date_time_input_format = 'best_effort';
ClickHouse 구성 파일을 사용해 NATS 서버 구성을 추가할 수 있습니다. 구체적으로는 NATS 엔진의 비밀번호를 추가할 수 있습니다:
<nats>
    <user>click</user>
    <password>house</password>
    <token>clickhouse</token>
</nats>

설명

각 메시지는 한 번만 읽을 수 있으므로(디버깅은 예외) 메시지를 읽는 용도로는 SELECT가 그다지 유용하지 않습니다. 더 실용적인 방법은 materialized view를 사용해 실시간 스레드를 만드는 것입니다. 이를 위해 다음을 수행합니다.
  1. 엔진을 사용해 NATS consumer를 생성하고 이를 데이터 스트림으로 간주합니다.
  2. 원하는 구조로 테이블을 생성합니다.
  3. 엔진의 데이터를 변환해 앞서 생성한 테이블에 저장하는 materialized view를 생성합니다.
MATERIALIZED VIEW가 엔진에 연결되면 백그라운드에서 데이터 수집을 시작합니다. 이렇게 하면 NATS에서 메시지를 지속적으로 받아 SELECT를 사용해 필요한 포맷으로 변환할 수 있습니다. 하나의 NATS 테이블에는 원하는 수만큼 materialized view를 만들 수 있습니다. 이들은 테이블에서 직접 데이터를 읽는 대신 새 레코드(블록 단위)를 받으므로, 여러 테이블에 서로 다른 수준의 상세도(그룹화 및 집계 적용 여부에 따라)로 쓸 수 있습니다. 예시:
  CREATE TABLE queue (
    key UInt64,
    value UInt64
  ) ENGINE = NATS
    SETTINGS nats_url = 'localhost:4444',
             nats_subjects = 'subject1',
             nats_format = 'JSONEachRow',
             date_time_input_format = 'best_effort';

  CREATE TABLE daily (key UInt64, value UInt64)
    ENGINE = MergeTree() ORDER BY key;

  CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT key, value FROM queue;

  SELECT key, value FROM daily ORDER BY key;
스트림 데이터 수신을 중지하거나 변환 로직을 변경하려면 materialized view를 detach하십시오:
  DETACH TABLE consumer;
  ATTACH TABLE consumer;
ALTER를 사용해 대상 테이블을 변경하려면, 대상 테이블과 뷰의 데이터 사이에 불일치가 생기지 않도록 materialized view를 비활성화하는 것이 좋습니다.

가상 컬럼

  • _subject - NATS 메시지 subject입니다. 데이터 타입: String.
nats_handle_error_mode='stream'일 때 추가되는 가상 컬럼:
  • _raw_message - 성공적으로 파싱되지 않은 원시 메시지입니다. 데이터 타입: Nullable(String).
  • _error - 파싱 실패 중 발생한 예외 메시지입니다. 데이터 타입: Nullable(String).
참고: _raw_message_error 가상 컬럼은 파싱 중 예외가 발생한 경우에만 채워지며, 메시지가 성공적으로 파싱된 경우에는 항상 NULL입니다.

데이터 포맷 지원

NATS 엔진은 ClickHouse에서 지원하는 모든 포맷을 지원합니다. 하나의 NATS 메시지에 포함되는 행 수는 해당 포맷이 행 기반인지 블록 기반인지에 따라 달라집니다.
  • 행 기반 포맷에서는 하나의 NATS 메시지에 포함되는 행 수를 nats_max_rows_per_message 설정으로 제어할 수 있습니다.
  • 블록 기반 포맷에서는 블록을 더 작은 부분으로 나눌 수는 없지만, 하나의 블록에 포함되는 행 수는 일반 설정인 max_block_size로 제어할 수 있습니다.

JetStream 사용

NATS JetStream과 함께 NATS 엔진을 사용하려면 먼저 NATS 스트림과 durable pull consumer를 생성해야 합니다. 이를 위해 NATS CLI 패키지의 nats 유틸리티를 사용할 수 있습니다. 예시는 다음과 같습니다.
$ nats stream add
? Stream Name stream_name
? Subjects stream_subject
? Storage file
? Replication 1
? Retention Policy Limits
? Discard Policy Old
? Stream Messages Limit -1
? Per Subject Messages Limit -1
? Total Stream Size -1
? Message TTL -1
? Max Message Size -1
? Duplicate tracking time window 2m0s
? Allow message Roll-ups No
? Allow message deletion Yes
? Allow purging subjects or the entire stream Yes
Stream stream_name was created

Information for Stream stream_name created 2025-10-03 14:12:51

                Subjects: stream_subject
                Replicas: 1
                 Storage: File

Options:

               Retention: Limits
         Acknowledgments: true
          Discard Policy: Old
        Duplicate Window: 2m0s
              Direct Get: true
       Allows Msg Delete: true
            Allows Purge: true
  Allows Per-Message TTL: false
          Allows Rollups: false

Limits:

        Maximum Messages: unlimited
     Maximum Per Subject: unlimited
           Maximum Bytes: unlimited
             Maximum Age: unlimited
    Maximum Message Size: unlimited
       Maximum Consumers: unlimited

State:

                Messages: 0
                   Bytes: 0 B
          First Sequence: 0
           Last Sequence: 0
        Active Consumers: 0
$ nats consumer add
? Select a Stream stream_name
? Consumer name consumer_name
? Delivery target (empty for Pull Consumers) 
? Start policy (all, new, last, subject, 1h, msg sequence) all
? Acknowledgment policy explicit
? Replay policy instant
? Filter Stream by subjects (blank for all) 
? Maximum Allowed Deliveries -1
? Maximum Acknowledgments Pending 0
? Deliver headers only without bodies No
? Add a Retry Backoff Policy No
Information for Consumer stream_name > consumer_name created 2025-10-03T14:13:51+03:00

Configuration:

                    Name: consumer_name
               Pull Mode: true
          Deliver Policy: All
              Ack Policy: Explicit
                Ack Wait: 30.00s
           Replay Policy: Instant
         Max Ack Pending: 1,000
       Max Waiting Pulls: 512

State:

  Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
    Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0
        Outstanding Acks: 0 out of maximum 1,000
    Redelivered Messages: 0
    Unprocessed Messages: 0
           Waiting Pulls: 0 of maximum 512
스트림과 durable pull consumer를 생성한 후에는 NATS 엔진 테이블을 만들 수 있습니다. 이를 위해 nats&#95;stream, nats&#95;consumer&#95;name, nats&#95;subjects를 초기화해야 합니다:
CREATE TABLE nats_jet_stream (
    key UInt64,
    value UInt64
  ) ENGINE NATS 
    SETTINGS  nats_url = 'localhost:4222',
              nats_stream = 'stream_name',
              nats_consumer_name = 'consumer_name',
              nats_subjects = 'stream_subject',
              nats_format = 'JSONEachRow';
마지막 수정일 2026년 6월 10일