メインコンテンツへスキップ
Cloud の Distributed エンジンClickHouse Cloud で Distributed テーブルエンジンを作成するには、remoteremoteSecure テーブル関数を使用できます。 ClickHouse Cloud では Distributed(...) 構文は使用できません。
Distributed エンジンを使用するテーブルは自身のデータを保存しませんが、複数のサーバーにまたがる分散クエリ処理を可能にします。 読み取りは自動的に並列化されます。読み取り時には、存在する場合、リモートサーバー上のテーブルの索引が使用されます。

テーブルの作成

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
[SETTINGS name=value, ...]

テーブルから

Distributed テーブルが現在のサーバー上のテーブルを参照している場合は、そのテーブルのスキーマをそのまま利用できます。
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] AS [db2.]name2 ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]]) [SETTINGS name=value, ...]

Distributed パラメーター

パラメーター説明
clusterサーバーの設定ファイル内のクラスター名
databaseリモートデータベースの名前
tableリモートテーブルの名前
sharding_key (Optional)シャーディングキー。
sharding_key の指定は、次の場合に必要です。
  • 分散テーブルへの INSERT の場合 (テーブルエンジンがデータをどのように分割するかを判断するために sharding_key を必要とするため) 。ただし、insert_distributed_one_random_shard 設定が有効な場合、INSERT にシャーディングキーは不要です。
  • optimize_skip_unused_shards を使用する場合。クエリ対象にする分片を判断するために sharding_key が必要です
policy_name (Optional)ポリシー名。バックグラウンド送信用の一時ファイルの保存に使用されます
関連項目

Distributed の設定

設定説明デフォルト値
fsync_after_insertDistributed へのバックグラウンド INSERT 後に、ファイルデータに対して fsync を実行します。OS が イニシエーター ノード のディスク上のファイルに、挿入されたデータ全体を確実にフラッシュしたことを保証します。false
fsync_directoriesディレクトリに対して fsync を実行します。Distributed テーブルへのバックグラウンド INSERT に関連する操作 (INSERT 後、データを分片へ送信した後など) のあとで、OS がディレクトリのメタデータを確実に更新したことを保証します。false
skip_unavailable_shardstrue の場合、ClickHouse は利用できない分片を自動的にスキップします。分片は次の場合に利用不可とマークされます。1) 接続障害により分片に到達できない。2) DNS で分片を名前解決できない。3) その分片上にテーブルが存在しない。false
bytes_to_throw_insertバックグラウンド INSERT 用に保留される圧縮済みバイト数がこの値を超える場合、例外がスローされます。0 の場合はスローしません。0
bytes_to_delay_insertバックグラウンド INSERT 用に保留される圧縮済みバイト数がこの値を超える場合、クエリは遅延されます。0 の場合は遅延しません。0
max_delay_to_insertバックグラウンド送信の保留バイト数が多い場合に、Distributed テーブルへデータを INSERT する際の最大遅延時間 (秒) 。60
background_insert_batchdistributed_background_insert_batch と同じです0
background_insert_split_batch_on_failuredistributed_background_insert_split_batch_on_failure と同じです0
background_insert_sleep_time_msdistributed_background_insert_sleep_time_ms と同じです0
background_insert_max_sleep_time_msdistributed_background_insert_max_sleep_time_ms と同じです0
flush_on_detachDETACH / DROP / サーバーのシャットダウン時に、データをリモートノードへフラッシュします。true
耐久性設定 (fsync_...):
  • 影響するのはバックグラウンド INSERT (つまり distributed_foreground_insert=false) のみです。これは、データが最初にイニシエーター ノードのディスクに保存され、その後バックグラウンドで分片へ送信される場合です。
  • INSERT のパフォーマンスが大幅に低下する可能性があります
  • Distributed テーブルのフォルダ内に保存されたデータを、挿入を受け付けたノード に書き込む処理に影響します。基盤となる MergeTree テーブルへの書き込み保証が必要な場合は、system.merge_tree_settings の耐久性設定 (...fsync...) を参照してください
挿入制限設定 (..._insert) については、以下も参照してください:
CREATE TABLE hits_all AS hits
ENGINE = Distributed(logs, default, hits[, sharding_key[, policy_name]])
SETTINGS
    fsync_after_insert=0,
    fsync_directories=0;
