跳转到主要内容
该引擎提供与 Amazon S3 生态的集成,并支持流式导入。该引擎与 KafkaRabbitMQ 引擎类似,但提供了 S3 特有功能。 理解 S3Queue 实现的 original PR 中的这条说明非常重要:当 MATERIALIZED VIEW 关联到该引擎时,S3Queue 表引擎就会开始在后台收集数据。

CREATE 表

CREATE TABLE s3_queue_engine_table (name String, value UInt32)
    ENGINE = S3Queue(path, [NOSIGN, | aws_access_key_id, aws_secret_access_key,] format, [compression], [headers], [extra_credentials])
    [SETTINGS]
    [mode = '',]
    [after_processing = 'keep',]
    [keeper_path = '',]
    [loading_retries = 0,]
    [processing_threads_num = 16,]
    [parallel_inserts = false,]
    [enable_logging_to_queue_log = true,]
    [last_processed_path = "",]
    [tracked_files_limit = 1000,]
    [tracked_file_ttl_sec = 0,]
    [polling_min_timeout_ms = 1000,]
    [polling_max_timeout_ms = 10000,]
    [polling_backoff_ms = 0,]
    [cleanup_interval_min_ms = 10000,]
    [cleanup_interval_max_ms = 30000,]
    [buckets = 0,]
    [list_objects_batch_size = 1000,]
    [enable_hash_ring_filtering = 0,]
    [max_processed_files_before_commit = 100,]
    [max_processed_rows_before_commit = 0,]
    [max_processed_bytes_before_commit = 0,]
    [max_processing_time_sec_before_commit = 0,]
24.7 之前,除 modeafter_processingkeeper_path 外,所有设置都必须使用 s3queue_ 前缀。
引擎参数 S3Queue 的参数与 S3 表引擎支持的参数相同。请参见此处的参数部分。 示例
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
    mode = 'unordered';
使用命名集合:
<clickhouse>
    <named_collections>
        <s3queue_conf>
            <url>'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*</url>
            <access_key_id>test<access_key_id>
            <secret_access_key>test</secret_access_key>
        </s3queue_conf>
    </named_collections>
</clickhouse>
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue(s3queue_conf, format = 'CSV', compression_method = 'gzip')
SETTINGS
    mode = 'ordered';

设置

要获取为该表配置的设置列表,请使用 system.s3_queue_settings 表。该功能从 24.10 版本开始可用。
设置名称 (24.7+) 从 24.7 版本开始,S3Queue 设置既可以使用 s3queue_ 前缀指定,也可以不使用:
  • 现代语法 (24.7+) :processing_threads_numtracked_file_ttl_sec 等。
  • 旧语法 (所有版本) :s3queue_processing_threads_nums3queue_tracked_file_ttl_sec 等。
24.7+ 同时支持这两种形式。本页中的示例使用不带前缀的现代语法。

模式

可选值:
  • unordered — 在 unordered 模式下,会通过 ZooKeeper 中的持久节点跟踪所有已处理文件的集合。
  • ordered — 在有序模式下,文件按字典序处理。这意味着,如果某个名为 ‘BBB’ 的文件已在某个时间点被处理,之后又有一个名为 ‘AA’ 的文件被添加到存储桶中,那么它会被忽略。ZooKeeper 中只会存储已成功消费文件里名称最大的文件名 (按字典序) ,以及因加载失败而需要重试的文件名。
默认值:在 24.6 之前的版本中为 ordered。从 24.6 开始不再提供默认值,必须手动指定该设置。对于在更早版本中创建的表,出于兼容性考虑,默认值将继续保持为 Ordered

after_processing

成功处理后如何处置文件。 可能的值:
  • keep。
  • delete。
  • move。
  • tag。
默认值:keep 移动需要额外设置。如果是在同一个 bucket 内移动,则必须通过 after_processing_move_prefix 提供新的 path prefix。 移动到另一个 S3 bucket 时,需要通过 after_processing_move_uri 指定目标 bucket URI,并通过 after_processing_move_access_key_idafter_processing_move_secret_access_key 提供 S3 凭证。 示例:
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
    mode = 'unordered',
    after_processing = 'move',
    after_processing_retries = 20,
    after_processing_move_prefix = 'dst_prefix',
    after_processing_move_uri = 'https://clickhouse-public-datasets.s3.amazonaws.com/dst-bucket',
    after_processing_move_access_key_id = 'test',
    after_processing_move_secret_access_key = 'test';
从一个 Azure 容器移动到另一个 Azure 容器时,需要将 Blob Storage 连接字符串指定为 after_processing_move_connection_string,并将容器名称指定为 after_processing_move_container。请参阅 AzureQueue 设置 添加标签时,需要通过 after_processing_tag_keyafter_processing_tag_value 提供标签键和值。

