Saltar al contenido principal
Motor Distributed en ClickHouse CloudPara crear un motor de tabla distribuida en ClickHouse Cloud, puede usar las funciones de tabla remote y remoteSecure. La sintaxis Distributed(...) no se puede usar en ClickHouse Cloud.
Las tablas con motor Distributed no almacenan datos propios, sino que permiten el procesamiento distribuido de consultas en varios servidores. La lectura se paraleliza automáticamente. Durante la lectura, se utilizan los índices de las tablas en los servidores remotos, si existen.

Crear una tabla

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
[SETTINGS name=value, ...]

A partir de una tabla

Cuando la tabla distribuida apunta a una tabla del servidor actual, puede adoptar el esquema de esa tabla:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] AS [db2.]name2 ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]]) [SETTINGS name=value, ...]

Parámetros de Distributed

ParámetroDescripción
clusterEl nombre del clúster en el archivo de configuración del servidor
databaseEl nombre de una base de datos remota
tableEl nombre de una tabla remota
sharding_key (Opcional)La clave de fragmentación.
Es necesario especificar sharding_key para lo siguiente:
  • Para realizar INSERTs en una tabla distribuida (ya que el motor de tabla necesita sharding_key para determinar cómo dividir los datos). Sin embargo, si está habilitado el ajuste insert_distributed_one_random_shard, los INSERTs no necesitan la clave de fragmentación.
  • Para usar optimize_skip_unused_shards, ya que sharding_key es necesario para determinar qué segmentos se deben consultar
policy_name (Opcional)El nombre de la política; se usará para almacenar archivos temporales para el envío en segundo plano
Véase también

Configuración de Distributed

SettingDescriptionDefault value
fsync_after_insertRealiza fsync de los datos del archivo después de una inserción en segundo plano en Distributed. Garantiza que el SO haya volcado todos los datos insertados a un archivo en el disco del nodo iniciador.false
fsync_directoriesRealiza fsync en los directorios. Garantiza que el SO haya actualizado los metadatos del directorio después de las operaciones relacionadas con las inserciones en segundo plano en una tabla distribuida (después de insertar, después de enviar los datos al segmento, etc.).false
skip_unavailable_shardsSi es true, ClickHouse omite silenciosamente los segmentos no disponibles. Un segmento se marca como no disponible cuando: 1) No se puede acceder al segmento debido a un fallo de conexión. 2) El segmento no se puede resolver mediante DNS. 3) La tabla no existe en el segmento.false
bytes_to_throw_insertSi los bytes comprimidos pendientes de un INSERT en segundo plano superan este valor, se lanzará una excepción. 0: no lanzar ninguna excepción.0
bytes_to_delay_insertSi los bytes comprimidos pendientes de un INSERT en segundo plano superan este valor, la consulta se retrasará. 0: no retrasar.0
max_delay_to_insertRetraso máximo, en segundos, para insertar datos en una tabla distribuida si hay muchos bytes pendientes de envío en segundo plano.60
background_insert_batchLo mismo que distributed_background_insert_batch0
background_insert_split_batch_on_failureLo mismo que distributed_background_insert_split_batch_on_failure0
background_insert_sleep_time_msLo mismo que distributed_background_insert_sleep_time_ms0
background_insert_max_sleep_time_msLo mismo que distributed_background_insert_max_sleep_time_ms0
flush_on_detachVuelca los datos a los nodos remotos en DETACH/DROP/apagado del servidor.true
Configuración de durabilidad (fsync_...):
  • Afecta solo a los INSERT en segundo plano (es decir, distributed_foreground_insert=false) cuando los datos se almacenan primero en el disco del nodo iniciador y, más tarde, se envían en segundo plano a los segmentos.
  • Puede reducir significativamente el rendimiento de INSERT
  • Afecta a la escritura de los datos almacenados dentro de la carpeta de la tabla distribuida en el nodo que aceptó la inserción. Si necesitas garantías de escritura de datos en las tablas MergeTree subyacentes, consulta la configuración de durabilidad (...fsync...) en system.merge_tree_settings
Para la configuración de límites de inserción (..._insert), consulta también:
Ejemplo
CREATE TABLE hits_all AS hits
ENGINE = Distributed(logs, default, hits[, sharding_key[, policy_name]])
SETTINGS
    fsync_after_insert=0,
    fsync_directories=0;
