Pular para o conteúdo principal
Motor Distributed no CloudPara criar um motor de tabela distribuída no ClickHouse Cloud, você pode usar as funções de tabela remote e remoteSecure. A sintaxe Distributed(...) não pode ser usada no ClickHouse Cloud.
As tabelas com o motor Distributed não armazenam dados próprios, mas permitem o processamento distribuído de consultas em vários servidores. A leitura é paralelizada automaticamente. Durante a leitura, os índices da tabela nos servidores remotos são usados, se existirem.

Criando uma tabela

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
[SETTINGS name=value, ...]

Com base em uma tabela

Quando a tabela Distributed aponta para uma tabela no servidor atual, você pode adotar o schema dessa tabela:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] AS [db2.]name2 ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]]) [SETTINGS name=value, ...]

Parâmetros da Distributed

ParâmetroDescrição
clusterO nome do cluster no arquivo de configuração do servidor
databaseO nome de um banco de dados remoto
tableO nome de uma tabela remota
sharding_key (Opcional)A chave de sharding.
Especificar a sharding_key é necessário para o seguinte:
  • Para INSERTs em uma tabela distribuída (pois o motor da tabela precisa da sharding_key para determinar como dividir os dados). No entanto, se a configuração insert_distributed_one_random_shard estiver habilitada, os INSERTs não precisarão da chave de sharding.
  • Para uso com optimize_skip_unused_shards, pois a sharding_key é necessária para determinar quais shards devem ser consultados
policy_name (Opcional)O nome da política; ele será usado para armazenar arquivos temporários para envio em segundo plano
Veja também

Configurações do Distributed

ConfiguraçãoDescriçãoValor padrão
fsync_after_insertExecuta fsync nos dados do arquivo após a inserção em segundo plano no Distributed. Garante que o SO gravou todos os dados inseridos em um arquivo no disco do nó iniciador.false
fsync_directoriesExecuta fsync nos diretórios. Garante que o SO atualizou os metadados do diretório após operações relacionadas a INSERTs em segundo plano na tabela Distributed (após a inserção, após enviar os dados para o shard etc.).false
skip_unavailable_shardsSe true, o ClickHouse ignora silenciosamente shards indisponíveis. Um shard é marcado como indisponível quando: 1) Não é possível alcançá-lo devido a uma falha de conexão. 2) O shard não pode ser resolvido via DNS. 3) A tabela não existe no shard.false
bytes_to_throw_insertSe mais do que esse número de bytes comprimidos ficar pendente para INSERT em segundo plano, uma exceção será lançada. 0 - não lançar.0
bytes_to_delay_insertSe mais do que esse número de bytes comprimidos ficar pendente para INSERT em segundo plano, a consulta será atrasada. 0 - não atrasar.0
max_delay_to_insertAtraso máximo para inserir dados na tabela Distributed, em segundos, se houver muitos bytes pendentes para envio em segundo plano.60
background_insert_batchO mesmo que distributed_background_insert_batch0
background_insert_split_batch_on_failureO mesmo que distributed_background_insert_split_batch_on_failure0
background_insert_sleep_time_msO mesmo que distributed_background_insert_sleep_time_ms0
background_insert_max_sleep_time_msO mesmo que distributed_background_insert_max_sleep_time_ms0
flush_on_detachGrava os dados em nós remotos em DETACH/DROP/desligamento do servidor.true
Configurações de durabilidade (fsync_...):
  • Afetam apenas INSERTs em segundo plano (ou seja, distributed_foreground_insert=false) quando os dados são armazenados pela primeira vez no disco do nó iniciador e, depois, em segundo plano, quando são enviados aos shards.
  • Podem reduzir significativamente o desempenho de INSERT
  • Afetam a gravação dos dados armazenados na pasta da tabela distribuída no nó que aceitou seu insert. Se você precisa de garantias de gravação dos dados nas tabelas MergeTree subjacentes, consulte as configurações de durabilidade (...fsync...) em system.merge_tree_settings
Para as configurações de limite de insert (..._insert), veja também:
Exemplo
CREATE TABLE hits_all AS hits
ENGINE = Distributed(logs, default, hits[, sharding_key[, policy_name]])
SETTINGS
    fsync_after_insert=0,
    fsync_directories=0;
