Dask 最佳实践

开始使用 Dask 的 API 很容易,但要用得则需要一些经验。本页面包含 Dask 最佳实践的建议,并提供了常见 Dask 问题的解决方案。

本文档专门关注 Dask 所有 API 通用的最佳实践。读者可能首先想查看特定 API 的最佳实践文档。

从小处开始

并行性会带来额外的复杂性和开销。对于较大的问题有时是必要的,但通常不是。在将 Dask 这样的并行计算系统添加到您的工作负载之前,您可能想先尝试一些替代方案

  • 使用更好的算法或数据结构:NumPy、pandas、Scikit-learn 可能有针对您尝试做的事情的更快函数。值得咨询专家或再次阅读他们的文档以找到更好的预构建算法。

  • 更好的文件格式:支持随机访问的高效二进制格式通常可以帮助您高效简单地管理大于内存的数据集。请参阅下面的高效存储数据部分。

  • 编译代码:使用 Numba 或 Cython 编译您的 Python 代码可能会使并行化变得不必要。或者您可以使用这些库中提供的多核并行性。

  • 采样:即使您有很多数据,使用所有数据可能也没有太大优势。通过智能采样,您可以从更容易管理的子集中获得相同的洞察力。

  • 分析:如果您尝试加快慢速代码的速度,首先了解其原因非常重要。对代码进行适度的分析投资可以帮助您确定是什么让您慢下来。这些信息可以帮助您更好地决定并行性是否可能有所帮助,或者其他方法是否可能更有效。

使用仪表板

Dask 的仪表板可帮助您了解工作器的状态。此信息可以指导您找到高效的解决方案。在并行和分布式计算中,需要注意新的开销,因此您旧有的直觉可能不再适用。使用仪表板可以帮助您重新学习哪些是快速的,哪些是慢速的,以及如何处理它们。

有关更多信息,请参阅Dask 仪表板文档

避免使用非常大的分区

您的数据块应足够小,以便许多块能同时适合工作器的可用内存。您通常在 Dask DataFrame 中选择分区大小时控制这一点(参见DataFrame 分区),或在 Dask Array 中选择块大小时控制这一点(参见Array 块)。

Dask 可能会在一台机器上并行处理与该机器核心数相同数量的块。因此,如果您有 1 GB 的块和十个核心,那么 Dask 可能会使用至少 10 GB 的内存。此外,Dask 通常会有 2-3 倍数量的块可供处理,以便它总是有事情可做。

如果您有一台配备 100 GB 内存和 10 个核心的机器,您可能想选择大约 1GB 大小的块。每个核心有足够的空间容纳十个块,这为 Dask 提供了健康的余量,同时避免了任务过小的问题。

请注意,您也应避免块大小过小。详细信息请参阅下一节。有关 Dask Arrays 选择块大小的更详细指南,请参阅这篇博文:选择合适的块大小

避免使用非常大的图

Dask 工作负载由任务组成。任务是一个 Python 函数,例如应用于 Python 对象(如 pandas DataFrame 或 NumPy 数组)的 np.sum。如果您正在使用包含许多分区的 Dask 集合,那么您执行的每个操作,例如 x + 1,很可能会生成许多任务,至少与您集合中的分区数量一样多。

每个任务都有一定的开销。大约在 200微秒 到 1毫秒 之间。如果您的计算有数千个任务,这没问题,开销大约一秒钟,可能不会困扰您。

然而,当您拥有包含数百万任务的非常大的图时,这可能会变得麻烦,这既是因为开销现在在 10 分钟到几小时的范围内,也是因为处理如此大的图的开销可能会开始压垮调度器。

