使用表达式系统进行查询规划

注意

本文档面向 Dask 开发者和贡献者。不面向终端用户。有关高级用户指南,请参阅优化器

表达式系统最初是为 Dask DataFrame 开发的,在 dask-expr 项目中实现。

Expr 对象

表达式系统围绕着 Expr 类构建。这个类用于表示可以在 Dask DataFrame 上执行的计算。Expr 类被设计用于继承,每个子类代表一种特定的计算类型。例如,有用于算术运算、逻辑运算等的子类。

构造

表达式系统以 Expr 类为中心,它表示对 Dask DataFrame 的计算。该类设计用于继承;每个子类对应一种特定的计算类型(例如,算术、逻辑运算、过滤、连接)。

值得注意的是,基础类及其子类均不允许使用自定义初始化方法(__init__)。这一设计决策反映了对性能的担忧,因为表达式对象可能会频繁创建和重新创建,并且构造器中的自定义逻辑可能会引入不必要的开销。

相反,表达式类使用由两个属性定义的数据类式接口

  • _parameters: 参数名称列表

  • _defaults: 可选参数的默认值字典

传递给构造器的参数存储在 operands 属性中,只进行最少的输入验证。示例如下

>>> class MyExpr(Expr):
        _parameters = ["param1", "param2"]
        _defaults = {"param2": None}

>>> expr = MyExpr(1, 2, 3)
>>> expr.param1
1

>>> expr.param2
2

>>> expr.operands
[1, 2, 3]

名称和令牌

每个表达式都通过一个名称唯一标识,该名称由以下部分组成

  • 一个前缀(通常是类名或其变体)

  • 通过 ^dask.base.tokenize() 对其操作数进行哈希处理生成的令牌

这种令牌化机制实现了

  • 图中标示等效表达式并去重

  • 检测优化步骤之间的变化

  • 对某些表达式类型强制执行单例模式

继承自 ^dask.dataframe._expr.SingletonExpr 的表达式(大多数 DataFrame 表达式的默认设置)保证按名称唯一。然而,这种令牌化系统引入了几个挑战

  • 性能:由于递归遍历和 Dask 的分派机制,令牌化速度较慢。

  • 确定性:没有注册 __dask_tokenize__ 的对象会回退到 (cloud)pickling,这可能很慢且不确定。

  • 跨解释器行为:令牌在不同的解释器或机器上不一致,这使得客户端-调度器交互复杂化。

为了解决这个问题,每个表达式在构造时计算并缓存其名称和令牌。这些值被存储并序列化以确保 pickle 往返的稳定性。令牌可通过 _determ_tokendeterministic_token 访问。

缓存和单例

尽管努力使表达式保持无状态,但实际上许多属性都是按需计算并通过 functools.cached_property 缓存的。这推迟了计算,但也使得推理状态何时以及如何被评估变得复杂。

缓存属性通常与表达式一起序列化,除非 _pickle_functools_cache 设置为 False。

为了在重复优化期间(这会重新创建表达式)保留缓存值,大多数类都继承自 ^dask.dataframe._expr.SingletonExpr。这确保同名实例返回先前创建的缓存版本。这使得表达式实际上成为不可变的单例——并且不能原地修改。

优化过程

表达式形成一个有向图结构:当一个表达式作为操作数传递给另一个表达式时,它就成为一个依赖项。虽然这开始时是树形结构,但通过名称去重会迅速将其转换为有向无环图(DAG)——这是优化的关键属性。

优化器目前执行以下五个步骤

  • 简化

  • 重写 / 调优

  • 下推

  • 简化 (再次)

  • 融合

简化

简化将表达式重写为更优化但语义上等效的形式。一个常见的例子是将投影或过滤下推到图的早期阶段,以减少计算。

简化的约束(运行时不强制执行)

  • 分区数 (npartitions) 不能增加。

  • 不得发生带有副作用的计算(例如,计算 divisions)。

重写 / 调优

此步骤基于启发式方法实现性能调优。通常旨在实现更高效的中间分区。

两个例子

  • 基于列投影融合 I/O 操作(例如,FusedIO

  • 选择适当的 split_out 来平衡分区

此步骤不改变表达式的逻辑含义,但会调整与执行相关的参数。

下推

在此阶段,抽象操作被转换为具体的执行策略。

例如

如果输入 DataFrame 已被适当分区,逻辑 Merge 可能会变成 BlockwiseMerge

在不太有利的情况下,可能会选择通用的 HashJoinP2P

这标志着从逻辑计划到物理计划的转变,类似于传统的查询规划器。

下推后,对结果表达式应用第二次简化步骤。

融合

块级任务的线性链被合并为一个任务,从而最大限度地减少调度器开销。

优化期间遍历图

每个优化器步骤都会遍历表达式图,直到没有发现进一步的更改。遍历通常遵循以下模式(以 Expr.simplify 为例)

  1. 调用 simplify_once,它

  2. 在自身(当前表达式)上调用 _simplify_down。这个向下遍历过程
    • 只能访问当前节点及其操作数

    • 可能返回新的(优化后的)表达式或 None

  3. 如果返回新表达式,检查其名称是否已更改(因为名称未更改意味着没有实际更改)。

  4. 然后在每个依赖项上调用 _simplify_up,传入父节点和依赖项的映射。这个向上遍历过程
    • 可以访问跨分支的上下文(例如,同级节点,共享父节点)

    • 返回父表达式的替代项

最后,通过在每个依赖项上调用 simplify_once,遍历递归进入依赖项。

注意

收敛和记忆化

如果没有保护措施,这种递归遍历可能会无限循环或导致指数级膨胀。保护措施包括

  • 按表达式名称进行记忆化

  • 检测重复的子图

尽管如此,病态情况偶尔还是会出现(例如 dask-expr#835)。

表达式作为客户端-调度器接口

我们不传输低级任务图,而是将表达式直接提交给调度器。这减少了开销,但也引入了复杂性

  • distributed.Client 在提交之前需要最终的任务键。

  • 令牌化在不同的解释器之间是非确定性的。

优化会改变键——因此必须在提交之前运行优化,以锁定键名称并填充缓存。

某些表达式(例如,ReadParquet)需要进行 I/O 以收集元数据,如分区统计信息。这些步骤必须在客户端发生,而不是在调度器上,并且目前在下推阶段处理。

遗留的高级图 (HLG) 支持

HighLevelGraph 是一种遗留表示形式,仍被 Dask Array、Bag 和 Delayed 对象使用。尽管其目标是延迟图的具体化,但许多代码路径会触发意外转换为低级图。

关键问题

  • 低级优化经常强制过早具体化。

  • HLG 缺乏对集合类型和所需优化的了解。

  • HLG 不编码后计算行为(例如,如何组合分区)。

为了弥合这一差距,HLGExpr 类包装了一个 HLG 并实现了完整的表达式接口。具体化被延迟到调度器明确调用 __dask_graph__ 时发生,此时进行低级优化。这确保了具体化和执行保持解耦。

自定义表达式和集合