データは、logs クラスター内のすべてのサーバーにある default.hits テーブルから読み取られます。データは読み取られるだけでなく、可能な範囲でリモートサーバー上で部分的に処理も行われます。たとえば、GROUP BY を含むクエリでは、データはリモートサーバー上で集計され、集約関数の中間状態がリクエスト元のサーバーに送信されます。その後、データはさらに集計されます。 データベース名の代わりに、文字列を返す定数式を使用できます。例: currentDatabase()

クラスター

クラスターは、サーバー設定ファイルで構成します。
<remote_servers>
    <logs>
        <!-- 分散クエリ用のサーバー間クラスター単位のシークレット
             デフォルト: シークレットなし(認証は実行されません)

             設定した場合、分散クエリは分片上で検証されます。少なくとも以下の条件を満たす必要があります:
             - 該当クラスターが分片上に存在すること、
             - 該当クラスターが同じシークレットを持つこと。

             また(より重要な点として)、initial_user が
             クエリの実行ユーザーとして使用されます。
        -->
        <!-- <secret></secret> -->
        
        <!-- 省略可能。このクラスターで 分散 DDL クエリ(ON CLUSTER 句)を許可するかどうか。デフォルト: true(許可)。-->        
        <!-- <allow_distributed_ddl_queries>true</allow_distributed_ddl_queries> -->
        
        <shard>
            <!-- 省略可能。データ書き込み時の分片の重み。デフォルト: 1。-->
            <weight>1</weight>
            <!-- 省略可能。分片の名前。空でなく、クラスター内の分片間で一意である必要があります。指定しない場合は空になります。-->
            <name>shard_01</name>
            <!-- 省略可能。データをいずれか1つのレプリカにのみ書き込むかどうか。デフォルト: false(すべてのレプリカにデータを書き込む)。-->
            <internal_replication>false</internal_replication>
            <replica>
                <!-- 省略可能。負荷分散におけるレプリカの優先度(load_balancing 設定も参照)。デフォルト: 1(値が小さいほど優先度が高い)。-->
                <priority>1</priority>
                <host>example01-01-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-01-2</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <weight>2</weight>
            <name>shard_02</name>
            <internal_replication>false</internal_replication>
            <replica>
                <host>example01-02-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-02-2</host>
                <secure>1</secure>
                <port>9440</port>
            </replica>
        </shard>
    </logs>
</remote_servers>
ここでは、logs という名前のクラスターを定義しています。このクラスターは 2 つの分片で構成され、各分片には 2 つのレプリカがあります。分片とは、データの異なる部分を保持するサーバーのことです (すべてのデータを読み取るには、すべての分片にアクセスする必要があります) 。レプリカは同じデータを持つ複製サーバーです (すべてのデータを読み取る場合、各分片についていずれか 1 つのレプリカにアクセスすれば十分です) 。 クラスター名にドットを含めることはできません。 各サーバーには、hostport、および必要に応じて userpasswordsecurecompressionbind_host を指定します。
パラメータ説明デフォルト値
hostリモートサーバーのアドレスです。ドメイン、IPv4 アドレス、IPv6 アドレスのいずれかを使用できます。ドメインを指定すると、サーバーは起動時に DNS リクエストを実行し、その結果はサーバーの稼働中保持されます。DNS リクエストに失敗すると、サーバーは起動しません。DNS レコードを変更した場合は、サーバーを再起動してください。-
portメッセージ交換用の TCP ポートです (config 内の tcp_port。通常は 9000 に設定されます) 。http_port と混同しないでください。-
userリモートサーバーへの接続に使用するユーザー名です。このユーザーには、指定したサーバーに接続するためのアクセス権が必要です。アクセスは users.xml ファイルで設定します。詳しくは、アクセス権 のセクションを参照してください。default
passwordリモートサーバーへの接続に使用するパスワードです (マスクされません) 。
secureセキュアな SSL/TLS 接続を使用するかどうかを指定します。通常はポートの指定も必要です (デフォルトのセキュアポートは 9440 です) 。サーバーは <tcp_port_secure>9440</tcp_port_secure> で待ち受け、正しい証明書が設定されている必要があります。false
compressionデータ圧縮を使用します。true
bind_hostこのノードからリモートサーバーへ接続する際に使用する送信元アドレスです。サポートされるのは IPv4 アドレスのみです。ClickHouse の分散クエリで使用される送信元 IP アドレスの設定が必要な、高度なデプロイメントのユースケースを想定しています。-
レプリカを指定すると、読み取り時には各分片ごとに利用可能なレプリカのうち 1 つが選択されます。負荷分散アルゴリズム (どのレプリカにアクセスするかの優先度) は設定できます。詳しくは load_balancing 設定を参照してください。サーバーとの接続が確立できない場合は、短いタイムアウトで接続が試行されます。接続に失敗すると次のレプリカが選択され、すべてのレプリカに対して同様に処理されます。すべてのレプリカへの接続試行が失敗した場合は、同じ方法で数回再試行されます。これはレジリエンスの向上には有効ですが、完全な耐障害性を保証するものではありません。リモートサーバーが接続を受け付けても、動作しなかったり、正常に動作しなかったりする可能性があるためです。 指定する分片は 1 つだけでもかまいません (この場合、クエリ処理は 分散 ではなく remote と呼ぶのが適切です) 。また、分片はいくつでも指定できます。各分片には、1 つ以上の任意の数のレプリカを指定できます。分片ごとに異なる数のレプリカを指定することもできます。 設定には、必要な数だけクラスターを指定できます。 クラスターを確認するには、system.clusters テーブルを使用します。 Distributed エンジンを使うと、クラスターをローカルサーバーのように扱えます。ただし、クラスターの設定を動的に指定することはできず、サーバー設定ファイルで設定する必要があります。通常、クラスター内のすべてのサーバーは同じクラスター設定を持ちます (必須ではありません) 。設定ファイル内のクラスターは、サーバーを再起動せずに動的に更新されます。 毎回、未知の分片とレプリカの集合にクエリを送る必要がある場合は、Distributed テーブルを作成する必要はありません。代わりに remote テーブル関数を使用してください。Table functions セクションを参照してください。

