优化器

注意

自 2024.03.0 版本起,Dask DataFrame 支持查询规划。

优化步骤

Dask DataFrame 在执行计算之前会运行多项优化。这些优化旨在提高查询效率。

优化包含以下步骤(此列表不完整)

  • 投影下推

    在每一步中只选择必需的列。这减少了需要从存储中读取的数据量,同时也减少了沿途处理的数据量。列会在查询的最早阶段被丢弃。

  • 过滤下推

    尽可能地将过滤器下推,可能推送到 IO 步骤。过滤器会在查询的最早阶段执行。

  • 分区剪枝

    分区选择会尽可能地向下推,可能推送到 IO 步骤。

  • 避免混洗

    Dask DataFrame 会尽量避免在工作节点之间混洗数据。如果列布局已知,例如 DataFrame 之前已经按照相同的列进行过混洗,就可以实现这一点。例如,在合并操作后执行 df.groupby(...).apply(...),如果 groupy 操作是基于合并的列进行的,就不会再次混洗数据。

    _images/avoiding-shuffles.svg

    类似地,在相同的 join 列上执行两次连续的 Join/Merge 操作会避免再次混洗数据。优化器会识别出 DataFrame 的分区方式已经符合预期,从而将操作简化为一次混洗和一次简单的合并操作。

  • 自动调整分区大小

    IO 层会根据从数据集中选择的列子集自动调整分区数量。非常小的分区会对调度器和混洗等昂贵操作产生负面影响。通过自动调整分区数量来解决这个问题。

    _images/automatic-repartitioning.svg

    选择两列,它们在每 200 MB 文件中共有 40 MB 数据。优化器将分区数量减少了 5 倍。

探索优化后的查询

Dask 在执行计算之前会调用 df.optimize()。此方法应用上述步骤并返回一个表示优化后查询的新 Dask DataFrame。

通过调用 df.pprint() 可以获得优化后查询的基本表示。这会将查询计划以人类可读的格式打印到命令行/控制台。此方法的优点是不需要任何额外的依赖项。

pdf = pd.DataFrame({"a": [1, 2, 3] * 5, "b": [1, 2, 3] * 5})
df = dd.from_pandas(pdf, npartitions=2)
df = df.replace(1, 5)[["a"]]

df.optimize().pprint()

Replace: to_replace=1 value=5
    FromPandas: frame='<pandas>' npartitions=2 columns=['a'] pyarrow_strings_enabled=True

通过调用 df.explain() 可以获得更高级、更易于阅读的表示。此方法需要安装 graphviz 包。该方法将返回一个表示查询计划的图并从中创建一个图像。

df.explain()
Optimized Query

我们可以在这两种表示中看到 FromPandas 消耗了列投影,只选择了列 a

对于更复杂的查询,explain() 方法明显更容易理解。