跳转到主要内容
由于 ClickHouse 的数据存储结构和复制机制,从 Postgres 复制到 ClickHouse 的更新和删除操作会在 ClickHouse 中产生重复行。本文将介绍其原因,以及在 ClickHouse 中处理重复数据的策略。

数据如何复制?

PostgreSQL 逻辑解码

ClickPipes 使用 Postgres Logical Decoding 来实时捕获 Postgres 中发生的变更。Postgres 中的 Logical Decoding 过程使 ClickPipes 这类客户端能够以便于人类阅读的格式接收这些变更,也就是一系列 INSERT、UPDATE 和 DELETE。

ReplacingMergeTree

ClickPipes 使用 ReplacingMergeTree 引擎将 Postgres 表映射到 ClickHouse。ClickHouse 在仅追加型工作负载下表现最佳,因此不建议频繁执行 UPDATE。这正是 ReplacingMergeTree 特别适用的场景。 使用 ReplacingMergeTree 时,更新会表示为插入该行的一个较新版本 (_peerdb_version) ;而删除则表示为插入该行的一个较新版本,并将 _peerdb_is_deleted 标记为 true。ReplacingMergeTree 引擎会在后台对数据进行去重和合并,并为给定主键 (id) 保留该行的最新版本,从而能够以版本化插入的方式高效处理 UPDATE 和 DELETE。 下面是 ClickPipes 在 ClickHouse 中创建表时执行的 CREATE TABLE 语句示例。
CREATE TABLE users
(
    `id` Int32,
    `reputation` String,
    `creationdate` DateTime64(6),
    `displayname` String,
    `lastaccessdate` DateTime64(6),
    `aboutme` String,
    `views` Int32,
    `upvotes` Int32,
    `downvotes` Int32,
    `websiteurl` String,
    `location` String,
    `accountid` Int32,
    `_peerdb_synced_at` DateTime64(9) DEFAULT now64(),
    `_peerdb_is_deleted` Int8,
    `_peerdb_version` Int64
)
ENGINE = ReplacingMergeTree(_peerdb_version)
PRIMARY KEY id
ORDER BY id;

示例说明

下图通过一个基础示例说明了如何使用 ClickPipes 在 PostgreSQL 和 ClickHouse 之间同步 users 表。 步骤 1 展示了 PostgreSQL 中 2 行数据的初始快照,以及 ClickPipes 将这 2 行数据初始加载到 ClickHouse 的过程。可以看到,这两行数据都会原样复制到 ClickHouse。 步骤 2 展示了对 users 表执行的三个操作:插入一行新数据、更新一行现有数据,以及删除另一行数据。 步骤 3 展示了 ClickPipes 如何将 INSERT、UPDATE 和 DELETE 操作以带版本的插入方式复制到 ClickHouse。UPDATE 会表现为 ID 2 对应行的一个新版本,而 DELETE 会表现为 ID 1 的一个新版本,并通过 _is_deleted 标记为 true。因此,与 PostgreSQL 相比,ClickHouse 中会多出三行数据。 因此,运行类似 SELECT count(*) FROM users; 这样的简单查询时,ClickHouse 和 PostgreSQL 中的结果可能会不同。根据 ClickHouse merge 文档,过期的行版本最终会在合并过程中被丢弃。但是,合并发生的时机无法预测,这意味着在合并完成之前,ClickHouse 中的查询可能会返回不一致的结果。 如何确保 ClickHouse 和 PostgreSQL 中的查询结果完全一致?

使用 FINAL 关键字去重

在 ClickHouse 查询中,推荐的数据去重方式是使用 FINAL 修饰符。 这样可确保只返回去重后的行。 下面来看看如何将它应用到三种不同的查询中。 请注意以下查询中的 WHERE 子句,它用于过滤掉已删除的行。
  • 简单计数查询:统计 posts 的数量。
这是你可以运行的最简单查询,用于检查同步是否已顺利完成。这两个查询应返回相同的计数结果。
-- PostgreSQL
SELECT count(*) FROM posts;

-- ClickHouse 
SELECT count(*) FROM posts FINAL WHERE _peerdb_is_deleted=0;
  • 使用 JOIN 的简单聚合:累计浏览量最高的前 10 位用户。
这是一个针对单表进行聚合的示例。如果这里存在重复数据,会严重影响 sum 函数的结果。
-- PostgreSQL 
SELECT
    sum(p.viewcount) AS viewcount,
    p.owneruserid AS user_id,
    u.displayname AS display_name
