| 名称 | Logo | 类型 | 状态 | 描述 |
|---|
| Apache Kafka |  | 流式 | 稳定版本 | 配置 ClickPipes,开始从 Apache Kafka 向 ClickHouse Cloud 摄取流式数据。 |
| Confluent Cloud |  | 流式 | 稳定版本 | 通过我们的直接集成,充分发挥 Confluent 与 ClickHouse Cloud 的组合优势。 |
| Redpanda |  | 流式 | 稳定版本 | 配置 ClickPipes,开始从 Redpanda 向 ClickHouse Cloud 摄取流式数据。 |
| AWS MSK |  | 流式 | 稳定版本 | 配置 ClickPipes,开始从 AWS MSK 向 ClickHouse Cloud 摄取流式数据。 |
| Azure Event Hubs |  | 流式 | 稳定版本 | 配置 ClickPipes,开始从 Azure Event Hubs 向 ClickHouse Cloud 摄取流式数据。 |
| WarpStream |  | 流式 | 稳定版本 | 配置 ClickPipes,开始从 WarpStream 向 ClickHouse Cloud 摄取流式数据。 |
支持的格式如下:
ClickPipes 当前支持以下标准 ClickHouse 数据类型:
- 基础数值类型 - [U]Int8/16/32/64、Float32/64 和 BFloat16
- 大整数类型 - [U]Int128/256
- Decimal 类型
- Boolean
- String
- FixedString
- Date、Date32
- DateTime、DateTime64 (仅支持 UTC 时区)
- Enum8/Enum16
- UUID
- IPv4
- IPv6
- Time、Time64
- JSON
- 所有 ClickHouse LowCardinality 类型
- Map,其键和值可使用上述任意类型 (包括 Nullable)
- Tuple 和 Array,其元素可使用上述任意类型 (包括 Nullable,仅支持一层深度)
- SimpleAggregateFunction 类型 (适用于 AggregatingMergeTree 或 SummingMergeTree 目标端)
ClickPipes 在以下情况下支持 Variant 类型:
- Avro Union 类型。如果你的 Avro schema 包含一个由多个非 NULL 类型组成的 union,ClickPipes 会推断出
合适的 Variant 类型。除此之外,Avro 数据不支持 Variant 类型。
- JSON 字段。你可以为源数据流中的任何 JSON 字段
手动指定 Variant 类型 (例如
Variant(String, Int64, DateTime)) 。不支持复杂子类型 (Array/Map/Tuple) 。此外,由于 ClickPipes 判定
应使用哪种 Variant 子类型的方式所限,在 Variant 定义中只能使用一种整数类型或 datetime 类型——例如,不支持 Variant(Int64, UInt32)。
ClickPipes 在以下情况下支持 JSON 类型:
- Avro Record 和 Protobuf Message 字段始终可以映射到 JSON 列。
- 如果 Avro 字段实际包含 JSON String 对象,则 Avro String 和 Bytes 字段可以映射到 JSON 列。
- 如果 Protobuf 字段实际包含 JSON String 对象,则 Protobuf String 和 Bytes kind 可以映射到 JSON 列。
- 始终为 JSON 对象的 JSON 字段可以映射到目标端 JSON 列。
请注意,你需要手动将目标端列更改为所需的 JSON 类型,包括任何固定或跳过的路径。
ClickPipes 支持所有 Avro 基本类型和复杂类型,以及除 local-timestamp-millis 和 local_timestamp-micros 之外的所有 Avro 逻辑类型。Avro record 类型会转换为 Tuple,array 类型会转换为 Array,map 类型会转换为 Map (仅支持字符串键) 。通常可使用此处列出的转换方式。我们建议对 Avro 数值类型使用精确的类型匹配,因为 ClickPipes 在类型转换时不会检查是否发生溢出或精度损失。
或者,所有 Avro 类型也都可以插入到 String 列中,此时会表示为有效的 JSON 字符串。
Avro 中的 Nullable 类型通过 (T, null) 或 (null, T) 形式的 union schema 定义,其中 T 是基础 Avro 类型。在 schema inference 期间,这类 union 会被映射为 ClickHouse 的 Nullable 列。请注意,ClickHouse 不支持
Nullable(Array)、Nullable(Map) 或 Nullable(Tuple) 类型。这些类型的 Avro null union 会被映射为对应的非 Nullable 版本 (Avro Record 类型会映射为 ClickHouse 的命名 Tuple) 。对于这些类型,Avro 中的 “null” 会被 insert 为:
- 对于 null Avro array,insert 为空 Array
- 对于 null Avro Map,insert 为空 Map
- 对于 null Avro Record,insert 为所有字段均使用默认值/零值的命名 Tuple
ClickPipes 支持所有 Protobuf 2 和 3 类型,唯一的例外是早已弃用的 proto 2 group 类型。基础类型转换使用
以下映射:
还支持所有基础类型的 Array、Map 和 Nullable 变体。
| Protobuf 类型 | ClickHouse 类型 |
|---|
bool | UInt8 |
float | Float32 |
double | Float64 |
int32, sint32, sfixed32 | Int32 |
int64, sint64, sfixed64 | Int64 |
uint32, fixed32 | UInt32 |
uint64, fixed64 | UInt64 |
string, bytes | String |
enum | Enum |
repeated T | Array(T) |
message | Tuple |
对于数值类型,建议使用精确匹配,以避免溢出或精度损失。
还支持以下知名类型:
| 知名类型 | ClickHouse 类型 |
|---|
google.protobuf.Timestamp | DateTime, DateTime64 |
google.protobuf.Duration | Time, Time64 |
google.protobuf.StringValue, google.protobuf.BytesValue | Nullable(String) |
google.protobuf.Int32Value, google.protobuf.SInt32Value, google.protobuf.SFixed32Value | Nullable(Int32) |
google.protobuf.Int64Value, google.protobuf.SInt64Value, google.protobuf.SFixed64Value | Nullable(Int64) |
google.protobuf.UInt32Value, google.protobuf.Fixed32Value | Nullable(UInt32) |
google.protobuf.UInt64Value, google.protobuf.Fixed64Value | Nullable(UInt64) |
google.protobuf.FloatValue | Nullable(Float32) |
google.protobuf.DoubleValue | Nullable(Float64) |
google.protobuf.BoolValue | Nullable(UInt8) |
在 schema inference 过程中,Protobuf oneof 字段默认会映射为具名 Tuple,其中最多只有一个字段会包含非默认
值。这些字段也可以自动映射为 Variant 列,此时生效的值将采用
已设置的组成字段的类型。或者,也可以将每个组成字段手动映射到各自独立的 ClickHouse 列;由于 oneof
字段彼此互斥,因此每条记录中始终只会有一列被赋值。
如果为 ClickPipe 定义的顶层 Protobuf schema 只包含一个 repeated 字段,且该字段本身是一个 protobuf Message,则 schema inference 和列映射将基于该“被包含”的 Message 字段。Kafka 消息会按此类消息列表进行处理,而单条 Kafka 消息会展开为多行 ClickHouse 数据。
以下虚拟列适用于兼容 Kafka 的流式数据源。创建新的目标端时,可以使用 Add Column 按钮将虚拟列添加到目标表中。
| 名称 | 描述 | 推荐数据类型 |
|---|
_key | Kafka 消息键 | String |
_timestamp | Kafka 时间戳 (毫秒精度) | DateTime64(3) |
_partition | Kafka 分区 | Int32 |
_offset | Kafka 偏移量 | Int64 |
_topic | Kafka topic | String |
_header_keys | 记录请求头中各个键组成的并行数组 | Array(String) |
_header_values | 记录请求头中各个值组成的并行数组 | Array(String) |
_raw_message | 完整的 Kafka 消息 | String |
请注意,_raw_message 列仅建议用于 JSON 数据。
对于只需要 JSON 字符串的使用场景 (例如使用 ClickHouse JsonExtract* 函数来
填充下游 materialized view) ,删除所有“非虚拟”列可能会提升 ClickPipes 性能。