跳转到主要内容
输入输出别名

描述

Apache Avro 是一种面向行的序列化格式,使用二进制编码来高效处理数据。AvroConfluent 格式支持借助 Confluent Schema Registry (或与其 API 兼容的服务) 读取和写入采用 Avro 编码的消息。 每条消息都使用 Confluent 传输格式:一个魔数字节 (0x00) ,后跟一个 4 字节大端序的 schema ID,再后跟 Avro 二进制数据。读取时,ClickHouse 会通过查询 registry 来解析 schema ID。写入时,ClickHouse 会注册根据输出列推导出的 schema,并将生成的 ID 添加到每一行前面。为获得最佳性能,schema 会被缓存。

数据类型映射

下表列出了 Apache Avro format 支持的所有 data types,以及它们在 INSERTSELECT queries 中对应的 ClickHouse data types
Avro data type INSERTClickHouse data typeAvro data type SELECT
boolean, int, long, float, doubleInt(8\16\32), UInt(8\16\32)int
boolean, int, long, float, doubleInt64, UInt64long
boolean, int, long, float, doubleFloat32float
boolean, int, long, float, doubleFloat64double
bytes, string, fixed, enumStringbytesstring *
bytes, string, fixedFixedString(N)fixed(N)
enumEnum(8\16)enum
array(T)Array(T)array(T)
map(V, K)Map(V, K)map(string, K)
union(null, T), union(T, null)Nullable(T)union(null, T)
union(T1, T2, …) **Variant(T1, T2, …)union(T1, T2, …) **
nullNullable(Nothing)null
int (date) ***Date, Date32int (date) ***
long (timestamp-millis) ***DateTime64(3)long (timestamp-millis) ***
long (timestamp-micros) ***DateTime64(6)long (timestamp-micros) ***
bytes (decimal) ***DateTime64(N)bytes (decimal) ***
intIPv4int
fixed(16)IPv6fixed(16)
bytes (decimal) ***Decimal(P, S)bytes (decimal) ***
string (uuid) ***UUIDstring (uuid) ***
fixed(16)Int128/UInt128fixed(16)
fixed(32)Int256/UInt256fixed(32)
recordTuplerecord
** Variant 类型 会隐式地接受 null 作为字段值,因此,例如 Avro 的 union(T1, T2, null) 会被转换为 Variant(T1, T2)。 因此,当从 ClickHouse 生成 Avro 时,我们必须始终在 Avro 的 union 类型集合中包含 null 类型,因为在进行 schema inference 时,我们无法判断是否有某个值实际为 null *** Avro 逻辑类型 不受支持的 Avro 逻辑类型:
  • time-millis
  • time-micros
  • duration

格式设置

SettingDescriptionDefault
input_format_avro_allow_missing_fields当 schema 中找不到某个字段时,是否使用默认值而不是报错。0
input_format_avro_null_as_default当向不可为空的列中插入 null 值时,是否使用默认值而不是报错。0
format_avro_schema_registry_urlConfluent Schema Registry 的 URL。对于基本身份验证,可以将经过 URL 编码的凭据直接包含在 URL 路径中。
format_avro_schema_registry_connection_timeoutSchema Registry HTTP 客户端的连接超时时间 (秒) ,用于 schema 拉取和注册。必须大于 0 且小于 600 (10 分钟) 。1
format_avro_schema_registry_send_timeoutSchema Registry HTTP 客户端的发送超时时间 (秒) 。必须大于 0 且小于 600 (10 分钟) 。1
format_avro_schema_registry_receive_timeoutSchema Registry HTTP 客户端的接收超时时间 (秒) 。必须大于 0 且小于 600 (10 分钟) 。1
output_format_avro_confluent_subject用于输出:schema 在 Schema Registry 中注册时使用的 subject 名称。写入时必填。
output_format_avro_string_column_pattern用于输出:将 String 列序列化为 Avro string 的正则表达式 (默认为 bytes) 。

示例

从 Kafka 读取

要使用 Kafka 表引擎 读取采用 Avro 编码的 Kafka topic,请使用 format_avro_schema_registry_url 设置指定 Schema Registry 的 URL。
CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';

SELECT * FROM topic1_stream;

写入 Kafka

要将 AvroConfluent 消息写入 Kafka topic,需要同时设置 Schema Registry URL 和 subject 名称。首次写入时,schema 会自动注册到 registry。
CREATE TABLE topic1_sink
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url',
output_format_avro_confluent_subject = 'topic1-value';

INSERT INTO topic1_sink VALUES ('hello', 'world');

使用基本身份验证

如果您的 Schema Registry 需要使用基本身份验证 (例如您使用的是 Confluent Cloud) ,可以在 format_avro_schema_registry_url 设置中提供经过 URL 编码的凭据。
CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';

故障排除

要监控摄取进度并排查 Kafka 消费者错误,可以查询 system.kafka_consumers 系统表。如果你的部署包含多个副本 (例如 ClickHouse Cloud) ,则必须使用 clusterAllReplicas 表函数。
SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;
如果遇到 schema 解析方面的问题,可以使用 kafkacat 配合 clickhouse-local 进行故障排查:
$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c
最后修改于 2026年6月10日