Os dados serão lidos de todos os servidores do cluster logs, a partir da tabela default.hits localizada em cada servidor do cluster. Os dados não são apenas lidos, mas também parcialmente processados nos servidores remotos (na medida do possível). Por exemplo, em uma consulta com GROUP BY, os dados serão agregados nos servidores remotos, e os estados intermediários das funções de agregação serão enviados ao servidor solicitante. Em seguida, os dados serão agregados novamente. Em vez do nome do banco de dados, você pode usar uma expressão constante que retorne uma string. Por exemplo: currentDatabase().

Clusters

Os clusters são configurados no arquivo de configuração do servidor:
<remote_servers>
    <logs>
        <!-- Secret por cluster entre servidores para consultas distribuídas
             padrão: sem secret (nenhuma autenticação será realizada)

             Se definido, as consultas distribuídas serão validadas nos shards, portanto, ao menos:
             - tal cluster deve existir no shard,
             - tal cluster deve ter o mesmo secret.

             E também (e o que é mais importante), o initial_user será
             usado como usuário atual para a consulta.
        -->
        <!-- <secret></secret> -->
        
        <!-- Opcional. Se as consultas DDL distribuídas (cláusula ON CLUSTER) são permitidas para este cluster. Padrão: true (permitido). -->        
        <!-- <allow_distributed_ddl_queries>true</allow_distributed_ddl_queries> -->
        
        <shard>
            <!-- Opcional. Peso do shard ao gravar dados. Padrão: 1. -->
            <weight>1</weight>
            <!-- Opcional. O nome do shard. Deve ser não vazio e único entre os shards do cluster. Se não especificado, ficará vazio. -->
            <name>shard_01</name>
            <!-- Opcional. Se os dados devem ser gravados em apenas uma das réplicas. Padrão: false (gravar dados em todas as réplicas). -->
            <internal_replication>false</internal_replication>
            <replica>
                <!-- Opcional. Prioridade da réplica para balanceamento de carga (veja também a configuração load_balancing). Padrão: 1 (valor menor tem maior prioridade). -->
                <priority>1</priority>
                <host>example01-01-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-01-2</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <weight>2</weight>
            <name>shard_02</name>
            <internal_replication>false</internal_replication>
            <replica>
                <host>example01-02-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-02-2</host>
                <secure>1</secure>
                <port>9440</port>
            </replica>
        </shard>
    </logs>
</remote_servers>
Aqui, um cluster é definido com o nome logs, consistindo em dois shards, cada um com duas réplicas. Os shards se referem aos servidores que contêm diferentes partes dos dados (para ler todos os dados, você precisa acessar todos os shards). As réplicas são servidores duplicados (para ler todos os dados, você pode acessar os dados em qualquer uma das réplicas). Os nomes dos clusters não devem conter pontos. Os parâmetros host, port e, opcionalmente, user, password, secure, compression, bind_host são especificados para cada servidor:
ParâmetroDescriçãoValor padrão
hostO endereço do servidor remoto. Você pode usar o domínio ou um endereço IPv4 ou IPv6. Se especificar o domínio, o servidor fará uma solicitação DNS ao iniciar, e o resultado será armazenado enquanto o servidor estiver em execução. Se a solicitação DNS falhar, o servidor não iniciará. Se você alterar o registro DNS, reinicie o servidor.-
portA porta TCP para atividade de mensageria (tcp_port na config, geralmente definida como 9000). Não confundir com http_port.-
userNome do usuário para se conectar a um servidor remoto. Esse usuário deve ter acesso para se conectar ao servidor especificado. O acesso é configurado no arquivo users.xml. Para mais informações, consulte a seção Direitos de acesso.default
passwordA senha para se conectar a um servidor remoto (não mascarada).
secureDefine se deve ser usada uma conexão SSL/TLS segura. Normalmente, isso também exige especificar a porta (a porta segura padrão é 9440). O servidor deve escutar em <tcp_port_secure>9440</tcp_port_secure> e estar configurado com os certificados corretos.false
compressionUsa compressão de dados.true
bind_hostO endereço de origem a ser usado ao se conectar ao servidor remoto a partir deste nó. Compatível apenas com endereço IPv4. Destina-se a casos de uso avançados de implantação em que é necessário definir o endereço IP de origem usado pelas consultas distribuídas do ClickHouse.-
Ao especificar réplicas, uma das réplicas disponíveis será selecionada para cada shard durante a leitura. Você pode configurar o algoritmo de balanceamento de carga (a preferência de qual réplica acessar) — veja a configuração load_balancing. Se a conexão com o servidor não for estabelecida, será feita uma tentativa de conexão com um timeout curto. Se a conexão falhar, a próxima réplica será selecionada, e assim por diante para todas as réplicas. Se a tentativa de conexão falhar para todas as réplicas, ela será repetida da mesma forma várias vezes. Isso favorece a resiliência, mas não fornece tolerância a falhas completa: um servidor remoto pode aceitar a conexão, mas pode não funcionar ou funcionar mal. Você pode especificar apenas um shard (nesse caso, o processamento da consulta deve ser chamado de remote, e não de distribuído) ou quantos shards quiser. Em cada shard, você pode especificar de uma a quantas réplicas quiser. Você pode especificar um número diferente de réplicas para cada shard. Você pode especificar quantos clusters quiser na configuração. Para ver seus clusters, use a tabela system.clusters. O motor Distributed permite trabalhar com um cluster como se fosse um servidor local. No entanto, a configuração do cluster não pode ser especificada dinamicamente; ela precisa ser definida no arquivo de configuração do servidor. Normalmente, todos os servidores em um cluster terão a mesma configuração de cluster (embora isso não seja obrigatório). Os clusters do arquivo de configuração são atualizados em tempo real, sem reiniciar o servidor. Se você precisar enviar uma consulta para um conjunto desconhecido de shards e réplicas a cada vez, não é necessário criar uma tabela Distributed — use a table function remote em vez disso. Veja a seção Funções de tabela.

