跳转到主要内容
理解 DataStore 的惰性求值模型,是高效使用它并获得最佳性能的关键。

惰性求值

DataStore 采用惰性求值——操作不会立即执行,而是会先记录下来并编译为优化后的 SQL 查询。只有在实际需要结果时,才会执行。

示例:惰性求值与急切求值

from pathlib import Path
Path("sales.csv").write_text("""\
region,product,category,amount,quantity,price,date,order_id
East,Widget,Electronics,5200,10,120,2024-01-15,1001
West,Gadget,Electronics,800,5,160,2024-02-20,1002
East,Gizmo,Home,6500,3,100,2024-03-10,1003
North,Widget,Electronics,4500,6,150,2024-06-18,1004
West,Gadget,Electronics,2000,8,250,2024-09-14,1005
""")

from chdb import datastore as pd

ds = pd.read_csv("sales.csv")

# 这些操作尚未执行
result = (ds
    .filter(ds['amount'] > 1000)    # 已记录,未执行
    .select('region', 'amount')      # 已记录,未执行
    .groupby('region')               # 已记录,未执行
    .agg({'amount': 'sum'})          # 已记录,未执行
    .sort('sum', ascending=False)    # 已记录,未执行
)

# 仍未执行——仅构建查询计划
print(result.to_sql())
# SELECT region, SUM(amount) AS sum
# FROM file('sales.csv', 'CSVWithNames')
# WHERE amount > 1000
# GROUP BY region
# ORDER BY sum DESC

# 此时才真正执行
df = result.to_df()  # <-- 触发执行

惰性求值的优势

  1. 查询优化:多个操作会编译成一条优化后的 SQL 查询
  2. 过滤器下推:在数据源层面应用过滤器
  3. 列裁剪:只读取所需列
  4. 延迟决策:可在运行时选择执行引擎
  5. 计划检查:可在执行前查看/调试查询

执行触发时机

当需要实际值时,会自动开始执行:

自动触发

触发方式示例说明
print() / repr()print(ds)显示结果
len()len(ds)获取行数
.columnsds.columns获取列名
.dtypesds.dtypes获取列数据类型
.shapeds.shape获取形状
.indexds.index获取行索引
.valuesds.values获取 NumPy 数组
Iterationfor row in ds迭代行
to_df()ds.to_df()转为 pandas
to_pandas()ds.to_pandas()to&#95;df 的别名
to_dict()ds.to_dict()转为 dict
to_numpy()ds.to_numpy()转为数组
.equals()ds.equals(other)比较 DataStore 对象
示例:
# 以下所有操作都会触发执行
print(ds)              # 显示
len(ds)                # 1000
ds.columns             # Index(['name', 'age', 'city'])
ds.shape               # (1000, 3)
list(ds)               # 值列表
ds.to_df()             # pandas DataFrame

保持惰性执行的操作

操作返回值说明
filter()DataStore添加 WHERE 子句
select()DataStore添加列选择
sort()DataStore添加 ORDER BY
groupby()LazyGroupBy准备 GROUP BY
join()DataStore添加 JOIN
ds['col']ColumnExpr列引用
ds[['col1', 'col2']]DataStore列选择
示例:
# 这些操作不会触发执行——它们保持惰性状态
result = ds.filter(ds['age'] > 25)      # 返回 DataStore
result = ds.select('name', 'age')        # 返回 DataStore
result = ds['name']                      # 返回 ColumnExpr
result = ds.groupby('city')              # 返回 LazyGroupBy

三阶段执行

DataStore 操作采用三阶段执行模型:

阶段 1:SQL 查询构建 (惰性)

可用 SQL 表达的操作会先收集起来:
result = (ds
    .filter(ds['status'] == 'active')   # WHERE
    .select('user_id', 'amount')         # SELECT
    .groupby('user_id')                  # GROUP BY
    .agg({'amount': 'sum'})              # SUM()
    .sort('sum', ascending=False)        # ORDER BY
    .limit(10)                           # LIMIT
)
# 以上全部编译为一条 SQL 查询

第 2 阶段:执行点

当触发条件满足时,将执行累积的 SQL:
# 此处触发执行
df = result.to_df()  
# 单个优化后的 SQL 查询现在开始执行

阶段 3:DataFrame 操作 (如有)

如果你在执行后继续链式调用仅 pandas 支持的操作:
# 混合操作
result = (ds
    .filter(ds['amount'] > 100)          # 阶段 1: SQL
    .to_df()                             # 阶段 2: 执行
    .pivot_table(...)                    # 阶段 3: pandas
)

查看执行计划

使用 explain() 查看实际会执行的内容:
Query
ds = pd.read_csv("sales.csv")

query = (ds
    .filter(ds['amount'] > 1000)
    .groupby('region')
    .agg({'amount': ['sum', 'mean']})
)

# 查看执行计划
query.explain()
Response
Pipeline:
  1. Source: file('sales.csv', 'CSVWithNames')
  2. Filter: amount > 1000
  3. GroupBy: region
  4. Aggregate: sum(amount), avg(amount)

