Apache Avro é um formato de serialização orientado a linhas que usa codificação binária para o processamento eficiente de dados. O formato AvroConfluent oferece suporte à leitura e à gravação de mensagens codificadas em Avro usando o Confluent Schema Registry (ou serviços compatíveis com a API).
Cada mensagem usa o wire format da Confluent: um byte mágico (0x00), seguido de um ID de schema de 4 bytes em big-endian e, em seguida, do datum binário Avro. Durante a leitura, o ClickHouse resolve o ID do schema consultando o registry. Durante a gravação, o ClickHouse registra o schema derivado das colunas de saída e prefixa cada linha com o ID resultante. Os schemas são armazenados em cache para garantir desempenho ideal.
Mapeamento de tipos de dados
A tabela abaixo mostra todos os tipos de dados compatíveis com o formato Apache Avro e seus respectivos tipos de dados no ClickHouse em consultas INSERT e SELECT.
Tipo de dado Avro INSERT | Tipo de dado ClickHouse | Tipo de dado Avro SELECT |
|---|
boolean, int, long, float, double | Int(8\16\32), UInt(8\16\32) | int |
boolean, int, long, float, double | Int64, UInt64 | long |
boolean, int, long, float, double | Float32 | float |
boolean, int, long, float, double | Float64 | double |
bytes, string, fixed, enum | String | bytes ou string * |
bytes, string, fixed | FixedString(N) | fixed(N) |
enum | Enum(8\16) | enum |
array(T) | Array(T) | array(T) |
map(V, K) | Map(V, K) | map(string, K) |
union(null, T), union(T, null) | Nullable(T) | union(null, T) |
union(T1, T2, …) ** | Variant(T1, T2, …) | union(T1, T2, …) ** |
null | Nullable(Nothing) | null |
int (date) *** | Date, Date32 | int (date) *** |
long (timestamp-millis) *** | DateTime64(3) | long (timestamp-millis) *** |
long (timestamp-micros) *** | DateTime64(6) | long (timestamp-micros) *** |
bytes (decimal) *** | DateTime64(N) | bytes (decimal) *** |
int | IPv4 | int |
fixed(16) | IPv6 | fixed(16) |
bytes (decimal) *** | Decimal(P, S) | bytes (decimal) *** |
string (uuid) *** | UUID | string (uuid) *** |
fixed(16) | Int128/UInt128 | fixed(16) |
fixed(32) | Int256/UInt256 | fixed(32) |
record | Tuple | record |
** O tipo Variant aceita implicitamente null como valor de campo; assim, por exemplo, o Avro union(T1, T2, null) será convertido em Variant(T1, T2).
Como resultado, ao gerar Avro a partir do ClickHouse, precisamos sempre incluir o tipo null no conjunto de tipos union do Avro, pois não sabemos, durante a inferência de esquema, se algum valor é de fato null.
*** Tipos lógicos do Avro
Tipos de dados lógicos do Avro não compatíveis:
time-millis
time-micros
duration
| Setting | Description | Default |
|---|
input_format_avro_allow_missing_fields | Se deve usar um valor padrão em vez de gerar um erro quando um campo não é encontrado no schema. | 0 |
input_format_avro_null_as_default | Se deve usar um valor padrão em vez de gerar um erro ao inserir um valor null em uma coluna que não é Nullable. | 0 |
format_avro_schema_registry_url | A URL do Confluent Schema Registry. Para autenticação básica, credenciais codificadas para URL podem ser incluídas diretamente no caminho da URL. | |
format_avro_schema_registry_connection_timeout | Tempo limite de conexão, em segundos, para o cliente HTTP do Schema Registry (usado tanto para buscar schemas quanto para registrá-los). Deve ser maior que 0 e menor que 600 (10 minutos). | 1 |
format_avro_schema_registry_send_timeout | Tempo limite de envio, em segundos, para o cliente HTTP do Schema Registry. Deve ser maior que 0 e menor que 600 (10 minutos). | 1 |
format_avro_schema_registry_receive_timeout | Tempo limite de recebimento, em segundos, para o cliente HTTP do Schema Registry. Deve ser maior que 0 e menor que 600 (10 minutos). | 1 |
output_format_avro_confluent_subject | Para saída: o nome do subject sob o qual o schema é registrado no Schema Registry. Obrigatório ao gravar. | |
output_format_avro_string_column_pattern | Para saída: expressão regular das colunas String a serem serializadas como Avro string (o padrão é bytes). | |
Para ler um tópico do Kafka codificado em Avro usando o motor de tabela Kafka, use a configuração format_avro_schema_registry_url para informar a URL do schema registry.
CREATE TABLE topic1_stream
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';
SELECT * FROM topic1_stream;
Para gravar mensagens AvroConfluent em um tópico do Kafka, defina a URL do schema registry e o nome do subject. O schema é registrado automaticamente no schema registry na primeira gravação.
CREATE TABLE topic1_sink
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url',
output_format_avro_confluent_subject = 'topic1-value';
INSERT INTO topic1_sink VALUES ('hello', 'world');
Usando autenticação básica
Se o seu schema registry exigir autenticação básica (por exemplo, se você estiver usando o Confluent Cloud), você pode fornecer credenciais codificadas em URL na configuração format_avro_schema_registry_url.
CREATE TABLE topic1_stream
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';
Para monitorar o progresso da ingestão e depurar erros no consumer do Kafka, você pode consultar a tabela de sistema system.kafka_consumers. Se a sua implantação tiver várias réplicas (por exemplo, no ClickHouse Cloud), será necessário usar a função de tabela clusterAllReplicas.
SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;
Se você tiver problemas na resolução do schema, poderá usar kafkacat com clickhouse-local para diagnosticar o problema:
$ kafkacat -b kafka-broker -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String" -q 'select * from table'
1 a
2 b
3 c
Última modificação em 10 de junho de 2026