메인 콘텐츠로 건너뛰기
Cloud에서의 Distributed 엔진ClickHouse Cloud에서 분산 테이블 엔진을 생성하려면 remoteremoteSecure 테이블 함수를 사용할 수 있습니다. ClickHouse Cloud에서는 Distributed(...) 구문을 사용할 수 없습니다.
Distributed 엔진을 사용하는 테이블은 자체적으로 데이터를 저장하지 않지만, 여러 서버에서 분산 쿼리 처리를 수행할 수 있습니다. 읽기 작업은 자동으로 병렬화됩니다. 읽는 동안에는 원격 서버에 테이블 인덱스가 있는 경우 해당 인덱스를 사용합니다.

테이블 만들기

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
[SETTINGS name=value, ...]

테이블에서

Distributed 테이블이 현재 서버의 테이블을 가리키는 경우 해당 테이블의 스키마(schema)를 사용할 수 있습니다:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] AS [db2.]name2 ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]]) [SETTINGS name=value, ...]

분산 매개변수

매개변수설명
cluster서버 구성 파일에 있는 클러스터 이름
database원격 데이터베이스의 이름
table원격 테이블의 이름
sharding_key (Optional)세그먼트 분할 키입니다.
다음 경우에는 sharding_key를 지정해야 합니다.
  • 분산 테이블에 INSERTs를 수행하는 경우(테이블 엔진이 데이터를 어떻게 분할할지 결정하려면 sharding_key가 필요하기 때문입니다). 다만 insert_distributed_one_random_shard 설정이 활성화되어 있으면 INSERTs에는 세그먼트 분할 키가 필요하지 않습니다.
  • optimize_skip_unused_shards와 함께 사용하는 경우 sharding_key는 쿼리할 세그먼트를 결정하는 데 필요합니다
policy_name (Optional)정책 이름입니다. 백그라운드 전송에 사용할 임시 파일을 저장하는 데 사용됩니다
관련 항목

분산 설정

설정설명기본값
fsync_after_insertDistributed에 대한 백그라운드 삽입 후 파일 데이터에 fsync를 수행합니다. OS가 initiator 노드의 디스크에서 삽입된 전체 데이터를 파일로 플러시했음을 보장합니다.false
fsync_directories디렉터리에 대해 fsync를 수행합니다. 분산 테이블에 대한 백그라운드 삽입 관련 작업(삽입 후, 데이터를 세그먼트로 전송한 후 등) 이후에 OS가 디렉터리 메타데이터를 갱신했음을 보장합니다.false
skip_unavailable_shardstrue이면 ClickHouse는 사용할 수 없는 세그먼트를 자동으로 건너뜁니다. 세그먼트는 다음 경우 사용할 수 없는 것으로 표시됩니다. 1) connection 실패로 인해 세그먼트에 연결할 수 없는 경우 2) DNS를 통해 세그먼트를 확인할 수 없는 경우 3) 세그먼트에 table이 없는 경우false
bytes_to_throw_insert백그라운드 INSERT에 대해 대기 중인 compressed 바이트 수가 이 값을 초과하면 예외가 발생합니다. 0이면 예외를 발생시키지 않습니다.0
bytes_to_delay_insert백그라운드 INSERT에 대해 대기 중인 compressed 바이트 수가 이 값을 초과하면 쿼리가 지연됩니다. 0이면 지연하지 않습니다.0
max_delay_to_insert백그라운드 전송을 위해 대기 중인 바이트가 많을 때 분산 테이블에 데이터를 삽입하는 최대 지연 시간(초)입니다.60
background_insert_batchdistributed_background_insert_batch와 동일합니다0
background_insert_split_batch_on_failuredistributed_background_insert_split_batch_on_failure와 동일합니다0
background_insert_sleep_time_msdistributed_background_insert_sleep_time_ms와 동일합니다0
background_insert_max_sleep_time_msdistributed_background_insert_max_sleep_time_ms와 동일합니다0
flush_on_detachDETACH/DROP/server 종료 시 원격 노드로 데이터를 플러시합니다.true
내구성 설정 (fsync_...):
  • 데이터가 먼저 initiator 노드 디스크에 저장되고, 이후 백그라운드에서 세그먼트로 전송되는 백그라운드 INSERT(distributed_foreground_insert=false)에만 영향을 줍니다.
  • INSERT 성능이 크게 저하될 수 있습니다.
  • 분산 테이블 폴더 내부에 저장된 데이터를 삽입을 수락한 노드에 기록하는 데 영향을 줍니다. 기본 MergeTree 테이블에 대한 데이터 기록 보장이 필요하다면 system.merge_tree_settings의 내구성 설정(...fsync...)을 참조하십시오.
