洗牌性能

groupby, join, 和 set_index 等操作具有特殊的性能考量,这与普通 Pandas 不同,原因在于 Dask DataFrame 的并行、大于内存以及分布式特性。

简单情况

首先,对于诸如 mean, sum, std, var, count, nunique 等已知归约操作,常见的 groupby 操作如 df.groupby(columns).reduction() 都非常快速和高效,即使分区没有按照已知的分区整齐划分。这是常见的情况。

此外,如果已知分区,那么当分组列包含索引时,对分组应用任意函数是高效的。

当 Dask DataFrame 与 Pandas DataFrame 进行连接,或者两个 Dask DataFrame 沿着它们的索引进行连接时,Join 操作也非常快速。在这些常见情况下操作时,无需特别考虑。

所以,如果你正在执行常见的 groupby 和 join 操作,那么你可以停止阅读本文。一切都会很好地进行扩展。幸运的是,大多数情况下都是如此。

>>> ddf.groupby(columns).known_reduction()            # Fast and common case
>>> ddf.groupby(columns_with_index).apply(user_fn)    # Fast and common case
>>> ddf.join(pandas_df, on=column)                    # Fast and common case
>>> lhs.join(rhs)                                     # Fast and common case
>>> lhs.merge(rhs, on=columns_with_index)             # Fast and common case

困难情况

在某些情况下,例如对分组应用任意函数(当分组不是基于索引且分区未知时),或者沿着非索引列进行 join,或者显式地将未排序的列设置为索引,我们可能需要触发一个全数据集的洗牌。

>>> ddf.groupby(columns_no_index).apply(user_fn)   # Requires shuffle
>>> lhs.join(rhs, on=columns_no_index)             # Requires shuffle
>>> ddf.set_index(column)                          # Requires shuffle

当我们沿着新的索引需要对数据重新排序时,洗牌是必需的。例如,如果我们有按时间组织的银行记录,而现在我们想按用户 ID 组织它们,那么我们就需要移动大量数据。在 Pandas 中,所有这些数据都适合内存,所以这个操作很容易。现在我们不假设所有数据都适合内存,就必须更小心一些。

通过限制自己使用上面提到的简单情况,可以避免数据的重新排序。

洗牌方法

目前有两种洗牌数据的策略,取决于你是在单机还是分布式集群上:基于磁盘的洗牌和基于网络的洗牌。

基于磁盘的洗牌

在单机上处理大于内存的数据时,我们通过将中间结果写入磁盘来执行洗牌。这使用 partd 项目进行基于磁盘的洗牌。

基于网络的洗牌

在分布式集群上操作时,Dask Worker 可能无法访问共享硬盘。在这种情况下,我们通过将输入分区根据它们最终的目的地切分成许多小块,并在网络中移动这些小块来洗牌数据。

选择方法

Dask 默认使用基于磁盘的洗牌,但如果默认调度器设置为使用 dask.distributed.Client(例如用户将 Client 设置为默认时),它将切换到分布式洗牌算法。

client = Client('scheduler:8786', set_as_default=True)

或者,如果你喜欢避免默认设置,可以使用 dataframe.shuffle.method 配置选项来配置全局洗牌方法。这可以全局设置:

dask.config.set({"dataframe.shuffle.method": "p2p"})

ddf.groupby(...).apply(...)

或者作为上下文管理器:

with dask.config.set({"dataframe.shuffle.method": "p2p"}):
    ddf.groupby(...).apply(...)

此外,set_index 也接受 shuffle_method 关键字参数,用于选择基于磁盘或基于任务的洗牌。

ddf.set_index(column, shuffle_method='disk')
ddf.set_index(column, shuffle_method='tasks')
ddf.set_index(column, shuffle_method='p2p')

聚合

Dask 支持 Pandas 的 aggregate 语法,用于在同一组上运行多个归约。直接支持常见的归约,如 max, sum, listmean

>>> ddf.groupby(columns).aggregate(['sum', 'mean', 'max', 'min', list])

Dask 还支持用户定义的归约。为确保适当的性能,归约必须被构造成三个独立的步骤。chunk 步骤独立地应用于每个分区,并对分区内的数据进行归约。aggregate 步骤合并分区内的结果。可选的 finalize 步骤结合从 aggregate 步骤返回的结果,并应返回单个最终列。为了让 Dask 识别归约,它必须作为 dask.dataframe.Aggregation 的实例传递。

例如,sum 可以实现为:

custom_sum = dd.Aggregation('custom_sum', lambda s: s.sum(), lambda s0: s0.sum())
ddf.groupby('g').agg(custom_sum)

名字参数应与现有归约不同,以避免数据损坏。每个函数的参数是已分组的 Series 对象,类似于 df.groupby('g')['value']

许多归约只能通过多个临时变量来实现。要实现这些归约,步骤应该返回元组并期望多个参数。一个均值函数可以实现为:

custom_mean = dd.Aggregation(
    'custom_mean',
    lambda s: (s.count(), s.sum()),
    lambda count, sum: (count.sum(), sum.sum()),
    lambda count, sum: sum / count,
)
ddf.groupby('g').agg(custom_mean)

例如,让我们计算 DataFrame 的组范围 (最大值 - 最小值)。

>>> df = pd.DataFrame({
...   'a': ['a', 'b', 'a', 'a', 'b'],
...   'b': [0, 1, 0, 2, 5],
... })
>>> ddf = dd.from_pandas(df, 2)

我们定义构建块来查找每个块的最大值和最小值,然后找到所有块的最大值和最小值。最后,通过计算最大值 Series 和最小值 Series 之间的差来完成:

>>> def chunk(grouped):
...     return grouped.max(), grouped.min()

>>> def agg(chunk_maxes, chunk_mins):
...     return chunk_maxes.max(), chunk_mins.min()

>>> def finalize(maxima, minima):
...     return maxima - minima

最后,我们创建并使用这个聚合:

>>> extent = dd.Aggregation('extent', chunk, agg, finalize=finalize)
>>> ddf.groupby('a').agg(extent).compute()
   b
a
a  2
b  4

要将 dask.dataframe.groupby.SeriesGroupBy.nunique 应用于多列,可以使用:

>>> df['c'] = [1, 2, 1, 1, 2]
>>> ddf = dd.from_pandas(df, 2)
>>> nunique = dd.Aggregation(
...     name="nunique",
...     chunk=lambda s: s.apply(lambda x: list(set(x))),
...     agg=lambda s0: s0.obj.groupby(level=list(range(s0.obj.index.nlevels))).sum(),
...     finalize=lambda s1: s1.apply(lambda final: len(set(final))),
... )
>>> ddf.groupby('a').agg({'b':nunique, 'c':nunique})

要访问 NumPy 函数,请使用 apply 和 lambda 函数,例如 .apply(lambda r: np.sum(r))。这是一个平方和聚合的示例:

>>> dd.Aggregation(name="sum_of_squares", chunk=lambda s: s.apply(lambda r: np.sum(np.power(r, 2))), agg=lambda s: s.sum())