Saltar al contenido principal
Este motor proporciona integración con el ecosistema de Amazon S3 y permite la importación en streaming. Este motor es similar a los motores Kafka y RabbitMQ, pero ofrece características específicas de S3. Es importante entender esta nota del PR original de la implementación de S3Queue: cuando se vincula una MATERIALIZED VIEW al motor, el motor de tabla S3Queue comienza a recopilar datos en segundo plano.

CREATE table

CREATE TABLE s3_queue_engine_table (name String, value UInt32)
    ENGINE = S3Queue(path, [NOSIGN, | aws_access_key_id, aws_secret_access_key,] format, [compression], [headers], [extra_credentials])
    [SETTINGS]
    [mode = '',]
    [after_processing = 'keep',]
    [keeper_path = '',]
    [loading_retries = 0,]
    [processing_threads_num = 16,]
    [parallel_inserts = false,]
    [enable_logging_to_queue_log = true,]
    [last_processed_path = "",]
    [tracked_files_limit = 1000,]
    [tracked_file_ttl_sec = 0,]
    [polling_min_timeout_ms = 1000,]
    [polling_max_timeout_ms = 10000,]
    [polling_backoff_ms = 0,]
    [cleanup_interval_min_ms = 10000,]
    [cleanup_interval_max_ms = 30000,]
    [buckets = 0,]
    [list_objects_batch_size = 1000,]
    [enable_hash_ring_filtering = 0,]
    [max_processed_files_before_commit = 100,]
    [max_processed_rows_before_commit = 0,]
    [max_processed_bytes_before_commit = 0,]
    [max_processing_time_sec_before_commit = 0,]
Antes de la versión 24.7, es obligatorio usar el prefijo s3queue_ para todos los ajustes, excepto mode, after_processing y keeper_path.
Parámetros del motor Los parámetros de S3Queue son los mismos que admite el motor de tabla S3. Consulte la sección de parámetros aquí. Ejemplo
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
    mode = 'unordered';
Uso de colecciones nombradas:
<clickhouse>
    <named_collections>
        <s3queue_conf>
            <url>'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*</url>
            <access_key_id>test<access_key_id>
            <secret_access_key>test</secret_access_key>
        </s3queue_conf>
    </named_collections>
</clickhouse>
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue(s3queue_conf, format = 'CSV', compression_method = 'gzip')
SETTINGS
    mode = 'ordered';

Configuración

Para obtener una lista de la configuración de la tabla, use la tabla system.s3_queue_settings. Disponible a partir de la versión 24.10.
Nombres de configuración (24.7+)A partir de la versión 24.7, la configuración de S3Queue puede especificarse con o sin el prefijo s3queue_:
  • Sintaxis moderna (24.7+): processing_threads_num, tracked_file_ttl_sec, etc.
  • Sintaxis heredada (todas las versiones): s3queue_processing_threads_num, s3queue_tracked_file_ttl_sec, etc.
Ambas formas son compatibles a partir de la versión 24.7. Los ejemplos de esta página usan la sintaxis moderna, sin prefijo.

Modo

Valores posibles:
  • unordered — En el modo no ordenado, el conjunto de todos los archivos ya procesados se registra mediante nodos persistentes en ZooKeeper.
  • ordered — En el modo ordenado, los archivos se procesan en orden lexicográfico. Esto significa que, si en algún momento se procesó un archivo llamado ‘BBB’ y más tarde se agrega al bucket un archivo llamado ‘AA’, este se ignorará. En ZooKeeper solo se almacenan el nombre máximo (en sentido lexicográfico) del archivo consumido correctamente y los nombres de los archivos que se volverán a intentar tras un intento de carga fallido.
Valor predeterminado: ordered en las versiones anteriores a la 24.6. A partir de la 24.6, no hay valor predeterminado; la configuración pasa a ser obligatoria y debe especificarse manualmente. En las tablas creadas en versiones anteriores, el valor predeterminado seguirá siendo Ordered por compatibilidad.

after_processing

Cómo gestionar el archivo después de procesarlo correctamente. Valores posibles:
  • keep.
  • delete.
  • move.
  • tag.
