使用 Expression 系统进行查询规划

注意

本文档面向 Dask 开发者和贡献者,不适合最终用户。有关高级用户指南,请参阅优化器

Expression 系统最初是为 Dask DataFrame 开发的,并在 dask-expr 项目中实现。

Expr 对象

Expression 系统围绕 Expr 类构建。此类用于表示可以在 Dask DataFrame 上执行的计算。Expr 类设计为可被继承,每个子类代表一种特定类型的计算。例如,存在用于算术运算、逻辑运算等的子类。

构造

Expression 系统以 Expr 类为核心,该类表示在 Dask DataFrame 上的计算。此类设计用于继承;每个子类对应于一种特定的计算类型(例如,算术、逻辑运算、过滤、连接)。

值得注意的是,自定义初始化方法(__init__)在基类及其子类中均被禁止。这一设计决策反映了对性能的担忧,因为 expression 对象可能会被频繁创建和重新创建,构造器中的自定义逻辑可能会引入不必要的开销。

取而代之的是,expression 类使用由两个属性定义的数据类式接口:

  • _parameters: 参数名称列表

  • _defaults: 可选参数的默认值字典

传递给构造器的参数存储在 operands 属性中,并进行最小的输入验证。例如:

>>> class MyExpr(Expr):
        _parameters = ["param1", "param2"]
        _defaults = {"param2": None}

>>> expr = MyExpr(1, 2, 3)
>>> expr.param1
1

>>> expr.param2
2

>>> expr.operands
[1, 2, 3]

名称和令牌 (Tokens)

每个 expression 都通过唯一的名称标识,该名称由以下部分组成:

  • 前缀(通常是类名或其变体)

  • 通过 ^dask.base.tokenize() 对其操作数进行哈希处理生成的令牌

这种令牌化机制实现了:

  • 图中等价 expression 的去重

  • 检测优化步骤之间的变化

  • 对某些 expression 类型强制执行单例模式

继承自 ^dask.dataframe._expr.SingletonExpr 的 expression(大多数 DataFrame expression 的默认设置)保证通过名称唯一。然而,这种令牌化系统引入了几个挑战:

  • 性能:由于递归遍历和 Dask 的分派机制,令牌化过程较慢。

  • 确定性:没有注册 __dask_tokenize__ 的对象会退回到 (云) pickling,这可能既慢又不确定。

  • 跨解释器行为:令牌在不同的解释器或机器之间不一致,使得客户端-调度器交互复杂化。

为了解决这个问题,每个 expression 在构造时计算并缓存其名称和令牌。这些值被存储和序列化,以确保 pickle 往返的稳定性。令牌可以通过 _determ_tokendeterministic_token 访问。

缓存和单例 (Singletons)

尽管努力使 expression 保持无状态,但在实践中,许多属性都是按需计算并通过 functools.cached_property 进行缓存。这延迟了计算,但也使得何时以及如何评估状态的推理复杂化。

缓存的属性通常与 expression 一起序列化,除非 _pickle_functools_cache 设置为 False。

为了在重复优化(这会重新创建 expression)期间保留缓存值,大多数类继承自 ^dask.dataframe._expr.SingletonExpr。这确保了具有相同名称的实例返回先前创建的缓存版本。这使得 expression 实际上成为不可变的单例——并且不得原地修改。

优化流程

Expression 形成一个有向图结构:当一个 expression 作为另一个 expression 的操作数传递时,它就成为一个依赖项。虽然最初是一个树形结构,但通过名称去重很快将其转换为有向无环图 (DAG)——这是优化的关键属性。

优化器当前执行以下五个步骤:

  • 简化

  • 重写/调整

  • 降级 (Lower)

  • 简化 (再次)

  • 融合 (Fuse)

简化

简化将 expression 重写为更优化但语义等价的形式。一个常见的例子是将投影或过滤下推到图的下游,以便提早减少计算量。

简化的约束(运行时不强制执行):

  • 分区数 (npartitions) 不得增加。

  • 不得发生具有副作用的计算(例如,计算 divisions)。

重写/调整

此步骤基于启发式方法实现性能调优。通常,这旨在获得更高效的中间分区。

两个例子:

  • 基于列投影融合 I/O 操作(例如,FusedIO

  • 选择合适的 split_out 来平衡分区

此步骤不改变 expression 的逻辑含义,但会调整与执行相关的参数。

降级 (Lowering)

在此阶段,抽象操作被转换为具体的执行策略。

例如:

如果输入 DataFrame 已经进行了适当的分区,逻辑上的 Merge 可能会变成 BlockwiseMerge

在不太有利的情况下,可能会转而选择通用的 HashJoinP2P

这标志着从逻辑计划到物理计划的转变,类似于传统的查询规划器。

降级后,对生成的 expression 应用第二次简化步骤。

融合 (Fuse)

线性链式的分块任务被合并为一个单独的任务,从而最大限度地减少调度器开销。

优化过程中的图遍历

每个优化器步骤都会遍历 expression 图,直到不再发现任何更改。遍历通常遵循以下模式(以 Expr.simplify 为例):

  1. 调用 simplify_once,它会:

  2. 在自身(当前 expression)上调用 _simplify_down。这个向下遍历过程:
    • 只能访问当前节点及其操作数

    • 可能会返回一个新的(优化后的)expression 或 None

  3. 如果返回一个新的 expression,检查其名称是否已更改(因为名称未更改意味着没有实质性改变)。

  4. 然后对每个依赖项调用 _simplify_up,传入父节点和依赖项映射。这个向上遍历过程:
    • 可以访问跨分支的上下文(例如,同级节点、共享父节点)

    • 返回父 expression 的替代项

最后,遍历通过在每个依赖项上调用 simplify_once 来递归深入。

注意

收敛与记忆化

如果没有保护措施,这种递归遍历可能会无限循环或导致指数级膨胀。保护措施包括:

  • 按 expression 名称进行记忆化

  • 检测重复的子图

尽管有这些措施,病态情况偶尔也会发生(例如 dask-expr#835)。

Expression 作为客户端-调度器接口

我们将 expression 直接提交给调度器,而不是传输低级任务图。这减少了开销,但也带来了一些复杂性:

  • distributed.Client 在提交前需要最终的任务键。

  • 令牌化在不同的解释器之间是非确定性的。

优化会改变键——因此必须在提交前运行,以锁定键名并填充缓存。

某些 expression(例如 ReadParquet)需要进行 I/O 来收集元数据,如分区统计信息。这些步骤必须在客户端执行,而不是在调度器上,并且目前在降级阶段处理。

对旧版 HighLevelGraph (HLG) 的支持

HighLevelGraph 是一种旧版表示形式,Dask 数组、包和延迟对象仍在使用它。尽管其目标是延迟图的具体化,但许多代码路径会触发意外地转换为低级图。

主要问题:

  • 低级优化通常会强制过早地具体化。

  • HLG 缺乏对集合类型和所需优化的了解。

  • HLG 不编码计算后行为(例如,如何组合分区)。

为了弥补这一差距,HLGExpr 类封装了一个 HLG 并实现了完整的 expression 接口。具体化被延迟到调度器显式调用 __dask_graph__ 时,此时会发生低级优化。这确保了具体化和执行保持解耦。

自定义 Expression 和集合 (Collections)