삽입 제한 설정 (..._insert)에 대해서는 다음도 참조하십시오.
예시
CREATE TABLE hits_all AS hits
ENGINE = Distributed(logs, default, hits[, sharding_key[, policy_name]])
SETTINGS
    fsync_after_insert=0,
    fsync_directories=0;
데이터는 logs 클러스터의 모든 서버에 있는 default.hits 테이블에서 읽어옵니다. 데이터는 단순히 읽기만 하는 것이 아니라, 가능한 범위 내에서 원격 서버에서 부분적으로 처리되기도 합니다. 예를 들어 GROUP BY가 포함된 쿼리의 경우, 데이터는 원격 서버에서 집계되고 집계 함수의 중간 상태가 요청 서버로 전송됩니다. 그런 다음 데이터가 추가로 집계됩니다. 데이터베이스 이름 대신 문자열을 반환하는 상수 표현식을 사용할 수 있습니다. 예시: currentDatabase().

클러스터

클러스터는 서버 구성 파일에서 설정합니다:
<remote_servers>
    <logs>
        <!-- 분산 쿼리를 위한 서버 간 클러스터별 시크릿
             기본값: 시크릿 없음(인증이 수행되지 않음)

             설정하면 분산 쿼리가 샤드에서 검증되므로 최소한 다음을 충족해야 합니다:
             - 해당 클러스터가 샤드에 존재해야 합니다.
             - 해당 클러스터가 동일한 시크릿을 가져야 합니다.

             또한 (더 중요하게) initial_user가
             쿼리의 현재 사용자로 사용됩니다.
        -->
        <!-- <secret></secret> -->
        
        <!-- 선택 사항. 이 클러스터에서 분산 DDL 쿼리(ON CLUSTER 절)를 허용할지 여부. 기본값: true(허용). -->        
        <!-- <allow_distributed_ddl_queries>true</allow_distributed_ddl_queries> -->
        
        <shard>
            <!-- 선택 사항. 데이터를 쓸 때의 샤드 가중치. 기본값: 1. -->
            <weight>1</weight>
            <!-- 선택 사항. 샤드 이름. 비어 있지 않아야 하며 클러스터 내 다른 샤드와 고유해야 합니다. 지정하지 않으면 비어 있습니다. -->
            <name>shard_01</name>
            <!-- 선택 사항. 레플리카 중 하나에만 데이터를 쓸지 여부. 기본값: false(모든 레플리카에 데이터를 씀). -->
            <internal_replication>false</internal_replication>
            <replica>
                <!-- 선택 사항. 로드 밸런싱을 위한 레플리카 우선순위(load_balancing 설정 참조). 기본값: 1(값이 작을수록 우선순위가 높음). -->
                <priority>1</priority>
                <host>example01-01-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-01-2</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <weight>2</weight>
            <name>shard_02</name>
            <internal_replication>false</internal_replication>
            <replica>
                <host>example01-02-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-02-2</host>
                <secure>1</secure>
                <port>9440</port>
            </replica>
        </shard>
    </logs>
