调度器概览
目录
调度器概览¶
创建 Dask 图后,我们使用调度器来运行它。Dask 当前实现了几种不同的调度器
dask.threaded.get
: 基于线程池的调度器dask.multiprocessing.get
: 基于进程池的调度器dask.get
: 同步调度器,适用于调试distributed.Client.get
: 分布式调度器,用于在多台机器上执行图。它位于外部的 distributed 项目中。
get
函数¶
所有调度器的入口点都是一个 get
函数。它接受一个 Dask 图以及要计算的键或键列表
>>> from operator import add
>>> dsk = {'a': 1,
... 'b': 2,
... 'c': (add, 'a', 'b'),
... 'd': (sum, ['a', 'b', 'c'])}
>>> get(dsk, 'c')
3
>>> get(dsk, 'd')
6
>>> get(dsk, ['a', 'b', 'c'])
[1, 2, 3]
使用 compute
方法¶
在使用 Dask collections 时,您很少需要直接与调度器 get
函数交互。每个 collection 都有一个默认的调度器,以及一个内置的 compute
方法来计算 collection 的输出
>>> import dask.array as da
>>> x = da.arange(100, chunks=10)
>>> x.sum().compute()
4950
compute
方法接受多个关键字参数
scheduler
: 期望调度器的名称(字符串形式,如"threads"
,"processes"
,"single-threaded"
等)、一个get
函数,或一个dask.distributed.Client
对象。这将覆盖 collection 的默认设置。**kwargs
: 要传递给调度器get
函数的额外关键字参数。
另请参阅: 配置调度器。
compute
函数¶
您可能希望一次性计算多个 Dask collections 的结果。与每个 collection 上的 compute
方法类似,还有一个通用的 compute
函数,它接受多个 collections 并返回多个结果。这会合并每个 collection 的图,从而共享中间结果
>>> y = (x + 1).sum()
>>> z = (x + 1).mean()
>>> da.compute(y, z) # Compute y and z, sharing intermediate results
(5050, 50.5)
在这里,中间结果 x + 1
只计算了一次,而如果分别调用 y.compute()
和 z.compute()
,它会被计算两次。对于共享许多中间结果的大型图来说,这能显著提升性能。
compute
函数适用于任何 Dask collection,并且位于 dask.base
中。为了方便起见,它也被导入到每个 collection 的顶层命名空间中。
>>> from dask.base import compute
>>> compute is da.compute
True
配置调度器¶
Dask collections 各自有一个默认的调度器
dask.array
和dask.dataframe
默认使用 threaded 调度器dask.bag
默认使用 multiprocessing 调度器。
在大多数情况下,默认设置是很好的选择。但是,有时您可能希望使用不同的调度器。有两种方法可以做到这一点。
在
compute
方法中使用scheduler
关键字参数>>> x.sum().compute(scheduler='processes')
使用
dask.config.set
。这既可以用作上下文管理器,也可以全局设置调度器# As a context manager >>> with dask.config.set(scheduler='processes'): ... x.sum().compute() # Set globally >>> dask.config.set(scheduler='processes') >>> x.sum().compute()
此外,每个调度器都可能接受一些特定于该调度器的额外关键字参数。例如,multiprocessing 和 threaded 调度器都接受一个 num_workers
关键字参数,用于设置要使用的进程或线程数量(默认为核心数)。这可以在调用 compute
时通过传递该关键字参数来设置
# Compute with 4 threads
>>> x.compute(num_workers=4)
或者,multiprocessing 和 threaded 调度器会检查是否使用 dask.config.set
设置了全局线程池或进程池
>>> from concurrent.futures import ThreadPoolExecutor
>>> with dask.config.set(pool=ThreadPoolExecutor(4)):
... x.compute()
multiprocessing 调度器还支持 不同的上下文(“spawn”、“forkserver”、“fork”),您可以使用 dask.config.set
进行设置。默认上下文是“spawn”,但您可以设置不同的上下文
>>> with dask.config.set({"multiprocessing.context": "forkserver"}):
... x.compute()
有关每个调度器的单独选项的更多信息,请参阅每个调度器 get
函数的文档字符串。
调试调度器¶
调试并行代码可能很困难,因为 pdb
等传统工具在多线程或多进程环境下效果不佳。为了解决这个问题,在调试时我们建议使用位于 dask.get
的同步调度器。它会串行运行所有内容,使其与 pdb
良好配合
>>> dask.config.set(scheduler='single-threaded')
>>> x.sum().compute() # This computation runs serially instead of in parallel
共享内存调度器还提供一组回调函数,可用于诊断和性能分析。您可以在此处了解更多关于调度器回调函数和诊断的信息。
更多信息¶
有关共享内存(threaded 或 multiprocessing)调度器设计的信息,请参阅共享内存
有关分布式内存调度器信息,请参阅distributed