Pular para o conteúdo principal
Este motor fornece integração com o ecossistema do Amazon S3 e permite importação em streaming. Este motor é semelhante aos motores Kafka e RabbitMQ, mas oferece recursos específicos do S3. É importante entender esta observação do PR original da implementação do S3Queue: quando a MATERIALIZED VIEW é associada ao motor, o motor de tabela S3Queue começa a coletar dados em segundo plano.

Criar tabela

CREATE TABLE s3_queue_engine_table (name String, value UInt32)
    ENGINE = S3Queue(path, [NOSIGN, | aws_access_key_id, aws_secret_access_key,] format, [compression], [headers], [extra_credentials])
    [SETTINGS]
    [mode = '',]
    [after_processing = 'keep',]
    [keeper_path = '',]
    [loading_retries = 0,]
    [processing_threads_num = 16,]
    [parallel_inserts = false,]
    [enable_logging_to_queue_log = true,]
    [last_processed_path = "",]
    [tracked_files_limit = 1000,]
    [tracked_file_ttl_sec = 0,]
    [polling_min_timeout_ms = 1000,]
    [polling_max_timeout_ms = 10000,]
    [polling_backoff_ms = 0,]
    [cleanup_interval_min_ms = 10000,]
    [cleanup_interval_max_ms = 30000,]
    [buckets = 0,]
    [list_objects_batch_size = 1000,]
    [enable_hash_ring_filtering = 0,]
    [max_processed_files_before_commit = 100,]
    [max_processed_rows_before_commit = 0,]
    [max_processed_bytes_before_commit = 0,]
    [max_processing_time_sec_before_commit = 0,]
Antes da versão 24.7, é necessário usar o prefixo s3queue_ para todas as configurações, exceto mode, after_processing e keeper_path.
Parâmetros do motor Os parâmetros de S3Queue são os mesmos aceitos pelo motor de tabela S3. Veja a seção de parâmetros aqui. Exemplo
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
    mode = 'unordered';
Usando named collections:
<clickhouse>
    <named_collections>
        <s3queue_conf>
            <url>'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*</url>
            <access_key_id>test<access_key_id>
            <secret_access_key>test</secret_access_key>
        </s3queue_conf>
    </named_collections>
</clickhouse>
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue(s3queue_conf, format = 'CSV', compression_method = 'gzip')
SETTINGS
    mode = 'ordered';

Configurações

Para obter uma lista das configurações definidas para a tabela, use a tabela system.s3_queue_settings. Disponível a partir da versão 24.10.
Nomes das configurações (24.7+)A partir da versão 24.7, as configurações do S3Queue podem ser especificadas com ou sem o prefixo s3queue_:
  • Sintaxe moderna (24.7+): processing_threads_num, tracked_file_ttl_sec etc.
  • Sintaxe legada (todas as versões): s3queue_processing_threads_num, s3queue_tracked_file_ttl_sec etc.
Ambas as formas são suportadas a partir da versão 24.7. Os exemplos nesta página usam a sintaxe moderna, sem prefixo.

Modo

Valores possíveis:
  • unordered — No modo unordered, o conjunto de todos os arquivos já processados é acompanhado por meio de nós persistentes no ZooKeeper.
  • ordered — No modo ordered, os arquivos são processados em ordem lexicográfica. Isso significa que, se um arquivo chamado ‘BBB’ foi processado em algum momento e, mais tarde, um arquivo chamado ‘AA’ for adicionado ao bucket, ele será ignorado. Apenas o maior nome (em ordem lexicográfica) do arquivo consumido com sucesso e os nomes dos arquivos que serão tentados novamente após uma tentativa de carregamento malsucedida são armazenados no ZooKeeper.
Valor padrão: ordered em versões anteriores à 24.6. A partir da 24.6, não há valor padrão, e a configuração passa a precisar ser especificada manualmente. Para tabelas criadas em versões anteriores, o valor padrão permanecerá Ordered por compatibilidade.

after_processing

Como lidar com o arquivo após o processamento bem-sucedido. Valores possíveis:
  • keep.
  • delete.
  • move.
  • tag.
