テーブルの作成
テーブルから
Distributed テーブルが現在のサーバー上のテーブルを参照している場合は、そのテーブルのスキーマをそのまま利用できます。
Distributed パラメーター
| パラメーター | 説明 |
|---|---|
cluster | サーバーの設定ファイル内のクラスター名 |
database | リモートデータベースの名前 |
table | リモートテーブルの名前 |
sharding_key (Optional) | シャーディングキー。sharding_key の指定は、次の場合に必要です。
|
policy_name (Optional) | ポリシー名。バックグラウンド送信用の一時ファイルの保存に使用されます |
- distributed_foreground_insert 設定
- 例については MergeTree を参照してください
Distributed の設定
| 設定 | 説明 | デフォルト値 |
|---|---|---|
fsync_after_insert | Distributed へのバックグラウンド INSERT 後に、ファイルデータに対して fsync を実行します。OS が イニシエーター ノード のディスク上のファイルに、挿入されたデータ全体を確実にフラッシュしたことを保証します。 | false |
fsync_directories | ディレクトリに対して fsync を実行します。Distributed テーブルへのバックグラウンド INSERT に関連する操作 (INSERT 後、データを分片へ送信した後など) のあとで、OS がディレクトリのメタデータを確実に更新したことを保証します。 | false |
skip_unavailable_shards | true の場合、ClickHouse は利用できない分片を自動的にスキップします。分片は次の場合に利用不可とマークされます。1) 接続障害により分片に到達できない。2) DNS で分片を名前解決できない。3) その分片上にテーブルが存在しない。 | false |
bytes_to_throw_insert | バックグラウンド INSERT 用に保留される圧縮済みバイト数がこの値を超える場合、例外がスローされます。0 の場合はスローしません。 | 0 |
bytes_to_delay_insert | バックグラウンド INSERT 用に保留される圧縮済みバイト数がこの値を超える場合、クエリは遅延されます。0 の場合は遅延しません。 | 0 |
max_delay_to_insert | バックグラウンド送信の保留バイト数が多い場合に、Distributed テーブルへデータを INSERT する際の最大遅延時間 (秒) 。 | 60 |
background_insert_batch | distributed_background_insert_batch と同じです | 0 |
background_insert_split_batch_on_failure | distributed_background_insert_split_batch_on_failure と同じです | 0 |
background_insert_sleep_time_ms | distributed_background_insert_sleep_time_ms と同じです | 0 |
background_insert_max_sleep_time_ms | distributed_background_insert_max_sleep_time_ms と同じです | 0 |
flush_on_detach | DETACH / DROP / サーバーのシャットダウン時に、データをリモートノードへフラッシュします。 | true |
耐久性設定 (
fsync_...):- 影響するのはバックグラウンド
INSERT(つまりdistributed_foreground_insert=false) のみです。これは、データが最初にイニシエーター ノードのディスクに保存され、その後バックグラウンドで分片へ送信される場合です。 INSERTのパフォーマンスが大幅に低下する可能性があります- Distributed テーブルのフォルダ内に保存されたデータを、挿入を受け付けたノード に書き込む処理に影響します。基盤となる MergeTree テーブルへの書き込み保証が必要な場合は、
system.merge_tree_settingsの耐久性設定 (...fsync...) を参照してください
..._insert) については、以下も参照してください:distributed_foreground_insert設定prefer_localhost_replica設定bytes_to_throw_insertはbytes_to_delay_insertより先に処理されるため、bytes_to_delay_insertより小さい値に設定しないでください
logs クラスター内のすべてのサーバーにある default.hits テーブルから読み取られます。データは読み取られるだけでなく、可能な範囲でリモートサーバー上で部分的に処理も行われます。たとえば、GROUP BY を含むクエリでは、データはリモートサーバー上で集計され、集約関数の中間状態がリクエスト元のサーバーに送信されます。その後、データはさらに集計されます。
データベース名の代わりに、文字列を返す定数式を使用できます。例: currentDatabase()。
クラスター
logs という名前のクラスターを定義しています。このクラスターは 2 つの分片で構成され、各分片には 2 つのレプリカがあります。分片とは、データの異なる部分を保持するサーバーのことです (すべてのデータを読み取るには、すべての分片にアクセスする必要があります) 。レプリカは同じデータを持つ複製サーバーです (すべてのデータを読み取る場合、各分片についていずれか 1 つのレプリカにアクセスすれば十分です) 。
クラスター名にドットを含めることはできません。
各サーバーには、host、port、および必要に応じて user、password、secure、compression、bind_host を指定します。
| パラメータ | 説明 | デフォルト値 |
|---|---|---|
host | リモートサーバーのアドレスです。ドメイン、IPv4 アドレス、IPv6 アドレスのいずれかを使用できます。ドメインを指定すると、サーバーは起動時に DNS リクエストを実行し、その結果はサーバーの稼働中保持されます。DNS リクエストに失敗すると、サーバーは起動しません。DNS レコードを変更した場合は、サーバーを再起動してください。 | - |
port | メッセージ交換用の TCP ポートです (config 内の tcp_port。通常は 9000 に設定されます) 。http_port と混同しないでください。 | - |
user | リモートサーバーへの接続に使用するユーザー名です。このユーザーには、指定したサーバーに接続するためのアクセス権が必要です。アクセスは users.xml ファイルで設定します。詳しくは、アクセス権 のセクションを参照してください。 | default |
password | リモートサーバーへの接続に使用するパスワードです (マスクされません) 。 | ” |
secure | セキュアな SSL/TLS 接続を使用するかどうかを指定します。通常はポートの指定も必要です (デフォルトのセキュアポートは 9440 です) 。サーバーは <tcp_port_secure>9440</tcp_port_secure> で待ち受け、正しい証明書が設定されている必要があります。 | false |
compression | データ圧縮を使用します。 | true |
bind_host | このノードからリモートサーバーへ接続する際に使用する送信元アドレスです。サポートされるのは IPv4 アドレスのみです。ClickHouse の分散クエリで使用される送信元 IP アドレスの設定が必要な、高度なデプロイメントのユースケースを想定しています。 | - |
system.clusters テーブルを使用します。
Distributed エンジンを使うと、クラスターをローカルサーバーのように扱えます。ただし、クラスターの設定を動的に指定することはできず、サーバー設定ファイルで設定する必要があります。通常、クラスター内のすべてのサーバーは同じクラスター設定を持ちます (必須ではありません) 。設定ファイル内のクラスターは、サーバーを再起動せずに動的に更新されます。
毎回、未知の分片とレプリカの集合にクエリを送る必要がある場合は、Distributed テーブルを作成する必要はありません。代わりに remote テーブル関数を使用してください。Table functions セクションを参照してください。
データの書き込み
Distributed テーブルが参照しているクラスター内のリモートテーブルに対して、直接 INSERT ステートメントを実行します。これは最も柔軟な方法であり、対象領域の要件によって複雑になるような場合でも、任意のシャーディング方式を使用できます。また、データを異なる分片に完全に独立して書き込めるため、最も効率的な方法でもあります。
2 つ目は、Distributed テーブルに対して INSERT ステートメントを実行する方法です。この場合、テーブル自体が挿入されたデータを各サーバーに分散します。Distributed テーブルに書き込むには、sharding_key パラメータが設定されている必要があります (分片が 1 つしかない場合を除く) 。
各分片には、設定ファイルで <weight> を定義できます。デフォルトでは、重みは 1 です。データは、分片の重みに比例した量で各分片に分散されます。まずすべての分片の重みを合計し、次に各分片の重みを合計値で割って、それぞれの分片の比率を決定します。たとえば、2 つの分片があり、1 つ目の重みが 1、2 つ目の重みが 2 の場合、1 つ目には挿入された行の 3 分の 1 (1 / 3) 、2 つ目には 3 分の 2 (2 / 3) が送られます。
各分片には、設定ファイルで internal_replication パラメータを定義できます。このパラメータが true に設定されている場合、書き込み操作では最初に正常なレプリカが選択され、そこにデータが書き込まれます。Distributed テーブルの基になるテーブルがレプリケートテーブル (たとえば Replicated*MergeTree テーブルエンジンのいずれか) である場合は、これを使用してください。テーブルレプリカの 1 つが書き込みを受け取り、その後ほかのレプリカへ自動的にレプリケーションされます。
internal_replication が false (デフォルト) に設定されている場合、データはすべてのレプリカに書き込まれます。この場合、Distributed テーブル自体がデータをレプリケートします。これはレプリケートテーブルを使用するよりも劣ります。レプリカ間の整合性が確認されないため、時間の経過とともに、それぞれに含まれるデータがわずかに異なってくるからです。
データの 1 行が送られる分片を選択するには、シャーディング式を評価し、それを分片の総重みで割った余りを取ります。その行は、prev_weights から prev_weights + weight までの余りに対応する半区間に該当する分片へ送られます。ここで、prev_weights は番号の小さい分片の総重み、weight はこの分片の重みです。たとえば、2 つの分片があり、1 つ目の重みが 9、2 つ目の重みが 10 の場合、行は範囲 [0, 9) の余りに対しては 1 つ目の分片に、範囲 [9, 19) の余りに対しては 2 つ目の分片に送られます。
シャーディング式には、整数を返す、定数およびテーブルのカラムから成る任意の式を使用できます。たとえば、データをランダムに分散するために rand() を、ユーザー ID を割った余りで分散するために UserID を使用できます (この場合、1 人のユーザーのデータは 1 つの分片に配置されるため、ユーザー単位で IN や JOIN を実行しやすくなります) 。いずれかのカラムの分布が十分に均一でない場合は、intHash64(UserID) のようにハッシュ関数で包むことができます。
単純な除算の余りによる方法は、シャーディングの手法としては限定的であり、常に適切とは限りません。これは中規模から大規模のデータ量 (数十台のサーバー) では機能しますが、非常に大規模なデータ量 (数百台以上のサーバー) には適しません。後者の場合は、Distributed テーブルのエントリを使うのではなく、対象領域に必要なシャーディング方式を使用してください。
次のような場合には、シャーディング方式を慎重に検討する必要があります。
- 特定のキーによるデータの結合 (
INまたはJOIN) が必要なクエリを使用する場合。データがこのキーでシャーディングされていれば、GLOBAL INやGLOBAL JOINの代わりにローカルのINやJOINを使用でき、はるかに効率的です。 - 多数のサーバー (数百台以上) を使用し、かつ多数の小さなクエリを処理する場合。たとえば、個々のクライアント (Web サイト、広告主、またはパートナーなど) のデータに対するクエリです。小さなクエリがクラスター全体に影響しないようにするには、1 つのクライアントのデータを 1 つの分片に配置するのが合理的です。あるいは、二段階シャーディングを設定することもできます。つまり、クラスター全体を「レイヤー」に分割し、各レイヤーは複数の分片で構成される場合があります。1 つのクライアントのデータは 1 つのレイヤー上に配置されますが、必要に応じてそのレイヤーに分片を追加でき、データはその中でランダムに分散されます。各レイヤーごとに
Distributedテーブルを作成し、グローバルクエリ用に 1 つの共有分散テーブルを作成します。
Distributed エンジンは、挿入されたデータを含む各ファイルを個別に送信しますが、distributed_background_insert_batch 設定を使うと、ファイルのバッチ送信を有効にできます。この設定により、ローカルサーバーとネットワークのリソースをより効率的に活用できるため、クラスターのパフォーマンスが向上します。データが正常に送信されているかどうかは、テーブルディレクトリ /var/lib/clickhouse/data/database/table/ 内のファイル一覧 (送信待ちのデータ) を確認してください。バックグラウンドタスクを実行するスレッド数は、background_distributed_schedule_pool_size 設定で指定できます。
Distributed テーブルへの INSERT 後にサーバーが停止した、または異常終了した場合 (たとえばハードウェア障害による場合) 、挿入されたデータが失われる可能性があります。テーブルディレクトリで破損したデータパートが検出されると、それは broken サブディレクトリに移され、以後は使用されません。
データの読み取り
Distributed テーブルに対してクエリを実行すると、SELECT クエリはすべての分片に送信され、データが分片間でどのように分散されていても機能します (完全にランダムに分散されていても問題ありません) 。新しい分片を追加しても、古いデータをその分片に移行する必要はありません。代わりに、より大きな重みを設定して新しいデータを書き込めます。データの分布はやや不均一になりますが、クエリは正しく効率的に動作します。
max_parallel_replicas オプションを有効にすると、1 つの分片内のすべてのレプリカにまたがってクエリ処理が並列化されます。詳しくは、max_parallel_replicas のセクションを参照してください。
分散 in および global in クエリがどのように処理されるかについて詳しくは、こちら のドキュメントを参照してください。
仮想カラム
_Shard_num
_shard_num — テーブル system.clusters の shard_num の値が格納されています。型: UInt32。
remote および [cluster](../../../sql-reference/table-functions/cluster.md) テーブル関数は内部で一時的な Distributed テーブルを作成するため、_shard_num` はそれらでも利用できます。- 仮想カラム の説明
background_distributed_schedule_pool_size設定shardNum()およびshardCount()関数