Перейти к основному содержанию
На этой странице описано, как настроить CDC (фиксация изменений данных) из DynamoDB в ClickHouse с помощью ClickPipes. Эта интеграция состоит из 2 компонентов:
  1. Начальный снимок через S3 ClickPipes
  2. Обновления в реальном времени через Kinesis ClickPipes
Данные будут поступать в таблицу с движком ReplacingMergeTree. Этот движок таблицы обычно используется в сценариях CDC (фиксация изменений данных), так как позволяет применять операции обновления. Подробнее об этом шаблоне можно прочитать в следующих статьях блога:

1. Настройте поток Kinesis

Сначала включите поток Kinesis для таблицы DynamoDB, чтобы отслеживать изменения в реальном времени. Это нужно сделать до создания снимка, чтобы не потерять данные. Руководство AWS доступно здесь.

2. Создайте снимок

Далее мы создадим снимок таблицы DynamoDB. Это можно сделать, экспортировав данные из AWS в S3. Руководство AWS доступно здесь. Выберите “Full export” в формате DynamoDB JSON.

3. Загрузите снимок в ClickHouse

Создайте необходимые таблицы

Данные из снимка DynamoDB будут выглядеть примерно так:
{
  "age": {
    "N": "26"
  },
  "first_name": {
    "S": "sally"
  },
  "id": {
    "S": "0A556908-F72B-4BE6-9048-9E60715358D4"
  }
}
Обратите внимание, что данные представлены во вложенном формате. Перед загрузкой в ClickHouse их нужно преобразовать в плоскую структуру. Это можно сделать в ClickHouse с помощью функции JSONExtract в materialized view. Нам нужно создать три таблицы:
  1. Таблицу для хранения исходных данных из DynamoDB
  2. Таблицу для хранения итоговых данных в плоской структуре (целевая таблица)
  3. materialized view для преобразования данных в плоскую структуру
Для приведенного выше примера данных DynamoDB таблицы ClickHouse будут выглядеть так:
/* Таблица снимка */
CREATE TABLE IF NOT EXISTS "default"."snapshot"
(
    `item` String
)
ORDER BY tuple();

/* Таблица для итоговых преобразованных данных */
CREATE MATERIALIZED VIEW IF NOT EXISTS "default"."snapshot_mv" TO "default"."destination" AS
SELECT
    JSONExtractString(item, 'id', 'S') AS id,
    JSONExtractInt(item, 'age', 'N') AS age,
    JSONExtractString(item, 'first_name', 'S') AS first_name
FROM "default"."snapshot";

/* Таблица для итоговых преобразованных данных */
CREATE TABLE IF NOT EXISTS "default"."destination" (
    "id" String,
    "first_name" String,
    "age" Int8,
    "version" Int64
)
ENGINE ReplacingMergeTree("version")
ORDER BY id;
К целевой таблице предъявляется несколько требований:
  • Эта таблица должна использовать движок ReplacingMergeTree
  • В таблице должен быть столбец version
    • На следующих шагах мы сопоставим поле ApproximateCreationDateTime из потока Kinesis со столбцом version.
  • В качестве ключа сортировки таблица должна использовать ключ партиционирования (задаётся через ORDER BY)
    • Для строк с одинаковым ключом сортировки будет выполняться дедупликация по столбцу version.

Создайте ClickPipe для загрузки снимка

Теперь вы можете создать ClickPipe, чтобы загрузить данные снимка из S3 в ClickHouse. Следуйте руководству по S3 ClickPipe, но используйте следующие настройки:
  • Путь приёма: Вам нужно найти путь к экспортированным JSON‑файлам в S3. Он будет выглядеть примерно так:
https://{bucket}.s3.amazonaws.com/{prefix}/AWSDynamoDB/{export-id}/data/*
  • Формат: JSONEachRow
  • Таблица: Ваша таблица снимка (например, default.snapshot в примере выше)
После создания данные начнут поступать в таблицу снимка и целевую таблицу. Не нужно ждать завершения загрузки снимка, прежде чем переходить к следующему шагу.

4. Создайте Kinesis ClickPipe

Теперь можно настроить Kinesis ClickPipe, чтобы отслеживать изменения из потока Kinesis в реальном времени. Следуйте руководству по Kinesis ClickPipe здесь, но используйте следующие настройки:
  • Поток: поток Kinesis из шага 1
  • Таблица: ваша целевая таблица (например, default.destination в примере выше)
  • Развернуть объект: true
  • Сопоставления столбцов:
    • ApproximateCreationDateTime: version
    • Сопоставьте остальные поля с соответствующими столбцами целевой таблицы, как показано ниже

5. Очистка (необязательно)

После завершения работы ClickPipe со снимком вы можете удалить таблицу снимка и materialized view.
DROP TABLE IF EXISTS "default"."snapshot";
DROP TABLE IF EXISTS "default"."snapshot_clickpipes_error";
DROP VIEW IF EXISTS "default"."snapshot_mv";
Последнее изменение 10 июня 2026 г.