Dask Delayed
目录
Dask Delayed¶
有时问题不适合像 dask.array
或 dask.dataframe
这样的集合。在这些情况下,用户可以使用更简单的 dask.delayed
接口来并行化自定义算法。这允许您通过少量注解普通 Python 代码直接创建计算图。
>>> x = dask.delayed(inc)(1)
>>> y = dask.delayed(inc)(2)
>>> z = dask.delayed(add)(x, y)
>>> z.compute()
5
>>> z.visualize()
示例¶
访问 https://examples.dask.org.cn/delayed.html 查看并运行使用 Dask Delayed 的示例。
有时我们会遇到可以并行化但无法很好地契合 Dask Array 或 Dask DataFrame 等高级抽象的问题。考虑以下示例:
def inc(x):
return x + 1
def double(x):
return x * 2
def add(x, y):
return x + y
data = [1, 2, 3, 4, 5]
output = []
for x in data:
a = inc(x)
b = double(x)
c = add(a, b)
output.append(c)
total = sum(output)
这个问题显然存在并行性(许多 inc
、double
和 add
函数可以独立求值),但不清楚如何将其转换为 array 或 DataFrame 计算。按照目前的方式编写,此代码在单个线程中顺序运行。然而,我们看到其中很多部分可以并行执行。
Dask 的 delayed
函数会装饰您的函数,使其能够延迟执行。它不会立即执行您的函数,而是会推迟执行,将函数及其参数放入任务图中。
|
包装一个函数或对象以生成一个 |
我们通过将函数包装在 delayed
中来稍微修改代码。这会延迟函数的执行并生成一个 Dask 计算图。
import dask
output = []
for x in data:
a = dask.delayed(inc)(x)
b = dask.delayed(double)(x)
c = dask.delayed(add)(a, b)
output.append(c)
total = dask.delayed(sum)(output)
我们使用了 dask.delayed
函数来包装我们想要转换为任务的函数调用。目前为止,inc
、double
、add
或 sum
的任何调用都还没有发生。相反,对象 total
是一个 Delayed
结果,它包含了整个计算的任务图。查看该图,我们可以看到明显的并行执行机会。Dask 调度器 将利用这种并行性,通常会提高性能(尽管在此示例中不会,因为这些函数已经非常小且快速)。
total.visualize() # see image to the right
现在我们可以计算这个延迟的结果,以并行执行计算图。
>>> total.compute()
45
装饰器¶
delayed 函数也常被用作装饰器。以下是将我们原始问题重新表述为并行代码的方式:
import dask
@dask.delayed
def inc(x):
return x + 1
@dask.delayed
def double(x):
return x * 2
@dask.delayed
def add(x, y):
return x + y
data = [1, 2, 3, 4, 5]
output = []
for x in data:
a = inc(x)
b = double(x)
c = add(a, b)
output.append(c)
total = dask.delayed(sum)(output)
最佳实践¶
有关常见问题和建议的列表,请参阅 Delayed 最佳实践。
间接依赖¶
有时您可能希望为某个任务添加依赖,但该任务并不将该依赖的结果作为输入。例如,当一个任务依赖于另一个任务的副作用时。在这些情况下,您可以使用 dask.graph_manipulation.bind
。
import dask
from dask.graph_manipulation import bind
DATA = []
@dask.delayed
def inc(x):
return x + 1
@dask.delayed
def add_data(x):
DATA.append(x)
@dask.delayed
def sum_data(x):
return sum(DATA) + x
a = inc(1)
b = add_data(a)
c = inc(3)
d = add_data(c)
e = inc(5)
f = bind(sum_data, [b, d])(e)
f.compute()
sum_data
只会在两个预期项都附加到 DATA 后才会对其进行操作。bind
也可以与通过函数参数传递的直接依赖项一起使用。
执行¶
默认情况下,Dask Delayed 使用线程调度器以避免数据传输开销。如果您的代码无法很好地释放 GIL(计算主要由纯 Python 代码组成,或计算包装了外部代码并持有 GIL),您应考虑在本地机器或集群上使用多进程调度器或dask.distributed 调度器。