任务图

在内部,Dask 将算法编码为任务图,通常表示为字典。这种图格式可以独立于 dask 集合使用。然而,直接使用 dask 图的情况很少见,除非您打算使用 Dask 开发新模块。即使如此,dask.delayed 通常是更好的选择。如果您是核心开发者,那么您应该从这里开始。

动机

通常,人们编写程序,然后由编译器/解释器解释执行(例如,pythonjavacclang)。有时,人们不满意这些编译器/解释器选择如何解释和执行他们的程序。在这种情况下,人们常常将代码的分析、优化和执行直接纳入代码本身。

通常,对并行执行的需求导致责任从编译器转移到人类开发者。在这种情况下,我们通常将程序结构显式地表示为程序内部的数据。

用户空间并行执行的一种常见方法是任务调度。在任务调度中,我们将程序分解成许多中等大小的任务或计算单元,通常是对非平凡数据量的函数调用。我们将这些任务表示为图中的节点,如果一个任务依赖于另一个任务产生的数据,则在节点之间添加边。我们调用任务调度器以尊重这些数据依赖关系并尽可能利用并行性(以便可以同时运行多个独立任务)的方式执行此图。


_images/map-reduce-task-scheduling.svg

任务调度有多种方法,包括完全并行(embarrassingly parallel)、MapReduce 和全任务调度。


存在许多解决方案。这是并行执行框架中的一种常见方法。任务调度逻辑通常隐藏在其他大型框架(如 Luigi、Storm、Spark、IPython Parallel 等)内部,因此经常被重新发明。

示例

考虑以下简单程序

def inc(i):
    return i + 1

def add(a, b):
    return a + b

x = 1
y = inc(x)
z = add(y, 10)

我们将其编码为一个字典,方式如下

d = {'x': DataNode(None, 1),
     'y': Task('y', inc, TaskRef('x')),
     'z': Task('z', add, TaskRef('y'), 10)}

由以下 Dask 图表示

A simple dask dictionary

虽然不如我们原始代码令人愉悦,但这种表示可以由其他 Python 代码(而不仅仅是 CPython 解释器)进行分析和执行。我们不建议用户以这种方式编写代码,但这是一种适合自动化系统的目标表示。此外,在非玩具示例中,执行时间可能比 incadd 要长得多,这使得额外的复杂性是合理的。

调度器

Dask 库目前包含一些调度器来执行这些图。每个调度器的工作方式不同,提供不同的性能保证,并在不同的上下文环境中运行。这些实现并非特殊,其他人可以轻松编写更适合其他应用或架构的不同调度器。生成 dask 图的系统(如 Dask Array、Dask Bag 等)可以为应用程序和硬件利用适当的调度器。

任务预期

当任务提交给 Dask 执行时,对该任务会做一些假设。

不要原地修改数据

通常不建议使用会原地改变未来状态的任务。原地修改存储在 Dask 中的数据可能会产生意想不到的后果。例如,考虑一个涉及 Numpy 数组的工作流

from dask.distributed import Client
import numpy as np

client = Client()
x = client.submit(np.arange, 10)  # [0, 1, 2, 3, ...]

def f(arr):
    arr[arr > 5] = 0  # modifies input directly without making a copy
    arr += 1          # modifies input directly without making a copy
    return arr

y = client.submit(f, x)

在上面的例子中,Dask 会原地更新 Numpy 数组 x 的值。虽然效率高,但这种行为可能会产生意想不到的后果,特别是如果其他任务需要使用 x,或者如果 Dask 由于工作器故障需要多次重新运行此计算时。

避免占用 GIL

一些包装外部 C/C++ 代码的 Python 函数可能会占用 GIL,从而阻止其他 Python 代码在后台运行。这是个问题,因为 Dask 工作器在运行您的函数时,它们还需要在后台相互通信。

如果您包装外部代码,请尝试释放 GIL。如果您使用任何常见的代码包装解决方案(如 Cython、Numba、ctypes 等),这通常很容易实现。