Valor padrão: keep. Move exige configurações adicionais. No caso de uma movimentação dentro do mesmo bucket, um novo prefixo de caminho deve ser fornecido como after_processing_move_prefix. A movimentação para outro bucket do S3 exige o URI do bucket de destino como after_processing_move_uri, credenciais do S3 como after_processing_move_access_key_id e after_processing_move_secret_access_key. Exemplo:
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
    mode = 'unordered',
    after_processing = 'move',
    after_processing_retries = 20,
    after_processing_move_prefix = 'dst_prefix',
    after_processing_move_uri = 'https://clickhouse-public-datasets.s3.amazonaws.com/dst-bucket',
    after_processing_move_access_key_id = 'test',
    after_processing_move_secret_access_key = 'test';
Mover de um contêiner do Azure para outro requer a string de conexão do Blob Storage como after_processing_move_connection_string e o nome do contêiner como after_processing_move_container. Consulte as configurações do AzureQueue. A marcação exige que a chave e o valor da tag sejam fornecidos como after_processing_tag_key e after_processing_tag_value.

after_processing_retries

Número de tentativas da ação solicitada após o processamento antes de desistir. Valores possíveis:
  • Inteiro não negativo.
Valor padrão: 10.

after_processing_move_access_key_id

ID da Access Key do bucket do S3 de destino para o qual mover arquivos processados com sucesso, caso o destino seja outro bucket do S3. Valores possíveis:
  • String.
Valor padrão: string vazia.

after_processing_move_prefix

Prefixo do caminho para o qual os arquivos processados com sucesso serão movidos. Aplica-se a ambos os casos: mover dentro do mesmo bucket e para outro bucket. Valores possíveis:
  • String.
Valor padrão: string vazia.

after_processing_move_secret_access_key

Chave de acesso secreta do bucket do S3 para o qual os arquivos processados com sucesso serão movidos, caso o destino seja outro bucket do S3. Valores possíveis:
  • String.
Valor padrão: string vazia.

after_processing_move_uri

URI do bucket do S3 para o qual os arquivos processados com sucesso serão movidos, se o destino for outro bucket do S3. Valores possíveis:
  • String.
Valor padrão: string vazia.

after_processing_tag_key

Chave da tag usada para marcar arquivos processados com sucesso, se after_processing='tag'. Valores possíveis:
  • String.
Valor padrão: string vazia.

after_processing_tag_value

Valor da tag a ser atribuído aos arquivos processados com sucesso, se after_processing='tag'. Valores possíveis:
  • String.
Valor padrão: string vazia.

keeper_path

Caminho para os metadados da fila no ZooKeeper. Se não for especificado explicitamente, o ClickHouse constrói o caminho a partir de s3queue_default_zookeeper_path, do UUID do banco de dados e do UUID da tabela. Valores absolutos (iniciados com /) são usados como estão, enquanto valores relativos são acrescentados ao prefixo configurado. Macros como {database} ou {uuid} são expandidas antes de o engine se conectar ao ZooKeeper. Para apontar para um cluster ZooKeeper auxiliar, prefixe o valor com o nome configurado, por exemplo analytics_keeper:/clickhouse/queue/orders. O nome deve existir em <auxiliary_zookeepers>; caso contrário, o engine retorna Unknown auxiliary ZooKeeper name .... A string completa (incluindo o prefixo) é preservada em SHOW CREATE TABLE para que a instrução possa ser replicada literalmente. Valores possíveis:
  • String.
Valor padrão: /.

loading_retries

Tenta recarregar o arquivo até o número de vezes especificado. Por padrão, não há novas tentativas. Valores possíveis:
  • Inteiro positivo.
Valor padrão: 0.

processing_threads_num

Número de threads para executar o processamento. Aplica-se apenas ao modo Unordered. Valor padrão: número de CPUs ou 16.

parallel_inserts

Por padrão, processing_threads_num gerará um INSERT, portanto ele apenas baixará arquivos e fará o parse usando múltiplas threads. Mas isso limita o paralelismo, então, para obter melhor throughput, use parallel_inserts=true; isso permitirá inserir dados em paralelo (mas tenha em mente que isso resultará em um número maior de partes de dados geradas para a família MergeTree).
Os INSERTs serão gerados de acordo com as configurações max_process*_before_commit.
Valor padrão: false.

enable_logging_to_s3queue_log

Ativa o logging em system.s3queue_log. Valor padrão: 0.

