Apache Avro — это строко-ориентированный формат сериализации, использующий двоичное кодирование для эффективной обработки данных. Формат AvroConfluent поддерживает чтение и запись сообщений в формате Avro с использованием Confluent Schema Registry (или сервисов с совместимым API).
Каждое сообщение использует формат передачи данных Confluent: магический байт (0x00), затем 4-байтовый идентификатор схемы в порядке байтов big-endian, после чего следует двоичное значение Avro. При чтении ClickHouse определяет идентификатор схемы, обращаясь к реестру. При записи ClickHouse регистрирует схему, полученную из выходных столбцов, и добавляет полученный идентификатор в начало каждой строки. Для оптимальной производительности схемы кэшируются.
Соответствие типов данных
В таблице ниже приведены все типы данных, поддерживаемые форматом Apache Avro, и соответствующие им типы данных ClickHouse в запросах INSERT и SELECT.
Тип данных Avro INSERT | Тип данных ClickHouse | Тип данных Avro SELECT |
|---|
boolean, int, long, float, double | Int(8\16\32), UInt(8\16\32) | int |
boolean, int, long, float, double | Int64, UInt64 | long |
boolean, int, long, float, double | Float32 | float |
boolean, int, long, float, double | Float64 | double |
bytes, string, fixed, enum | String | bytes или string * |
bytes, string, fixed | FixedString(N) | fixed(N) |
enum | Enum(8\16) | enum |
array(T) | Array(T) | array(T) |
map(V, K) | Map(V, K) | map(string, K) |
union(null, T), union(T, null) | Nullable(T) | union(null, T) |
union(T1, T2, …) ** | Variant(T1, T2, …) | union(T1, T2, …) ** |
null | Nullable(Nothing) | null |
int (date) *** | Date, Date32 | int (date) *** |
long (timestamp-millis) *** | DateTime64(3) | long (timestamp-millis) *** |
long (timestamp-micros) *** | DateTime64(6) | long (timestamp-micros) *** |
bytes (decimal) *** | DateTime64(N) | bytes (decimal) *** |
int | IPv4 | int |
fixed(16) | IPv6 | fixed(16) |
bytes (decimal) *** | Decimal(P, S) | bytes (decimal) *** |
string (uuid) *** | UUID | string (uuid) *** |
fixed(16) | Int128/UInt128 | fixed(16) |
fixed(32) | Int256/UInt256 | fixed(32) |
record | Tuple | record |
** Тип Variant неявно допускает null в качестве значения поля, поэтому, например, Avro union(T1, T2, null) будет преобразован в Variant(T1, T2).
В результате при генерации Avro из ClickHouse нам всегда нужно включать тип null в набор типов Avro union, поскольку во время вывода схемы мы не знаем, является ли какое-либо из значений фактически null.
*** Логические типы Avro
Неподдерживаемые логические типы данных Avro:
time-millis
time-micros
duration
| Настройка | Описание | По умолчанию |
|---|
input_format_avro_allow_missing_fields | Следует ли использовать значение по умолчанию вместо выдачи ошибки, если поле не найдено в схеме. | 0 |
input_format_avro_null_as_default | Следует ли использовать значение по умолчанию вместо выдачи ошибки при вставке значения null в столбец, не являющийся Nullable. | 0 |
format_avro_schema_registry_url | URL Confluent Schema Registry. Для базовой аутентификации учетные данные в URL-кодировке можно включить непосредственно в URL. | |
format_avro_schema_registry_connection_timeout | Тайм-аут соединения в секундах для HTTP-клиента Schema Registry (используется как для получения схемы, так и для регистрации). Должен быть больше 0 и меньше 600 (10 минут). | 1 |
format_avro_schema_registry_send_timeout | Тайм-аут отправки в секундах для HTTP-клиента Schema Registry. Должен быть больше 0 и меньше 600 (10 минут). | 1 |
format_avro_schema_registry_receive_timeout | Тайм-аут получения в секундах для HTTP-клиента Schema Registry. Должен быть больше 0 и меньше 600 (10 минут). | 1 |
output_format_avro_confluent_subject | Для вывода: имя subject, под которым схема зарегистрирована в Schema Registry. Обязательно при записи. | |
output_format_avro_string_column_pattern | Для вывода: шаблон регулярного выражения для столбцов String, которые нужно сериализовать как Avro string (по умолчанию — bytes). | |
Чтобы читать Kafka топик в кодировке Avro с помощью движка таблицы Kafka, используйте параметр format_avro_schema_registry_url, чтобы указать URL реестра схем.
CREATE TABLE topic1_stream
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';
SELECT * FROM topic1_stream;
Чтобы записывать сообщения AvroConfluent в топик Kafka, задайте URL реестра схем и имя subject. Схема автоматически регистрируется в реестре при первой записи.
CREATE TABLE topic1_sink
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url',
output_format_avro_confluent_subject = 'topic1-value';
INSERT INTO topic1_sink VALUES ('hello', 'world');
Использование базовой аутентификации
Если для вашего реестра схем требуется базовая аутентификация (например, если вы используете Confluent Cloud), вы можете указать URL-кодированные учетные данные в параметре format_avro_schema_registry_url.
CREATE TABLE topic1_stream
(
field1 String,
field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';
Чтобы отслеживать прогресс ингестии и диагностировать ошибки Kafka-консьюмера, можно выполнить запрос к системной таблице system.kafka_consumers. Если в вашем развертывании несколько реплик (например, в ClickHouse Cloud), необходимо использовать табличную функцию clusterAllReplicas.
SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;
Если вы столкнулись с проблемами с разрешением схемы, для диагностики можно использовать kafkacat вместе с clickhouse-local:
$ kafkacat -b kafka-broker -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String" -q 'select * from table'
1 a
2 b
3 c
Последнее изменение 10 июня 2026 г.