合并

DataFrame 合并是一种常见且开销较大的计算,在不同情况下受益于各种优化。理解数据的布局以及要完成的任务对性能有很大影响。本文档页面将介绍各种不同的选项及其对性能的影响。

大型到大型非排序合并

在最糟糕的情况下,你有两个大型表,每个表都有许多分区,并且你想沿着一个可能未排序的列将它们进行合并。

这可能很慢。在这种情况下,Dask DataFrame 需要移动所有数据,以便合并列中具有匹配值的行位于同一分区中。这种大规模移动会产生通信开销,并且可能需要大量内存。如果内存不足,Dask 将不得不读写数据到磁盘,这可能会导致其他性能开销。

这些问题是可以解决的,但会比许多其他操作慢得多。如果可能,最好避免它们。

大型到小型合并

许多连接或合并计算将一个大型表与一个小型表结合起来。如果小型表是单分区 Dask DataFrame,或者只是一个普通的 Pandas DataFrame,那么计算可以以一种令人尴尬的并行方式进行,即大型 DataFrame 的每个分区都与那个单分区小型表进行合并。这相对于 Pandas 合并几乎没有额外开销。

如果你的小型表可以轻松地放入内存中,那么你可能需要使用 repartition 方法确保它是一个单分区表。

import dask
large = dask.datasets.timeseries(freq="10s", npartitions=10)
small = dask.datasets.timeseries(freq="1D", dtypes={"z": int})

small = small.repartition(npartitions=1)
result = large.merge(small, how="left", on=["timestamp"])

排序合并

Pandas 的 merge API 支持 left_index=right_index= 选项以在索引上执行合并。对于 Dask DataFrames,如果索引具有已知分区边界(参见 分区),这些关键字选项具有特殊意义。在这种情况下,DataFrame 分区沿这些分区边界对齐(这通常很快),然后在分区对之间发生令人尴尬的并行 Pandas 合并。这通常相对较快。

排序或索引合并是解决大型到大型合并问题的好方法。如果你计划重复对数据集进行合并,那么提前设置索引并可能将数据存储为保留该索引的格式(如 Parquet)是值得的。

import dask
import dask.dataframe as dd

left = dask.datasets.timeseries(dtypes={"foo": int})

# timeseries returns a dataframe indexed by
# timestamp, we don't need to set_index.

# left.set_index("timestamp")

left.to_parquet("left", overwrite=True)
left = dd.read_parquet("left")

right_one = dask.datasets.timeseries(dtypes={"bar": int})
right_two = dask.datasets.timeseries(dtypes={"baz": int})

result = left.merge(
    right_one, how="left", left_index=True, right_index=True)
result = result.merge(
    right_two, how="left", left_index=True, right_index=True)