Valor predeterminado: keep. move requiere configuración adicional. En caso de moverlo dentro del mismo bucket, se debe proporcionar un nuevo prefijo de ruta como after_processing_move_prefix. Para moverlo a otro bucket de S3, se requiere el URI del bucket de destino como after_processing_move_uri y las credenciales de S3 como after_processing_move_access_key_id y after_processing_move_secret_access_key. Ejemplo:
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
    mode = 'unordered',
    after_processing = 'move',
    after_processing_retries = 20,
    after_processing_move_prefix = 'dst_prefix',
    after_processing_move_uri = 'https://clickhouse-public-datasets.s3.amazonaws.com/dst-bucket',
    after_processing_move_access_key_id = 'test',
    after_processing_move_secret_access_key = 'test';
Para mover datos de un contenedor de Azure a otro, se requiere la connection string de Blob Storage como after_processing_move_connection_string y el nombre del contenedor como after_processing_move_container. Consulte la configuración de AzureQueue. El etiquetado requiere la clave y el valor de la etiqueta, proporcionados como after_processing_tag_key y after_processing_tag_value.

after_processing_retries

Número de reintentos para la acción solicitada después del procesamiento, antes de desistir. Valores posibles:
  • Entero no negativo.
Valor predeterminado: 10.

after_processing_move_access_key_id

ID de la clave de acceso para el bucket de S3 al que se moverán los archivos procesados correctamente, si el destino es otro bucket de S3. Valores posibles:
  • String.
Valor predeterminado: cadena vacía.

after_processing_move_prefix

Prefijo de ruta al que se moverán los archivos procesados correctamente. Se aplica tanto al moverlos dentro del mismo bucket como a otro bucket. Valores posibles:
  • String.
Valor predeterminado: cadena vacía.

after_processing_move_secret_access_key

Clave de acceso secreta del bucket de S3 al que se moverán los archivos procesados correctamente, si el destino es otro bucket de S3. Valores posibles:
  • String.
Valor predeterminado: cadena vacía.

after_processing_move_uri

URI del bucket de S3 al que se moverán los archivos procesados correctamente, si el destino es otro bucket de S3. Valores posibles:
  • String.
Valor predeterminado: cadena vacía.

after_processing_tag_key

Clave de la etiqueta que se aplicará a los archivos procesados correctamente, si after_processing='tag'. Posibles valores:
  • String.
Valor predeterminado: cadena vacía.

after_processing_tag_value

Valor de etiqueta que se asignará a los archivos procesados correctamente si after_processing='tag'. Valores posibles:
  • String.
Valor predeterminado: cadena vacía.

keeper_path

Ruta a los metadatos de la cola en ZooKeeper. Si no se especifica explícitamente, ClickHouse construye la ruta a partir de s3queue_default_zookeeper_path, el UUID de la base de datos y el UUID de la tabla. Los valores absolutos (que empiezan por /) se usan tal como se proporcionan, mientras que los valores relativos se agregan al prefijo configurado. Las macros como {database} o {uuid} se expanden antes de que el motor se conecte a ZooKeeper. Para apuntar a un clúster de ZooKeeper auxiliar, anteponga al valor el nombre configurado, por ejemplo analytics_keeper:/clickhouse/queue/orders. El nombre debe existir en <auxiliary_zookeepers>; de lo contrario, el motor informa Unknown auxiliary ZooKeeper name .... La cadena completa (incluido el prefijo) se conserva en SHOW CREATE TABLE para que la instrucción pueda replicarse literalmente. Posibles valores:
  • String.
Valor predeterminado: /.

loading_retries

Reintenta la carga de archivos hasta el número de veces especificado. De forma predeterminada, no hay reintentos. Valores posibles:
  • Entero positivo.
Valor predeterminado: 0.

processing_threads_num

Número de hilos para realizar el procesamiento. Se aplica solo al modo Unordered. Valor predeterminado: número de CPU o 16.

parallel_inserts

De forma predeterminada, processing_threads_num generará un INSERT, por lo que solo descargará archivos y los analizará en varios hilos. Pero esto limita el paralelismo, así que, para obtener un mayor rendimiento, use parallel_inserts=true; esto permitirá insertar datos en paralelo (pero tenga en cuenta que dará lugar a un mayor número de partes de datos generadas para la familia MergeTree).
Los INSERT se generarán de acuerdo con la configuración de max_process*_before_commit.
Valor predeterminado: false.

enable_logging_to_s3queue_log

