共享内存

异步调度器接受任何 concurrent.futures.Executor 实例。这包括 Python 标准库中定义的 ThreadPoolExecutorProcessPoolExecutor 实例,以及来自第三方库的任何其他子类。Dask 还定义了自己的 SynchronousExecutor,它只在主线程上运行函数(对调试很有用)。

完整的 Dask get 函数分别存在于 dask.threaded.getdask.multiprocessing.getdask.get 中。

策略

异步调度器维护索引数据结构,这些结构显示哪些任务依赖于哪些数据,哪些数据可用,哪些数据在等待哪些任务完成后才能释放,以及哪些任务当前正在运行。相对于总任务数和可用任务数,它可以在常数时间内更新这些信息。这些索引结构使得 Dask 异步调度器能够在单台机器上扩展到非常多的任务。

Embarrassingly parallel dask flow

为了保持较小的内存占用,我们选择将就绪任务保存在后进先出栈中,以便最近可用的任务获得优先权。这鼓励在开始新链之前完成相关任务链。这也可以在常数时间内查询。

性能

tl;dr 线程调度器开销大致如下

  • 每个任务 200 微秒的开销

  • 10 微秒的启动时间(如果您每次都希望创建一个新的 ThreadPoolExecutor)

  • 随任务数量的常数扩展

  • 随每个任务依赖数量的线性扩展

调度器会引入开销。这种开销有效地限制了我们并行处理的粒度。下面我们将测量异步调度器在使用不同的 apply 函数(线程、同步、多进程)以及在不同负载类型(易并行、密集通信)下的开销。

我们可以做的最快/最简单的测试是使用 IPython 的 timeit magic 函数

In [1]: import dask.array as da

In [2]: x = da.ones(1000, chunks=(2,)).sum()

In [3]: len(x.dask)
Out[3]: 1168

In [4]: %timeit x.compute()
80.9 ms +- 387 us per loop (mean +- std. dev. of 7 runs, 10 loops each)

因此每个任务大约需要 ~90 微秒。其中大约 100 毫秒来自开销

In [5]: x = da.ones(1000, chunks=(1000,)).sum()

In [6]: %timeit x.compute()
1.06 ms +- 3.65 us per loop (mean +- std. dev. of 7 runs, 1,000 loops each)

每次启动 ThreadPoolExecutor 都会有一些开销。这可以通过使用全局或上下文线程池来缓解。

>>> from concurrent.futures import ThreadPoolExecutor
>>> pool = ThreadPoolExecutor()
>>> dask.config.set(pool=pool)  # set global ThreadPoolExecutor

or

>>> with dask.config.set(pool=pool)  # use ThreadPoolExecutor throughout with block
...     ...

现在我们测量任务数量的扩展和图密度的扩展

Adding nodes

随任务数量的线性扩展

随着图中任务数量的增加,我们看到调度开销呈线性增长。每个任务的渐近成本取决于调度器。依赖于某种异步线程池的调度器成本为几毫秒,而单线程调度器成本为几微秒。

Graph depicting how well Dask scales with the number of nodes in the task graph. Graph shows the duration in seconds on the y-axis versus number of edges per task on the x-axis. The time to schedule the entire graph is constant initially, followed by a linear increase after roughly 500 tasks for multiprocessing and threaded schedulers and 10 tasks for async and core schedulers. The inverse is true for the cost per task, with a linear cost decrease, followed by more or less constant cost.

整个图的调度开销(左)与每个任务的调度开销(右)

Adding edges

随边数量的线性扩展

随着每个任务的边数量增加,调度开销也呈线性增长。

注意:无论是朴素的核心调度器还是多进程调度器都不擅长处理具有非平凡跨任务通信的工作流;它们已从图中移除。

Graph depicting how well Dask scales with the number of edges in the task graph. Graph shows the duration in seconds on the y-axis versus number of edges per task on the x-axis. As the number of edges increases from 0 to 100, the time to schedule the entire graph using the threaded scheduler goes from 2 to 8 seconds whereas using the async scheduler goes from 0 to 3 seconds. The cost per edge decreases up until about 10 edges, after which the cost plateaus for both the threaded and async schedulers, with the async scheduler being consistently faster.

整个图的调度开销(左)与每条边的调度开销(右)

下载调度脚本

已知限制

共享内存调度器有一些显著的限制

  1. 它在单台机器上工作

  2. 线程调度器受 Python 代码的 GIL 限制,因此如果您的操作是纯 Python 函数,您不应该期望多核加速

  3. 多进程调度器必须在 Worker 之间序列化函数,这可能会失败

  4. 多进程调度器必须在 Worker 和中心进程之间序列化数据,这可能会很昂贵

  5. 多进程调度器无法直接在 Worker 进程之间传输数据;所有数据都通过主进程路由。