Los datos se leerán de todos los servidores del clúster logs, desde la tabla default.hits ubicada en cada servidor del clúster. Los datos no solo se leen, sino que también se procesan parcialmente en los servidores remotos (en la medida de lo posible). Por ejemplo, en una consulta con GROUP BY, los datos se agregarán en los servidores remotos y los estados intermedios de las funciones de agregación se enviarán al servidor solicitante. Después, los datos se agregarán de nuevo. En lugar del nombre de la base de datos, puede usar una expresión constante que devuelva una cadena. Por ejemplo: currentDatabase().

Clústeres

Los clústeres se configuran en el archivo de configuración del servidor:
<remote_servers>
    <logs>
        <!-- Secret entre servidores por clúster para consultas distribuidas
             predeterminado: sin secret (no se realizará autenticación)

             Si se establece, las consultas distribuidas serán validadas en los segmentos, por lo que al menos:
             - dicho clúster debe existir en el segmento,
             - dicho clúster debe tener el mismo secret.

             Además (y lo que es más importante), el initial_user se
             usará como usuario actual para la consulta.
        -->
        <!-- <secret></secret> -->
        
        <!-- Opcional. Indica si se permiten las consultas DDL distribuidas (cláusula ON CLUSTER) para este clúster. Predeterminado: true (permitido). -->        
        <!-- <allow_distributed_ddl_queries>true</allow_distributed_ddl_queries> -->
        
        <shard>
            <!-- Opcional. Peso del segmento al escribir datos. Predeterminado: 1. -->
            <weight>1</weight>
            <!-- Opcional. Nombre del segmento. Debe ser no vacío y único entre los segmentos del clúster. Si no se especifica, estará vacío. -->
            <name>shard_01</name>
            <!-- Opcional. Indica si se deben escribir datos en una sola réplica. Predeterminado: false (escribir datos en todas las réplicas). -->
            <internal_replication>false</internal_replication>
            <replica>
                <!-- Opcional. Prioridad de la réplica para el balanceo de carga (véase también la configuración load_balancing). Predeterminado: 1 (a menor valor, mayor prioridad). -->
                <priority>1</priority>
                <host>example01-01-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-01-2</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <weight>2</weight>
            <name>shard_02</name>
            <internal_replication>false</internal_replication>
            <replica>
                <host>example01-02-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-02-2</host>
                <secure>1</secure>
                <port>9440</port>
            </replica>
        </shard>
    </logs>
</remote_servers>
Aquí se define un clúster con el nombre logs que consta de dos segmentos, cada uno de los cuales contiene dos réplicas. Los segmentos son los servidores que contienen distintas partes de los datos (para leer todos los datos, debe acceder a todos los segmentos). Las réplicas son servidores duplicados (para leer todos los datos, puede acceder a los datos desde cualquiera de las réplicas). Los nombres de los clústeres no deben contener puntos. Los parámetros host, port y, opcionalmente, user, password, secure, compression, bind_host se especifican para cada servidor:
ParámetroDescripciónValor predeterminado
hostLa dirección del servidor remoto. Puede usar el dominio o una dirección IPv4 o IPv6. Si especifica el dominio, el servidor realiza una solicitud DNS al iniciarse y el resultado se conserva mientras el servidor esté en ejecución. Si la solicitud DNS falla, el servidor no se inicia. Si cambia el registro DNS, reinicie el servidor.-
portEl puerto TCP para la actividad de mensajería (tcp_port en la configuración, normalmente establecido en 9000). No debe confundirse con http_port.-
userNombre del usuario para conectarse a un servidor remoto. Este usuario debe tener permisos de acceso para conectarse al servidor especificado. El acceso se configura en el archivo users.xml. Para obtener más información, consulte la sección Permisos de acceso.default
passwordLa contraseña para conectarse a un servidor remoto (no enmascarada).
secureIndica si se debe usar una conexión SSL/TLS segura. Normalmente también requiere especificar el puerto (el puerto seguro predeterminado es 9440). El servidor debe escuchar en <tcp_port_secure>9440</tcp_port_secure> y estar configurado con los certificados correctos.false
compressionUtiliza compresión de datos.true
bind_hostLa dirección de origen que se usará al conectarse al servidor remoto desde este nodo. Solo se admiten direcciones IPv4. Está pensado para casos de uso avanzados de despliegue en los que es necesario establecer la dirección IP de origen que usan las consultas distribuidas de ClickHouse.-
Al especificar réplicas, se seleccionará una de las réplicas disponibles para cada uno de los segmentos al leer. Puede configurar el algoritmo de equilibrio de carga (la preferencia por la réplica a la que se accede); consulte la opción load_balancing. Si no se establece la conexión con el servidor, se realizará un intento de conexión con un tiempo de espera corto. Si la conexión falla, se seleccionará la siguiente réplica, y así sucesivamente con todas las réplicas. Si el intento de conexión falla para todas las réplicas, se repetirá de la misma manera varias veces. Esto favorece la resiliencia, pero no proporciona tolerancia a fallos completa: un servidor remoto puede aceptar la conexión, pero no funcionar o funcionar mal. Puede especificar solo uno de los segmentos (en este caso, el procesamiento de consultas debería llamarse remote, en lugar de distributed) o cualquier número de segmentos. En cada segmento, puede especificar desde una hasta cualquier número de réplicas. Puede especificar un número diferente de réplicas para cada segmento. Puede especificar tantos clústeres como desee en la configuración. Para ver sus clústeres, use la tabla system.clusters. El motor Distributed permite trabajar con un clúster como si fuera un servidor local. Sin embargo, la configuración del clúster no puede especificarse dinámicamente; debe configurarse en el archivo de configuración del servidor. Normalmente, todos los servidores de un clúster tendrán la misma configuración de clúster (aunque no es obligatorio). Los clústeres del archivo de configuración se actualizan sobre la marcha, sin reiniciar el servidor. Si necesita enviar una consulta a un conjunto desconocido de segmentos y réplicas cada vez, no necesita crear una tabla Distributed; use en su lugar la función de tabla remote. Consulte la sección Funciones de tabla.

