跳转到主要内容
DataStore 提供了全面的聚合和窗口函数支持,充分利用 ClickHouse 强大的 SQL 聚合能力。

基础聚合

内置方法

MethodSQL 对应项描述
sum()SUM()求和值
mean()AVG()平均值
count()COUNT()统计非 NULL 值的数量
min()MIN()最小值
max()MAX()最大值
median()MEDIAN()中位数
std()stddevPop()标准差
var()varPop()方差
nunique()COUNT(DISTINCT)统计唯一值的数量
示例:
from pathlib import Path
Path("sales.csv").write_text("""\
region,product,category,amount,quantity,price,date,order_id
East,Widget,Electronics,5200,10,120,2024-01-15,1001
West,Gadget,Electronics,800,5,160,2024-02-20,1002
East,Gizmo,Home,6500,3,100,2024-03-10,1003
North,Widget,Electronics,4500,6,150,2024-06-18,1004
West,Gadget,Electronics,2000,8,250,2024-09-14,1005
""")

from chdb import datastore as pd

ds = pd.read_csv("sales.csv")

# 单列聚合
total = ds['amount'].sum()
average = ds['amount'].mean()
count = ds['amount'].count()

# 所有聚合操作
print(ds['amount'].sum())    # 总和
print(ds['amount'].mean())   # 平均值
print(ds['amount'].std())    # 标准差
print(ds['amount'].median()) # 中位数
print(ds['amount'].nunique()) # 唯一值数量

GroupBy 聚合

单一聚合

# 分组并聚合
result = ds.groupby('category')['amount'].sum()
result = ds.groupby('region')['sales'].mean()

多个聚合

# 字典语法
result = ds.groupby('category').agg({
    'amount': 'sum',
    'quantity': 'mean',
    'order_id': 'count'
})

# 每列的聚合列表
result = ds.groupby('category').agg({
    'amount': ['sum', 'mean', 'max'],
    'quantity': ['sum', 'count']
})

具名聚合

# 命名聚合(pandas 风格)
result = ds.groupby('region').agg(
    total_amount=('amount', 'sum'),
    avg_quantity=('quantity', 'mean'),
    order_count=('order_id', 'count'),
    max_price=('price', 'max')
)

多个 GroupBy 字段

# 按多列分组
result = ds.groupby(['region', 'category']).agg({
    'amount': 'sum',
    'quantity': 'sum'
})

统计聚合

方法SQL 对应函数说明
quantile(q)quantile(q)第 q 分位数 (0-1)
skew()skewPop()偏度
kurt()kurtPop()峰度
corr()corr()相关性
cov()covar()协方差
sem()-均值的标准误
示例:
# 分位数
q50 = ds['amount'].quantile(0.5)  # 中位数
q95 = ds['amount'].quantile(0.95) # 第95百分位数

# 多个分位数
quantiles = ds['amount'].quantile([0.25, 0.5, 0.75])

# 列之间的相关性
correlation = ds[['sales', 'marketing_spend']].corr()

条件聚合

ClickHouse 特有的按条件聚合函数。
FunctionClickHouseDescription
sum_if(cond)sumIf()在满足条件时求和
count_if(cond)countIf()在满足条件时计数
avg_if(cond)avgIf()在满足条件时求平均值
min_if(cond)minIf()在满足条件时求最小值
max_if(cond)maxIf()在满足条件时求最大值
示例:
from chdb.datastore import F, Field

# 仅对高价值订单求和
high_value_sum = F.sum_if(Field('amount'), Field('amount') > 1000)

# 统计活跃用户数
active_count = F.count_if(Field('status') == 'active')

# 在 groupby 上下文中
result = ds.groupby('region').agg({
    'total': ('amount', 'sum'),
    'high_value': ('amount', F.sum_if(Field('amount') > 1000)),
})

收集类聚合

ClickHouse 特有的值收集函数。
FunctionClickHouseDescription
group_array()groupArray()收集为数组
group_uniq_array()groupUniqArray()将唯一值收集为数组
group_concat(sep)groupConcat()拼接字符串
top_k(n)topK(n)频率最高的前 K 个值
any()any()任意值
any_last()anyLast()最后一个值
first_value()first_value()按顺序的第一个值
last_value()last_value()按顺序的最后一个值
示例:
from chdb.datastore import F, Field

# 按类别收集所有标签
result = ds.groupby('category').agg({
    'all_tags': ('tag', F.group_array()),
    'unique_tags': ('tag', F.group_uniq_array())
})

# 获取各区域排名前 5 的产品
result = ds.groupby('region').agg({
    'top_products': ('product_id', F.top_k(5))
})

窗口函数

排名函数

函数SQL描述
row_number()ROW_NUMBER()按顺序编号的行号
rank()RANK()有空缺的排名
dense_rank()DENSE_RANK()无空缺的排名
ntile(n)NTILE(n)划分为 n 个桶
percent_rank()PERCENT_RANK()百分位排名 (0-1)
cume_dist()CUME_DIST()累积分布
示例:
from chdb.datastore import F, Field