after_processing_retries

在放弃之前,对请求的后处理操作进行重试的次数。 可选值:
  • 非负整数。
默认值:10

after_processing_move_access_key_id

如果目标端是另一个 S3 bucket,则用于将处理成功的文件移动到该 bucket 的 Access Key ID。 可能的值:
  • String。
默认值:空字符串。

after_processing_move_prefix

用于移动已成功处理文件后的路径前缀。适用于以下两种情况:在同一个 bucket 内移动,或移动到另一个 bucket。 可能的值:
  • String。
默认值:空字符串。

after_processing_move_secret_access_key

如果目标端是另一个 S3 bucket,则这是用于将已成功处理的文件移动到该bucket的 Secret Access Key。 可能的值:
  • String。
默认值:空字符串。

after_processing_move_uri

如果目标端是另一个 S3 bucket,则用于将已成功处理的文件移动到该 S3 bucket 的 URI。 可能的值:
  • String。
默认值:空字符串。

after_processing_tag_key

after_processing='tag' 时,用于为处理成功的文件添加标签的标签键。 可能的值:
  • String。
默认值:空字符串。

after_processing_tag_value

after_processing='tag' 时,用于为成功处理的文件添加标签的标签值。 可选值:
  • String。
默认值:空字符串。

keeper_path

ZooKeeper 中队列元数据的路径。如果未显式指定,ClickHouse 会根据 s3queue_default_zookeeper_path、数据库 UUID 和表 UUID 构建该路径。绝对路径值 (以 / 开头) 会按原样使用,而相对路径值会追加到已配置的前缀后。诸如 {database}{uuid} 这样的宏会在引擎连接到 ZooKeeper 之前展开。 如需指定辅助 ZooKeeper cluster,请在该值前加上已配置的名称前缀,例如 analytics_keeper:/clickhouse/queue/orders。该名称必须存在于 <auxiliary_zookeepers> 中;否则引擎会报错 Unknown auxiliary ZooKeeper name ...。完整字符串 (包括前缀) 会保留在 SHOW CREATE TABLE 中,因此该语句可以被原样复制。 可能的值:
  • String。
默认值:/

loading_retries

文件加载最多可重试指定次数。默认情况下,不会重试。 可能的值:
  • 正整数。
默认值:0

processing_threads_num

用于执行处理操作的线程数。仅适用于 Unordered 模式。 默认值:CPU 数量或 16。

parallel_inserts

默认情况下,processing_threads_num 只会产生一个 INSERT,因此只会用多线程下载文件并进行解析。 但这会限制并行度,因此为了获得更好的吞吐量,建议使用 parallel_inserts=true,这样就可以并行写入数据 (但请注意,这会导致 MergeTree 家族生成更多的数据分区片段) 。
INSERT 会根据 max_process*_before_commit 设置来创建。
默认值:false

enable_logging_to_s3queue_log

启用向 system.s3queue_log 写入日志。 默认值:0

polling_min_timeout_ms

指定 ClickHouse 在发起下一次轮询之前的最短等待时间,单位为毫秒。 可能的值:
  • 正整数。
默认值:1000

polling_max_timeout_ms

定义 ClickHouse 在发起下一次轮询前最多等待多长时间,单位为毫秒。 可能的值:
  • 正整数。
默认值:10000

polling_backoff_ms

用于确定在未发现新文件时,在上一次轮询间隔基础上额外增加的等待时间。下一次轮询会在上一次间隔与该 backoff 值之和,或最大间隔 (取较小者) 之后进行。 可能的值:
  • 正整数。
默认值:0

tracked_files_limit

如果使用“unordered”模式,该参数可用于限制 Zookeeper 节点数量;对于“ordered”模式则不生效。 达到限制后,最早处理的文件会从 ZooKeeper 节点中删除,并被重新处理。 可能的值:
  • 正整数。
默认值:1000

tracked_file_ttl_sec

在“unordered”模式下,用于指定已处理文件在 ZooKeeper 节点中保留的最长时长 (默认永久保留) ;在“ordered”模式下,此设置无效。 超过指定秒数后,文件将被重新导入。 可能的值:
  • 正整数。
默认值:0

cleanup_interval_min_ms

用于“Ordered”模式。定义后台任务重新调度间隔的最小下限;该任务负责维护已跟踪文件的生存时间 (TTL) 和已跟踪文件集合的最大数量。 默认值:10000

cleanup_interval_max_ms

用于“Ordered”模式。定义后台任务重新调度间隔的最大上限,该任务负责维护已跟踪文件的生存时间 (TTL) 以及已跟踪文件的最大数量。 默认值:30000

buckets

