使用 Expression 系统进行查询规划
目录
使用 Expression 系统进行查询规划¶
注意
本文档面向 Dask 开发者和贡献者,不适合最终用户。有关高级用户指南,请参阅优化器。
Expression 系统最初是为 Dask DataFrame 开发的,并在 dask-expr 项目中实现。
Expr 对象¶
Expression 系统围绕 Expr 类构建。此类用于表示可以在 Dask DataFrame 上执行的计算。Expr 类设计为可被继承,每个子类代表一种特定类型的计算。例如,存在用于算术运算、逻辑运算等的子类。
构造¶
Expression 系统以 Expr 类为核心,该类表示在 Dask DataFrame 上的计算。此类设计用于继承;每个子类对应于一种特定的计算类型(例如,算术、逻辑运算、过滤、连接)。
值得注意的是,自定义初始化方法(__init__
)在基类及其子类中均被禁止。这一设计决策反映了对性能的担忧,因为 expression 对象可能会被频繁创建和重新创建,构造器中的自定义逻辑可能会引入不必要的开销。
取而代之的是,expression 类使用由两个属性定义的数据类式接口:
_parameters
: 参数名称列表_defaults
: 可选参数的默认值字典
传递给构造器的参数存储在 operands 属性中,并进行最小的输入验证。例如:
>>> class MyExpr(Expr):
_parameters = ["param1", "param2"]
_defaults = {"param2": None}
>>> expr = MyExpr(1, 2, 3)
>>> expr.param1
1
>>> expr.param2
2
>>> expr.operands
[1, 2, 3]
名称和令牌 (Tokens)¶
每个 expression 都通过唯一的名称标识,该名称由以下部分组成:
前缀(通常是类名或其变体)
通过
^dask.base.tokenize()
对其操作数进行哈希处理生成的令牌
这种令牌化机制实现了:
图中等价 expression 的去重
检测优化步骤之间的变化
对某些 expression 类型强制执行单例模式
继承自 ^dask.dataframe._expr.SingletonExpr
的 expression(大多数 DataFrame expression 的默认设置)保证通过名称唯一。然而,这种令牌化系统引入了几个挑战:
性能:由于递归遍历和 Dask 的分派机制,令牌化过程较慢。
确定性:没有注册
__dask_tokenize__
的对象会退回到 (云) pickling,这可能既慢又不确定。跨解释器行为:令牌在不同的解释器或机器之间不一致,使得客户端-调度器交互复杂化。
为了解决这个问题,每个 expression 在构造时计算并缓存其名称和令牌。这些值被存储和序列化,以确保 pickle 往返的稳定性。令牌可以通过 _determ_token
或 deterministic_token
访问。
缓存和单例 (Singletons)¶
尽管努力使 expression 保持无状态,但在实践中,许多属性都是按需计算并通过 functools.cached_property
进行缓存。这延迟了计算,但也使得何时以及如何评估状态的推理复杂化。
缓存的属性通常与 expression 一起序列化,除非 _pickle_functools_cache
设置为 False。
为了在重复优化(这会重新创建 expression)期间保留缓存值,大多数类继承自 ^dask.dataframe._expr.SingletonExpr
。这确保了具有相同名称的实例返回先前创建的缓存版本。这使得 expression 实际上成为不可变的单例——并且不得原地修改。
优化流程¶
Expression 形成一个有向图结构:当一个 expression 作为另一个 expression 的操作数传递时,它就成为一个依赖项。虽然最初是一个树形结构,但通过名称去重很快将其转换为有向无环图 (DAG)——这是优化的关键属性。
优化器当前执行以下五个步骤:
简化
重写/调整
降级 (Lower)
简化 (再次)
融合 (Fuse)
简化¶
简化将 expression 重写为更优化但语义等价的形式。一个常见的例子是将投影或过滤下推到图的下游,以便提早减少计算量。
简化的约束(运行时不强制执行):
分区数 (npartitions) 不得增加。
不得发生具有副作用的计算(例如,计算 divisions)。
重写/调整¶
此步骤基于启发式方法实现性能调优。通常,这旨在获得更高效的中间分区。
两个例子:
基于列投影融合 I/O 操作(例如,
FusedIO
)选择合适的
split_out
来平衡分区
此步骤不改变 expression 的逻辑含义,但会调整与执行相关的参数。
降级 (Lowering)¶
在此阶段,抽象操作被转换为具体的执行策略。
例如:
如果输入 DataFrame 已经进行了适当的分区,逻辑上的 Merge
可能会变成 BlockwiseMerge
。
在不太有利的情况下,可能会转而选择通用的 HashJoinP2P
。
这标志着从逻辑计划到物理计划的转变,类似于传统的查询规划器。
降级后,对生成的 expression 应用第二次简化步骤。
融合 (Fuse)¶
线性链式的分块任务被合并为一个单独的任务,从而最大限度地减少调度器开销。
优化过程中的图遍历¶
每个优化器步骤都会遍历 expression 图,直到不再发现任何更改。遍历通常遵循以下模式(以 Expr.simplify 为例):
调用
simplify_once
,它会:- 在自身(当前 expression)上调用
_simplify_down
。这个向下遍历过程: 只能访问当前节点及其操作数
可能会返回一个新的(优化后的)expression 或 None
- 在自身(当前 expression)上调用
如果返回一个新的 expression,检查其名称是否已更改(因为名称未更改意味着没有实质性改变)。
- 然后对每个依赖项调用
_simplify_up
,传入父节点和依赖项映射。这个向上遍历过程: 可以访问跨分支的上下文(例如,同级节点、共享父节点)
返回父 expression 的替代项
- 然后对每个依赖项调用
最后,遍历通过在每个依赖项上调用 simplify_once
来递归深入。
注意
收敛与记忆化
如果没有保护措施,这种递归遍历可能会无限循环或导致指数级膨胀。保护措施包括:
按 expression 名称进行记忆化
检测重复的子图
尽管有这些措施,病态情况偶尔也会发生(例如 dask-expr#835)。
Expression 作为客户端-调度器接口¶
我们将 expression 直接提交给调度器,而不是传输低级任务图。这减少了开销,但也带来了一些复杂性:
distributed.Client 在提交前需要最终的任务键。
令牌化在不同的解释器之间是非确定性的。
优化会改变键——因此必须在提交前运行,以锁定键名并填充缓存。
某些 expression(例如 ReadParquet)需要进行 I/O 来收集元数据,如分区统计信息。这些步骤必须在客户端执行,而不是在调度器上,并且目前在降级阶段处理。
对旧版 HighLevelGraph (HLG) 的支持¶
HighLevelGraph 是一种旧版表示形式,Dask 数组、包和延迟对象仍在使用它。尽管其目标是延迟图的具体化,但许多代码路径会触发意外地转换为低级图。
主要问题:
低级优化通常会强制过早地具体化。
HLG 缺乏对集合类型和所需优化的了解。
HLG 不编码计算后行为(例如,如何组合分区)。
为了弥补这一差距,HLGExpr
类封装了一个 HLG 并实现了完整的 expression 接口。具体化被延迟到调度器显式调用 __dask_graph__
时,此时会发生低级优化。这确保了具体化和执行保持解耦。