Dask 最佳实践

Dask 的 API 易于上手,但要 用好 则需要一些经验。本页包含 Dask 最佳实践建议,并提供常见 Dask 问题的解决方案。

本文档专门关注所有 Dask API 共享的最佳实践。读者可能首先希望查阅某个特定 API 的最佳实践文档。

从小处着手

并行带来了额外的复杂性和开销。有时它对于更大的问题是必需的,但通常并非如此。在将像 Dask 这样的并行计算系统添加到您的工作负载之前,您可能需要先尝试一些替代方案

  • 使用更好的算法或数据结构:NumPy、pandas、Scikit-learn 可能具有更快的功能来完成您正在尝试做的事情。咨询专家或再次通读它们的文档以找到更好的预构建算法可能是值得的。

  • 更好的文件格式:支持随机访问的高效二进制格式通常可以帮助您高效且简单地管理大于内存的数据集。请参阅下面的高效存储数据部分。

  • 编译后的代码:使用 Numba 或 Cython 编译您的 Python 代码可能会使并行性变得不必要。或者您可以使用这些库中可用的多核并行性。

  • 采样:即使您有很多数据,使用全部数据可能也没有太大优势。通过智能采样,您也许能够从更易于管理的子集中得出相同的洞察。

  • 性能分析:如果您试图加速慢代码,首先理解它为何慢是很重要的。对代码进行适度的性能分析投资可以帮助您识别是什么减慢了您的速度。这些信息可以帮助您更好地决定并行性是否可能有所帮助,或者其他方法是否可能更有效。

使用控制面板

Dask 的控制面板可以帮助您了解工作节点的状态。这些信息可以指导您找到高效的解决方案。在并行和分布式计算中,需要注意新的开销,因此您旧的直觉可能不再适用。使用控制面板可以帮助您重新学习什么是快什么是慢以及如何处理它们。

有关更多信息,请参阅Dask 控制面板文档

避免非常大的分区

您的数据块应该足够小,以便许多块可以同时放入工作节点的可用内存中。您通常在 Dask DataFrame 中选择分区大小(参见 DataFrame 分区)或在 Dask Array 中选择块大小(参见 Array 块)时控制这一点。

Dask 可能会在一台机器上并行处理与该机器核心数一样多的块。因此,如果您有 1 GB 的块和十个核心,那么 Dask 可能会使用至少 10 GB 的内存。此外,Dask 通常会有 2-3 倍的可用块来处理,以便它总有事情可做。

如果您的机器有 100 GB 内存和 10 个核心,那么您可能希望选择 1 GB 范围的块。您每个核心有十个块的空间,这为 Dask 提供了健康的余量,同时又不会使任务过小。

请注意,您也需要避免过小的块大小。详细信息请参阅下一节。有关选择 Dask 数组块大小的更详细指南,请参阅这篇关于选择合适的块大小的博客文章。

避免非常大的图

Dask 工作负载由任务组成。任务是一个 Python 函数,例如应用于 pandas DataFrame 或 NumPy 数组等 Python 对象的 np.sum。如果您处理具有许多分区的 Dask 集合,那么您执行的每个操作,例如 x + 1,很可能会生成许多任务,至少与集合中的分区一样多。

每个任务都会带来一些开销。这大约在 200 微秒到 1 毫秒之间。如果您的计算有数千个任务,这没问题,大约会有一秒钟的开销,这可能不会让您烦恼。

然而,当您有非常大的图(包含数百万个任务)时,这可能会变得麻烦,因为此时开销在 10 分钟到数小时之间,而且处理如此大的图的开销可能会开始使调度器不堪重负。