Activa el registro en system.s3queue_log. Valor predeterminado: 0.

polling_min_timeout_ms

Especifica el tiempo mínimo, en milisegundos, que ClickHouse espera antes de realizar un nuevo intento de sondeo. Valores posibles:
  • Entero positivo.
Valor predeterminado: 1000.

polling_max_timeout_ms

Define el tiempo máximo, en milisegundos, que ClickHouse espera antes de iniciar el siguiente intento de sondeo. Valores posibles:
  • Entero positivo.
Valor predeterminado: 10000.

polling_backoff_ms

Determina el tiempo de espera adicional que se agrega al intervalo de sondeo anterior cuando no se encuentran archivos nuevos. El siguiente sondeo se realiza tras la suma del intervalo anterior y este valor de backoff, o el intervalo máximo, lo que sea menor. Valores posibles:
  • Entero positivo.
Valor predeterminado: 0.

tracked_files_limit

Permite limitar el número de nodos de ZooKeeper si se usa el modo ‘unordered’; no tiene efecto en el modo ‘ordered’. Si se alcanza el límite, los archivos procesados más antiguos se eliminarán del nodo de ZooKeeper y se volverán a procesar. Valores posibles:
  • Entero positivo.
Valor predeterminado: 1000.

tracked_file_ttl_sec

Número máximo de segundos durante los que se almacenan los archivos procesados en el nodo de ZooKeeper (de forma predeterminada, se almacenan indefinidamente) en el modo ‘unordered’; no tiene efecto en el modo ‘ordered’. Una vez transcurrido el número de segundos especificado, el archivo se volverá a importar. Valores posibles:
  • Entero positivo.
Valor predeterminado: 0.

cleanup_interval_min_ms

Para el modo ‘Ordered’. Define un límite mínimo para el intervalo de reprogramación de una tarea en segundo plano, encargada de mantener el TTL de los archivos rastreados y el número máximo de archivos rastreados. Valor predeterminado: 10000.

cleanup_interval_max_ms

Para el modo ‘Ordered’. Define el límite máximo del intervalo de reprogramación de una tarea en segundo plano, encargada de mantener el TTL de los archivos rastreados y el número máximo de archivos rastreados. Valor predeterminado: 30000.

buckets

Para el modo ‘Ordered’. Disponible desde la versión 24.6. Si hay varias réplicas de la tabla S3Queue y cada una trabaja con el mismo directorio de metadatos en Keeper, el valor de buckets debe ser como mínimo igual al número de réplicas. Si también se usa la configuración processing_threads, conviene aumentar aún más el valor de la configuración buckets, ya que define el paralelismo real del procesamiento de S3Queue.

use_persistent_processing_nodes

De forma predeterminada, la tabla S3Queue siempre ha usado nodos de procesamiento efímeros, lo que podría dar lugar a datos duplicados si la sesión de ZooKeeper expira antes de que S3Queue confirme en ZooKeeper los archivos procesados, pero después de haber comenzado a procesarlos. Esta configuración obliga al servidor a eliminar la posibilidad de duplicados en caso de que expire una sesión de Keeper.

persistent_processing_nodes_ttl_seconds

En caso de una finalización no controlada del servidor, es posible que, si use_persistent_processing_nodes está habilitado, queden nodos de procesamiento sin eliminar. Esta configuración define un período de tiempo durante el cual estos nodos de procesamiento pueden eliminarse de forma segura. Valor predeterminado: 3600 (1 hora).

Ajustes relacionados con S3

El motor admite todos los ajustes relacionados con S3. Para obtener más información sobre los ajustes de S3, consulte aquí.

Acceso basado en roles de S3

El motor de tabla S3Queue admite acceso basado en roles. Consulta la documentación aquí para conocer los pasos necesarios para configurar un rol con acceso a tu bucket. Una vez configurado el rol, se puede pasar un roleARN mediante el parámetro extra_credentials, como se muestra a continuación:
CREATE TABLE s3_table
(
    ts DateTime,
    value UInt64
)
ENGINE = S3Queue(
                'https://<your_bucket>/*.csv',
                extra_credentials(role_arn = 'arn:aws:iam::111111111111:role/<your_role>')
                ,'CSV')
SETTINGS
    ...

Modo ordenado de S3Queue

