跳转到主要内容
该引擎提供与 Azure Blob 存储 生态的集成,可实现流式数据导入。

创建表

CREATE TABLE test (name String, value UInt32)
    ENGINE = AzureQueue(...)
    [SETTINGS]
    [mode = '',]
    [after_processing = 'keep',]
    [keeper_path = '',]
    ...
引擎参数 AzureQueue 的参数与 AzureBlobStorage 表引擎支持的参数相同。请参阅此处的参数部分。 AzureBlobStorage 表引擎类似,用户可以使用 Azurite 模拟器在本地进行 Azure Storage 开发。更多详情请参阅此处 示例
CREATE TABLE azure_queue_engine_table
(
    `key` UInt64,
    `data` String
)
ENGINE = AzureQueue('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'testcontainer', '*', 'CSV')
SETTINGS mode = 'unordered'

设置

支持的设置基本与 S3Queue 表引擎相同,只是不带 s3queue_ 前缀。请参阅完整设置列表。 如需查看为该表配置的设置列表,请使用 system.azure_queue_settings 表。该功能自 24.10 起可用。 以下设置仅适用于 AzureQueue,不适用于 S3Queue。

after_processing_move_connection_string

如果目标端是另一个 Azure 容器,则用于将已成功处理的文件移动到该容器的 Azure Blob 存储连接字符串。 可能的值:
  • String。
默认值:空字符串。

after_processing_move_container

如果目标端是另一个 Azure 容器,则此参数指定成功处理后的文件要移动到的容器名称。 可能的值:
  • String。
默认值:空字符串。 示例:
CREATE TABLE azure_queue_engine_table
(
    `key` UInt64,
    `data` String
)
ENGINE = AzureQueue('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'testcontainer', '*', 'CSV')
SETTINGS
    mode = 'unordered',
    after_processing = 'move',
    after_processing_move_connection_string = 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;',
    after_processing_move_container = 'dst-container';

对 AzureQueue 表引擎执行 SELECT

默认情况下,AzureQueue 表禁止执行 SELECT 查询。这遵循了常见的队列模式:数据读取一次后,就会从队列中移除。禁用 SELECT 是为了防止意外的数据丢失。 不过,在某些场景下,这样做也可能很有用。要启用此功能,需要将设置 stream_like_engine_allow_direct_select 设为 True。 AzureQueue 引擎还为 SELECT 查询提供了一个特殊设置:commit_on_select。将其设为 False 可在读取后保留队列中的数据,设为 True 则会将其移除。

描述

SELECT 对流式导入并不是特别有用 (调试除外) ,因为每个文件只能导入一次。更实用的做法是使用 materialized view 创建实时处理链路。为此:
  1. 使用该引擎创建一个表,用于从 S3 中指定路径消费数据,并将其视为数据流。
  2. 创建一个具有所需结构的表。
  3. 创建一个 materialized view,将来自该引擎的数据转换后写入前面创建的表中。
MATERIALIZED VIEW 关联到该引擎时,它就会开始在后台收集数据。 示例:
CREATE TABLE azure_queue_engine_table (key UInt64, data String)
  ENGINE=AzureQueue('<endpoint>', 'CSV', 'gzip')
  SETTINGS
      mode = 'unordered';

CREATE TABLE stats (key UInt64, data String)
  ENGINE = MergeTree() ORDER BY key;

CREATE MATERIALIZED VIEW consumer TO stats
  AS SELECT key, data FROM azure_queue_engine_table;

SELECT * FROM stats ORDER BY key;

虚拟列

  • _path — 文件路径。
  • _file — 文件名。
有关虚拟列的更多信息,见此处

内部信息

通过表设置 enable_logging_to_queue_log=1 为该表启用日志记录。 内部信息相关功能与 S3Queue 表引擎 相同,但有以下几个明显区别:
  1. 对于服务器版本 >= 25.1,使用 system.azure_queue_metadata_cache 作为队列的内存状态。对于更早的版本,使用 system.s3queue_metadata_cache (其中也会包含 azure 表的信息) 。
  2. 通过 ClickHouse 主配置启用 system.azure_queue_log,例如:
  <azure_queue_log>
    <database>system</database>
    <table>azure_queue_log</table>
  </azure_queue_log>
这个持久化表包含与 system.s3queue_metadata_cache 相同的信息,但对应的是已处理和处理失败的文件。 该表的结构如下:

CREATE TABLE system.azure_queue_log
(
    `hostname` LowCardinality(String) COMMENT 'Hostname',
    `event_date` Date COMMENT 'Event date of writing this log row',
    `event_time` DateTime COMMENT 'Event time of writing this log row',
    `database` String COMMENT 'The name of a database where current S3Queue table lives.',
    `table` String COMMENT 'The name of S3Queue table.',
    `uuid` String COMMENT 'The UUID of S3Queue table',
    `file_name` String COMMENT 'File name of the processing file',
    `rows_processed` UInt64 COMMENT 'Number of processed rows',
    `status` Enum8('Processed' = 0, 'Failed' = 1) COMMENT 'Status of the processing file',
    `processing_start_time` Nullable(DateTime) COMMENT 'Time of the start of processing the file',
    `processing_end_time` Nullable(DateTime) COMMENT 'Time of the end of processing the file',
    `exception` String COMMENT 'Exception message if happened'
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time)
COMMENT 'Contains logging entries with the information files processes by S3Queue engine.'

示例:
SELECT *
FROM system.azure_queue_log
LIMIT 1
FORMAT Vertical

Row 1:
──────
hostname:              clickhouse
event_date:            2024-12-16
event_time:            2024-12-16 13:42:47
database:              default
table:                 azure_queue_engine_table
uuid:                  1bc52858-00c0-420d-8d03-ac3f189f27c8
file_name:             test_1.csv
rows_processed:        3
status:                Processed
processing_start_time: 2024-12-16 13:42:47
processing_end_time:   2024-12-16 13:42:47
exception:

1 row in set. Elapsed: 0.002 sec.

最后修改于 2026年6月10日