Escritura de datos

Hay dos métodos para escribir datos en un clúster: Primero, puede definir en qué servidores escribir cada dato y realizar la escritura directamente en cada segmento. En otras palabras, ejecutar sentencias INSERT directamente en las tablas remotas del clúster a las que apunta la tabla distribuida. Esta es la solución más flexible, ya que puede usar cualquier esquema de sharding, incluso uno no trivial debido a los requisitos del dominio. También es la solución más óptima, ya que los datos pueden escribirse en distintos segmentos de forma completamente independiente. Segundo, puede ejecutar sentencias INSERT en una tabla distribuida. En este caso, la tabla distribuirá por sí misma los datos insertados entre los servidores. Para poder escribir en una tabla distribuida, debe tener configurado el parámetro sharding_key (excepto si solo hay un segmento). Cada segmento puede tener un <weight> definido en el archivo de configuración. De forma predeterminada, el peso es 1. Los datos se distribuyen entre los segmentos en una cantidad proporcional al peso del segmento. Se suman todos los pesos de los segmentos y, después, el peso de cada segmento se divide por el total para determinar la proporción correspondiente. Por ejemplo, si hay dos segmentos y el primero tiene un peso de 1 mientras que el segundo tiene un peso de 2, al primero se le enviará un tercio (1 / 3) de las filas insertadas y al segundo se le enviarán dos tercios (2 / 3). Cada segmento puede tener definido el parámetro internal_replication en el archivo de configuración. Si este parámetro se establece en true, la operación de escritura selecciona la primera réplica en buen estado y escribe los datos en ella. Úselo si las tablas subyacentes a la tabla distribuida son tablas replicadas (por ejemplo, cualquiera de los motores de tabla Replicated*MergeTree). Una de las réplicas de la tabla recibirá la escritura, y esta se replicará automáticamente en las demás réplicas. Si internal_replication se establece en false (el valor predeterminado), los datos se escriben en todas las réplicas. En este caso, la tabla distribuida replica por sí misma los datos. Esto es peor que usar tablas replicadas porque no se comprueba la consistencia de las réplicas y, con el tiempo, contendrán datos ligeramente distintos. Para seleccionar el segmento al que se envía una fila de datos, se analiza la expresión de sharding y se toma el resto de dividirla por el peso total de los segmentos. La fila se envía al segmento que corresponde al semiintervalo de restos desde prev_weights hasta prev_weights + weight, donde prev_weights es el peso total de los segmentos con el número más pequeño, y weight es el peso de este segmento. Por ejemplo, si hay dos segmentos y el primero tiene un peso de 9 mientras que el segundo tiene un peso de 10, la fila se enviará al primer segmento para los restos del rango [0, 9), y al segundo para los restos del rango [9, 19). La expresión de sharding puede ser cualquier expresión formada por constantes y columnas de tabla que devuelva un entero. Por ejemplo, puede usar la expresión rand() para distribuir los datos aleatoriamente, o UserID para distribuirlos según el resto de dividir el ID del usuario (de este modo, los datos de un mismo usuario residirán en un único segmento, lo que simplifica la ejecución de IN y JOIN por usuario). Si una de las columnas no se distribuye de forma suficientemente uniforme, puede envolverla en una función hash, por ejemplo intHash64(UserID). Un simple resto de la división es una solución limitada para el sharding y no siempre es adecuada. Funciona para volúmenes de datos medianos y grandes (decenas de servidores), pero no para volúmenes de datos muy grandes (cientos de servidores o más). En este último caso, utilice el esquema de sharding que requiera el dominio en lugar de basarse en las entradas de las tablas distribuidas. Debe prestar atención al esquema de sharding en los siguientes casos:
  • Se usan consultas que requieren unir datos (IN o JOIN) por una clave específica. Si los datos están fragmentados por esta clave, puede usar IN o JOIN local en lugar de GLOBAL IN o GLOBAL JOIN, lo cual es mucho más eficiente.
  • Se usa un gran número de servidores (cientos o más) con un gran número de consultas pequeñas, por ejemplo, consultas de datos de clientes individuales (p. ej., sitios web, anunciantes o socios). Para que las consultas pequeñas no afecten a todo el clúster, tiene sentido ubicar los datos de un único cliente en un único segmento. Como alternativa, puede configurar un sharding de dos niveles: dividir todo el clúster en “capas”, donde una capa puede constar de múltiples segmentos. Los datos de un único cliente se ubican en una sola capa, pero se pueden añadir segmentos a una capa según sea necesario, y los datos se distribuyen aleatoriamente dentro de ella. Se crean tablas distribuidas para cada capa, y se crea una única tabla distribuida compartida para las consultas globales.
