调度

所有大规模的 Dask 集合,例如 Dask ArrayDask DataFrameDask Bag,以及细粒度 API,例如 delayedfutures,都会生成任务图。任务图中的每个节点都是一个普通的 Python 函数,节点之间的边是普通的 Python 对象,这些对象由一个任务创建作为输出,并在另一个任务中用作输入。Dask 生成这些任务图后,需要将它们并行地在硬件上执行。这就是任务调度器的工作。存在不同的任务调度器,每个调度器都会接收一个任务图并计算出相同的结果,但性能特性不同。

Dask 有两类任务调度器

  1. 单机调度器:此调度器在本地进程或线程池上提供基本功能。它是最早开发的调度器,也是默认调度器。它使用简单且开销小,但只能在单机上使用,无法扩展。

  2. 分布式调度器:此调度器更复杂,提供更多功能,但设置起来也需要更多精力。它可以在本地运行,也可以分布式地运行在集群上。


Dask is composed of three parts. "Collections" create "Task Graphs" which are then sent to the "Scheduler" for execution. There are two types of schedulers that are described in more detail below.

对于不同的计算任务,您可能会发现特定的调度器设置能带来更好的性能。本文档旨在帮助您理解如何选择和配置不同的调度器,并提供何时选择特定调度器的指导。

本地线程

import dask
dask.config.set(scheduler='threads')  # overwrite default with threaded scheduler

线程调度器使用本地的 concurrent.futures.ThreadPoolExecutor 执行计算。它轻量且无需设置。它引入的任务开销非常小(每个任务约 50 微秒),并且由于所有事情都发生在同一个进程中,任务之间传输数据没有任何成本。然而,由于 Python 的全局解释器锁 (GIL),此调度器仅在您的计算主要由非 Python 代码主导时提供并行性,这主要是在处理 NumPy 数组、Pandas DataFrame 中的数值数据或使用生态系统中任何其他基于 C/C++/Cython 的项目时。

线程调度器是 Dask ArrayDask DataFrameDask Delayed 的默认选择。但是,如果您的计算主要由处理纯 Python 对象(如字符串、字典或列表)主导,那么您可能需要尝试以下基于进程的调度器(我们目前推荐在本地机器上使用分布式调度器)。

本地进程

注意

下文描述的分布式调度器在今天通常是更好的选择。我们鼓励读者在本节之后继续阅读。

提示

在独立的 Python 脚本中使用多进程调度器时,请务必包含一个 if __name__ == "__main__": 块。更多详情请参阅独立的 Python 脚本

import dask
dask.config.set(scheduler='processes')  # overwrite default with multiprocessing scheduler

多进程调度器使用本地的 concurrent.futures.ProcessPoolExecutor 执行计算。它使用轻量且无需设置。每个任务及其所有依赖项都会发送到本地进程执行,然后结果再发送回主进程。这意味着它能够绕过 GIL 的问题,即使在主要由纯 Python 代码主导的计算(例如处理字符串、字典和列表的计算)中也能提供并行性。

然而,在远程进程之间来回移动数据可能会带来性能损失,尤其是在进程之间传输的数据量很大时。当工作流程相对线性,不涉及大量的任务间数据传输,并且输入和输出都较小(如文件名和计数)时,多进程调度器是一个很好的选择。

这在基本的数据摄取工作负载中很常见,例如 Dask Bag 中常见的工作负载,其中多进程调度器是默认设置。

>>> import dask.bag as db
>>> db.read_text('*.json').map(json.loads).pluck('name').frequencies().compute()
{'alice': 100, 'bob': 200, 'charlie': 300}

对于更复杂的工作负载,其中大型中间结果可能被多个下游任务依赖,我们通常建议在本地机器上使用分布式调度器。分布式调度器在移动大型中间结果方面更智能。

单线程

import dask
dask.config.set(scheduler='synchronous')  # overwrite default with single-threaded scheduler