您可以通过以下方式构建更小的图:

  • 增加您的块大小:如果您有 1,000 GB 数据并使用 10 MB 的块,那么您有 100,000 个分区。对此类集合的每个操作将生成至少 100,000 个任务。

    然而,如果您将块大小增加到 1 GB 甚至几 GB,那么您可以将开销降低几个数量级。这要求您的工作器拥有远超 1 GB 的内存,但这对于较大的工作负载是典型的。

  • 将操作融合在一起:Dask 会自己做一些融合,但您可以帮助它。如果您有一个非常复杂的操作,包含几十个子操作,也许您可以将其打包到一个单独的 Python 函数中,并使用诸如 da.map_blocksdd.map_partitions 之类的函数。

    一般来说,您能将越多的管理工作移到您的函数中越好。这样 Dask 调度器就不需要考虑所有细粒度的操作了。

  • 分解您的计算:对于非常大的工作负载,您可能还想尝试一次将较小的块发送给 Dask。例如,如果您正在处理 PB 级数据但发现 Dask 只能处理 100 TB,也许您可以将计算分解成十个部分,然后逐个提交它们。

学习定制技巧

高层 Dask 集合(array、DataFrame、bag)包含遵循 NumPy 和 pandas 标准 Python API 的常见操作。然而,许多 Python 工作负载很复杂,可能需要这些高层 API 中未包含的操作。

幸运的是,有许多选项可以支持自定义工作负载

  • 所有集合都有一个 map_partitionsmap_blocks 函数,它将用户提供的函数应用于集合中的每个 pandas DataFrame 或 NumPy 数组。由于 Dask 集合由普通的 Python 对象组成,因此通常很容易将自定义函数映射到数据集的分区上而无需太多修改。

    df.map_partitions(my_custom_func)
    
  • 更复杂的 map_* 函数。有时您的自定义行为并不是高度并行化的,而是需要更高级的通信。例如,您可能需要在某个分区和下一个分区之间传递少量信息,或者您可能想构建一个自定义聚合。

    Dask 集合也包含用于这些目的的方法。

  • 对于更复杂的工作负载,您可以将集合转换为单独的块,并使用 Dask Delayed 按您喜欢的方式排列这些块。每个集合通常都有一个 to_delayed 方法。

map_partitions(func, *args[, meta, ...])

对每个 DataFrame 分区应用 Python 函数。

map_overlap(func, df, before, after, *args)

对每个分区应用一个函数,与相邻分区共享行。

groupby.Aggregation(name, chunk, agg[, finalize])

用户定义的 groupby-aggregation。

blockwise(func, out_ind, *args[, name, ...])

张量操作:广义内积和外积

map_blocks(func, *args[, name, token, ...])

将函数映射到 dask 数组的所有块上。

map_overlap(func, *args[, depth, boundary, ...])

将函数映射到具有一些重叠的数组块上

reduction(x, chunk, aggregate[, axis, ...])

归约操作的通用版本

高效存储数据

随着您的计算能力提高,您可能会发现数据访问和 I/O 占用了总时间的更大一部分。此外,并行计算通常会为您的数据存储方式增加新的限制,特别是在提供与您计划计算方式一致的数据块的随机访问方面。

例如

  • 对于压缩,您可能会发现您会放弃 gzip 和 bz2,转而采用 lz4、snappy 和 Z-Standard 等提供更好性能和随机访问的新系统。

  • 对于存储格式,您可能会发现您想要自描述格式,这些格式针对随机访问、元数据存储和二进制编码进行了优化,例如 ParquetORCZarrHDF5GeoTIFF

  • 在云上工作时,您可能会发现一些较旧的格式(如 HDF5)可能效果不佳。

  • 您可能想以与常见查询很好地对齐的方式对数据进行分区或分块。在 Dask DataFrame 中,这可能意味着选择一个列进行排序以实现快速选择和连接。对于 Dask Array,这可能意味着选择与您的访问模式和算法对齐的块大小。

进程、线程和虚拟机大小

如果您主要使用 Numpy、pandas、Scikit-learn、Numba 和其他释放 GIL 的库进行数值计算,那么主要使用线程。如果您处理文本数据或列表、字典等 Python 集合,那么主要使用进程。

如果您在使用线程数很高(远大于 4)的较大型机器上,那么无论如何您可能都应该将事情分解为至少几个进程。Python 在数值计算方面,每个进程大约 4 个线程可以达到很高的生产力,但 50 个线程则不行。