FROM posts p
LEFT JOIN users u ON u.id = p.owneruserid
WHERE p.owneruserid > 0
GROUP BY user_id, display_name
ORDER BY viewcount DESC
LIMIT 10;

-- ClickHouse 
SELECT
    sum(p.viewcount) AS viewcount,
    p.owneruserid AS user_id,
    u.displayname AS display_name
FROM posts AS p
FINAL
LEFT JOIN users AS u
FINAL ON (u.id = p.owneruserid) AND (u._peerdb_is_deleted = 0)
WHERE (p.owneruserid > 0) AND (p._peerdb_is_deleted = 0)
GROUP BY
    user_id,
    display_name
ORDER BY viewcount DESC
LIMIT 10

FINAL 设置

你无需在查询中为每个表名都添加 FINAL 修饰符,而是可以使用 FINAL 设置,将其自动应用到查询中的所有表。 此设置既可按查询应用,也可应用于整个会话。
-- 按查询设置 FINAL
SELECT count(*) FROM posts SETTINGS FINAL = 1;

-- 为会话设置 FINAL
SET final = 1;
SELECT count(*) FROM posts; 

ROW policy

隐藏冗余的 _peerdb_is_deleted = 0 过滤器,一个简单的方法是使用 ROW policy。 下面是一个示例:创建行策略,从而在对表 votes 的所有查询中排除已删除的行。
-- 对所有用户应用行策略
CREATE ROW POLICY cdc_policy ON votes FOR SELECT USING _peerdb_is_deleted = 0 TO ALL;
行策略会应用于一组用户和角色。在此示例中,它会应用于所有用户和角色。也可以将其调整为仅应用于特定用户或角色。

像在 Postgres 中那样查询

将分析型数据集从 PostgreSQL 迁移到 ClickHouse 时,通常需要修改应用程序中的查询,以适应数据处理和查询执行上的差异。 本节将介绍如何在保持原有查询不变的情况下对数据进行去重。

视图

视图 是隐藏查询中 FINAL 关键字的一个好方法,因为它们不存储任何数据,只是在每次访问时从另一张表读取。 下面是一个示例,展示如何在 ClickHouse 中为数据库中的每张表创建包含 FINAL 关键字并过滤已删除行的视图。
CREATE VIEW posts_view AS SELECT * FROM posts FINAL WHERE _peerdb_is_deleted=0;
CREATE VIEW users_view AS SELECT * FROM users FINAL WHERE _peerdb_is_deleted=0;
CREATE VIEW votes_view AS SELECT * FROM votes FINAL WHERE _peerdb_is_deleted=0;
CREATE VIEW comments_view AS SELECT * FROM comments FINAL WHERE _peerdb_is_deleted=0;
然后,我们可以使用与在 PostgreSQL 中相同的查询语句来查询这些视图。
-- 浏览量最高的帖子
SELECT
    sum(viewcount) AS viewcount,
    owneruserid
FROM posts_view
WHERE owneruserid > 0
GROUP BY owneruserid
ORDER BY viewcount DESC
LIMIT 10

可刷新materialized view

另一种方法是使用可刷新materialized view,它支持按计划执行查询,对行去重,并将结果存储到目标表中。每次按计划刷新时,目标表都会被最新的查询结果替换。 这种方法的主要优势在于,带有 FINAL 关键字的查询只会在刷新时执行一次,因此后续针对目标表的查询无需再使用 FINAL。 不过,它的缺点是目标表中的数据只能保持在最近一次刷新的状态。也就是说,对于许多用例来说,几分钟到几小时的刷新间隔可能已经足够。
-- 创建去重后的 posts 表 
CREATE TABLE deduplicated_posts AS posts;

-- 创建 Materialized view 并设置为每小时刷新一次
CREATE MATERIALIZED VIEW deduplicated_posts_mv REFRESH EVERY 1 HOUR TO deduplicated_posts AS 
SELECT * FROM posts FINAL WHERE _peerdb_is_deleted=0 
然后,您就可以像平常一样查询 deduplicated_posts 表了。
SELECT
    sum(viewcount) AS viewcount,
    owneruserid
FROM deduplicated_posts
WHERE owneruserid > 0
GROUP BY owneruserid
ORDER BY viewcount DESC
LIMIT 10;
最后修改于 2026年6月10日