适用于“Ordered”模式。自 24.6 起可用。如果 S3Queue 表存在多个副本,且每个副本都使用 Keeper 中相同的元数据目录,那么 buckets 的值至少应等于副本数量。如果同时还使用了 processing_threads 设置,则通常还应进一步增大 buckets 的值,因为它决定了 S3Queue 处理的实际并行度。

use_persistent_processing_nodes

默认情况下,S3Queue 表始终使用临时处理节点。如果 ZooKeeper 会话在 S3Queue 将已处理文件提交到 ZooKeeper 之前过期,而它又已经开始处理这些文件,就可能导致数据重复。此设置会强制 server 在 Keeper 会话过期时消除出现重复数据的可能性。

persistent_processing_nodes_ttl_seconds

如果服务器发生非正常终止,并且启用了 use_persistent_processing_nodes,则可能会遗留未被移除的处理中节点。此设置定义了一个时间段,超过该时间后,可以安全地清理这些处理中节点。 默认值:3600 (1 小时) 。

S3 相关设置

该引擎支持所有与 S3 相关的设置。有关 S3 设置的更多信息,请参见这里

S3 基于角色的访问

S3Queue 表引擎支持基于角色的访问。 有关如何配置可访问您的 bucket 的角色,请参阅此处的文档。 配置好角色后,可以通过 extra_credentials 参数传入 roleARN,如下所示:
CREATE TABLE s3_table
(
    ts DateTime,
    value UInt64
)
ENGINE = S3Queue(
                'https://<your_bucket>/*.csv',
                extra_credentials(role_arn = 'arn:aws:iam::111111111111:role/<your_role>')
                ,'CSV')
SETTINGS
    ...

S3Queue ordered 模式

S3Queue 处理模式可以在 ZooKeeper 中存储更少的元数据,但有一个限制:按时间后添加的文件,其名称按字母数字顺序必须更大。 S3Queueordered 模式和 unordered 模式都支持 (s3queue_)processing_threads_num 设置 (s3queue_ 前缀可选) ,可用于控制在服务器本地处理 S3 文件的线程数。 对于不带分区的 ordered 模式,ClickHouse 可以从上一个已处理的键继续列出 S3 内容,以避免重新列出整个前缀下的历史内容。在按桶划分的 ordered 模式中,为避免跳过未处理的文件,恢复点会保守地选择为所有桶中最小的已处理键。 这种恢复列出优化仅用于不带分区的 ordered 模式下、以 S3 为后端的队列 (不适用于 AzureQueue,也不适用于设置了 partitioning_mode 的情况) 。 此外,ordered 模式还引入了另一个名为 (s3queue_)buckets 的设置,表示“逻辑线程”。也就是说,在分布式场景下,当存在多个带有 S3Queue 表副本的服务器时,该设置定义了处理单元的数量。例如,每个 S3Queue 副本上的每个处理线程都会尝试锁定某个 bucket 进行处理,而每个 bucket 会根据文件名的哈希分配到特定文件。因此,在分布式场景中,强烈建议将 (s3queue_)buckets 设置为至少等于副本数,或更大。桶的数量大于副本数也是完全没问题的。最理想的情况是,(s3queue_)buckets 设置等于 number_of_replicas(s3queue_)processing_threads_num 的乘积。 不建议在 24.6 版本之前使用 (s3queue_)processing_threads_num 设置。 (s3queue_)buckets 设置从 24.6 版本开始可用。

从 S3Queue 表引擎中 SELECT

默认情况下,S3Queue 表禁止执行 SELECT 查询。这遵循了常见的队列模式:数据读取一次后就会从队列中移除。禁止 SELECT 是为了防止意外的数据丢失。 不过,在某些情况下,这样做也可能很有用。为此,你需要将设置 stream_like_engine_allow_direct_select 设为 True。 S3Queue 引擎针对 SELECT 查询有一个特殊设置:commit_on_select。将其设为 False 可在读取后保留队列中的数据,设为 True 则会将其移除。

描述

SELECT 对流式导入并没有太大用处 (调试除外) ,因为每个文件只能导入一次。更实用的做法是使用 materialized views 创建实时处理线程。为此:
  1. 使用该 engine 创建一个表,用于从 S3 中指定路径消费数据,并将其视为数据 stream。
  2. 创建一个具有所需结构的表。
  3. 创建一个 materialized view,将来自该 engine 的数据转换后写入先前创建的表。
MATERIALIZED VIEW 关联到该 engine 后,它就会开始在后台收集数据。 示例:
  CREATE TABLE s3queue_engine_table (name String, value UInt32)
    ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
    SETTINGS
        mode = 'unordered';

  CREATE TABLE stats (name String, value UInt32)
    ENGINE = MergeTree() ORDER BY name;

  CREATE MATERIALIZED VIEW consumer TO stats
    AS SELECT name, value FROM s3queue_engine_table;

  SELECT * FROM stats ORDER BY name;