データの書き込み

クラスターにデータを書き込む方法は 2 つあります。 1 つ目は、どのデータをどのサーバーに書き込むかを定義し、各分片に対して直接書き込みを行う方法です。つまり、Distributed テーブルが参照しているクラスター内のリモートテーブルに対して、直接 INSERT ステートメントを実行します。これは最も柔軟な方法であり、対象領域の要件によって複雑になるような場合でも、任意のシャーディング方式を使用できます。また、データを異なる分片に完全に独立して書き込めるため、最も効率的な方法でもあります。 2 つ目は、Distributed テーブルに対して INSERT ステートメントを実行する方法です。この場合、テーブル自体が挿入されたデータを各サーバーに分散します。Distributed テーブルに書き込むには、sharding_key パラメータが設定されている必要があります (分片が 1 つしかない場合を除く) 。 各分片には、設定ファイルで <weight> を定義できます。デフォルトでは、重みは 1 です。データは、分片の重みに比例した量で各分片に分散されます。まずすべての分片の重みを合計し、次に各分片の重みを合計値で割って、それぞれの分片の比率を決定します。たとえば、2 つの分片があり、1 つ目の重みが 1、2 つ目の重みが 2 の場合、1 つ目には挿入された行の 3 分の 1 (1 / 3) 、2 つ目には 3 分の 2 (2 / 3) が送られます。 各分片には、設定ファイルで internal_replication パラメータを定義できます。このパラメータが true に設定されている場合、書き込み操作では最初に正常なレプリカが選択され、そこにデータが書き込まれます。Distributed テーブルの基になるテーブルがレプリケートテーブル (たとえば Replicated*MergeTree テーブルエンジンのいずれか) である場合は、これを使用してください。テーブルレプリカの 1 つが書き込みを受け取り、その後ほかのレプリカへ自動的にレプリケーションされます。 internal_replicationfalse (デフォルト) に設定されている場合、データはすべてのレプリカに書き込まれます。この場合、Distributed テーブル自体がデータをレプリケートします。これはレプリケートテーブルを使用するよりも劣ります。レプリカ間の整合性が確認されないため、時間の経過とともに、それぞれに含まれるデータがわずかに異なってくるからです。 データの 1 行が送られる分片を選択するには、シャーディング式を評価し、それを分片の総重みで割った余りを取ります。その行は、prev_weights から prev_weights + weight までの余りに対応する半区間に該当する分片へ送られます。ここで、prev_weights は番号の小さい分片の総重み、weight はこの分片の重みです。たとえば、2 つの分片があり、1 つ目の重みが 9、2 つ目の重みが 10 の場合、行は範囲 [0, 9) の余りに対しては 1 つ目の分片に、範囲 [9, 19) の余りに対しては 2 つ目の分片に送られます。 シャーディング式には、整数を返す、定数およびテーブルのカラムから成る任意の式を使用できます。たとえば、データをランダムに分散するために rand() を、ユーザー ID を割った余りで分散するために UserID を使用できます (この場合、1 人のユーザーのデータは 1 つの分片に配置されるため、ユーザー単位で INJOIN を実行しやすくなります) 。いずれかのカラムの分布が十分に均一でない場合は、intHash64(UserID) のようにハッシュ関数で包むことができます。 単純な除算の余りによる方法は、シャーディングの手法としては限定的であり、常に適切とは限りません。これは中規模から大規模のデータ量 (数十台のサーバー) では機能しますが、非常に大規模なデータ量 (数百台以上のサーバー) には適しません。後者の場合は、Distributed テーブルのエントリを使うのではなく、対象領域に必要なシャーディング方式を使用してください。 次のような場合には、シャーディング方式を慎重に検討する必要があります。
  • 特定のキーによるデータの結合 (IN または JOIN) が必要なクエリを使用する場合。データがこのキーでシャーディングされていれば、GLOBAL INGLOBAL JOIN の代わりにローカルの INJOIN を使用でき、はるかに効率的です。
  • 多数のサーバー (数百台以上) を使用し、かつ多数の小さなクエリを処理する場合。たとえば、個々のクライアント (Web サイト、広告主、またはパートナーなど) のデータに対するクエリです。小さなクエリがクラスター全体に影響しないようにするには、1 つのクライアントのデータを 1 つの分片に配置するのが合理的です。あるいは、二段階シャーディングを設定することもできます。つまり、クラスター全体を「レイヤー」に分割し、各レイヤーは複数の分片で構成される場合があります。1 つのクライアントのデータは 1 つのレイヤー上に配置されますが、必要に応じてそのレイヤーに分片を追加でき、データはその中でランダムに分散されます。各レイヤーごとに Distributed テーブルを作成し、グローバルクエリ用に 1 つの共有分散テーブルを作成します。
