Создание таблицы
Из таблицы
Distributed указывает на таблицу на текущем сервере, вы можете использовать её схему:
Параметры Distributed
| Параметр | Описание |
|---|---|
cluster | Имя кластера в конфигурационном файле сервера |
database | Имя удаленной базы данных |
table | Имя удаленной таблицы |
sharding_key (Необязательно) | Ключ сегментирования. Указание sharding_key необходимо в следующих случаях:
|
policy_name (Необязательно) | Имя политики; оно будет использоваться для хранения временных файлов при фоновой отправке |
- настройка distributed_foreground_insert
- MergeTree с примерами
Настройки Distributed
| Настройка | Описание | Значение по умолчанию |
|---|---|---|
fsync_after_insert | Выполнять fsync для данных файла после фоновой вставки в Distributed. Гарантирует, что ОС сбросила на диск все вставленные данные на узле-инициаторе. | false |
fsync_directories | Выполнять fsync для каталогов. Гарантирует, что ОС обновила метаданные каталогов после операций, связанных с фоновыми вставками в таблицу Distributed (после вставки, после отправки данных в сегмент и т. д.). | false |
skip_unavailable_shards | Если true, ClickHouse молча пропускает недоступные сегменты. Сегмент помечается как недоступный, если: 1) к сегменту невозможно подключиться из-за сбоя соединения. 2) сегмент не разрешается через DNS. 3) таблица отсутствует на сегменте. | false |
bytes_to_throw_insert | Если объём сжатых байтов, ожидающих фонового INSERT, превысит это значение, будет сгенерировано исключение. 0 — не генерировать исключение. | 0 |
bytes_to_delay_insert | Если объём сжатых байтов, ожидающих фонового INSERT, превысит это значение, запрос будет задержан. 0 — не задерживать. | 0 |
max_delay_to_insert | Максимальная задержка вставки данных в таблицу Distributed в секундах, если для фоновой отправки накопилось много байтов. | 60 |
background_insert_batch | То же, что и distributed_background_insert_batch | 0 |
background_insert_split_batch_on_failure | То же, что и distributed_background_insert_split_batch_on_failure | 0 |
background_insert_sleep_time_ms | То же, что и distributed_background_insert_sleep_time_ms | 0 |
background_insert_max_sleep_time_ms | То же, что и distributed_background_insert_max_sleep_time_ms | 0 |
flush_on_detach | Сбрасывать данные на удалённые узлы при DETACH/DROP/остановке сервера. | true |
Настройки надёжности хранения (
fsync_...):- Влияют только на фоновые
INSERT(то естьdistributed_foreground_insert=false), когда данные сначала сохраняются на диске узла-инициатора, а затем в фоне отправляются в сегменты. - Могут значительно снизить производительность
INSERT - Влияют на запись данных, хранящихся в каталоге distributed-таблицы, на узле, который принял вашу вставку. Если вам нужны гарантии записи данных в базовые таблицы MergeTree, см. настройки надёжности (
...fsync...) вsystem.merge_tree_settings
..._insert) см. также:- настройку
distributed_foreground_insert - настройку
prefer_localhost_replica bytes_to_throw_insertобрабатывается раньшеbytes_to_delay_insert, поэтому не следует задавать для него значение меньше, чемbytes_to_delay_insert
logs из таблицы default.hits, расположенной на каждом сервере кластера. Данные не только считываются, но и частично обрабатываются на удалённых серверах (насколько это возможно). Например, в запросе с GROUP BY данные будут агрегироваться на удалённых серверах, а промежуточные состояния агрегатных функций будут отправляться на сервер-инициатор запроса. Затем данные будут агрегироваться дальше.
Вместо имени базы данных можно использовать константное выражение, возвращающее строку. Например: currentDatabase().
Кластеры
logs, состоящий из двух сегментов, каждый из которых содержит две реплики. Сегменты — это серверы, содержащие разные части данных (чтобы прочитать все данные, необходимо обратиться ко всем сегментам). Реплики — это серверы-дубликаты (чтобы прочитать все данные, можно обратиться к данным на любой из реплик).
Имена кластеров не должны содержать точек.
Для каждого сервера указываются параметры host, port, а также при необходимости user, password, secure, compression, bind_host:
| Параметр | Описание | Значение по умолчанию |
|---|---|---|
host | Адрес удалённого сервера. Можно использовать либо доменное имя, либо IPv4-адрес или IPv6-адрес. Если указан домен, сервер при запуске выполняет DNS-запрос, и результат сохраняется, пока сервер работает. Если DNS-запрос завершается ошибкой, сервер не запускается. Если вы изменили DNS-запись, перезапустите сервер. | - |
port | TCP-порт для обмена сообщениями (tcp_port в конфигурации, обычно равен 9000). Не следует путать с http_port. | - |
user | Имя пользователя для подключения к удалённому серверу. У этого пользователя должны быть права доступа для подключения к указанному серверу. Доступ настраивается в файле users.xml. Дополнительные сведения см. в разделе Права доступа. | default |
password | Пароль для подключения к удалённому серверу (не маскируется). | ” |
secure | Следует ли использовать защищённое SSL/TLS‑соединение. Обычно также требуется указать порт (порт по умолчанию для защищённого соединения — 9440). Сервер должен прослушивать <tcp_port_secure>9440</tcp_port_secure> и быть настроен с корректными сертификатами. | false |
compression | Использовать сжатие данных. | true |
bind_host | Исходный адрес, который следует использовать при подключении к удалённому серверу с этого узла. Поддерживается только IPv4-адрес. Предназначено для сложных сценариев развертывания, когда необходимо задать исходный IP-адрес, используемый ClickHouse для распределённых запросов. | - |
system.clusters.
Движок Distributed позволяет работать с кластером как с локальным сервером. Однако конфигурацию кластера нельзя задавать динамически, её нужно настраивать в конфигурационном файле сервера. Обычно все серверы в кластере имеют одинаковую конфигурацию кластера (хотя это и не обязательно). Кластеры из конфигурационного файла обновляются на лету, без перезапуска сервера.
Если вам нужно каждый раз отправлять запрос неизвестному набору сегментов и реплик, создавать таблицу Distributed не нужно — вместо этого используйте табличную функцию remote. См. раздел Табличные функции.
Запись данных
INSERT в удалённые таблицы кластера, на которые указывает distributed таблица Distributed. Это наиболее гибкое решение, поскольку позволяет использовать любую схему шардирования, даже нетривиальную, если этого требует предметная область. Кроме того, это и наиболее оптимальное решение, так как данные можно записывать в разные сегменты полностью независимо друг от друга.
Во-вторых, можно выполнять операторы INSERT в distributed таблицу Distributed. В этом случае таблица сама распределяет вставленные данные по серверам. Чтобы записывать данные в distributed таблицу Distributed, у неё должен быть настроен параметр sharding_key (кроме случая, когда сегмент только один).
Для каждого сегмента в конфигурационном файле можно определить <weight>. По умолчанию вес равен 1. Данные распределяются по сегментам в объёме, пропорциональном весу сегмента. Все веса сегментов суммируются, затем вес каждого сегмента делится на общую сумму, чтобы определить долю каждого сегмента. Например, если есть два сегмента, и первый имеет вес 1, а второй — вес 2, то в первый будет отправлена одна треть (1 / 3) вставленных строк, а во второй — две трети (2 / 3).
Для каждого сегмента в конфигурационном файле можно определить параметр internal_replication. Если этот параметр установлен в true, операция записи выбирает первую работоспособную реплику и записывает данные в неё. Используйте это, если таблицы, лежащие в основе distributed таблицы Distributed, являются реплицируемыми таблицами (например, используют любой из движков таблиц Replicated*MergeTree). Данные будут записаны в одну из реплик таблицы, а затем автоматически реплицированы на остальные реплики.
Если internal_replication установлен в false (значение по умолчанию), данные записываются во все реплики. В этом случае distributed таблица Distributed сама реплицирует данные. Это хуже, чем использование реплицируемых таблиц, поскольку согласованность реплик не проверяется, и со временем они будут содержать немного различающиеся данные.
Чтобы выбрать сегмент, в который будет отправлена строка данных, анализируется выражение шардирования, и берётся остаток от деления на общий вес сегментов. Строка отправляется в тот сегмент, которому соответствует полуинтервал остатков от prev_weights до prev_weights + weight, где prev_weights — это общий вес сегментов с меньшими номерами, а weight — вес данного сегмента. Например, если есть два сегмента, и первый имеет вес 9, а второй — вес 10, то строка будет отправлена в первый сегмент для остатков из диапазона [0, 9), а во второй — для остатков из диапазона [9, 19).
Выражением шардирования может быть любое выражение из констант и столбцов таблицы, возвращающее целое число. Например, можно использовать выражение rand() для случайного распределения данных или UserID для распределения по остатку от деления идентификатора пользователя (тогда данные одного пользователя будут находиться на одном сегменте, что упрощает выполнение IN и JOIN по пользователям). Если один из столбцов распределён недостаточно равномерно, его можно обернуть в хеш-функцию, например intHash64(UserID).
Простой остаток от деления — ограниченное решение для шардирования, и оно подходит не всегда. Оно работает для средних и больших объёмов данных (десятки серверов), но не для очень больших объёмов данных (сотни серверов и более). В последнем случае используйте схему шардирования, подходящую для предметной области, а не таблицы Distributed.
О схеме шардирования следует задуматься в следующих случаях:
- Используются запросы, требующие объединения данных (
INилиJOIN) по определённому ключу. Если данные шардированы по этому ключу, можно использовать локальныеINилиJOINвместоGLOBAL INилиGLOBAL JOIN, что значительно эффективнее. - Используется большое количество серверов (сотни и более) с большим числом небольших запросов, например запросов к данным отдельных клиентов (например, веб-сайтов, рекламодателей или партнёров). Чтобы небольшие запросы не влияли на весь кластер, имеет смысл размещать данные одного клиента на одном сегменте. Либо можно настроить двухуровневое шардирование: разделить весь кластер на «слои», где слой может состоять из нескольких сегментов. Данные одного клиента располагаются на одном слое, но при необходимости в слой можно добавлять сегменты, а данные внутри них распределяются случайным образом. Таблицы
Distributedсоздаются для каждого слоя, а для глобальных запросов создаётся одна общая distributed таблица.
Distributed отправляет каждый файл со вставленными данными отдельно, но вы можете включить батч-отправку файлов с помощью настройки distributed_background_insert_batch. Эта настройка повышает производительность кластера за счёт более эффективного использования ресурсов локального сервера и сети. Следует проверять, что данные отправляются успешно, просматривая список файлов (данных, ожидающих отправки) в каталоге таблицы: /var/lib/clickhouse/data/database/table/. Количество потоков, выполняющих фоновые задачи, можно задать с помощью настройки background_distributed_schedule_pool_size.
Если после INSERT в distributed таблицу Distributed сервер вышел из строя или был аварийно перезапущен (например, из-за аппаратного сбоя), вставленные данные могут быть потеряны. Если в каталоге таблицы обнаружена повреждённая часть данных, она переносится в подкаталог broken и больше не используется.
Чтение данных
Distributed запросы SELECT отправляются во все сегменты и работают независимо от того, как данные распределены между сегментами (они могут быть распределены совершенно случайным образом). При добавлении нового сегмента переносить в него старые данные не требуется. Вместо этого можно записывать в него новые данные, задав больший вес: данные будут распределены немного неравномерно, но запросы продолжат работать корректно и эффективно.
Когда включена опция max_parallel_replicas, обработка запроса распараллеливается между всеми репликами в пределах одного сегмента. Дополнительные сведения см. в разделе max_parallel_replicas.
Чтобы узнать больше о том, как обрабатываются распределенные запросы in и global in, см. эту документацию.
Виртуальные столбцы
_Shard_num
_shard_num — содержит значение shard_num из таблицы system.clusters. Тип: UInt32.
Поскольку табличные функции
remote и [cluster](../../../sql-reference/table-functions/cluster.md) внутренне создают временную distributed таблицу, _shard_num` также доступен и в них.- Описание виртуальных столбцов
- настройка
background_distributed_schedule_pool_size - функции
shardNum()иshardCount()