Crear una tabla
A partir de una tabla
Parámetros de Distributed
| Parámetro | Descripción |
|---|---|
cluster | El nombre del clúster en el archivo de configuración del servidor |
database | El nombre de una base de datos remota |
table | El nombre de una tabla remota |
sharding_key (Opcional) | La clave de fragmentación. Es necesario especificar sharding_key para lo siguiente:
|
policy_name (Opcional) | El nombre de la política; se usará para almacenar archivos temporales para el envío en segundo plano |
- ajuste distributed_foreground_insert
- MergeTree para ver ejemplos
Configuración de Distributed
| Setting | Description | Default value |
|---|---|---|
fsync_after_insert | Realiza fsync de los datos del archivo después de una inserción en segundo plano en Distributed. Garantiza que el SO haya volcado todos los datos insertados a un archivo en el disco del nodo iniciador. | false |
fsync_directories | Realiza fsync en los directorios. Garantiza que el SO haya actualizado los metadatos del directorio después de las operaciones relacionadas con las inserciones en segundo plano en una tabla distribuida (después de insertar, después de enviar los datos al segmento, etc.). | false |
skip_unavailable_shards | Si es true, ClickHouse omite silenciosamente los segmentos no disponibles. Un segmento se marca como no disponible cuando: 1) No se puede acceder al segmento debido a un fallo de conexión. 2) El segmento no se puede resolver mediante DNS. 3) La tabla no existe en el segmento. | false |
bytes_to_throw_insert | Si los bytes comprimidos pendientes de un INSERT en segundo plano superan este valor, se lanzará una excepción. 0: no lanzar ninguna excepción. | 0 |
bytes_to_delay_insert | Si los bytes comprimidos pendientes de un INSERT en segundo plano superan este valor, la consulta se retrasará. 0: no retrasar. | 0 |
max_delay_to_insert | Retraso máximo, en segundos, para insertar datos en una tabla distribuida si hay muchos bytes pendientes de envío en segundo plano. | 60 |
background_insert_batch | Lo mismo que distributed_background_insert_batch | 0 |
background_insert_split_batch_on_failure | Lo mismo que distributed_background_insert_split_batch_on_failure | 0 |
background_insert_sleep_time_ms | Lo mismo que distributed_background_insert_sleep_time_ms | 0 |
background_insert_max_sleep_time_ms | Lo mismo que distributed_background_insert_max_sleep_time_ms | 0 |
flush_on_detach | Vuelca los datos a los nodos remotos en DETACH/DROP/apagado del servidor. | true |
Configuración de durabilidad (
fsync_...):- Afecta solo a los
INSERTen segundo plano (es decir,distributed_foreground_insert=false) cuando los datos se almacenan primero en el disco del nodo iniciador y, más tarde, se envían en segundo plano a los segmentos. - Puede reducir significativamente el rendimiento de
INSERT - Afecta a la escritura de los datos almacenados dentro de la carpeta de la tabla distribuida en el nodo que aceptó la inserción. Si necesitas garantías de escritura de datos en las tablas MergeTree subyacentes, consulta la configuración de durabilidad (
...fsync...) ensystem.merge_tree_settings
..._insert), consulta también:- la configuración
distributed_foreground_insert - la configuración
prefer_localhost_replica bytes_to_throw_insertse procesa antes quebytes_to_delay_insert, por lo que no debes establecerlo en un valor inferior abytes_to_delay_insert
logs, desde la tabla default.hits ubicada en cada servidor del clúster. Los datos no solo se leen, sino que también se procesan parcialmente en los servidores remotos (en la medida de lo posible). Por ejemplo, en una consulta con GROUP BY, los datos se agregarán en los servidores remotos y los estados intermedios de las funciones de agregación se enviarán al servidor solicitante. Después, los datos se agregarán de nuevo.
En lugar del nombre de la base de datos, puede usar una expresión constante que devuelva una cadena. Por ejemplo: currentDatabase().
Clústeres
logs que consta de dos segmentos, cada uno de los cuales contiene dos réplicas. Los segmentos son los servidores que contienen distintas partes de los datos (para leer todos los datos, debe acceder a todos los segmentos). Las réplicas son servidores duplicados (para leer todos los datos, puede acceder a los datos desde cualquiera de las réplicas).
Los nombres de los clústeres no deben contener puntos.
Los parámetros host, port y, opcionalmente, user, password, secure, compression, bind_host se especifican para cada servidor:
| Parámetro | Descripción | Valor predeterminado |
|---|---|---|
host | La dirección del servidor remoto. Puede usar el dominio o una dirección IPv4 o IPv6. Si especifica el dominio, el servidor realiza una solicitud DNS al iniciarse y el resultado se conserva mientras el servidor esté en ejecución. Si la solicitud DNS falla, el servidor no se inicia. Si cambia el registro DNS, reinicie el servidor. | - |
port | El puerto TCP para la actividad de mensajería (tcp_port en la configuración, normalmente establecido en 9000). No debe confundirse con http_port. | - |
user | Nombre del usuario para conectarse a un servidor remoto. Este usuario debe tener permisos de acceso para conectarse al servidor especificado. El acceso se configura en el archivo users.xml. Para obtener más información, consulte la sección Permisos de acceso. | default |
password | La contraseña para conectarse a un servidor remoto (no enmascarada). | ” |
secure | Indica si se debe usar una conexión SSL/TLS segura. Normalmente también requiere especificar el puerto (el puerto seguro predeterminado es 9440). El servidor debe escuchar en <tcp_port_secure>9440</tcp_port_secure> y estar configurado con los certificados correctos. | false |
compression | Utiliza compresión de datos. | true |
bind_host | La dirección de origen que se usará al conectarse al servidor remoto desde este nodo. Solo se admiten direcciones IPv4. Está pensado para casos de uso avanzados de despliegue en los que es necesario establecer la dirección IP de origen que usan las consultas distribuidas de ClickHouse. | - |
remote, en lugar de distributed) o cualquier número de segmentos. En cada segmento, puede especificar desde una hasta cualquier número de réplicas. Puede especificar un número diferente de réplicas para cada segmento.
Puede especificar tantos clústeres como desee en la configuración.
Para ver sus clústeres, use la tabla system.clusters.
El motor Distributed permite trabajar con un clúster como si fuera un servidor local. Sin embargo, la configuración del clúster no puede especificarse dinámicamente; debe configurarse en el archivo de configuración del servidor. Normalmente, todos los servidores de un clúster tendrán la misma configuración de clúster (aunque no es obligatorio). Los clústeres del archivo de configuración se actualizan sobre la marcha, sin reiniciar el servidor.
Si necesita enviar una consulta a un conjunto desconocido de segmentos y réplicas cada vez, no necesita crear una tabla Distributed; use en su lugar la función de tabla remote. Consulte la sección Funciones de tabla.
Escritura de datos
INSERT directamente en las tablas remotas del clúster a las que apunta la tabla distribuida. Esta es la solución más flexible, ya que puede usar cualquier esquema de sharding, incluso uno no trivial debido a los requisitos del dominio. También es la solución más óptima, ya que los datos pueden escribirse en distintos segmentos de forma completamente independiente.
Segundo, puede ejecutar sentencias INSERT en una tabla distribuida. En este caso, la tabla distribuirá por sí misma los datos insertados entre los servidores. Para poder escribir en una tabla distribuida, debe tener configurado el parámetro sharding_key (excepto si solo hay un segmento).
Cada segmento puede tener un <weight> definido en el archivo de configuración. De forma predeterminada, el peso es 1. Los datos se distribuyen entre los segmentos en una cantidad proporcional al peso del segmento. Se suman todos los pesos de los segmentos y, después, el peso de cada segmento se divide por el total para determinar la proporción correspondiente. Por ejemplo, si hay dos segmentos y el primero tiene un peso de 1 mientras que el segundo tiene un peso de 2, al primero se le enviará un tercio (1 / 3) de las filas insertadas y al segundo se le enviarán dos tercios (2 / 3).
Cada segmento puede tener definido el parámetro internal_replication en el archivo de configuración. Si este parámetro se establece en true, la operación de escritura selecciona la primera réplica en buen estado y escribe los datos en ella. Úselo si las tablas subyacentes a la tabla distribuida son tablas replicadas (por ejemplo, cualquiera de los motores de tabla Replicated*MergeTree). Una de las réplicas de la tabla recibirá la escritura, y esta se replicará automáticamente en las demás réplicas.
Si internal_replication se establece en false (el valor predeterminado), los datos se escriben en todas las réplicas. En este caso, la tabla distribuida replica por sí misma los datos. Esto es peor que usar tablas replicadas porque no se comprueba la consistencia de las réplicas y, con el tiempo, contendrán datos ligeramente distintos.
Para seleccionar el segmento al que se envía una fila de datos, se analiza la expresión de sharding y se toma el resto de dividirla por el peso total de los segmentos. La fila se envía al segmento que corresponde al semiintervalo de restos desde prev_weights hasta prev_weights + weight, donde prev_weights es el peso total de los segmentos con el número más pequeño, y weight es el peso de este segmento. Por ejemplo, si hay dos segmentos y el primero tiene un peso de 9 mientras que el segundo tiene un peso de 10, la fila se enviará al primer segmento para los restos del rango [0, 9), y al segundo para los restos del rango [9, 19).
La expresión de sharding puede ser cualquier expresión formada por constantes y columnas de tabla que devuelva un entero. Por ejemplo, puede usar la expresión rand() para distribuir los datos aleatoriamente, o UserID para distribuirlos según el resto de dividir el ID del usuario (de este modo, los datos de un mismo usuario residirán en un único segmento, lo que simplifica la ejecución de IN y JOIN por usuario). Si una de las columnas no se distribuye de forma suficientemente uniforme, puede envolverla en una función hash, por ejemplo intHash64(UserID).
Un simple resto de la división es una solución limitada para el sharding y no siempre es adecuada. Funciona para volúmenes de datos medianos y grandes (decenas de servidores), pero no para volúmenes de datos muy grandes (cientos de servidores o más). En este último caso, utilice el esquema de sharding que requiera el dominio en lugar de basarse en las entradas de las tablas distribuidas.
Debe prestar atención al esquema de sharding en los siguientes casos:
- Se usan consultas que requieren unir datos (
INoJOIN) por una clave específica. Si los datos están fragmentados por esta clave, puede usarINoJOINlocal en lugar deGLOBAL INoGLOBAL JOIN, lo cual es mucho más eficiente. - Se usa un gran número de servidores (cientos o más) con un gran número de consultas pequeñas, por ejemplo, consultas de datos de clientes individuales (p. ej., sitios web, anunciantes o socios). Para que las consultas pequeñas no afecten a todo el clúster, tiene sentido ubicar los datos de un único cliente en un único segmento. Como alternativa, puede configurar un sharding de dos niveles: dividir todo el clúster en “capas”, donde una capa puede constar de múltiples segmentos. Los datos de un único cliente se ubican en una sola capa, pero se pueden añadir segmentos a una capa según sea necesario, y los datos se distribuyen aleatoriamente dentro de ella. Se crean tablas distribuidas para cada capa, y se crea una única tabla distribuida compartida para las consultas globales.
Distributed envía por separado cada archivo con datos insertados, pero puede habilitar el envío por lotes de archivos con el ajuste distributed_background_insert_batch. Este ajuste mejora el rendimiento del clúster al aprovechar mejor los recursos del servidor local y de la red. Debe comprobar si los datos se envían correctamente revisando la lista de archivos (datos pendientes de envío) en el directorio de la tabla: /var/lib/clickhouse/data/database/table/. El número de hilos que ejecutan tareas en segundo plano puede establecerse con el ajuste background_distributed_schedule_pool_size.
Si el servidor dejó de estar disponible o sufrió un reinicio brusco (por ejemplo, debido a un fallo de hardware) después de un INSERT en una tabla distribuida, los datos insertados podrían perderse. Si se detecta una parte de datos dañada en el directorio de la tabla, se transfiere al subdirectorio broken y deja de utilizarse.
Lectura de datos
SELECT se envían a todos los segmentos y funcionan independientemente de cómo estén distribuidos los datos entre ellos (incluso pueden distribuirse completamente al azar). Cuando se añade un nuevo segmento, no es necesario transferirle los datos antiguos. En su lugar, puede escribir datos nuevos en él asignándole un mayor peso; los datos se distribuirán de forma ligeramente desigual, pero las consultas seguirán funcionando de forma correcta y eficiente.
Cuando la opción max_parallel_replicas está habilitada, el procesamiento de consultas se paraleliza en todas las réplicas dentro de un mismo segmento. Para obtener más información, consulte la sección max_parallel_replicas.
Para obtener más información sobre cómo se procesan las consultas distribuidas in y global in, consulte esta documentación.
Columnas virtuales
_Shard_num
_shard_num — Contiene el valor shard_num de la tabla system.clusters. Tipo: UInt32.
Como las funciones de tabla
remote y [cluster](../../../sql-reference/table-functions/cluster.md) crean internamente una tabla Distributed temporal, _shard_num` también está disponible allí.- Descripción de las columnas virtuales
- Ajuste
background_distributed_schedule_pool_size - Funciones
shardNum()yshardCount()