Перейти к основному содержанию
Apache Beam — это открытая унифицированная модель программирования, которая позволяет разработчикам определять и выполнять как батч-, так и потоковые (непрерывные) конвейеры обработки данных. Гибкость Apache Beam заключается в поддержке широкого спектра сценариев обработки данных — от операций ETL (извлечение, преобразование, загрузка) до сложной обработки событий и Real-time аналитики. Эта интеграция использует официальный коннектор JDBC ClickHouse в качестве базового слоя для вставки.

Пакет интеграции

Пакет интеграции, необходимый для работы Apache Beam с ClickHouse, поддерживается и разрабатывается в рамках Apache Beam I/O Connectors — набора интеграций для множества популярных систем хранения данных и баз данных. Реализация org.apache.beam.sdk.io.clickhouse.ClickHouseIO находится в репозитории Apache Beam.

Настройка пакета ClickHouse для Apache Beam

Установка пакета

Добавьте следующую зависимость в систему управления пакетами:
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-clickhouse</artifactId>
    <version>${beam.version}</version>
</dependency>
Рекомендуемая версия BeamКоннектор ClickHouseIO рекомендуется использовать с Apache Beam версии 2.59.0 и выше. Более ранние версии могут не полностью поддерживать функциональность коннектора.
Артефакты доступны в официальном репозитории Maven.

Пример кода

В следующем примере CSV-файл input.csv считывается в виде PCollection, преобразуется в объект Row (с использованием заданной схемы) и вставляется в локальный экземпляр ClickHouse с помощью ClickHouseIO:

package org.example;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.clickhouse.ClickHouseIO;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;

public class Main {

    public static void main(String[] args) {
        // Создание объекта Pipeline.
        Pipeline p = Pipeline.create();

        Schema SCHEMA =
                Schema.builder()
                        .addField(Schema.Field.of("name", Schema.FieldType.STRING).withNullable(true))
                        .addField(Schema.Field.of("age", Schema.FieldType.INT16).withNullable(true))
                        .addField(Schema.Field.of("insertion_time", Schema.FieldType.DATETIME).withNullable(false))
                        .build();

        // Применение преобразований к конвейеру.
        PCollection<String> lines = p.apply("ReadLines", TextIO.read().from("src/main/resources/input.csv"));

        PCollection<Row> rows = lines.apply("ConvertToRow", ParDo.of(new DoFn<String, Row>() {
            @ProcessElement
            public void processElement(@Element String line, OutputReceiver<Row> out) {

                String[] values = line.split(",");
                Row row = Row.withSchema(SCHEMA)
                        .addValues(values[0], Short.parseShort(values[1]), DateTime.now())
                        .build();
                out.output(row);
            }
        })).setRowSchema(SCHEMA);

        rows.apply("Write to ClickHouse",
                        ClickHouseIO.write("jdbc:clickhouse://localhost:8123/default?user=default&password=******", "test_table"));

        // Запуск конвейера.
        p.run().waitUntilFinish();
    }
}

Поддерживаемые типы данных

ClickHouseApache BeamПоддерживаетсяПримечания
TableSchema.TypeName.FLOAT32Schema.TypeName#FLOAT
TableSchema.TypeName.FLOAT64Schema.TypeName#DOUBLE
TableSchema.TypeName.INT8Schema.TypeName#BYTE
TableSchema.TypeName.INT16Schema.TypeName#INT16
TableSchema.TypeName.INT32Schema.TypeName#INT32
TableSchema.TypeName.INT64Schema.TypeName#INT64
TableSchema.TypeName.STRINGSchema.TypeName#STRING
TableSchema.TypeName.UINT8Schema.TypeName#INT16
TableSchema.TypeName.UINT16Schema.TypeName#INT32
TableSchema.TypeName.UINT32Schema.TypeName#INT64
TableSchema.TypeName.UINT64Schema.TypeName#INT64
TableSchema.TypeName.DATESchema.TypeName#DATETIME
TableSchema.TypeName.DATETIMESchema.TypeName#DATETIME
TableSchema.TypeName.ARRAYSchema.TypeName#ARRAY
TableSchema.TypeName.ENUM8Schema.TypeName#STRING
TableSchema.TypeName.ENUM16Schema.TypeName#STRING
TableSchema.TypeName.BOOLSchema.TypeName#BOOLEAN
TableSchema.TypeName.TUPLESchema.TypeName#ROW
TableSchema.TypeName.FIXEDSTRINGFixedBytesFixedBytes — это LogicalType, представляющий массив
байтов фиксированной длины, который находится в
org.apache.beam.sdk.schemas.logicaltypes
Schema.TypeName#DECIMAL
Schema.TypeName#MAP

Параметры ClickHouseIO.Write

Конфигурацию ClickHouseIO.Write можно настроить с помощью следующих функций-сеттеров:
Функция-сеттер параметраТип аргументаЗначение по умолчаниюОписание
withMaxInsertBlockSize(long maxInsertBlockSize)1000000Максимальный размер блока строк для вставки.
withMaxRetries(int maxRetries)5Максимальное количество повторных попыток для неудачных вставок.
withMaxCumulativeBackoff(Duration maxBackoff)Duration.standardDays(1000)Максимальная суммарная длительность задержки для повторных попыток.
withInitialBackoff(Duration initialBackoff)Duration.standardSeconds(5)Начальная длительность задержки перед первой повторной попыткой.
withInsertDistributedSync(Boolean sync)trueЕсли true, синхронизирует операции вставки для distributed таблиц.
withInsertQuorum(Long quorum)nullКоличество реплик, необходимых для подтверждения операции вставки.
withInsertDeduplicate(Boolean deduplicate)trueЕсли true, для операций вставки включается дедупликация.
withTableSchema(TableSchema schema)nullСхема целевой таблицы ClickHouse.

Ограничения

Учитывайте следующие ограничения при использовании коннектора:
  • На данный момент поддерживается только операция Sink. Коннектор не поддерживает операцию Source.
  • ClickHouse выполняет дедупликацию при вставке в таблицу ReplicatedMergeTree или в таблицу Distributed, построенную поверх ReplicatedMergeTree. Без репликации вставка в обычную таблицу MergeTree может приводить к появлению дубликатов, если вставка завершается ошибкой, а затем успешно повторяется. Однако каждый блок вставляется атомарно, а размер блока можно настроить с помощью ClickHouseIO.Write.withMaxInsertBlockSize(long). Дедупликация достигается за счёт использования контрольных сумм вставленных блоков. Подробнее о дедупликации см. в разделах Deduplication и Deduplicate insertion config.
  • Коннектор не выполняет никаких DDL-операторов; поэтому целевая таблица должна существовать до вставки.
Последнее изменение 10 июня 2026 г.