Dask Delayed

有时问题不适合 dask.arraydask.dataframe 这样的集合类型。在这种情况下,用户可以使用更简单的 dask.delayed 接口来并行化自定义算法。这允许您通过对普通 Python 代码进行轻微的注释,直接创建任务图。

>>> x = dask.delayed(inc)(1)
>>> y = dask.delayed(inc)(2)
>>> z = dask.delayed(add)(x, y)
>>> z.compute()
5
>>> z.visualize()
A Dask Delayed task graph with two "inc" functions combined using an "add" function resulting in an output node.

示例

访问 https://examples.dask.org.cn/delayed.html 查看并运行使用 Dask Delayed 的示例。

有时我们面临可以并行化的问题,但它们不适合 Dask Array 或 Dask DataFrame 这样的高级抽象。考虑以下示例:

def inc(x):
    return x + 1

def double(x):
    return x * 2

def add(x, y):
    return x + y

data = [1, 2, 3, 4, 5]

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = sum(output)

这个问题显然存在并行性(许多 inc, doubleadd 函数可以独立评估),但尚不清楚如何将其转换为数组或 DataFrame 计算。按照目前的写法,这段代码在单个线程中顺序运行。然而,我们看到其中很多部分都可以并行执行。

Dask 的 delayed 函数装饰你的函数,使它们延迟执行。它不会立即执行你的函数,而是推迟执行,将函数及其参数放入任务图中。

delayed([obj, name, pure, nout, traverse])

包装一个函数或对象,以生成一个 Delayed 对象。

我们通过将函数包装在 delayed 中来稍微修改代码。这会延迟函数的执行,并生成一个 Dask 任务图

import dask

output = []
for x in data:
    a = dask.delayed(inc)(x)
    b = dask.delayed(double)(x)
    c = dask.delayed(add)(a, b)
    output.append(c)

total = dask.delayed(sum)(output)

我们使用 dask.delayed 函数来包装我们想要转换为任务的函数调用。此时,incdoubleaddsum 的调用都尚未发生。相反,对象 total 是一个 Delayed 结果,它包含整个计算的任务图。查看该图,我们看到了明显的并行执行机会。Dask 调度器 将利用这种并行性,通常会提高性能(尽管在这个例子中不会,因为这些函数本身已经非常小且快速)。

total.visualize()  # see image to the right
A task graph with many nodes for "inc" and "double" that combine with "add" nodes. The output of the "add" nodes finally aggregate with a "sum" node.

现在我们可以计算这个延迟结果,以并行执行任务图

>>> total.compute()
45

装饰器

将 delayed 函数用作装饰器也很常见。以下是将我们的原始问题重写为并行代码的示例:

import dask

@dask.delayed
def inc(x):
    return x + 1

@dask.delayed
def double(x):
    return x * 2

@dask.delayed
def add(x, y):
    return x + y

data = [1, 2, 3, 4, 5]

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = dask.delayed(sum)(output)

实时

有时您希望在执行过程中创建和销毁工作,从其他任务启动任务等。对于这种情况,请参阅 Futures 接口。

最佳实践

常见问题和建议列表,请参阅 Delayed 最佳实践

间接依赖

有时您可能希望向任务添加一个依赖项,但该任务并不直接将该依赖项的结果作为输入。例如,当一个任务依赖于另一个任务的副作用时。在这种情况下,您可以使用 dask.graph_manipulation.bind

import dask
from dask.graph_manipulation import bind

DATA = []

@dask.delayed
def inc(x):
    return x + 1

@dask.delayed
def add_data(x):
    DATA.append(x)

@dask.delayed
def sum_data(x):
    return sum(DATA) + x

a = inc(1)
b = add_data(a)
c = inc(3)
d = add_data(c)
e = inc(5)
f = bind(sum_data, [b, d])(e)
f.compute()

sum_data 只有在预期项目都已添加到 DATA 后才会对其进行操作。bind 也可以与通过函数参数传递的直接依赖项一起使用。

执行

默认情况下,Dask Delayed 使用线程调度器以避免数据传输成本。如果您的代码未能很好地释放 GIL(计算主要由纯 Python 代码组成,或计算封装了外部代码且持有 GIL),您应该考虑在本地机器或集群上使用多进程调度器或dask.distributed 调度器。