Gravação de dados

Há dois métodos para gravar dados em um cluster: Primeiro, você pode definir em quais servidores gravar quais dados e fazer a gravação diretamente em cada shard. Em outras palavras, executar instruções INSERT diretamente nas tabelas remotas do cluster para as quais a tabela Distributed aponta. Essa é a solução mais flexível, pois você pode usar qualquer esquema de sharding, inclusive um esquema não trivial, conforme os requisitos do domínio em questão. Essa também é a solução mais eficiente, já que os dados podem ser gravados em diferentes shards de forma totalmente independente. Segundo, você pode executar instruções INSERT em uma tabela Distributed. Nesse caso, a própria tabela distribuirá os dados inseridos entre os servidores. Para gravar em uma tabela Distributed, ela precisa ter o parâmetro sharding_key configurado (exceto se houver apenas um shard). Cada shard pode ter um <weight> definido no arquivo de configuração. Por padrão, o peso é 1. Os dados são distribuídos entre os shards em quantidade proporcional ao peso do shard. Todos os pesos dos shards são somados; em seguida, o peso de cada shard é dividido pelo total para determinar a proporção correspondente. Por exemplo, se houver dois shards e o primeiro tiver peso 1, enquanto o segundo tiver peso 2, um terço (1 / 3) das linhas inseridas será enviado ao primeiro, e dois terços (2 / 3) ao segundo. Cada shard pode ter o parâmetro internal_replication definido no arquivo de configuração. Se esse parâmetro for definido como true, a operação de gravação seleciona a primeira réplica saudável e grava os dados nela. Use isso se as tabelas subjacentes à tabela Distributed forem tabelas replicadas (por exemplo, qualquer um dos motores de tabela Replicated*MergeTree). Uma das réplicas da tabela receberá a gravação, e ela será replicada automaticamente para as outras réplicas. Se internal_replication for definido como false (o padrão), os dados serão gravados em todas as réplicas. Nesse caso, a tabela Distributed replica os dados por conta própria. Isso é pior do que usar tabelas replicadas, porque a consistência das réplicas não é verificada e, com o tempo, elas conterão dados ligeiramente diferentes. Para selecionar o shard para o qual uma linha de dados será enviada, a expressão de sharding é analisada, e calcula-se o resto da divisão dela pelo peso total dos shards. A linha é enviada ao shard que corresponde ao intervalo semiaberto dos restos de prev_weights até prev_weights + weight, em que prev_weights é o peso total dos shards com número menor, e weight é o peso deste shard. Por exemplo, se houver dois shards, e o primeiro tiver peso 9, enquanto o segundo tiver peso 10, a linha será enviada ao primeiro shard para os restos no intervalo [0, 9) e ao segundo para os restos no intervalo [9, 19). A expressão de sharding pode ser qualquer expressão composta por constantes e colunas da tabela que retorne um inteiro. Por exemplo, você pode usar a expressão rand() para distribuir os dados aleatoriamente, ou UserID para distribuição pelo resto da divisão do ID do usuário (nesse caso, os dados de um único usuário ficarão em um único shard, o que simplifica a execução de IN e JOIN por usuários). Se uma das colunas não tiver distribuição suficientemente uniforme, você pode envolvê-la em uma função hash, por exemplo intHash64(UserID). Usar apenas o resto da divisão é uma solução limitada para sharding e nem sempre é adequado. Ela funciona para volumes médios e grandes de dados (dezenas de servidores), mas não para volumes muito grandes de dados (centenas de servidores ou mais). Neste último caso, use o esquema de sharding exigido pelo domínio em questão, em vez de usar entradas em tabelas Distributed. Você deve se preocupar com o esquema de sharding nos seguintes casos:
  • São usadas consultas que exigem combinar dados (IN ou JOIN) por uma chave específica. Se os dados estiverem distribuídos entre shards por essa chave, você poderá usar IN ou JOIN local em vez de GLOBAL IN ou GLOBAL JOIN, o que é muito mais eficiente.
  • Um grande número de servidores é usado (centenas ou mais) com um grande número de consultas pequenas, por exemplo, consultas sobre dados de clientes individuais (como sites, anunciantes ou parceiros). Para que as consultas pequenas não afetem o cluster inteiro, faz sentido manter os dados de um único cliente em um único shard. Como alternativa, você pode configurar um sharding em dois níveis: dividir o cluster inteiro em “camadas”, em que uma camada pode consistir em vários shards. Os dados de um único cliente ficam em uma única camada, mas shards podem ser adicionados a uma camada conforme necessário, e os dados são distribuídos aleatoriamente dentro dela. Tabelas Distributed são criadas para cada camada, e uma única tabela distribuída compartilhada é criada para consultas globais.
