Apache Beam — это открытая унифицированная модель программирования, которая позволяет разработчикам определять и выполнять как батч-, так и потоковые (непрерывные) конвейеры обработки данных. Гибкость Apache Beam заключается в поддержке широкого спектра сценариев обработки данных — от операций ETL (извлечение, преобразование, загрузка) до сложной обработки событий и Real-time аналитики.
Эта интеграция использует официальный коннектор JDBC ClickHouse в качестве базового слоя для вставки.
Пакет интеграции, необходимый для работы Apache Beam с ClickHouse, поддерживается и разрабатывается в рамках Apache Beam I/O Connectors — набора интеграций для множества популярных систем хранения данных и баз данных.
Реализация org.apache.beam.sdk.io.clickhouse.ClickHouseIO находится в репозитории Apache Beam.
Настройка пакета ClickHouse для Apache Beam
Добавьте следующую зависимость в систему управления пакетами:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-clickhouse</artifactId>
<version>${beam.version}</version>
</dependency>
Рекомендуемая версия BeamКоннектор ClickHouseIO рекомендуется использовать с Apache Beam версии 2.59.0 и выше.
Более ранние версии могут не полностью поддерживать функциональность коннектора.
Артефакты доступны в официальном репозитории Maven.
В следующем примере CSV-файл input.csv считывается в виде PCollection, преобразуется в объект Row (с использованием заданной схемы) и вставляется в локальный экземпляр ClickHouse с помощью ClickHouseIO:
package org.example;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.clickhouse.ClickHouseIO;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;
public class Main {
public static void main(String[] args) {
// Создание объекта Pipeline.
Pipeline p = Pipeline.create();
Schema SCHEMA =
Schema.builder()
.addField(Schema.Field.of("name", Schema.FieldType.STRING).withNullable(true))
.addField(Schema.Field.of("age", Schema.FieldType.INT16).withNullable(true))
.addField(Schema.Field.of("insertion_time", Schema.FieldType.DATETIME).withNullable(false))
.build();
// Применение преобразований к конвейеру.
PCollection<String> lines = p.apply("ReadLines", TextIO.read().from("src/main/resources/input.csv"));
PCollection<Row> rows = lines.apply("ConvertToRow", ParDo.of(new DoFn<String, Row>() {
@ProcessElement
public void processElement(@Element String line, OutputReceiver<Row> out) {
String[] values = line.split(",");
Row row = Row.withSchema(SCHEMA)
.addValues(values[0], Short.parseShort(values[1]), DateTime.now())
.build();
out.output(row);
}
})).setRowSchema(SCHEMA);
rows.apply("Write to ClickHouse",
ClickHouseIO.write("jdbc:clickhouse://localhost:8123/default?user=default&password=******", "test_table"));
// Запуск конвейера.
p.run().waitUntilFinish();
}
}
Поддерживаемые типы данных
| ClickHouse | Apache Beam | Поддерживается | Примечания |
|---|
TableSchema.TypeName.FLOAT32 | Schema.TypeName#FLOAT | ✅ | |
TableSchema.TypeName.FLOAT64 | Schema.TypeName#DOUBLE | ✅ | |
TableSchema.TypeName.INT8 | Schema.TypeName#BYTE | ✅ | |
TableSchema.TypeName.INT16 | Schema.TypeName#INT16 | ✅ | |
TableSchema.TypeName.INT32 | Schema.TypeName#INT32 | ✅ | |
TableSchema.TypeName.INT64 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.STRING | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.UINT8 | Schema.TypeName#INT16 | ✅ | |
TableSchema.TypeName.UINT16 | Schema.TypeName#INT32 | ✅ | |
TableSchema.TypeName.UINT32 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.UINT64 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.DATE | Schema.TypeName#DATETIME | ✅ | |
TableSchema.TypeName.DATETIME | Schema.TypeName#DATETIME | ✅ | |
TableSchema.TypeName.ARRAY | Schema.TypeName#ARRAY | ✅ | |
TableSchema.TypeName.ENUM8 | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.ENUM16 | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.BOOL | Schema.TypeName#BOOLEAN | ✅ | |
TableSchema.TypeName.TUPLE | Schema.TypeName#ROW | ✅ | |
TableSchema.TypeName.FIXEDSTRING | FixedBytes | ✅ | FixedBytes — это LogicalType, представляющий массив байтов фиксированной длины, который находится в org.apache.beam.sdk.schemas.logicaltypes |
| Schema.TypeName#DECIMAL | ❌ | |
| Schema.TypeName#MAP | ❌ | |
Параметры ClickHouseIO.Write
Конфигурацию ClickHouseIO.Write можно настроить с помощью следующих функций-сеттеров:
| Функция-сеттер параметра | Тип аргумента | Значение по умолчанию | Описание |
|---|
withMaxInsertBlockSize | (long maxInsertBlockSize) | 1000000 | Максимальный размер блока строк для вставки. |
withMaxRetries | (int maxRetries) | 5 | Максимальное количество повторных попыток для неудачных вставок. |
withMaxCumulativeBackoff | (Duration maxBackoff) | Duration.standardDays(1000) | Максимальная суммарная длительность задержки для повторных попыток. |
withInitialBackoff | (Duration initialBackoff) | Duration.standardSeconds(5) | Начальная длительность задержки перед первой повторной попыткой. |
withInsertDistributedSync | (Boolean sync) | true | Если true, синхронизирует операции вставки для distributed таблиц. |
withInsertQuorum | (Long quorum) | null | Количество реплик, необходимых для подтверждения операции вставки. |
withInsertDeduplicate | (Boolean deduplicate) | true | Если true, для операций вставки включается дедупликация. |
withTableSchema | (TableSchema schema) | null | Схема целевой таблицы ClickHouse. |
Учитывайте следующие ограничения при использовании коннектора:
- На данный момент поддерживается только операция Sink. Коннектор не поддерживает операцию Source.
- ClickHouse выполняет дедупликацию при вставке в таблицу
ReplicatedMergeTree или в таблицу Distributed, построенную поверх ReplicatedMergeTree. Без репликации вставка в обычную таблицу MergeTree может приводить к появлению дубликатов, если вставка завершается ошибкой, а затем успешно повторяется. Однако каждый блок вставляется атомарно, а размер блока можно настроить с помощью ClickHouseIO.Write.withMaxInsertBlockSize(long). Дедупликация достигается за счёт использования контрольных сумм вставленных блоков. Подробнее о дедупликации см. в разделах Deduplication и Deduplicate insertion config.
- Коннектор не выполняет никаких DDL-операторов; поэтому целевая таблица должна существовать до вставки.
Последнее изменение 10 июня 2026 г.