polling_min_timeout_ms

Especifica o tempo mínimo, em milissegundos, que o ClickHouse aguarda antes de fazer a próxima tentativa de polling. Valores possíveis:
  • Inteiro positivo.
Valor padrão: 1000.

polling_max_timeout_ms

Define o tempo máximo, em milissegundos, que o ClickHouse aguarda antes de iniciar a próxima tentativa de polling. Valores possíveis:
  • Inteiro positivo.
Valor padrão: 10000.

polling_backoff_ms

Determina o tempo de espera adicional acrescentado ao intervalo de polling anterior quando nenhum arquivo novo é encontrado. A próxima verificação ocorre após a soma do intervalo anterior com esse valor de backoff, ou o intervalo máximo, o que for menor. Valores possíveis:
  • Inteiro positivo.
Valor padrão: 0.

tracked_files_limit

Permite limitar o número de nós do Zookeeper caso o modo ‘unordered’ seja usado; não tem efeito no modo ‘ordered’. Se o limite for atingido, os arquivos processados há mais tempo serão excluídos do nó do ZooKeeper e processados novamente. Possíveis valores:
  • Inteiro positivo.
Valor padrão: 1000.

tracked_file_ttl_sec

Número máximo de segundos para manter arquivos processados no nó do ZooKeeper (mantidos para sempre por padrão) no modo ‘unordered’; não tem efeito no modo ‘ordered’. Após o número especificado de segundos, o arquivo será importado novamente. Valores possíveis:
  • Inteiro positivo.
Valor padrão: 0.

cleanup_interval_min_ms

Para o modo ‘Ordered’. Define um limite mínimo para o intervalo de reagendamento de uma tarefa em segundo plano, responsável por manter o TTL dos arquivos monitorados e o limite máximo de arquivos monitorados. Valor padrão: 10000.

cleanup_interval_max_ms

No modo ‘Ordered’. Define o limite máximo do intervalo de reagendamento de uma tarefa em segundo plano responsável por manter o TTL dos arquivos rastreados e o conjunto máximo de arquivos rastreados. Valor padrão: 30000.

buckets

Para o modo ‘Ordered’. Disponível desde a versão 24.6. Se houver várias réplicas da tabela S3Queue, cada uma usando o mesmo diretório de metadados no Keeper, o valor de buckets precisa ser pelo menos igual ao número de réplicas. Se a configuração processing_threads também for usada, faz sentido aumentar ainda mais o valor da configuração buckets, pois ela define o paralelismo efetivo do processamento da S3Queue.

use_persistent_processing_nodes

Por padrão, a tabela S3Queue sempre usou nós de processamento efêmeros, o que poderia levar à duplicação de dados caso a sessão do ZooKeeper expire depois que o processamento já tiver sido iniciado, mas antes de o S3Queue fazer o commit dos arquivos processados no ZooKeeper. Essa configuração força o servidor a eliminar a possibilidade de duplicações em caso de expiração da sessão do Keeper.

persistent_processing_nodes_ttl_seconds

Em caso de encerramento não gracioso do servidor, é possível que, se use_persistent_processing_nodes estiver habilitado, haja nós de processamento que não foram removidos. Essa configuração define um período durante o qual esses nós de processamento podem ser removidos com segurança. Valor padrão: 3600 (1 hora).

Configurações relacionadas ao S3

O motor oferece suporte a todas as configurações relacionadas ao S3. Para mais informações sobre as configurações do S3, consulte aqui.

Acesso ao S3 baseado em função

O motor de tabela S3Queue oferece suporte a acesso baseado em função. Consulte a documentação aqui para ver as etapas de configuração de uma função para acessar seu bucket. Depois que a função estiver configurada, um roleARN poderá ser passado por meio do parâmetro extra_credentials, como mostrado abaixo:
CREATE TABLE s3_table
(
    ts DateTime,
    value UInt64
)
ENGINE = S3Queue(
                'https://<your_bucket>/*.csv',
                extra_credentials(role_arn = 'arn:aws:iam::111111111111:role/<your_role>')
                ,'CSV')
SETTINGS
    ...

Modo ordenado do S3Queue