Generated SQL:
SELECT region, SUM(amount) AS sum, AVG(amount) AS mean
FROM file('sales.csv', 'CSVWithNames')
WHERE amount > 1000
GROUP BY region
使用 verbose=True 查看更多详细信息:
query.explain(verbose=True)
参见调试:explain(),获取完整文档。

缓存

DataStore 会缓存执行结果,以避免重复查询。

缓存的工作原理

from pathlib import Path
Path("data.csv").write_text("""\
name,age,city,salary,department
Alice,25,NYC,55000,Engineering
Bob,30,LA,65000,Product
Charlie,35,NYC,80000,Engineering
Diana,28,SF,70000,Design
Eve,42,NYC,95000,Product
""")

ds = pd.read_csv("data.csv")
result = ds.filter(ds['age'] > 25)

# 首次访问 - 执行查询
print(result.shape)  # 执行并缓存

# 第二次访问 - 使用缓存
print(result.columns)  # 使用已缓存的结果

# 第三次访问 - 使用缓存
df = result.to_df()  # 使用已缓存的结果

缓存失效

对 DataStore 的修改操作会使缓存失效:
result = ds.filter(ds['age'] > 25)
print(result.shape)  # 执行并缓存

# 新操作会导致缓存失效
result2 = result.filter(result['city'] == 'NYC')
print(result2.shape)  # 重新执行(查询不同)

手动缓存控制

# 清除缓存
ds.clear_cache()

# 禁用缓存
from chdb.datastore.config import config
config.set_cache_enabled(False)

混合使用 SQL 和 Pandas 操作

DataStore 能够智能处理混合使用 SQL 和 pandas 的操作:

与 SQL 兼容的操作

这些可编译为 SQL:
  • filter(), where()
  • select()
  • groupby(), agg()
  • sort(), orderby()
  • limit(), offset()
  • join(), union()
  • distinct()
  • 列运算 (数学运算、比较、字符串方法)

仅 Pandas 支持的操作

这些操作会触发执行,并使用 pandas:
  • 搭配自定义函数的 apply()
  • 带复杂聚合的 pivot_table()
  • stack()unstack()
  • 对已执行 DataFrame 的操作

混合型管道

# SQL 阶段
result = (ds
    .filter(ds['amount'] > 100)      # SQL
    .groupby('category')              # SQL
    .agg({'amount': 'sum'})           # SQL
)

# 执行 + pandas 阶段
result = (result
    .to_df()                          # 执行 SQL
    .pivot_table(...)                 # pandas 操作
)

执行引擎选择

DataStore 可通过不同的引擎执行操作:

自动模式 (默认)

from chdb.datastore.config import config

config.set_execution_engine('auto')  # 默认
# 自动为每个操作选择最佳引擎

强制使用 chDB 引擎

config.set_execution_engine('chdb')
# 所有操作均使用 ClickHouse SQL

强制使用 pandas 引擎

config.set_execution_engine('pandas')
# 所有操作均使用 pandas
详见配置:执行引擎了解详情。

性能影响

好:尽早过滤

# 好的做法:在 SQL 中过滤,然后聚合
result = (ds
    .filter(ds['date'] >= '2024-01-01')  # 尽早减少数据量
    .groupby('category')
    .agg({'amount': 'sum'})
)

不佳:过晚应用过滤器

# 不推荐:聚合所有数据,然后过滤
result = (ds
    .groupby('category')
    .agg({'amount': 'sum'})
    .to_df()
    .query('sum > 1000')  # 聚合后使用 Pandas 过滤
)

推荐:尽早选择所需列

# 推荐:在 SQL 中选择列
result = (ds
    .select('user_id', 'amount', 'date')
    .filter(ds['date'] >= '2024-01-01')
    .groupby('user_id')
    .agg({'amount': 'sum'})
)

好:让 SQL 来完成工作

# 好的做法:在 SQL 中执行复杂聚合
result = (ds
    .groupby('category')
    .agg({
        'amount': ['sum', 'mean', 'count'],
        'quantity': 'sum'
    })
    .sort('sum', ascending=False)
    .limit(10)
)
# 一条 SQL 查询搞定所有操作

# 不好的做法:多条独立查询
sums = ds.groupby('category')['amount'].sum().to_df()
means = ds.groupby('category')['amount'].mean().to_df()
# 两条查询,而非一条

最佳实践总结

  1. 执行前先串联好各项操作 - 先构建完整查询,再统一触发一次
  2. 尽早过滤 - 在源头减少数据
  3. 只选择所需的列 - 列裁剪可提升性能
  4. 使用 explain() 了解执行过程 - 运行前先调试
  5. 让 SQL 处理聚合 - ClickHouse 已针对此进行了优化
  6. 留意执行触发时机 - 避免意外过早执行
  7. 合理使用缓存 - 了解缓存何时会失效
最后修改于 2026年6月10日