您可以通过以下方式构建更小的图:

  • 增加块大小: 如果您有 1,000 GB 数据并使用 10 MB 的块,那么您将有 100,000 个分区。对此类集合的每次操作将至少生成 100,000 个任务。

    然而,如果您将块大小增加到 1 GB 甚至几 GB,那么您将开销降低了几个数量级。这要求您的工作节点具有远大于 1 GB 的内存,但这对于较大的工作负载来说是典型的。

  • 融合操作: Dask 会自行执行一些融合,但您可以帮助它。如果您有一个非常复杂的操作,包含几十个子操作,也许您可以将其打包到一个 Python 函数中,并使用像 da.map_blocksdd.map_partitions 这样的函数。

    通常,您可以将越多的管理工作移入您的函数中越好。这样 Dask 调度器就不必考虑所有细粒度的操作。

  • 分解计算: 对于非常大的工作负载,您可能还想尝试一次将较小的块发送给 Dask。例如,如果您正在处理 PB 级别的数据,但发现 Dask 只能愉快地处理 100 TB,那么您可以将计算分解为十个部分,然后逐个提交它们。

学习定制技术

高级 Dask 集合(array、DataFrame、bag)包含遵循 NumPy 和 pandas 标准 Python API 的常见操作。然而,许多 Python 工作负载很复杂,可能需要这些高级 API 中未包含的操作。

幸运的是,有许多选项支持定制工作负载

  • 所有集合都有一个 map_partitionsmap_blocks 函数,它将用户提供的函数应用于集合中的每个 pandas DataFrame 或 NumPy 数组。由于 Dask 集合由普通的 Python 对象组成,因此通常很容易在不进行大量修改的情况下,将自定义函数映射到数据集的分区上。

    df.map_partitions(my_custom_func)
    
  • 更复杂的 map_* 函数。有时您的自定义行为不是完全并行,而是需要更高级的通信。例如,您可能需要将少量信息从一个分区传递到下一个分区,或者您可能想构建一个自定义聚合。

    Dask 集合也包含了这些方法。

  • 对于更复杂的工作负载,您可以将集合转换为单个块,并使用 Dask Delayed 按您喜欢的方式排列这些块。每个集合通常都有一个 to_delayed 方法。

map_partitions(func, *args[, meta, ...])

对每个 DataFrame 分区应用 Python 函数。

map_overlap(func, df, before, after, *args)

对每个分区应用函数,并与相邻分区共享行。

groupby.Aggregation(name, chunk, agg[, finalize])

用户定义的 groupby 聚合。

blockwise(func, out_ind, *args[, name, ...])

张量操作:广义内积和外积

map_blocks(func, *args[, name, token, ...])

对 dask 数组的所有块应用函数。

map_overlap(func, *args[, depth, boundary, ...])

对具有一些重叠的数组块应用函数

reduction(x, chunk, aggregate[, axis, ...])

约简的通用版本

高效存储数据

随着您的计算能力的提高,您可能会发现数据访问和 I/O 占用了您总时间的更大比例。此外,并行计算通常会对您存储数据的方式增加新的约束,特别是围绕如何根据您的计算计划提供对数据块的随机访问。

例如

  • 对于压缩,您可能会发现您放弃 gzip 和 bz2,转而使用像 lz4、snappy 和 Z-Standard 这样提供更好性能和随机访问的新系统。

  • 对于存储格式,您可能会发现您需要自描述格式,这些格式针对随机访问、元数据存储和二进制编码进行了优化,例如 ParquetORCZarrHDF5GeoTIFF

  • 在云上工作时,您可能会发现一些较旧的格式(如 HDF5)可能效果不佳。

  • 您可能希望以与常见查询很好地对齐的方式对数据进行分区或分块。在 Dask DataFrame 中,这可能意味着选择要排序的列以实现快速选择和连接。对于 Dask Array,这可能意味着选择与您的访问模式和算法对齐的块大小。

进程、线程和虚拟机大小

如果您主要使用 Numpy、pandas、Scikit-learn、Numba 以及其他释放 GIL 的库进行数值计算,则主要使用线程。如果您处理文本数据或 Python 集合(如列表和字典),则主要使用进程。

如果您在线程数很高(远大于 4)的较大机器上,那么无论如何您都应该将事物拆分成至少几个进程。Python 在数值计算方面,每个进程大约使用 4 个线程可以高效运行,但 50 个线程则不能。