データはバックグラウンドで書き込まれます。テーブルに挿入されると、データブロックはローカルファイルシステムに書き込まれるだけです。データはその後、できるだけ早くバックグラウンドでリモートサーバーに送信されます。データ送信の間隔は、distributed_background_insert_sleep_time_ms および distributed_background_insert_max_sleep_time_ms の設定で管理されます。Distributed エンジンは、挿入されたデータを含む各ファイルを個別に送信しますが、distributed_background_insert_batch 設定を使うと、ファイルのバッチ送信を有効にできます。この設定により、ローカルサーバーとネットワークのリソースをより効率的に活用できるため、クラスターのパフォーマンスが向上します。データが正常に送信されているかどうかは、テーブルディレクトリ /var/lib/clickhouse/data/database/table/ 内のファイル一覧 (送信待ちのデータ) を確認してください。バックグラウンドタスクを実行するスレッド数は、background_distributed_schedule_pool_size 設定で指定できます。 Distributed テーブルへの INSERT 後にサーバーが停止した、または異常終了した場合 (たとえばハードウェア障害による場合) 、挿入されたデータが失われる可能性があります。テーブルディレクトリで破損したデータパートが検出されると、それは broken サブディレクトリに移され、以後は使用されません。

データの読み取り

Distributed テーブルに対してクエリを実行すると、SELECT クエリはすべての分片に送信され、データが分片間でどのように分散されていても機能します (完全にランダムに分散されていても問題ありません) 。新しい分片を追加しても、古いデータをその分片に移行する必要はありません。代わりに、より大きな重みを設定して新しいデータを書き込めます。データの分布はやや不均一になりますが、クエリは正しく効率的に動作します。 max_parallel_replicas オプションを有効にすると、1 つの分片内のすべてのレプリカにまたがってクエリ処理が並列化されます。詳しくは、max_parallel_replicas のセクションを参照してください。 分散 in および global in クエリがどのように処理されるかについて詳しくは、こちら のドキュメントを参照してください。

仮想カラム

_Shard_num

_shard_num — テーブル system.clustersshard_num の値が格納されています。型: UInt32
remote および [cluster](../../../sql-reference/table-functions/cluster.md) テーブル関数は内部で一時的な Distributed テーブルを作成するため、_shard_num` はそれらでも利用できます。
関連項目
最終更新日 2026年6月10日