</remote_servers>
여기서는 logs라는 이름의 클러스터를 정의하며, 2개의 세그먼트로 구성되고 각 세그먼트에는 2개의 레플리카가 포함됩니다. 세그먼트는 서로 다른 데이터 파트를 포함하는 서버를 의미합니다(모든 데이터를 읽으려면 모든 세그먼트에 접근해야 합니다). 레플리카는 데이터를 복제하는 서버입니다(모든 데이터를 읽으려면 레플리카 중 어느 하나에 접근하면 됩니다). 클러스터 이름에는 점을 포함할 수 없습니다. 각 서버에 대해 host, port와 선택적으로 user, password, secure, compression, bind_host 매개변수를 지정합니다:
매개변수설명기본값
host원격 서버의 주소입니다. 도메인, IPv4 주소 또는 IPv6 주소를 사용할 수 있습니다. 도메인을 지정하면 서버가 시작될 때 DNS 요청을 수행하며, 그 결과는 서버가 실행되는 동안 저장됩니다. DNS 요청이 실패하면 서버는 시작되지 않습니다. DNS 레코드를 변경한 경우 서버를 다시 시작하십시오.-
port메시지 교환에 사용하는 TCP 포트입니다(configtcp_port, 일반적으로 9000으로 설정됨). http_port와 혼동하지 마십시오.-
user원격 서버에 연결할 때 사용할 사용자 이름입니다. 이 사용자는 지정된 서버에 연결할 수 있는 접근 권한이 있어야 합니다. 접근은 users.xml 파일에서 구성됩니다. 자세한 내용은 접근 권한 섹션을 참조하십시오.default
password원격 서버에 연결할 때 사용할 비밀번호입니다(마스킹되지 않음).
secure보안 SSL/TLS 연결 사용 여부입니다. 일반적으로 포트도 함께 지정해야 합니다(기본 보안 포트는 9440입니다). 서버는 <tcp_port_secure>9440</tcp_port_secure>에서 수신 대기해야 하며 올바른 인증서로 구성되어 있어야 합니다.false
compression데이터 압축을 사용합니다.true
bind_host이 노드에서 원격 서버에 연결할 때 사용할 출발지 주소입니다. IPv4 주소만 지원됩니다. ClickHouse 분산 쿼리에서 사용할 출발지 IP 주소를 설정해야 하는 고급 배포 사용 사례를 위한 옵션입니다.-
레플리카를 지정하면 읽을 때 각 세그먼트마다 사용 가능한 레플리카 중 하나가 선택됩니다. 로드 밸런싱 알고리즘(어느 레플리카에 접근할지에 대한 우선순위)을 구성할 수 있습니다. 자세한 내용은 load_balancing 설정을 참조하십시오. 서버와의 연결이 설정되지 않으면 짧은 timeout으로 연결을 시도합니다. 연결에 실패하면 다음 레플리카가 선택되며, 이 과정이 모든 레플리카에 대해 반복됩니다. 모든 레플리카에 대한 연결 시도가 실패하면 같은 방식으로 여러 차례 다시 시도합니다. 이는 복원력 향상에 도움이 되지만 완전한 장애 허용을 제공하지는 않습니다. 원격 서버가 연결은 수락하더라도 실제로는 동작하지 않거나 성능이 저하된 상태일 수 있기 때문입니다. 세그먼트는 하나만 지정할 수도 있고(이 경우 쿼리 처리는 distributed가 아니라 remote라고 해야 합니다), 원하는 수만큼 지정할 수도 있습니다. 각 세그먼트에서는 레플리카를 하나 이상 원하는 수만큼 지정할 수 있습니다. 세그먼트마다 레플리카 수를 다르게 지정할 수 있습니다. 구성에는 원하는 만큼 많은 클러스터를 지정할 수 있습니다. 클러스터를 확인하려면 system.clusters 테이블을 사용하십시오. Distributed 엔진을 사용하면 클러스터를 로컬 서버처럼 사용할 수 있습니다. 하지만 클러스터 구성은 동적으로 지정할 수 없으며, 서버 구성 파일에서 설정해야 합니다. 일반적으로 클러스터의 모든 서버는 동일한 클러스터 구성을 사용합니다(필수는 아닙니다). 구성 파일의 클러스터는 서버를 다시 시작하지 않고도 즉시 업데이트됩니다. 매번 알 수 없는 세트의 세그먼트와 레플리카로 쿼리를 보내야 한다면 Distributed 테이블을 만들 필요가 없습니다. 대신 remote 테이블 함수를 사용하십시오. 자세한 내용은 테이블 함수 섹션을 참조하십시오.

데이터 쓰기

