跳转到主要内容
这是关于从 PostgreSQL 迁移到 ClickHouse 的指南的 第 1 部分。本文通过一个实际示例,演示如何采用实时复制 (CDC,变更数据捕获) 的方法高效完成迁移。文中介绍的许多概念同样适用于将数据从 PostgreSQL 手动批量迁移到 ClickHouse 的场景。

数据集

作为用于展示从 Postgres 迁移到 ClickHouse 典型流程的示例数据集,我们使用此处记录的 Stack Overflow 数据集。该数据集包含 Stack Overflow 从 2008 年到 2024 年 4 月期间产生的所有 postvoteusercommentbadge。该数据的 PostgreSQL schema 如下所示: 用于在 PostgreSQL 中创建这些表的 DDL 命令可在此处获取。 该 schema 虽然未必是最优的,但用到了多种常见的 PostgreSQL 特性,包括主键、外键、分区和索引。 我们将把这些概念逐一迁移到 ClickHouse 中的对应实现。 对于希望将该数据集导入 PostgreSQL 实例以测试迁移步骤的用户,我们提供了 pg_dump 格式的数据下载,以及配套的 DDL,后续的数据加载命令如下所示:
# users
wget https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/pdump/2024/users.sql.gz
gzip -d users.sql.gz
psql < users.sql

# posts
wget https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/pdump/2024/posts.sql.gz
gzip -d posts.sql.gz
psql < posts.sql

# posthistory
wget https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/pdump/2024/posthistory.sql.gz
gzip -d posthistory.sql.gz
psql < posthistory.sql

# comments
wget https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/pdump/2024/comments.sql.gz
gzip -d comments.sql.gz
psql < comments.sql

# votes
wget https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/pdump/2024/votes.sql.gz
gzip -d votes.sql.gz
psql < votes.sql

# badges
wget https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/pdump/2024/badges.sql.gz
gzip -d badges.sql.gz
psql < badges.sql

# postlinks
wget https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/pdump/2024/postlinks.sql.gz
gzip -d postlinks.sql.gz
psql < postlinks.sql
虽然对于 ClickHouse 来说,这个数据集很小,但对于 Postgres 来说,规模已经相当可观。上述数据只是其中一个子集,涵盖了 2024 年前 3 个月的数据。
虽然我们的示例结果使用的是完整数据集,以展示 Postgres 与 ClickHouse 之间的性能差异,但下文记录的所有步骤同样适用于这个较小的子集,功能上完全一致。想要将完整数据集加载到 Postgres 的用户,请参见这里。由于上述 schema 中定义了外键约束,PostgreSQL 的完整数据集仅包含满足引用完整性的行。如有需要,也可以直接将无此类约束的 Parquet 版本 轻松加载到 ClickHouse 中。

数据迁移

实时复制 (CDC (变更数据捕获) )

请参考此指南设置 ClickPipes for PostgreSQL。该指南涵盖多种不同类型的 Postgres 源实例。 采用 ClickPipes 或 PeerDB 的 CDC (变更数据捕获) 方法时,PostgreSQL 数据库中的每个表都会自动复制到 ClickHouse 中。 为了近实时处理更新和删除,ClickPipes 使用 ReplacingMergeTree 引擎将 Postgres 表映射到 ClickHouse;该引擎专为在 ClickHouse 中处理更新和删除而设计。你可以在这里了解更多关于数据如何通过 ClickPipes 复制到 ClickHouse 的信息。需要注意的是,使用 CDC (变更数据捕获) 进行复制时,在复制更新或删除操作时会在 ClickHouse 中产生重复行。有关在 ClickHouse 中处理这些重复行的方法,请参阅使用 FINAL 修饰符的相关技术 下面来看一下使用 ClickPipes 在 ClickHouse 中创建表 users 的方式。
CREATE TABLE users
(
    `id` Int32,
    `reputation` String,
    `creationdate` DateTime64(6),
    `displayname` String,
    `lastaccessdate` DateTime64(6),
    `aboutme` String,
    `views` Int32,
    `upvotes` Int32,
    `downvotes` Int32,
    `websiteurl` String,
    `location` String,
    `accountid` Int32,
    `_peerdb_synced_at` DateTime64(9) DEFAULT now64(),
    `_peerdb_is_deleted` Int8,
    `_peerdb_version` Int64
)
ENGINE = ReplacingMergeTree(_peerdb_version)
PRIMARY KEY id
ORDER BY id;
设置完成后,ClickPipes 会开始将 PostgreSQL 中的所有数据迁移到 ClickHouse。对于 Stack Overflow 数据集,根据网络状况和部署规模不同,这个过程通常只需几分钟。

手动批量加载并定期更新