O modo de processamento S3Queue permite armazenar menos metadados no ZooKeeper, mas tem a limitação de exigir que arquivos adicionados posteriormente tenham nomes alfanumericamente maiores. O modo ordered do S3Queue, assim como o unordered, oferece suporte à configuração (s3queue_)processing_threads_num (o prefixo s3queue_ é opcional), que permite controlar o número de threads que farão o processamento local dos arquivos S3 no servidor. No modo ordered sem particionamento, o ClickHouse pode retomar a listagem do S3 a partir da última chave processada para evitar listar novamente todo o histórico do prefixo. No modo ordenado com buckets, o ponto de retomada é escolhido de forma conservadora como a menor chave processada entre todos os buckets para evitar pular arquivos ainda não processados. Essa otimização de retomada da listagem é usada apenas para filas com S3 em modo ordenado sem particionamento (não para AzureQueue nem quando partitioning_mode está definido). Além disso, o modo ordered também introduz outra configuração chamada (s3queue_)buckets, que significa “threads lógicas”. Isso significa que, em um cenário distribuído com vários servidores e réplicas da tabela S3Queue, essa configuração define o número de unidades de processamento. Por exemplo, cada thread de processamento em cada réplica S3Queue tentará bloquear um determinado bucket para processamento; cada bucket é atribuído a determinados arquivos por meio do hash do nome do arquivo. Portanto, em um cenário distribuído, é altamente recomendável que a configuração (s3queue_)buckets seja pelo menos igual ao número de réplicas ou maior. Não há problema em ter um número de buckets maior que o número de réplicas. O cenário ideal é que a configuração (s3queue_)buckets seja igual ao produto de number_of_replicas por (s3queue_)processing_threads_num. A configuração (s3queue_)processing_threads_num não é recomendada para uso antes da versão 24.6. A configuração (s3queue_)buckets está disponível a partir da versão 24.6.

SELECT no engine de tabela S3Queue

As consultas SELECT são proibidas por padrão em tabelas S3Queue. Isso segue o padrão comum de fila, em que os dados são lidos uma vez e depois removidos da fila. O SELECT é proibido para evitar perda acidental de dados. No entanto, em alguns casos isso pode ser útil. Para isso, você precisa definir a configuração stream_like_engine_allow_direct_select como True. O engine S3Queue tem uma configuração especial para consultas SELECT: commit_on_select. Defina-a como False para preservar os dados na fila após a leitura, ou como True para removê-los.

Descrição

SELECT não é particularmente útil para importação em streaming (exceto para depuração), porque cada arquivo pode ser importado apenas uma vez. É mais prático criar fluxos em tempo real usando visões materializadas. Para fazer isso:
  1. Use o mecanismo para criar uma tabela que consuma do caminho especificado no S3 e trate-a como um fluxo de dados.
  2. Crie uma tabela com a estrutura desejada.
  3. Crie uma visão materializada que converta os dados do mecanismo e os insira em uma tabela criada anteriormente.
Quando a MATERIALIZED VIEW é associada ao mecanismo, ela começa a coletar dados em segundo plano. Exemplo:
  CREATE TABLE s3queue_engine_table (name String, value UInt32)
    ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
    SETTINGS
        mode = 'unordered';

  CREATE TABLE stats (name String, value UInt32)
    ENGINE = MergeTree() ORDER BY name;

  CREATE MATERIALIZED VIEW consumer TO stats
    AS SELECT name, value FROM s3queue_engine_table;

  SELECT * FROM stats ORDER BY name;

Colunas virtuais

  • _path — Caminho do arquivo.
  • _file — Nome do arquivo.
  • _size — Tamanho do arquivo.
  • _time — Horário de criação do arquivo.
Para mais informações sobre colunas virtuais, consulte aqui.

Curingas em path

O argumento path pode especificar vários arquivos usando curingas no estilo do bash. Para que um arquivo seja processado, ele deve existir e corresponder ao padrão completo do path. A listagem de arquivos é determinada durante o SELECT (não no momento do CREATE).
  • * — Substitui qualquer quantidade de caracteres, exceto /, incluindo a string vazia.
  • ** — Substitui qualquer quantidade de caracteres, incluindo /, incluindo a string vazia.
  • ? — Substitui qualquer caractere único.
  • {some_string,another_string,yet_another_one} — Substitui qualquer uma das strings 'some_string', 'another_string', 'yet_another_one'.
  • {N..M} — Substitui qualquer número no intervalo de N a M, incluindo ambas as extremidades. N e M podem ter zeros à esquerda, por exemplo, 000..078.