클러스터에 데이터를 쓰는 방법은 두 가지입니다. 첫째, 어떤 서버에 어떤 데이터를 쓸지 지정한 다음 각 세그먼트에 직접 쓸 수 있습니다. 즉, Distributed 테이블이 가리키는 클러스터의 원격 테이블에 직접 INSERT SQL 문을 실행합니다. 이 방법은 가장 유연한 방식으로, 도메인 요구 사항 때문에 단순하지 않은 경우를 포함해 어떤 세그먼트 분할 방식이든 사용할 수 있습니다. 또한 데이터가 서로 다른 세그먼트에 완전히 독립적으로 기록될 수 있으므로 가장 최적의 방법이기도 합니다. 둘째, Distributed 테이블에 INSERT SQL 문을 실행할 수 있습니다. 이 경우 테이블이 삽입된 데이터를 서버 전체에 걸쳐 직접 분산합니다. Distributed 테이블에 쓰려면 sharding_key 매개변수가 구성되어 있어야 합니다(세그먼트가 하나뿐인 경우는 예외). 각 세그먼트에는 구성 파일에서 <weight>를 정의할 수 있습니다. 기본값은 1입니다. 데이터는 세그먼트 가중치에 비례하여 각 세그먼트에 분산됩니다. 모든 세그먼트의 가중치를 합산한 다음, 각 세그먼트의 가중치를 전체 합으로 나누어 각 세그먼트의 비율을 결정합니다. 예를 들어 세그먼트가 두 개이고 첫 번째 가중치가 1, 두 번째 가중치가 2이면 첫 번째에는 삽입된 행의 3분의 1(1 / 3)이, 두 번째에는 3분의 2(2 / 3)가 전송됩니다. 각 세그먼트에는 구성 파일에서 internal_replication 매개변수를 정의할 수 있습니다. 이 매개변수가 true로 설정되면 쓰기 작업은 정상 상태인 첫 번째 레플리카를 선택해 여기에 데이터를 씁니다. Distributed 테이블의 기반 테이블이 복제된 테이블인 경우(예: Replicated*MergeTree 테이블 엔진) 이 방식을 사용하십시오. 테이블 레플리카 중 하나가 쓰기를 수신하고, 다른 레플리카에는 자동으로 복제됩니다. internal_replicationfalse(기본값)로 설정되면 데이터는 모든 레플리카에 기록됩니다. 이 경우 Distributed 테이블이 자체적으로 데이터를 복제합니다. 이 방식은 복제된 테이블을 사용하는 것보다 바람직하지 않습니다. 레플리카의 일관성을 확인하지 않기 때문에 시간이 지나면 서로 약간 다른 데이터를 포함하게 됩니다. 데이터의 한 행이 전송될 세그먼트를 선택하기 위해 세그먼트 분할 표현식을 분석하고, 이를 세그먼트의 총 가중치로 나눈 나머지를 구합니다. 그러면 해당 행은 prev_weights부터 prev_weights + weight까지의 나머지 반개구간에 대응하는 세그먼트로 전송됩니다. 여기서 prev_weights는 더 작은 번호를 가진 세그먼트들의 총 가중치이고, weight는 현재 세그먼트의 가중치입니다. 예를 들어 세그먼트가 두 개이고 첫 번째 가중치가 9, 두 번째 가중치가 10이면 범위 [0, 9)의 나머지는 첫 번째 세그먼트로, 범위 [9, 19)의 나머지는 두 번째 세그먼트로 전송됩니다. 세그먼트 분할 표현식은 상수와 테이블 컬럼으로 구성된, 정수를 반환하는 어떤 표현식이든 될 수 있습니다. 예를 들어 데이터를 무작위로 분산하려면 rand() 표현식을 사용할 수 있고, 사용자 ID를 나눈 나머지에 따라 분산하려면 UserID를 사용할 수 있습니다(이 경우 한 사용자의 데이터는 하나의 세그먼트에만 저장되므로 사용자 기준 INJOIN 실행이 단순해집니다). 특정 컬럼의 분포가 충분히 고르지 않다면 intHash64(UserID)와 같은 해시 함수로 감쌀 수 있습니다. 나눗셈의 단순한 나머지를 사용하는 방식은 제한적인 세그먼트 분할 방법이며, 항상 적절한 것은 아닙니다. 이 방식은 중간 규모 및 대규모 데이터 볼륨(수십 대의 서버)에는 잘 작동하지만, 매우 큰 데이터 볼륨(수백 대 이상의 서버)에는 적합하지 않습니다. 후자의 경우 Distributed 테이블의 항목을 사용하기보다 도메인 요구 사항에 맞는 세그먼트 분할 방식을 사용하십시오. 다음과 같은 경우에는 세그먼트 분할 방식에 특히 주의를 기울여야 합니다.
  • 특정 키로 데이터를 결합해야 하는(IN 또는 JOIN) 쿼리를 사용하는 경우입니다. 데이터가 이 키를 기준으로 세그먼트 분할되어 있다면 GLOBAL IN 또는 GLOBAL JOIN 대신 로컬 IN 또는 JOIN을 사용할 수 있으며, 훨씬 더 효율적입니다.
  • 많은 수의 서버(수백 대 이상)에서 많은 수의 작은 쿼리를 사용하는 경우입니다. 예를 들어 개별 클라이언트(예: 웹사이트, 광고주 또는 파트너)의 데이터에 대한 쿼리입니다. 작은 쿼리가 전체 클러스터에 영향을 주지 않도록 하려면 하나의 클라이언트 데이터를 하나의 세그먼트에 두는 것이 합리적입니다. 또는 2단계 세그먼트 분할을 구성할 수도 있습니다. 즉, 전체 클러스터를 “레이어”로 나누고, 각 레이어는 여러 세그먼트로 구성될 수 있습니다. 하나의 클라이언트 데이터는 하나의 레이어에 위치하지만, 필요에 따라 해당 레이어에 세그먼트를 추가할 수 있고, 그 내부에서는 데이터가 무작위로 분산됩니다. 각 레이어마다 Distributed 테이블을 만들고, 전역 쿼리를 위한 단일 공유 분산 테이블을 추가로 만듭니다.