采用手动方式时,可通过以下方法完成数据集的初始批量加载:
  • 表函数 - 在 ClickHouse 中使用 Postgres 表函数 从 Postgres SELECT 数据,并将其 INSERT 到 ClickHouse 表中。适用于将数百 GB 以内的数据集进行批量加载。
  • 导出 - 导出为 CSV 或 SQL 脚本文件等中间格式。然后可以通过客户端使用 INSERT FROM INFILE 子句将这些文件加载到 ClickHouse,或者借助对象存储及其相关函数 (如 s3、gcs) 进行加载。
从 PostgreSQL 手动加载数据时,首先需要在 ClickHouse 中创建表。请参阅这篇数据建模文档,其中也使用 Stack Overflow 数据集来优化 ClickHouse 中的表 schema。 PostgreSQL 与 ClickHouse 之间的数据类型可能有所不同。为了确定每个表列对应的类型,可以结合 Postgres 表函数 使用 DESCRIBE 命令。以下命令用于查看 PostgreSQL 中 posts 表的结构,请根据你的环境进行修改:
Query
DESCRIBE TABLE postgresql('<host>:<port>', 'postgres', 'posts', '<username>', '<password>')
SETTINGS describe_compact_output = 1
有关 PostgreSQL 与 ClickHouse 之间数据类型映射的概览,请参阅附录文档 针对该 schema 优化数据类型的步骤,与从其他来源加载数据 (例如 S3 上的 Parquet) 时完全相同。应用这篇使用 Parquet 的替代指南中描述的流程后,将得到如下 schema:
Query
CREATE TABLE stackoverflow.posts
(
   `Id` Int32,
   `PostTypeId` Enum('Question' = 1, 'Answer' = 2, 'Wiki' = 3, 'TagWikiExcerpt' = 4, 'TagWiki' = 5, 'ModeratorNomination' = 6, 'WikiPlaceholder' = 7, 'PrivilegeWiki' = 8),
   `AcceptedAnswerId` UInt32,
   `CreationDate` DateTime,
   `Score` Int32,
   `ViewCount` UInt32,
   `Body` String,
   `OwnerUserId` Int32,
   `OwnerDisplayName` String,
   `LastEditorUserId` Int32,
   `LastEditorDisplayName` String,
   `LastEditDate` DateTime,
   `LastActivityDate` DateTime,
   `Title` String,
   `Tags` String,
   `AnswerCount` UInt16,
   `CommentCount` UInt8,
   `FavoriteCount` UInt8,
   `ContentLicense`LowCardinality(String),
   `ParentId` String,
   `CommunityOwnedDate` DateTime,
   `ClosedDate` DateTime
)
ENGINE = MergeTree
ORDER BY tuple()
COMMENT 'Optimized types'
我们可以用一个简单的 INSERT INTO SELECT 来填充它:从 PostgreSQL 读取数据并插入到 ClickHouse 中。
Query
INSERT INTO stackoverflow.posts SELECT * FROM postgresql('<host>:<port>', 'postgres', 'posts', '<username>', '<password>')
0 rows in set. Elapsed: 146.471 sec. Processed 59.82 million rows, 83.82 GB (408.40 thousand rows/s., 572.25 MB/s.)
增量加载也可以定期调度。如果 Postgres 表只接收插入操作,并且存在递增的 id 或时间戳,则可以使用上述表函数方法加载增量数据,即可将 WHERE 子句应用于 SELECT。如果能够保证更新始终作用于同一列,这种方法也可用于支持更新。不过,若要支持删除,则需要完全重新加载,而随着表不断增长,这可能会越来越难实现。 我们演示如何使用 CreationDate 进行初始加载和增量加载 (我们假设如果行被更新,它也会随之更新) 。
-- 初始加载
INSERT INTO stackoverflow.posts SELECT * FROM postgresql('<host>', 'postgres', 'posts', 'postgres', '<password')

INSERT INTO stackoverflow.posts SELECT * FROM postgresql('<host>', 'postgres', 'posts', 'postgres', '<password') WHERE CreationDate > ( SELECT (max(CreationDate) FROM stackoverflow.posts)
ClickHouse 会将简单的 WHERE 子句 (如 =!=>>=<<= 和 IN) 下推到 PostgreSQL 服务器。因此,只要确保用于识别变更集的列上建有索引,就能提高增量加载的效率。
使用查询复制时,检测 UPDATE 操作的一种可行方法是将 XMIN system column (事务 ID) 用作水位线——该列的变化表明数据已发生变更,因此可将相应变更应用到目标表。采用这种方法的用户应注意,XMIN 值可能会回绕,而且进行比较时需要全表扫描,这会使变更跟踪更加复杂。
点击此处查看第 2 部分
最后修改于 2026年6月10日