Saltar al contenido principal
Este motor ofrece integración con el ecosistema de Azure Blob Storage, lo que permite importar datos en streaming.

CREATE TABLE

CREATE TABLE test (name String, value UInt32)
    ENGINE = AzureQueue(...)
    [SETTINGS]
    [mode = '',]
    [after_processing = 'keep',]
    [keeper_path = '',]
    ...
Parámetros del motor Los parámetros de AzureQueue son los mismos que admite el motor de tabla AzureBlobStorage. Consulte la sección de parámetros aquí. Al igual que con el motor de tabla AzureBlobStorage, los usuarios pueden usar el emulador Azurite para el desarrollo local con Azure Storage. Encontrará más información aquí. Ejemplo
CREATE TABLE azure_queue_engine_table
(
    `key` UInt64,
    `data` String
)
ENGINE = AzureQueue('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'testcontainer', '*', 'CSV')
SETTINGS mode = 'unordered'

Configuración

El conjunto de ajustes admitidos es prácticamente el mismo que para el motor de tabla S3Queue, pero sin el prefijo s3queue_. Consulte la lista completa de ajustes. Para obtener una lista de los ajustes configurados para la tabla, use la tabla system.azure_queue_settings. Disponible a partir de la versión 24.10. A continuación se muestran los ajustes compatibles únicamente con AzureQueue y no aplicables a S3Queue.

after_processing_move_connection_string

Cadena de conexión de Azure Blob Storage a la que mover los archivos procesados correctamente, si el destino es otro contenedor de Azure. Valores posibles:
  • String.
Valor predeterminado: cadena vacía.

after_processing_move_container

Nombre del contenedor al que se moverán los archivos procesados correctamente si el destino es otro contenedor de Azure. Valores posibles:
  • String.
Valor predeterminado: cadena vacía. Ejemplo:
CREATE TABLE azure_queue_engine_table
(
    `key` UInt64,
    `data` String
)
ENGINE = AzureQueue('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'testcontainer', '*', 'CSV')
SETTINGS
    mode = 'unordered',
    after_processing = 'move',
    after_processing_move_connection_string = 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;',
    after_processing_move_container = 'dst-container';

SELECT en el motor de tabla AzureQueue

Las consultas SELECT están prohibidas de forma predeterminada en las tablas AzureQueue. Esto sigue el patrón común de las colas, en el que los datos se leen una vez y luego se eliminan de la cola. SELECT está prohibido para evitar la pérdida accidental de datos. Sin embargo, en algunos casos puede resultar útil. Para ello, debe establecer la configuración stream_like_engine_allow_direct_select en True. El motor AzureQueue tiene una configuración especial para las consultas SELECT: commit_on_select. Establézcala en False para conservar los datos en la cola después de leerlos, o en True para eliminarlos.

Descripción

SELECT no resulta especialmente útil para la importación en streaming (salvo para tareas de depuración), porque cada archivo solo puede importarse una vez. Es más práctico crear flujos en tiempo real mediante vistas materializadas. Para ello:
  1. Use el motor para crear una tabla que consuma desde la ruta especificada en S3 y trátela como un flujo de datos.
  2. Cree una tabla con la estructura deseada.
  3. Cree una vista materializada que convierta los datos del motor y los inserte en una tabla creada previamente.
Cuando la MATERIALIZED VIEW se conecta al motor, empieza a recopilar datos en segundo plano. Ejemplo:
CREATE TABLE azure_queue_engine_table (key UInt64, data String)
  ENGINE=AzureQueue('<endpoint>', 'CSV', 'gzip')
  SETTINGS
      mode = 'unordered';

CREATE TABLE stats (key UInt64, data String)
  ENGINE = MergeTree() ORDER BY key;

CREATE MATERIALIZED VIEW consumer TO stats
  AS SELECT key, data FROM azure_queue_engine_table;

SELECT * FROM stats ORDER BY key;

Columnas virtuales

  • _path — Ruta del archivo.
  • _file — Nombre del archivo.
Para obtener más información sobre las columnas virtuales, consulte aquí.

Introspección

Habilite el registro de la tabla mediante la configuración de tabla enable_logging_to_queue_log=1. Las capacidades de introspección son las mismas que las del motor de tabla S3Queue, con varias diferencias concretas:
  1. Use system.azure_queue_metadata_cache para el estado en memoria de la cola en las versiones del servidor >= 25.1. Para versiones anteriores, use system.s3queue_metadata_cache (también contendría información de las tablas azure).
  2. Habilite system.azure_queue_log mediante la configuración principal de ClickHouse; por ejemplo:
  <azure_queue_log>
    <database>system</database>
    <table>azure_queue_log</table>
  </azure_queue_log>
Esta tabla persistente contiene la misma información que system.s3queue_metadata_cache, pero sobre los archivos procesados y fallidos. La tabla tiene la siguiente estructura:

CREATE TABLE system.azure_queue_log
(
    `hostname` LowCardinality(String) COMMENT 'Hostname',
    `event_date` Date COMMENT 'Event date of writing this log row',
    `event_time` DateTime COMMENT 'Event time of writing this log row',
    `database` String COMMENT 'The name of a database where current S3Queue table lives.',
    `table` String COMMENT 'The name of S3Queue table.',
    `uuid` String COMMENT 'The UUID of S3Queue table',
    `file_name` String COMMENT 'File name of the processing file',
    `rows_processed` UInt64 COMMENT 'Number of processed rows',
    `status` Enum8('Processed' = 0, 'Failed' = 1) COMMENT 'Status of the processing file',
    `processing_start_time` Nullable(DateTime) COMMENT 'Time of the start of processing the file',
    `processing_end_time` Nullable(DateTime) COMMENT 'Time of the end of processing the file',
    `exception` String COMMENT 'Exception message if happened'
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time)
COMMENT 'Contiene entradas de logging con la información de los archivos procesados por el motor S3Queue.'

Ejemplo:
SELECT *
FROM system.azure_queue_log
LIMIT 1
FORMAT Vertical

Row 1:
──────
hostname:              clickhouse
event_date:            2024-12-16
event_time:            2024-12-16 13:42:47
database:              default
table:                 azure_queue_engine_table
uuid:                  1bc52858-00c0-420d-8d03-ac3f189f27c8
file_name:             test_1.csv
rows_processed:        3
status:                Processed
processing_start_time: 2024-12-16 13:42:47
processing_end_time:   2024-12-16 13:42:47
exception:

1 row in set. Elapsed: 0.002 sec.

Última modificación el 10 de junio de 2026