데이터는 백그라운드에서 기록됩니다. 테이블에 삽입되면 데이터 블록은 우선 로컬 파일 시스템에만 기록됩니다. 데이터는 가능한 한 빨리 백그라운드에서 원격 서버로 전송됩니다. 데이터 전송 주기는 distributed_background_insert_sleep_time_msdistributed_background_insert_max_sleep_time_ms 설정으로 관리됩니다. Distributed 엔진은 삽입된 데이터가 들어 있는 각 파일을 개별적으로 전송하지만, distributed_background_insert_batch 설정을 사용하면 파일을 배치로 전송할 수 있습니다. 이 설정은 로컬 서버와 네트워크 리소스를 더 효율적으로 활용해 클러스터 성능을 향상합니다. 테이블 디렉터리의 파일 목록(전송을 기다리는 데이터)을 확인하여 데이터가 성공적으로 전송되었는지 점검해야 합니다: /var/lib/clickhouse/data/database/table/. 백그라운드 작업을 수행하는 스레드 수는 background_distributed_schedule_pool_size 설정으로 지정할 수 있습니다. Distributed 테이블에 INSERT한 후 서버가 더 이상 존재하지 않게 되었거나 비정상적으로 재시작된 경우(예: 하드웨어 장애)에는 삽입된 데이터가 손실될 수 있습니다. 테이블 디렉터리에서 손상된 데이터 파트가 감지되면 broken 하위 디렉터리로 이동되며 더 이상 사용되지 않습니다.

데이터 읽기

Distributed 테이블을 쿼리할 때 SELECT 쿼리는 모든 세그먼트로 전송되며, 데이터가 세그먼트 간에 어떻게 분산되어 있든 관계없이 동작합니다(완전히 무작위로 분산되어 있어도 됩니다). 새 세그먼트를 추가할 때 기존 데이터를 그 세그먼트로 옮길 필요는 없습니다. 대신 더 높은 가중치를 사용해 새 데이터를 해당 세그먼트에 기록할 수 있습니다. 그러면 데이터가 다소 불균등하게 분산될 수 있지만, 쿼리는 여전히 정확하고 효율적으로 동작합니다. max_parallel_replicas 옵션이 활성화되면 단일 세그먼트 내의 모든 레플리카에서 쿼리 처리가 병렬로 수행됩니다. 자세한 내용은 max_parallel_replicas 섹션을 참조하십시오. 분산 inglobal in 쿼리가 처리되는 방식에 관해 자세히 알아보려면 이 문서를 참조하십시오.

가상 컬럼

_Shard_num

_shard_num — 테이블 system.clustersshard_num 값을 포함합니다. 유형: UInt32.
remote 및 [cluster](../../../sql-reference/table-functions/cluster.md) 테이블 함수는 내부적으로 임시 분산 테이블을 생성하므로 _shard_num`도 여기에서 사용할 수 있습니다.
관련 항목
마지막 수정일 2026년 6월 10일