连接

DataFrame 连接是一种常见且耗时的计算,在不同情况下可以受益于各种优化。了解数据的布局以及您尝试实现的目标,可以极大地影响性能。本文档页面将介绍各种不同的选项及其性能影响。

大型到大型非排序连接

在最坏的情况下,您有两个大型表,每个表都有许多分区,并且您希望沿一个可能未排序的列将它们连接起来。

这可能会很慢。在这种情况下,Dask DataFrame 需要移动所有数据,以便连接列中具有匹配值的行位于同一分区中。这种大规模移动可能会产生通信成本,并且可能需要大量内存。如果找不到足够的内存,Dask 将不得不将数据读写到磁盘,这可能会导致其他性能开销。

这些问题是可以解决的,但会比许多其他操作慢得多。如果可能,最好避免它们。

大型到小型连接

许多连接或合并计算都结合了一个大型表和一个小型表。如果小型表是单分区 Dask DataFrame,或者仅仅是普通的 Pandas DataFrame,那么计算可以以高度并行的方式进行,大型 DataFrame 的每个分区都与单个小型表连接。这相对于 Pandas 连接几乎没有额外的开销。

如果您的小型表可以轻松地放入内存,那么您可能希望使用 repartition 方法确保它是一个单分区。

import dask
large = dask.datasets.timeseries(freq="10s", npartitions=10)
small = dask.datasets.timeseries(freq="1D", dtypes={"z": int})

small = small.repartition(npartitions=1)
result = large.merge(small, how="left", on=["timestamp"])

排序连接

Pandas 的 merge API 支持 left_index=right_index= 选项以在索引上执行连接。对于 Dask DataFrames,如果索引具有已知的分区边界(见 分区),这些关键字选项具有特殊意义。在这种情况下,DataFrame 分区会沿着这些分区边界对齐(这通常很快),然后跨分区对进行高度并行的 Pandas 连接。这通常相对较快。

排序或索引连接是解决大型到大型连接问题的好方法。如果您计划重复地对数据集进行连接,那么提前设置索引并可能将数据存储在保持该索引的格式(如 Parquet)中,可能很有价值。

import dask
import dask.dataframe as dd

left = dask.datasets.timeseries(dtypes={"foo": int})

# timeseries returns a dataframe indexed by
# timestamp, we don't need to set_index.

# left.set_index("timestamp")

left.to_parquet("left", overwrite=True)
left = dd.read_parquet("left")

right_one = dask.datasets.timeseries(dtypes={"bar": int})
right_two = dask.datasets.timeseries(dtypes={"baz": int})

result = left.merge(
    right_one, how="left", left_index=True, right_index=True)
result = result.merge(
    right_two, how="left", left_index=True, right_index=True)