这是一条适用于云计算和选择合适的虚拟机实例大小的通用建议。选择“完美”实例有很多细节需要考虑,但一个好的起点是 CPU 与 RAM 的比例为 1:4,每个虚拟机一个工作节点实例。您可以根据您的工作负载进行调整。

有关线程、进程以及如何在 Dask 中配置它们的更多信息,请参阅调度器文档

使用 Dask 加载数据

我们看到一个常见的反模式是人们在客户端(即他们的本地机器)在 Dask 之外创建大型 Python 对象,如 DataFrame 或 Array,然后将它们嵌入到计算中。这意味着 Dask 必须通过网络多次发送这些对象,而不是仅仅传递数据指针。

这会带来很多开销,并显著减慢计算速度,特别是当客户端和调度器之间的网络连接很慢时。它还可能使调度器过载,导致内存不足错误。相反,您应该使用 Dask 方法加载数据,并使用 Dask 来控制结果。

以下是一些需要避免的常见模式和更好的替代方案:

我们正在使用 Dask 读取 Parquet 数据集,然后向其追加一组 pandas DataFrames。我们在将数据发送到 Dask 之前,将 CSV 文件加载到内存中。

ddf = dd.read_parquet(...)

pandas_dfs = []
for fn in filenames:
    pandas_dfs(pandas.read_csv(fn))     # Read locally with pandas
ddf = dd.concat([ddf] + pandas_dfs)     # Give to Dask

相反,我们可以使用 Dask 直接读取 CSV 文件,将所有数据保留在集群上。

ddf = dd.read_parquet(...)
ddf2 = dd.read_csv(filenames)
ddf = dd.concat([ddf, ddf2])

我们正在使用 NumPy 创建一个内存中的数组,然后将其交给 Dask,迫使 Dask 将数组嵌入到任务图中,而不是处理数据指针。

f = h5py.File(...)

x = np.asarray(f["x"])  # Get data as a NumPy array locally
x = da.from_array(x)   # Hand NumPy array to Dask

相反,我们可以使用 Dask 直接读取文件,将所有数据保留在集群上。

f = h5py.File(...)
x = da.from_array(f["x"])  # Let Dask do the reading

我们正在使用 pandas 读取一个大型 CSV 文件,然后使用 delayed 构建一个图来并行化对数据的计算。

@dask.delayed
def process(a, b):
    ...

df = pandas.read_csv("some-large-file.csv")  # Create large object locally
results = []
for item in L:
    result = process(item, df)  # include df in every delayed call
    results.append(result)

相反,我们也可以使用 delayed 来读取数据。这避免了将大型文件嵌入到图中,Dask 只需传递对 delayed 对象的引用即可。

@dask.delayed
def process(a, b):
   ...

df = dask.delayed(pandas.read_csv)("some-large-file.csv")  # Let Dask build object
results = []
for item in L:
   result = process(item, df)  # include pointer to df in every delayed call
   results.append(result)

将大型对象(如 pandas DataFrames 或 Arrays)嵌入到计算中是 Dask 用户常遇到的痛点。这会带来显著的延迟,直到调度器接收并能够开始计算,并在计算期间给调度器带来压力。

转而使用 Dask 加载这些对象可以避免这些问题,并显著提高计算性能。

避免重复调用 compute

调用 compute 将阻塞客户端上的执行,直到 Dask 计算完成。我们经常看到的一种模式是用户在循环中或对略有不同的查询顺序调用 compute

这会阻止 Dask 在集群上并行化不同的计算,并阻止在不同查询之间共享中间结果。

foo = ...
results = []
for i in range(...):
     results.append(foo.select(...).compute())

这会在每次迭代到达 compute 调用时暂停执行,一次计算一个查询。

foo = ...
results = []
for i in range(...):
     results.append(foo.select(...))  # no compute here
results = dask.compute(*results)

这允许 Dask 只计算计算的共享部分(如上面的 foo 对象)一次,而不是每次调用 compute 时计算一次,并且允许 Dask 对不同的选择进行并行化,而不是顺序运行它们。