Os dados são gravados em segundo plano. Quando inserido na tabela, o bloco de dados é gravado apenas no sistema de arquivos local. Os dados são enviados aos servidores remotos em segundo plano o mais rápido possível. A periodicidade do envio dos dados é gerenciada pelas configurações distributed_background_insert_sleep_time_ms e distributed_background_insert_max_sleep_time_ms. O motor Distributed envia separadamente cada arquivo com dados inseridos, mas você pode habilitar o envio em lote de arquivos com a configuração distributed_background_insert_batch. Essa configuração melhora o desempenho do cluster ao aproveitar melhor os recursos do servidor local e da rede. Você deve verificar se os dados foram enviados com sucesso conferindo a lista de arquivos (dados aguardando envio) no diretório da tabela: /var/lib/clickhouse/data/database/table/. O número de threads que executam tarefas em segundo plano pode ser definido pela configuração background_distributed_schedule_pool_size. Se o servidor não estiver mais disponível ou tiver sofrido uma reinicialização abrupta (por exemplo, devido a uma falha de hardware) após um INSERT em uma tabela Distributed, os dados inseridos poderão ser perdidos. Se uma parte de dados danificada for detectada no diretório da tabela, ela será transferida para o subdiretório broken e não será mais usada.

Leitura de dados

Ao consultar uma tabela Distributed, as consultas SELECT são enviadas para todos os shards e funcionam independentemente de como os dados estão distribuídos entre eles (podem até estar distribuídos de forma totalmente aleatória). Quando você adiciona um novo shard, não precisa transferir os dados antigos para ele. Em vez disso, pode escrever novos dados nele atribuindo um peso maior — os dados ficarão distribuídos de forma ligeiramente desigual, mas as consultas continuarão funcionando de forma correta e eficiente. Quando a opção max_parallel_replicas está ativada, o processamento de consultas é paralelizado em todas as réplicas dentro de um único shard. Para mais informações, consulte a seção max_parallel_replicas. Para saber mais sobre como as consultas distribuídas in e global in são processadas, consulte esta documentação.

Colunas virtuais

_Shard_num

_shard_num — Contém o valor de shard_num da tabela system.clusters. Tipo: UInt32.
Como as funções de tabela remote e [cluster](../../../sql-reference/table-functions/cluster.md) criam internamente uma tabela Distributed temporária, _shard_num` também fica disponível nelas.
Veja também
Última modificação em 10 de junho de 2026