调度
目录
调度¶
所有大型 Dask 集合(如 Dask Array、Dask DataFrame 和 Dask Bag)以及细粒度 API(如 delayed 和 futures)都会生成任务图,图中的每个节点都是一个普通的 Python 函数,节点之间的边是由一个任务创建并作为输出、在另一个任务中用作输入的普通 Python 对象。Dask 生成这些任务图后,需要在并行硬件上执行它们。这是一项 任务调度器 的职责。存在不同的任务调度器,每个调度器都会使用一个任务图并计算出相同的结果,但性能特性不同。
Dask 有两类任务调度器
单机调度器:此调度器在本地进程或线程池上提供基本功能。它是首先创建的调度器,也是默认调度器。它使用简单且成本低廉,但只能在单机上使用且不可扩展。
分布式调度器:此调度器更复杂,提供更多功能,但也需要更多精力进行设置。它可以在本地或跨集群分布式运行。
对于不同的计算,您可能会发现使用特定的调度器设置能获得更好的性能。本文档旨在帮助您理解如何选择和配置不同的调度器,并提供何时使用哪种调度器更合适的指导方针。
本地线程¶
import dask
dask.config.set(scheduler='threads') # overwrite default with threaded scheduler
线程调度器使用本地 concurrent.futures.ThreadPoolExecutor
执行计算。它轻量且无需设置。它引入的任务开销非常小(每个任务约 50 微秒),并且由于所有操作都在同一进程中发生,因此任务之间传输数据没有成本。然而,由于 Python 的全局解释器锁 (GIL),此调度器仅在计算主要由非 Python 代码主导时(例如在 NumPy 数组、Pandas DataFrame 中操作数值数据,或使用生态系统中任何其他基于 C/C++/Cython 的项目时)提供并行性。
线程调度器是 Dask Array、Dask DataFrame 和 Dask Delayed 的默认选择。但是,如果您的计算主要由处理纯 Python 对象(如字符串、字典或列表)主导,那么您可能想尝试下面基于进程的调度器之一(我们目前推荐在本地机器上使用分布式调度器)。
本地进程¶
注意
下文描述的分布式调度器如今通常是更好的选择。我们建议读者在本节之后继续阅读。
提示
在使用多进程调度器运行独立 Python 脚本时,请务必包含一个 if __name__ == "__main__":
块。有关更多详细信息,请参阅 独立 Python 脚本。
import dask
dask.config.set(scheduler='processes') # overwrite default with multiprocessing scheduler
多进程调度器使用本地 concurrent.futures.ProcessPoolExecutor
执行计算。它使用轻便且无需设置。每个任务及其所有依赖项都会被发送到一个本地进程执行,然后结果被发送回主进程。这意味着它能够绕过 GIL 的问题,即使在主要由纯 Python 代码主导的计算中(例如处理字符串、字典和列表的计算)也能提供并行性。
然而,将数据发送到远程进程并传回可能会引入性能损失,尤其是在进程之间传输的数据较大时。多进程调度器非常适用于工作流程相对线性、不涉及大量任务间数据传输,并且输入和输出都较小(例如文件名和计数)的情况。
这在基本数据摄取工作负载中很常见,例如在 Dask Bag 中常见的情况,多进程调度器是其默认设置。
>>> import dask.bag as db
>>> db.read_text('*.json').map(json.loads).pluck('name').frequencies().compute()
{'alice': 100, 'bob': 200, 'charlie': 300}
对于更复杂的工作负载,其中较大的中间结果可能被多个下游任务依赖,我们通常建议在本地机器上使用分布式调度器。分布式调度器在处理大型中间结果的移动方面更智能。
单线程¶
import dask
dask.config.set(scheduler='synchronous') # overwrite default with single-threaded scheduler
单线程同步调度器在本地线程中执行所有计算,完全没有并行性。这对于调试和性能分析特别有价值,因为在使用线程或进程时它们会变得更加困难。
例如,在使用 IPython 或 Jupyter Notebook 时,当使用并行 Dask 调度器时,%debug
、%pdb
或 %prun
等魔术命令将无法很好地工作(它们并非设计用于并行计算环境)。但是,如果您遇到异常并想进入调试器,您可能希望在单线程调度器下重新运行计算,此时这些工具将正常工作。
Dask Distributed (本地)¶
提示
在使用本地分布式调度器运行独立 Python 脚本时,请务必包含一个 if __name__ == "__main__":
块。有关更多详细信息,请参阅 独立 Python 脚本。
from dask.distributed import Client
client = Client()
# or
client = Client(processes=False)
Dask 分布式调度器可以 设置在集群上 或在个人机器上本地运行。尽管其名称为“分布式”,但在本地机器上使用它通常很实用,原因如下:
它提供了对异步 API 的访问,特别是 Futures
它提供了一个诊断仪表盘,可以提供有关性能和进度的宝贵见解
它更复杂地处理数据局部性,因此在需要多个进程的工作负载上,它比多进程调度器更高效
您可以在 这些文档 中阅读更多关于在单机上使用 Dask 分布式调度器的信息。
Dask Distributed (集群)¶
您也可以在分布式集群上运行 Dask。有多种方法可以根据您的集群进行设置。我们建议参考 如何部署 Dask 集群 获取更多信息。
配置¶
您可以通过使用 dask.config.set(scheduler...)
命令配置全局默认调度器。这可以全局设置
dask.config.set(scheduler='threads')
x.compute()
或作为上下文管理器
with dask.config.set(scheduler='threads'):
x.compute()
或在单个计算调用中
x.compute(scheduler='threads')
每个调度器可能支持特定于该调度器的额外关键字参数。例如,基于池的单机调度器允许您提供自定义池或指定所需的 worker 数量
from concurrent.futures import ThreadPoolExecutor
with dask.config.set(pool=ThreadPoolExecutor(4)):
x.compute()
with dask.config.set(num_workers=4):
x.compute()
请注意,Dask 还支持自定义 concurrent.futures.Executor
子类,例如来自 loky 的 ReusablePoolExecutor
from loky import get_reusable_executor
with dask.config.set(scheduler=get_reusable_executor()):
x.compute()
其他库如 ipyparallel 和 mpi4py 也提供了 concurrent.futures.Executor
子类,同样可以使用。
独立 Python 脚本¶
在独立 Python 脚本中运行 Dask 调度器时需要注意一些事项。具体来说,当使用单机多进程调度器或本地分布式调度器时,Dask 会创建额外的 Python 进程。作为 Python 正常子进程初始化的一部分,Python 会在创建的每个子进程中导入脚本内容(这适用于任何创建子进程的 Python 代码,不仅限于 Dask)。这种导入初始化可能导致子进程递归地创建其他子进程,最终引发错误。
常见的错误
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
为了避免这类错误,您应该将任何创建子进程的 Dask 代码(例如,所有使用多进程调度器的 compute()
调用,或创建本地分布式集群时)放在一个 if __name__ == "__main__":
块中。这确保了只有当脚本作为主程序运行时才会创建子进程。
例如,使用以下脚本运行 python myscript.py
将会引发错误
# myscript.py
from dask.distributed import Client
client = Client() # Will raise an error when creating local subprocesses
相反,应该将脚本内容放在 if __name__ == "__main__":
块中
# myscript.py
if __name__ == "__main__": # This avoids infinite subprocess creation
from dask.distributed import Client
client = Client()
有关此主题的更多详细信息,请参阅 Python 的多进程指南。