虚拟列

  • _path — 文件路径。
  • _file — 文件名。
  • _size — 文件大小。
  • _time — 文件创建时间。
有关虚拟列的更多信息,请参见此处

路径中的通配符

path 参数可以使用类似 bash 的通配符来指定多个文件。要被处理的文件必须存在,并且与整个路径模式匹配。文件列表是在执行 SELECT 时确定的 (而不是在 CREATE 时) 。
  • * — 匹配任意数量的任意字符,但不包括 /,也可以是空字符串。
  • ** — 匹配任意数量的任意字符,包括 /,也可以是空字符串。
  • ? — 匹配任意单个字符。
  • {some_string,another_string,yet_another_one} — 匹配字符串 'some_string''another_string''yet_another_one' 中的任意一个。
  • {N..M} — 匹配从 N 到 M 范围内的任意数字,包括两个端点。N 和 M 可以带前导零,例如 000..078
带有 {} 的写法与 remote 表函数类似。

限制

  1. 出现重复行可能是由以下原因造成的:
  • 在文件处理过程中,解析在中途发生异常,且已通过 s3queue_loading_retries 启用重试;
  • S3Queue 配置在多个服务器上,并指向 zookeeper 中的同一路径,而某台服务器尚未来得及提交已处理文件时,keeper 会话就已过期,这可能导致另一台服务器接手处理该文件,而该文件可能已经被第一台服务器部分或全部处理;不过,如果设置了 use_persistent_processing_nodes = 1,则从 25.8 版本起不再存在这一问题。
  • 服务器异常终止。
  1. 如果 S3Queue 配置在多个服务器上并指向 zookeeper 中的同一路径,同时使用了 Ordered 模式,则 s3queue_loading_retries 将不起作用。该问题很快会得到修复。

内部信息

如需查看内部信息,可使用无状态表 system.s3queue_metadata_cache 和持久化表 system.s3queue_log
  1. system.s3queue_metadata_cache。此表不是持久化的,用于显示 S3Queue 的内存状态:当前正在处理哪些文件,以及哪些文件已处理或处理失败。
┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
CREATE TABLE system.s3queue_metadata_cache
(
    `database` String,
    `table` String,
    `file_name` String,
    `rows_processed` UInt64,
    `status` String,
    `processing_start_time` Nullable(DateTime),
    `processing_end_time` Nullable(DateTime),
    `ProfileEvents` Map(String, UInt64)
    `exception` String
)
ENGINE = SystemS3Queue
COMMENT 'Contains in-memory state of S3Queue metadata and currently processed rows per file.'
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
示例:

SELECT *
FROM system.s3queue_metadata_cache

Row 1:
──────
zookeeper_path:        /clickhouse/s3queue/25ea5621-ae8c-40c7-96d0-cec959c5ab88/3b3f66a1-9866-4c2e-ba78-b6bfa154207e
file_name:             wikistat/original/pageviews-20150501-030000.gz
rows_processed:        5068534
status:                Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time:   2023-10-13 13:10:31
ProfileEvents:         {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5068534,'SelectedBytes':198132283,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':2480,'S3QueueSetFileProcessedMicroseconds':9985,'S3QueuePullMicroseconds':273776,'LogTest':17}
exception:
  1. system.s3queue_log。持久化表。包含与 system.s3queue_metadata_cache 相同的信息,但针对 processedfailed 文件。
该表的结构如下:
SHOW CREATE TABLE system.s3queue_log

Query id: 0ad619c3-0f2a-4ee4-8b40-c73d86e04314

┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
CREATE TABLE system.s3queue_log
(
    `event_date` Date,
    `event_time` DateTime,
    `table_uuid` String,
    `file_name` String,
    `rows_processed` UInt64,
    `status` Enum8('Processed' = 0, 'Failed' = 1),
    `processing_start_time` Nullable(DateTime),
    `processing_end_time` Nullable(DateTime),
    `ProfileEvents` Map(String, UInt64),
    `exception` String
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time) │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
要使用 system.s3queue_log,请在服务器配置文件中定义相应配置:
    <s3queue_log>
        <database>system</database>
        <table>s3queue_log</table>
    </s3queue_log>
示例:
SELECT *
FROM system.s3queue_log

Row 1:
──────
event_date:            2023-10-13
event_time:            2023-10-13 13:10:12
table_uuid:
file_name:             wikistat/original/pageviews-20150501-020000.gz
rows_processed:        5112621
status:                Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time:   2023-10-13 13:10:12
ProfileEvents:         {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5112621,'SelectedBytes':198577687,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':1934,'S3QueueSetFileProcessedMicroseconds':17063,'S3QueuePullMicroseconds':5841972,'LogTest':17}
exception:
最后修改于 2026年6月10日