这条建议也适用于云计算和选择合适的虚拟机实例大小。选择“完美”实例有很多细微之处,但一个好的起点是 CPU 与 RAM 的比例为 1:4,每个虚拟机一个 Worker 实例。您可以根据您的工作负载进行调整。

有关线程、进程以及如何在 Dask 中配置它们的更多信息,请参阅调度器文档

使用 Dask 加载数据

我们看到的一种常见的反模式是人们在客户端(即本地机器)在 Dask 之外创建大型 Python 对象(如 DataFrame 或 Array),然后将其嵌入到计算中。这意味着 Dask 必须通过网络多次发送这些对象,而不是只传递数据的指针。

这会产生很多开销并显著减慢计算速度,特别是如果客户端和调度器之间的网络连接很慢。它也可能使调度器过载,导致内存不足错误。相反,您应该使用 Dask 方法加载数据并使用 Dask 控制结果。

以下是一些应避免的常见模式和更好的替代方案

我们在向 Parquet 数据集追加一组 pandas DataFrames 之前使用 Dask 读取该数据集。我们正在将 CSV 文件加载到内存中,然后再将数据发送给 Dask。

ddf = dd.read_parquet(...)

pandas_dfs = []
for fn in filenames:
    pandas_dfs(pandas.read_csv(fn))     # Read locally with pandas
ddf = dd.concat([ddf] + pandas_dfs)     # Give to Dask

相反,我们可以直接使用 Dask 读取 CSV 文件,将所有数据保留在集群上。

ddf = dd.read_parquet(...)
ddf2 = dd.read_csv(filenames)
ddf = dd.concat([ddf, ddf2])

我们在将一个内存中的数组交给 Dask 之前使用 NumPy 创建它,迫使 Dask 将数组嵌入到任务图中,而不是处理数据的指针。

f = h5py.File(...)

x = np.asarray(f["x"])  # Get data as a NumPy array locally
x = da.from_array(x)   # Hand NumPy array to Dask

相反,我们可以直接使用 Dask 读取文件,将所有数据保留在集群上。

f = h5py.File(...)
x = da.from_array(f["x"])  # Let Dask do the reading

我们在使用 delayed 构建图来并行计算数据之前,使用 pandas 读取一个大型 CSV 文件。

@dask.delayed
def process(a, b):
    ...

df = pandas.read_csv("some-large-file.csv")  # Create large object locally
results = []
for item in L:
    result = process(item, df)  # include df in every delayed call
    results.append(result)

相反,我们也可以使用 delayed 读取数据。这避免了将大型文件嵌入到图中,Dask 可以只传递对 delayed 对象的引用。

@dask.delayed
def process(a, b):
   ...

df = dask.delayed(pandas.read_csv)("some-large-file.csv")  # Let Dask build object
results = []
for item in L:
   result = process(item, df)  # include pointer to df in every delayed call
   results.append(result)

将 pandas DataFrames 或 Arrays 等大型对象嵌入到计算中是 Dask 用户经常遇到的痛点。它会显著增加调度器接收并开始计算的延迟,并在计算过程中给调度器带来压力。

相反,使用 Dask 加载这些对象可以避免这些问题,并显著提高计算性能。

避免重复调用 compute

调用 compute 将阻塞客户端的执行,直到 Dask 计算完成。我们经常看到的一种模式是用户在循环中或对略有不同的查询顺序调用 compute

这会阻止 Dask 在集群上并行执行不同的计算,也阻止了在不同查询之间共享中间结果。

foo = ...
results = []
for i in range(...):
     results.append(foo.select(...).compute())

这会在每次迭代到达 compute 调用时暂停执行,一次计算一个查询。

foo = ...
results = []
for i in range(...):
     results.append(foo.select(...))  # no compute here
results = dask.compute(*results)

这使得 Dask 只计算计算的共享部分(例如上面的 foo 对象)一次,而不是每次调用 compute 都计算一次,并且允许 Dask 并行执行不同的选择操作,而不是顺序运行它们。