Перейти к основному содержанию
ВходВыходПсевдоним

Описание

Apache Avro — это строко-ориентированный формат сериализации, использующий двоичное кодирование для эффективной обработки данных. Формат AvroConfluent поддерживает чтение и запись сообщений в формате Avro с использованием Confluent Schema Registry (или сервисов с совместимым API). Каждое сообщение использует формат передачи данных Confluent: магический байт (0x00), затем 4-байтовый идентификатор схемы в порядке байтов big-endian, после чего следует двоичное значение Avro. При чтении ClickHouse определяет идентификатор схемы, обращаясь к реестру. При записи ClickHouse регистрирует схему, полученную из выходных столбцов, и добавляет полученный идентификатор в начало каждой строки. Для оптимальной производительности схемы кэшируются.

Соответствие типов данных

В таблице ниже приведены все типы данных, поддерживаемые форматом Apache Avro, и соответствующие им типы данных ClickHouse в запросах INSERT и SELECT.
Тип данных Avro INSERTТип данных ClickHouseТип данных Avro SELECT
boolean, int, long, float, doubleInt(8\16\32), UInt(8\16\32)int
boolean, int, long, float, doubleInt64, UInt64long
boolean, int, long, float, doubleFloat32float
boolean, int, long, float, doubleFloat64double
bytes, string, fixed, enumStringbytes или string *
bytes, string, fixedFixedString(N)fixed(N)
enumEnum(8\16)enum
array(T)Array(T)array(T)
map(V, K)Map(V, K)map(string, K)
union(null, T), union(T, null)Nullable(T)union(null, T)
union(T1, T2, …) **Variant(T1, T2, …)union(T1, T2, …) **
nullNullable(Nothing)null
int (date) ***Date, Date32int (date) ***
long (timestamp-millis) ***DateTime64(3)long (timestamp-millis) ***
long (timestamp-micros) ***DateTime64(6)long (timestamp-micros) ***
bytes (decimal) ***DateTime64(N)bytes (decimal) ***
intIPv4int
fixed(16)IPv6fixed(16)
bytes (decimal) ***Decimal(P, S)bytes (decimal) ***
string (uuid) ***UUIDstring (uuid) ***
fixed(16)Int128/UInt128fixed(16)
fixed(32)Int256/UInt256fixed(32)
recordTuplerecord
** Тип Variant неявно допускает null в качестве значения поля, поэтому, например, Avro union(T1, T2, null) будет преобразован в Variant(T1, T2). В результате при генерации Avro из ClickHouse нам всегда нужно включать тип null в набор типов Avro union, поскольку во время вывода схемы мы не знаем, является ли какое-либо из значений фактически null. *** Логические типы Avro Неподдерживаемые логические типы данных Avro:
  • time-millis
  • time-micros
  • duration

Настройки формата

НастройкаОписаниеПо умолчанию
input_format_avro_allow_missing_fieldsСледует ли использовать значение по умолчанию вместо выдачи ошибки, если поле не найдено в схеме.0
input_format_avro_null_as_defaultСледует ли использовать значение по умолчанию вместо выдачи ошибки при вставке значения null в столбец, не являющийся Nullable.0
format_avro_schema_registry_urlURL Confluent Schema Registry. Для базовой аутентификации учетные данные в URL-кодировке можно включить непосредственно в URL.
format_avro_schema_registry_connection_timeoutТайм-аут соединения в секундах для HTTP-клиента Schema Registry (используется как для получения схемы, так и для регистрации). Должен быть больше 0 и меньше 600 (10 минут).1
format_avro_schema_registry_send_timeoutТайм-аут отправки в секундах для HTTP-клиента Schema Registry. Должен быть больше 0 и меньше 600 (10 минут).1
format_avro_schema_registry_receive_timeoutТайм-аут получения в секундах для HTTP-клиента Schema Registry. Должен быть больше 0 и меньше 600 (10 минут).1
output_format_avro_confluent_subjectДля вывода: имя subject, под которым схема зарегистрирована в Schema Registry. Обязательно при записи.
output_format_avro_string_column_patternДля вывода: шаблон регулярного выражения для столбцов String, которые нужно сериализовать как Avro string (по умолчанию — bytes).

Примеры

Чтение из Kafka

Чтобы читать Kafka топик в кодировке Avro с помощью движка таблицы Kafka, используйте параметр format_avro_schema_registry_url, чтобы указать URL реестра схем.
CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';

SELECT * FROM topic1_stream;

Запись в Kafka

Чтобы записывать сообщения AvroConfluent в топик Kafka, задайте URL реестра схем и имя subject. Схема автоматически регистрируется в реестре при первой записи.
CREATE TABLE topic1_sink
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url',
output_format_avro_confluent_subject = 'topic1-value';

INSERT INTO topic1_sink VALUES ('hello', 'world');

Использование базовой аутентификации

Если для вашего реестра схем требуется базовая аутентификация (например, если вы используете Confluent Cloud), вы можете указать URL-кодированные учетные данные в параметре format_avro_schema_registry_url.
CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';

Устранение неполадок

Чтобы отслеживать прогресс ингестии и диагностировать ошибки Kafka-консьюмера, можно выполнить запрос к системной таблице system.kafka_consumers. Если в вашем развертывании несколько реплик (например, в ClickHouse Cloud), необходимо использовать табличную функцию clusterAllReplicas.
SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;
Если вы столкнулись с проблемами с разрешением схемы, для диагностики можно использовать kafkacat вместе с clickhouse-local:
$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c
Последнее изменение 10 июня 2026 г.