调度器概述

创建 Dask 图后,我们使用调度器来运行它。Dask 当前实现了几种不同的调度器

  • dask.threaded.get:一个由线程池支持的调度器

  • dask.multiprocessing.get:一个由进程池支持的调度器

  • dask.get:一个同步调度器,适合调试

  • distributed.Client.get:一个用于在多台机器上执行图的分布式调度器。这位于外部的 distributed 项目中。

get 函数

所有调度器的入口点都是一个 get 函数。它接受一个 Dask 图以及要计算的一个或多个键

>>> from operator import add

>>> dsk = {'a': 1,
...        'b': 2,
...        'c': (add, 'a', 'b'),
...        'd': (sum, ['a', 'b', 'c'])}

>>> get(dsk, 'c')
3

>>> get(dsk, 'd')
6

>>> get(dsk, ['a', 'b', 'c'])
[1, 2, 3]

使用 compute 方法

使用 Dask 集合时,您很少需要直接与调度器的 get 函数交互。每个集合都有一个默认调度器和一个内置的 compute 方法,用于计算集合的输出

>>> import dask.array as da
>>> x = da.arange(100, chunks=10)
>>> x.sum().compute()
4950

compute 方法接受多个关键字参数

  • scheduler:所需调度器的名称字符串(如 "threads""processes""single-threaded" 等),或者一个 get 函数,或者一个 dask.distributed.Client 对象。它会覆盖集合的默认设置。

  • **kwargs:要传递给调度器 get 函数的额外关键字参数。

另请参阅:配置调度器

compute 函数

您可能希望一次性计算多个 Dask 集合的结果。类似于每个集合上的 compute 方法,还有一个通用的 compute 函数,它接受多个集合并返回多个结果。这会合并每个集合的图,因此中间结果是共享的

>>> y = (x + 1).sum()
>>> z = (x + 1).mean()
>>> da.compute(y, z)    # Compute y and z, sharing intermediate results
(5050, 50.5)

在这里,中间结果 x + 1 只计算了一次,而分别调用 y.compute()z.compute() 则会计算两次。对于共享许多中间结果的大型图,这可以带来显著的性能提升。

compute 函数适用于任何 Dask 集合,并且位于 dask.base 中。为了方便起见,它也被导入到每个集合的顶层命名空间中。

>>> from dask.base import compute
>>> compute is da.compute
True

配置调度器

每个 Dask 集合都有一个默认调度器

  • dask.arraydask.dataframe 默认使用线程调度器

  • dask.bag 默认使用多进程调度器。

大多数情况下,默认设置是很好的选择。但是,有时您可能想使用不同的调度器。有两种方法可以做到这一点。

  1. compute 方法中使用 scheduler 关键字

    >>> x.sum().compute(scheduler='processes')
    
  2. 使用 dask.config.set。这可以作为上下文管理器使用,也可以全局设置调度器。

    # As a context manager
    >>> with dask.config.set(scheduler='processes'):
    ...     x.sum().compute()
    
    # Set globally
    >>> dask.config.set(scheduler='processes')
    >>> x.sum().compute()
    

此外,每个调度器可能会接受一些特定于该调度器的额外关键字参数。例如,多进程调度器和线程调度器都接受一个 num_workers 关键字参数,用于设置要使用的进程或线程数量(默认为核心数量)。这可以在调用 compute 时通过传递关键字参数来设置。

# Compute with 4 threads
>>> x.compute(num_workers=4)

或者,多进程调度器和线程调度器会检查是否使用 dask.config.set 设置了全局池。

>>> from concurrent.futures import ThreadPoolExecutor
>>> with dask.config.set(pool=ThreadPoolExecutor(4)):
...     x.compute()

多进程调度器还支持不同的上下文(“spawn”、“forkserver”、“fork”),您可以使用 dask.config.set 来设置。默认上下文是“spawn”,但您可以设置一个不同的。

>>> with dask.config.set({"multiprocessing.context": "forkserver"}):
...     x.compute()

有关每个调度器单独选项的更多信息,请参阅每个调度器 get 函数的文档字符串。

调试调度器

调试并行代码可能很困难,因为传统的工具(如 pdb)在多个线程或进程中工作得不好。为了解决调试时的这个问题,我们建议使用位于 dask.get 的同步调度器。它会串行运行所有内容,使其能很好地与 pdb 配合使用。

>>> dask.config.set(scheduler='single-threaded')
>>> x.sum().compute()    # This computation runs serially instead of in parallel

共享内存调度器还提供了一组回调函数,可用于诊断和性能分析。您可以在这里了解更多关于调度器回调和诊断的信息。

更多信息

  • 有关共享内存(线程或多进程)调度器设计的信息,请参阅共享内存

  • 有关分布式内存调度器的信息,请参阅distributed