Criando uma tabela
Com base em uma tabela
Distributed aponta para uma tabela no servidor atual, você pode adotar o schema dessa tabela:
Parâmetros da Distributed
| Parâmetro | Descrição |
|---|---|
cluster | O nome do cluster no arquivo de configuração do servidor |
database | O nome de um banco de dados remoto |
table | O nome de uma tabela remota |
sharding_key (Opcional) | A chave de sharding. Especificar a sharding_key é necessário para o seguinte:
|
policy_name (Opcional) | O nome da política; ele será usado para armazenar arquivos temporários para envio em segundo plano |
- configuração distributed_foreground_insert
- MergeTree para ver exemplos
Configurações do Distributed
| Configuração | Descrição | Valor padrão |
|---|---|---|
fsync_after_insert | Executa fsync nos dados do arquivo após a inserção em segundo plano no Distributed. Garante que o SO gravou todos os dados inseridos em um arquivo no disco do nó iniciador. | false |
fsync_directories | Executa fsync nos diretórios. Garante que o SO atualizou os metadados do diretório após operações relacionadas a INSERTs em segundo plano na tabela Distributed (após a inserção, após enviar os dados para o shard etc.). | false |
skip_unavailable_shards | Se true, o ClickHouse ignora silenciosamente shards indisponíveis. Um shard é marcado como indisponível quando: 1) Não é possível alcançá-lo devido a uma falha de conexão. 2) O shard não pode ser resolvido via DNS. 3) A tabela não existe no shard. | false |
bytes_to_throw_insert | Se mais do que esse número de bytes comprimidos ficar pendente para INSERT em segundo plano, uma exceção será lançada. 0 - não lançar. | 0 |
bytes_to_delay_insert | Se mais do que esse número de bytes comprimidos ficar pendente para INSERT em segundo plano, a consulta será atrasada. 0 - não atrasar. | 0 |
max_delay_to_insert | Atraso máximo para inserir dados na tabela Distributed, em segundos, se houver muitos bytes pendentes para envio em segundo plano. | 60 |
background_insert_batch | O mesmo que distributed_background_insert_batch | 0 |
background_insert_split_batch_on_failure | O mesmo que distributed_background_insert_split_batch_on_failure | 0 |
background_insert_sleep_time_ms | O mesmo que distributed_background_insert_sleep_time_ms | 0 |
background_insert_max_sleep_time_ms | O mesmo que distributed_background_insert_max_sleep_time_ms | 0 |
flush_on_detach | Grava os dados em nós remotos em DETACH/DROP/desligamento do servidor. | true |
Configurações de durabilidade (
fsync_...):- Afetam apenas
INSERTs em segundo plano (ou seja,distributed_foreground_insert=false) quando os dados são armazenados pela primeira vez no disco do nó iniciador e, depois, em segundo plano, quando são enviados aos shards. - Podem reduzir significativamente o desempenho de
INSERT - Afetam a gravação dos dados armazenados na pasta da tabela distribuída no nó que aceitou seu insert. Se você precisa de garantias de gravação dos dados nas tabelas MergeTree subjacentes, consulte as configurações de durabilidade (
...fsync...) emsystem.merge_tree_settings
..._insert), veja também:- configuração
distributed_foreground_insert - configuração
prefer_localhost_replica bytes_to_throw_inserté tratado antes debytes_to_delay_insert, portanto você não deve defini-lo com um valor menor quebytes_to_delay_insert
logs, a partir da tabela default.hits localizada em cada servidor do cluster. Os dados não são apenas lidos, mas também parcialmente processados nos servidores remotos (na medida do possível). Por exemplo, em uma consulta com GROUP BY, os dados serão agregados nos servidores remotos, e os estados intermediários das funções de agregação serão enviados ao servidor solicitante. Em seguida, os dados serão agregados novamente.
Em vez do nome do banco de dados, você pode usar uma expressão constante que retorne uma string. Por exemplo: currentDatabase().
Clusters
logs, consistindo em dois shards, cada um com duas réplicas. Os shards se referem aos servidores que contêm diferentes partes dos dados (para ler todos os dados, você precisa acessar todos os shards). As réplicas são servidores duplicados (para ler todos os dados, você pode acessar os dados em qualquer uma das réplicas).
Os nomes dos clusters não devem conter pontos.
Os parâmetros host, port e, opcionalmente, user, password, secure, compression, bind_host são especificados para cada servidor:
| Parâmetro | Descrição | Valor padrão |
|---|---|---|
host | O endereço do servidor remoto. Você pode usar o domínio ou um endereço IPv4 ou IPv6. Se especificar o domínio, o servidor fará uma solicitação DNS ao iniciar, e o resultado será armazenado enquanto o servidor estiver em execução. Se a solicitação DNS falhar, o servidor não iniciará. Se você alterar o registro DNS, reinicie o servidor. | - |
port | A porta TCP para atividade de mensageria (tcp_port na config, geralmente definida como 9000). Não confundir com http_port. | - |
user | Nome do usuário para se conectar a um servidor remoto. Esse usuário deve ter acesso para se conectar ao servidor especificado. O acesso é configurado no arquivo users.xml. Para mais informações, consulte a seção Direitos de acesso. | default |
password | A senha para se conectar a um servidor remoto (não mascarada). | ” |
secure | Define se deve ser usada uma conexão SSL/TLS segura. Normalmente, isso também exige especificar a porta (a porta segura padrão é 9440). O servidor deve escutar em <tcp_port_secure>9440</tcp_port_secure> e estar configurado com os certificados corretos. | false |
compression | Usa compressão de dados. | true |
bind_host | O endereço de origem a ser usado ao se conectar ao servidor remoto a partir deste nó. Compatível apenas com endereço IPv4. Destina-se a casos de uso avançados de implantação em que é necessário definir o endereço IP de origem usado pelas consultas distribuídas do ClickHouse. | - |
system.clusters.
O motor Distributed permite trabalhar com um cluster como se fosse um servidor local. No entanto, a configuração do cluster não pode ser especificada dinamicamente; ela precisa ser definida no arquivo de configuração do servidor. Normalmente, todos os servidores em um cluster terão a mesma configuração de cluster (embora isso não seja obrigatório). Os clusters do arquivo de configuração são atualizados em tempo real, sem reiniciar o servidor.
Se você precisar enviar uma consulta para um conjunto desconhecido de shards e réplicas a cada vez, não é necessário criar uma tabela Distributed — use a table function remote em vez disso. Veja a seção Funções de tabela.
Gravação de dados
INSERT diretamente nas tabelas remotas do cluster para as quais a tabela Distributed aponta. Essa é a solução mais flexível, pois você pode usar qualquer esquema de sharding, inclusive um esquema não trivial, conforme os requisitos do domínio em questão. Essa também é a solução mais eficiente, já que os dados podem ser gravados em diferentes shards de forma totalmente independente.
Segundo, você pode executar instruções INSERT em uma tabela Distributed. Nesse caso, a própria tabela distribuirá os dados inseridos entre os servidores. Para gravar em uma tabela Distributed, ela precisa ter o parâmetro sharding_key configurado (exceto se houver apenas um shard).
Cada shard pode ter um <weight> definido no arquivo de configuração. Por padrão, o peso é 1. Os dados são distribuídos entre os shards em quantidade proporcional ao peso do shard. Todos os pesos dos shards são somados; em seguida, o peso de cada shard é dividido pelo total para determinar a proporção correspondente. Por exemplo, se houver dois shards e o primeiro tiver peso 1, enquanto o segundo tiver peso 2, um terço (1 / 3) das linhas inseridas será enviado ao primeiro, e dois terços (2 / 3) ao segundo.
Cada shard pode ter o parâmetro internal_replication definido no arquivo de configuração. Se esse parâmetro for definido como true, a operação de gravação seleciona a primeira réplica saudável e grava os dados nela. Use isso se as tabelas subjacentes à tabela Distributed forem tabelas replicadas (por exemplo, qualquer um dos motores de tabela Replicated*MergeTree). Uma das réplicas da tabela receberá a gravação, e ela será replicada automaticamente para as outras réplicas.
Se internal_replication for definido como false (o padrão), os dados serão gravados em todas as réplicas. Nesse caso, a tabela Distributed replica os dados por conta própria. Isso é pior do que usar tabelas replicadas, porque a consistência das réplicas não é verificada e, com o tempo, elas conterão dados ligeiramente diferentes.
Para selecionar o shard para o qual uma linha de dados será enviada, a expressão de sharding é analisada, e calcula-se o resto da divisão dela pelo peso total dos shards. A linha é enviada ao shard que corresponde ao intervalo semiaberto dos restos de prev_weights até prev_weights + weight, em que prev_weights é o peso total dos shards com número menor, e weight é o peso deste shard. Por exemplo, se houver dois shards, e o primeiro tiver peso 9, enquanto o segundo tiver peso 10, a linha será enviada ao primeiro shard para os restos no intervalo [0, 9) e ao segundo para os restos no intervalo [9, 19).
A expressão de sharding pode ser qualquer expressão composta por constantes e colunas da tabela que retorne um inteiro. Por exemplo, você pode usar a expressão rand() para distribuir os dados aleatoriamente, ou UserID para distribuição pelo resto da divisão do ID do usuário (nesse caso, os dados de um único usuário ficarão em um único shard, o que simplifica a execução de IN e JOIN por usuários). Se uma das colunas não tiver distribuição suficientemente uniforme, você pode envolvê-la em uma função hash, por exemplo intHash64(UserID).
Usar apenas o resto da divisão é uma solução limitada para sharding e nem sempre é adequado. Ela funciona para volumes médios e grandes de dados (dezenas de servidores), mas não para volumes muito grandes de dados (centenas de servidores ou mais). Neste último caso, use o esquema de sharding exigido pelo domínio em questão, em vez de usar entradas em tabelas Distributed.
Você deve se preocupar com o esquema de sharding nos seguintes casos:
- São usadas consultas que exigem combinar dados (
INouJOIN) por uma chave específica. Se os dados estiverem distribuídos entre shards por essa chave, você poderá usarINouJOINlocal em vez deGLOBAL INouGLOBAL JOIN, o que é muito mais eficiente. - Um grande número de servidores é usado (centenas ou mais) com um grande número de consultas pequenas, por exemplo, consultas sobre dados de clientes individuais (como sites, anunciantes ou parceiros). Para que as consultas pequenas não afetem o cluster inteiro, faz sentido manter os dados de um único cliente em um único shard. Como alternativa, você pode configurar um sharding em dois níveis: dividir o cluster inteiro em “camadas”, em que uma camada pode consistir em vários shards. Os dados de um único cliente ficam em uma única camada, mas shards podem ser adicionados a uma camada conforme necessário, e os dados são distribuídos aleatoriamente dentro dela. Tabelas
Distributedsão criadas para cada camada, e uma única tabela distribuída compartilhada é criada para consultas globais.
Distributed envia separadamente cada arquivo com dados inseridos, mas você pode habilitar o envio em lote de arquivos com a configuração distributed_background_insert_batch. Essa configuração melhora o desempenho do cluster ao aproveitar melhor os recursos do servidor local e da rede. Você deve verificar se os dados foram enviados com sucesso conferindo a lista de arquivos (dados aguardando envio) no diretório da tabela: /var/lib/clickhouse/data/database/table/. O número de threads que executam tarefas em segundo plano pode ser definido pela configuração background_distributed_schedule_pool_size.
Se o servidor não estiver mais disponível ou tiver sofrido uma reinicialização abrupta (por exemplo, devido a uma falha de hardware) após um INSERT em uma tabela Distributed, os dados inseridos poderão ser perdidos. Se uma parte de dados danificada for detectada no diretório da tabela, ela será transferida para o subdiretório broken e não será mais usada.
Leitura de dados
Distributed, as consultas SELECT são enviadas para todos os shards e funcionam independentemente de como os dados estão distribuídos entre eles (podem até estar distribuídos de forma totalmente aleatória). Quando você adiciona um novo shard, não precisa transferir os dados antigos para ele. Em vez disso, pode escrever novos dados nele atribuindo um peso maior — os dados ficarão distribuídos de forma ligeiramente desigual, mas as consultas continuarão funcionando de forma correta e eficiente.
Quando a opção max_parallel_replicas está ativada, o processamento de consultas é paralelizado em todas as réplicas dentro de um único shard. Para mais informações, consulte a seção max_parallel_replicas.
Para saber mais sobre como as consultas distribuídas in e global in são processadas, consulte esta documentação.
Colunas virtuais
_Shard_num
_shard_num — Contém o valor de shard_num da tabela system.clusters. Tipo: UInt32.
Como as funções de tabela
remote e [cluster](../../../sql-reference/table-functions/cluster.md) criam internamente uma tabela Distributed temporária, _shard_num` também fica disponível nelas.- Descrição das colunas virtuais
- Configuração
background_distributed_schedule_pool_size - Funções
shardNum()eshardCount()