Construções com {} são semelhantes à table function remote.

Limitações

  1. Linhas duplicadas podem ocorrer em decorrência de:
  • uma exceção ocorrer durante o parsing, no meio do processamento do arquivo, e as tentativas de repetição estiverem habilitadas por meio de s3queue_loading_retries;
  • S3Queue estar configurado em vários servidores apontando para o mesmo caminho no ZooKeeper, e a sessão do Keeper expirar antes que um servidor consiga fazer commit do arquivo processado, o que pode levar outro servidor a assumir o processamento do arquivo, que pode já ter sido processado parcial ou totalmente pelo primeiro servidor; no entanto, isso não é true desde a versão 25.8 se use_persistent_processing_nodes = 1.
  • encerramento anormal do servidor.
  1. Se S3Queue estiver configurado em vários servidores apontando para o mesmo caminho no ZooKeeper e o modo Ordered for usado, s3queue_loading_retries não funcionará. Isso será corrigido em breve.

Introspecção

Para fins de introspecção, use a tabela sem estado system.s3queue_metadata_cache e a tabela persistente system.s3queue_log.
  1. system.s3queue_metadata_cache. Esta tabela não é persistente e mostra o estado em memória do S3Queue: quais arquivos estão sendo processados no momento e quais arquivos já foram processados ou falharam.
┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
CREATE TABLE system.s3queue_metadata_cache
(
    `database` String,
    `table` String,
    `file_name` String,
    `rows_processed` UInt64,
    `status` String,
    `processing_start_time` Nullable(DateTime),
    `processing_end_time` Nullable(DateTime),
    `ProfileEvents` Map(String, UInt64)
    `exception` String
)
ENGINE = SystemS3Queue
COMMENT 'Contains in-memory state of S3Queue metadata and currently processed rows per file.'
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
Exemplo:

SELECT *
FROM system.s3queue_metadata_cache

Linha 1:
──────
zookeeper_path:        /clickhouse/s3queue/25ea5621-ae8c-40c7-96d0-cec959c5ab88/3b3f66a1-9866-4c2e-ba78-b6bfa154207e
file_name:             wikistat/original/pageviews-20150501-030000.gz
rows_processed:        5068534
status:                Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time:   2023-10-13 13:10:31
ProfileEvents:         {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5068534,'SelectedBytes':198132283,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':2480,'S3QueueSetFileProcessedMicroseconds':9985,'S3QueuePullMicroseconds':273776,'LogTest':17}
exception:
  1. system.s3queue_log. Tabela persistente. Contém as mesmas informações que system.s3queue_metadata_cache, mas para arquivos processed e failed.
A tabela tem a seguinte estrutura:
SHOW CREATE TABLE system.s3queue_log

Query id: 0ad619c3-0f2a-4ee4-8b40-c73d86e04314

┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
CREATE TABLE system.s3queue_log
(
    `event_date` Date,
    `event_time` DateTime,
    `table_uuid` String,
    `file_name` String,
    `rows_processed` UInt64,
    `status` Enum8('Processed' = 0, 'Failed' = 1),
    `processing_start_time` Nullable(DateTime),
    `processing_end_time` Nullable(DateTime),
    `ProfileEvents` Map(String, UInt64),
    `exception` String
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time) │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
Para usar system.s3queue_log, defina a configuração no arquivo de configuração do servidor:
    <s3queue_log>
        <database>system</database>
        <table>s3queue_log</table>
    </s3queue_log>
Exemplo:
SELECT *
FROM system.s3queue_log

Linha 1:
──────
event_date:            2023-10-13
event_time:            2023-10-13 13:10:12
table_uuid:
file_name:             wikistat/original/pageviews-20150501-020000.gz
rows_processed:        5112621
status:                Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time:   2023-10-13 13:10:12
ProfileEvents:         {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5112621,'SelectedBytes':198577687,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':1934,'S3QueueSetFileProcessedMicroseconds':17063,'S3QueuePullMicroseconds':5841972,'LogTest':17}
exception:
Última modificação em 10 de junho de 2026