El modo de procesamiento de S3Queue permite almacenar menos metadatos en ZooKeeper, pero tiene la limitación de que los archivos añadidos más tarde deben tener nombres alfanuméricamente mayores. El modo ordered de S3Queue, al igual que unordered, admite la configuración (s3queue_)processing_threads_num (el prefijo s3queue_ es opcional), que permite controlar el número de hilos que procesarán localmente en el server los archivos de S3. Para el modo ordered sin particionado, ClickHouse puede reanudar el listado de S3 desde la última clave procesada para evitar volver a listar todo el historial del prefijo. En el modo ordenado con buckets, el punto de reanudación se elige de forma conservadora como la menor clave procesada entre todos los buckets para evitar omitir archivos sin procesar. Esta optimización de reanudación del listado se usa solo para colas respaldadas por S3 en modo ordenado sin particionado (no para AzureQueue ni cuando se establece partitioning_mode). Además, el modo ordered también introduce otra configuración llamada (s3queue_)buckets, que significa “hilos lógicos”. Esto significa que, en un escenario distribuido con varios servers que tienen réplicas de la table S3Queue, esta configuración define el número de unidades de procesamiento. Por ejemplo, cada hilo de procesamiento en cada réplica de S3Queue intentará bloquear un bucket determinado para procesarlo; cada bucket se asigna a ciertos archivos mediante el hash del nombre del archivo. Por lo tanto, en un escenario distribuido se recomienda encarecidamente que la configuración (s3queue_)buckets sea al menos igual al número de réplicas, o mayor. No hay inconveniente en que el número de buckets sea mayor que el número de réplicas. El escenario más óptimo sería que la configuración (s3queue_)buckets fuera igual al producto de number_of_replicas y (s3queue_)processing_threads_num. No se recomienda usar la configuración (s3queue_)processing_threads_num antes de la versión 24.6. La configuración (s3queue_)buckets está disponible a partir de la versión 24.6.

SELECT en el motor de tabla S3Queue

Las consultas SELECT están prohibidas de forma predeterminada en las tablas S3Queue. Esto sigue el patrón habitual de las colas, en el que los datos se leen una vez y luego se eliminan de la cola. SELECT está prohibido para evitar la pérdida accidental de datos. Sin embargo, en algunos casos puede ser útil. Para ello, debes establecer el ajuste stream_like_engine_allow_direct_select en True. El motor S3Queue tiene un ajuste especial para las consultas SELECT: commit_on_select. Establécelo en False para conservar los datos en la cola después de leerlos, o en True para eliminarlos.

Descripción

SELECT no es especialmente útil para la importación en streaming (excepto para depuración), porque cada archivo solo puede importarse una vez. Es más práctico crear procesos en tiempo real usando vistas materializadas. Para ello:
  1. Use el motor para crear una tabla que consuma desde la ruta especificada en S3 y trátela como un flujo de datos.
  2. Cree una tabla con la estructura deseada.
  3. Cree una vista materializada que convierta los datos del motor y los inserte en una tabla creada previamente.
Cuando la MATERIALIZED VIEW se conecta al motor, empieza a recopilar datos en segundo plano. Ejemplo:
  CREATE TABLE s3queue_engine_table (name String, value UInt32)
    ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
    SETTINGS
        mode = 'unordered';

  CREATE TABLE stats (name String, value UInt32)
    ENGINE = MergeTree() ORDER BY name;

  CREATE MATERIALIZED VIEW consumer TO stats
    AS SELECT name, value FROM s3queue_engine_table;

  SELECT * FROM stats ORDER BY name;

Columnas virtuales

  • _path — Ruta del archivo.
  • _file — Nombre del archivo.
  • _size — Tamaño del archivo.
  • _time — Hora de creación del archivo.
Para obtener más información sobre las columnas virtuales, consulte aquí.

Comodines en la ruta

El argumento path puede especificar varios archivos usando comodines similares a los de bash. Para que un archivo se procese, debe existir y coincidir con el patrón completo de la ruta. La lista de archivos se determina durante SELECT (no en el momento de CREATE).
  • * — Sustituye cualquier cantidad de caracteres, excepto /, incluida la cadena vacía.
  • ** — Sustituye cualquier cantidad de caracteres, incluido /, incluida la cadena vacía.
  • ? — Sustituye cualquier carácter individual.
  • {some_string,another_string,yet_another_one} — Sustituye cualquiera de las cadenas 'some_string', 'another_string', 'yet_another_one'.
  • {N..M} — Sustituye cualquier número dentro del rango de N a M, incluidos ambos extremos. N y M pueden tener ceros a la izquierda; por ejemplo, 000..078.