Los datos se escriben en segundo plano. Cuando se insertan en la tabla, el bloque de datos simplemente se escribe en el sistema de archivos local. Los datos se envían a los servidores remotos en segundo plano lo antes posible. La periodicidad del envío de datos se controla mediante los ajustes distributed_background_insert_sleep_time_ms y distributed_background_insert_max_sleep_time_ms. El motor Distributed envía por separado cada archivo con datos insertados, pero puede habilitar el envío por lotes de archivos con el ajuste distributed_background_insert_batch. Este ajuste mejora el rendimiento del clúster al aprovechar mejor los recursos del servidor local y de la red. Debe comprobar si los datos se envían correctamente revisando la lista de archivos (datos pendientes de envío) en el directorio de la tabla: /var/lib/clickhouse/data/database/table/. El número de hilos que ejecutan tareas en segundo plano puede establecerse con el ajuste background_distributed_schedule_pool_size. Si el servidor dejó de estar disponible o sufrió un reinicio brusco (por ejemplo, debido a un fallo de hardware) después de un INSERT en una tabla distribuida, los datos insertados podrían perderse. Si se detecta una parte de datos dañada en el directorio de la tabla, se transfiere al subdirectorio broken y deja de utilizarse.

Lectura de datos

Al consultar una tabla distribuida, las consultas SELECT se envían a todos los segmentos y funcionan independientemente de cómo estén distribuidos los datos entre ellos (incluso pueden distribuirse completamente al azar). Cuando se añade un nuevo segmento, no es necesario transferirle los datos antiguos. En su lugar, puede escribir datos nuevos en él asignándole un mayor peso; los datos se distribuirán de forma ligeramente desigual, pero las consultas seguirán funcionando de forma correcta y eficiente. Cuando la opción max_parallel_replicas está habilitada, el procesamiento de consultas se paraleliza en todas las réplicas dentro de un mismo segmento. Para obtener más información, consulte la sección max_parallel_replicas. Para obtener más información sobre cómo se procesan las consultas distribuidas in y global in, consulte esta documentación.

Columnas virtuales

_Shard_num

_shard_num — Contiene el valor shard_num de la tabla system.clusters. Tipo: UInt32.
Como las funciones de tabla remote y [cluster](../../../sql-reference/table-functions/cluster.md) crean internamente una tabla Distributed temporal, _shard_num` también está disponible allí.
Véase también
Última modificación el 10 de junio de 2026