优化器

注意

Dask DataFrame 从版本 2024.03.0 开始支持查询规划

优化步骤

Dask DataFrame 在执行计算之前会运行多个优化。这些优化旨在提高查询的效率。

这些优化包括以下步骤(此列表不完整)

  • 投影下推

    在每个步骤中只选择必需的列。这减少了需要从存储中读取的数据量,同时也减少了处理过程中的数据量。在查询的最早阶段就丢弃不需要的列。

  • 过滤下推

    尽可能地将过滤器下推,甚至推到 IO 步骤。过滤器在查询的最早阶段执行。

  • 分区剪枝

    分区选择尽可能地下推,甚至推到 IO 步骤。

  • 避免数据混洗 (Shuffle)

    Dask DataFrame 会尽量避免在工作进程之间进行数据混洗 (shuffling)。如果列的布局已知(例如 DataFrame 之前已在同一列上进行过混洗),则可以实现这一点。例如,如果在合并操作后执行 df.groupby(...).apply(...),并且 groupby 操作发生在合并列上,则不会再次混洗数据。

    _images/avoiding-shuffles.svg

    类似地,在相同的 join 列上执行两次连续的 Join/Merge 操作将避免再次混洗数据。优化器会识别出 DataFrame 的分区已经符合预期,因此将操作简化为一次 Shuffle 和一次简单的合并操作。

  • 自动调整分区大小

    IO 层会根据从数据集中选择的列子集自动调整分区数量。非常小的分区会对调度器和昂贵的混洗操作产生负面影响。这通过自动调整分区数量来解决。

    _images/automatic-repartitioning.svg

    选择总大小为 40 MB(对应每个 200 MB 文件)的两列。优化器将分区数量减少了 5 倍。

探索优化后的查询

Dask 在执行计算之前会调用 df.optimize()。此方法应用上述步骤,并返回一个表示优化后查询的新 Dask DataFrame。

通过调用 df.pprint() 可以获得优化后查询的基本表示。这会将查询计划以人类可读的格式打印到命令行/控制台。此方法的优点是它不需要任何额外的依赖项。

pdf = pd.DataFrame({"a": [1, 2, 3] * 5, "b": [1, 2, 3] * 5})
df = dd.from_pandas(pdf, npartitions=2)
df = df.replace(1, 5)[["a"]]

df.optimize().pprint()

Replace: to_replace=1 value=5
    FromPandas: frame='<pandas>' npartitions=2 columns=['a'] pyarrow_strings_enabled=True

通过调用 df.explain() 可以获得更高级、更容易阅读的表示。此方法需要安装 graphviz 包。该方法将返回表示查询计划的图,并从中创建一个图像。

df.explain()
Optimized Query

在两种表示中,我们都可以看到 FromPandas 应用了列投影,只选择了列 a

对于更复杂的查询,explain() 方法更容易理解。