Las construcciones con {} son similares a la función de tabla remote.

Limitaciones

  1. Las filas duplicadas pueden producirse como resultado de:
  • una excepción durante el análisis sintáctico en mitad del procesamiento del archivo y que los reintentos estén habilitados mediante s3queue_loading_retries;
  • que S3Queue esté configurado en varios servidores que apunten a la misma ruta en ZooKeeper y que la sesión de Keeper expire antes de que uno de los servidores consiga confirmar el archivo procesado, lo que podría hacer que otro servidor asuma el procesamiento del archivo, que podría haber sido procesado parcial o totalmente por el primer servidor; sin embargo, esto ya no ocurre a partir de la versión 25.8 si use_persistent_processing_nodes = 1.
  • terminación anómala del servidor.
  1. Si S3Queue está configurado en varios servidores que apuntan a la misma ruta en ZooKeeper y se usa el modo Ordered, s3queue_loading_retries no funcionará. Esto se corregirá pronto.

Introspección

Para la introspección, use la tabla sin estado system.s3queue_metadata_cache y la tabla persistente system.s3queue_log.
  1. system.s3queue_metadata_cache. Esta tabla no es persistente y muestra el estado en memoria de S3Queue: qué archivos se están procesando actualmente y cuáles ya se procesaron o fallaron.
┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
CREATE TABLE system.s3queue_metadata_cache
(
    `database` String,
    `table` String,
    `file_name` String,
    `rows_processed` UInt64,
    `status` String,
    `processing_start_time` Nullable(DateTime),
    `processing_end_time` Nullable(DateTime),
    `ProfileEvents` Map(String, UInt64)
    `exception` String
)
ENGINE = SystemS3Queue
COMMENT 'Contains in-memory state of S3Queue metadata and currently processed rows per file.'
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
Ejemplo:

SELECT *
FROM system.s3queue_metadata_cache

Row 1:
──────
zookeeper_path:        /clickhouse/s3queue/25ea5621-ae8c-40c7-96d0-cec959c5ab88/3b3f66a1-9866-4c2e-ba78-b6bfa154207e
file_name:             wikistat/original/pageviews-20150501-030000.gz
rows_processed:        5068534
status:                Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time:   2023-10-13 13:10:31
ProfileEvents:         {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5068534,'SelectedBytes':198132283,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':2480,'S3QueueSetFileProcessedMicroseconds':9985,'S3QueuePullMicroseconds':273776,'LogTest':17}
exception:
  1. system.s3queue_log. Tabla persistente. Contiene la misma información que system.s3queue_metadata_cache, pero para los archivos processed y failed.
La tabla tiene la siguiente estructura:
SHOW CREATE TABLE system.s3queue_log

Query id: 0ad619c3-0f2a-4ee4-8b40-c73d86e04314

┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
CREATE TABLE system.s3queue_log
(
    `event_date` Date,
    `event_time` DateTime,
    `table_uuid` String,
    `file_name` String,
    `rows_processed` UInt64,
    `status` Enum8('Processed' = 0, 'Failed' = 1),
    `processing_start_time` Nullable(DateTime),
    `processing_end_time` Nullable(DateTime),
    `ProfileEvents` Map(String, UInt64),
    `exception` String
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time) │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
Para usar system.s3queue_log, defina su configuración en el archivo de configuración del servidor:
    <s3queue_log>
        <database>system</database>
        <table>s3queue_log</table>
    </s3queue_log>
Ejemplo:
SELECT *
FROM system.s3queue_log

Row 1:
──────
event_date:            2023-10-13
event_time:            2023-10-13 13:10:12
table_uuid:
file_name:             wikistat/original/pageviews-20150501-020000.gz
rows_processed:        5112621
status:                Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time:   2023-10-13 13:10:12
ProfileEvents:         {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5112621,'SelectedBytes':198577687,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':1934,'S3QueueSetFileProcessedMicroseconds':17063,'S3QueuePullMicroseconds':5841972,'LogTest':17}
exception:
Última modificación el 10 de junio de 2026