单线程同步调度器在本地线程中执行所有计算,完全没有并行性。这对于调试和性能分析特别有价值,因为在使用线程或进程时,这些操作会更加困难。

例如,在使用 IPython 或 Jupyter notebooks 时,使用并行 Dask 调度器时,%debug%pdb%prun 等 magic 命令可能无法正常工作(它们并非设计用于并行计算环境)。但是,如果您遇到异常并希望进入调试器,您可能希望在单线程调度器下重新运行计算,以便这些工具能够正常工作。

Dask 分布式 (本地)

提示

在独立的 Python 脚本中使用本地分布式调度器时,请务必包含一个 if __name__ == "__main__": 块。更多详情请参阅独立的 Python 脚本

from dask.distributed import Client
client = Client()
# or
client = Client(processes=False)

Dask 分布式调度器既可以在集群上设置,也可以在个人电脑上本地运行。尽管名字叫“分布式”,但在本地机器上使用它通常很实用,原因如下:

  1. 它提供了对异步 API 的访问,特别是 Futures

  2. 它提供了一个诊断仪表盘,可以提供有关性能和进度的宝贵见解

  3. 它能更智能地处理数据局部性,因此在需要多个进程的工作负载上,它比多进程调度器效率更高

您可以在这些文档中阅读更多关于在单机上使用 Dask 分布式调度器的信息。

Dask 分布式 (集群)

您也可以在分布式集群上运行 Dask。根据您的集群类型,有多种方法可以进行设置。我们建议参考如何部署 Dask 集群以获取更多信息。

配置

您可以使用 dask.config.set(scheduler...) 命令配置全局默认调度器。这可以全局设置

dask.config.set(scheduler='threads')

x.compute()

或作为上下文管理器

with dask.config.set(scheduler='threads'):
    x.compute()

或在单次计算调用中

x.compute(scheduler='threads')

每个调度器都可能支持特定于该调度器的额外关键字参数。例如,基于池的单机调度器允许您提供自定义线程池或指定所需的工作进程数。

from concurrent.futures import ThreadPoolExecutor
with dask.config.set(pool=ThreadPoolExecutor(4)):
    x.compute()

with dask.config.set(num_workers=4):
    x.compute()

请注意,Dask 还支持自定义的 concurrent.futures.Executor 子类,例如来自 lokyReusablePoolExecutor

from loky import get_reusable_executor
with dask.config.set(scheduler=get_reusable_executor()):
    x.compute()

其他库如 ipyparallelmpi4py 也提供了 concurrent.futures.Executor 子类,也可以使用。

独立的 Python 脚本

在独立的 Python 脚本中运行 Dask 调度器时需要注意一些事项。具体来说,当使用单机多进程调度器或本地分布式调度器时,Dask 会创建额外的 Python 进程。作为 Python 正常子进程初始化的一部分,Python 会在创建的每个子进程中导入脚本的内容(这对于任何创建子进程的 Python 代码都是如此,不仅仅是在 Dask 中)。这种导入初始化可能导致子进程递归创建其他子进程,最终引发错误。

常见错误
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.

This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:

   if __name__ == '__main__':
         freeze_support()
         ...

The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.

为了避免这类错误,您应该将任何创建子进程的 Dask 代码(例如,所有使用多进程调度器的 compute() 调用,或创建本地分布式集群时)放在 if __name__ == "__main__": 块内。这确保了子进程只在您的脚本作为主程序运行时才被创建。

例如,使用下面的脚本运行 python myscript.py 将会引发错误

# myscript.py

from dask.distributed import Client
client = Client()  # Will raise an error when creating local subprocesses

相反,应该将脚本的内容放在 if __name__ == "__main__": 块内

# myscript.py

if __name__ == "__main__":  # This avoids infinite subprocess creation

   from dask.distributed import Client
   client = Client()

关于此主题的更多详细信息,请参阅Python 的多进程指南