# 添加行号
ds['row_num'] = F.row_number().over(order_by='date')

# 在分组内排名
ds['rank'] = F.rank().over(
    partition_by='category',
    order_by='sales'
)

# 密集排名(无空缺)
ds['dense_rank'] = F.dense_rank().over(
    partition_by='region',
    order_by=('revenue', 'desc')
)

# 划分为四分位数
ds['quartile'] = F.ntile(4).over(order_by='score')

值函数

函数SQL描述
lag(n)LAG(col, n)前一行的值
lead(n)LEAD(col, n)后一行的值
first_value()FIRST_VALUE()窗口中的第一个值
last_value()LAST_VALUE()窗口中的最后一个值
nth_value(n)NTH_VALUE(col, n)窗口中的第 n 个值
示例:
# 前一个值和后一个值
ds['prev_price'] = F.lag('price', 1).over(order_by='date')
ds['next_price'] = F.lead('price', 1).over(order_by='date')

# 分区中的第一个值和最后一个值
ds['first_order'] = F.first_value('amount').over(
    partition_by='customer_id',
    order_by='date'
)

累计函数

方法描述
cumsum()累计和
cummax()累计最大值
cummin()累计最小值
cumprod()累计乘积
diff(n)与前 n 行的差值
pct_change(n)与前 n 行相比的百分比变化
示例:
# 累计计算
ds['running_total'] = ds['amount'].cumsum()
ds['running_max'] = ds['amount'].cummax()

# 按分组计算
ds['group_cumsum'] = ds.groupby('category')['amount'].cumsum()

# 环比
ds['daily_diff'] = ds['sales'].diff(1)
ds['pct_change'] = ds['sales'].pct_change(1)

滚动窗口

# 滚动窗口聚合
ds['rolling_avg'] = ds['price'].rolling(window=7).mean()
ds['rolling_sum'] = ds['amount'].rolling(window=30).sum()
ds['rolling_std'] = ds['value'].rolling(window=10).std()

# 扩展窗口
ds['expanding_max'] = ds['price'].expanding().max()
ds['expanding_sum'] = ds['amount'].expanding().sum()

F 命名空间

F 命名空间用于访问 ClickHouse 函数。

Import

from chdb.datastore import F, Field

使用 F 函数

# 聚合
F.sum(Field('amount'))
F.avg(Field('price'))
F.count(Field('id'))

# 统计
F.quantile(Field('value'), 0.95)
F.stddev_pop(Field('score'))
F.corr(Field('x'), Field('y'))

# 条件
F.sum_if(Field('amount'), Field('status') == 'completed')
F.count_if(Field('is_active'))

# 字符串
F.length(Field('name'))
F.upper(Field('text'))

# 日期/时间
F.to_year(Field('date'))
F.date_diff('day', Field('start'), Field('end'))

# 数组
F.array_sum(Field('values'))
F.array_avg(Field('scores'))

# 数学
F.abs(Field('delta'))
F.round(Field('price'), 2)
F.floor(Field('value'))
F.ceil(Field('value'))

结合窗口函数使用 F

# 定义窗口帧
window = F.window(
    partition_by='category',
    order_by='date',
    rows_between=(-7, 0)  # 当前行及前7行
)

ds['rolling_avg'] = F.avg(Field('price')).over(window)

常见聚合方式

每组 Top N

# 按销售额列出每个类别排名前 3 的产品
result = (ds
    .assign(rank=F.row_number().over(
        partition_by='category',
        order_by=('sales', 'desc')
    ))
    .filter(ds['rank'] <= 3)
)

累计值

# 销售额累计总和
ds['running_total'] = F.sum('amount').over(
    order_by='date',
    rows_between=(None, 0)  # 截至当前行的所有行
)

移动平均

# 7天移动平均
ds['ma_7'] = F.avg('price').over(
    order_by='date',
    rows_between=(-6, 0)
)

同比对比

# 同比比较
ds['prev_year_sales'] = F.lag('sales', 12).over(
    partition_by='product_id',
    order_by='month'
)
ds['yoy_growth'] = (ds['sales'] - ds['prev_year_sales']) / ds['prev_year_sales']

百分位排名

# 按客户总消费额排名
ds['spend_percentile'] = F.percent_rank().over(order_by='total_spend')

聚合方法汇总

类别方法
基础sum, mean, count, min, max, median
统计std, var, quantile, skew, kurt, corr, cov
条件sum_if, count_if, avg_if, min_if, max_if
收集group_array, group_uniq_array, group_concat, top_k
排名row_number, rank, dense_rank, ntile, percent_rank
值函数lag, lead, first_value, last_value, nth_value
累计cumsum, cummax, cummin, cumprod, diff, pct_change
滚动rolling().mean/sum/std/..., expanding().mean/sum/...
最后修改于 2026年6月10日