Dask Delayed
目录
Dask Delayed¶
有时问题不适合 dask.array
或 dask.dataframe
这样的集合类型。在这种情况下,用户可以使用更简单的 dask.delayed
接口来并行化自定义算法。这允许您通过对普通 Python 代码进行轻微的注释,直接创建任务图。
>>> x = dask.delayed(inc)(1)
>>> y = dask.delayed(inc)(2)
>>> z = dask.delayed(add)(x, y)
>>> z.compute()
5
>>> z.visualize()
示例¶
访问 https://examples.dask.org.cn/delayed.html 查看并运行使用 Dask Delayed 的示例。
有时我们面临可以并行化的问题,但它们不适合 Dask Array 或 Dask DataFrame 这样的高级抽象。考虑以下示例:
def inc(x):
return x + 1
def double(x):
return x * 2
def add(x, y):
return x + y
data = [1, 2, 3, 4, 5]
output = []
for x in data:
a = inc(x)
b = double(x)
c = add(a, b)
output.append(c)
total = sum(output)
这个问题显然存在并行性(许多 inc
, double
和 add
函数可以独立评估),但尚不清楚如何将其转换为数组或 DataFrame 计算。按照目前的写法,这段代码在单个线程中顺序运行。然而,我们看到其中很多部分都可以并行执行。
Dask 的 delayed
函数装饰你的函数,使它们
|
包装一个函数或对象,以生成一个 |
我们通过将函数包装在 delayed
中来稍微修改代码。这会延迟函数的执行,并生成一个 Dask 任务图
import dask
output = []
for x in data:
a = dask.delayed(inc)(x)
b = dask.delayed(double)(x)
c = dask.delayed(add)(a, b)
output.append(c)
total = dask.delayed(sum)(output)
我们使用 dask.delayed
函数来包装我们想要转换为任务的函数调用。此时,inc
、double
、add
或 sum
的调用都尚未发生。相反,对象 total
是一个 Delayed
结果,它包含整个计算的任务图。查看该图,我们看到了明显的并行执行机会。Dask 调度器 将利用这种并行性,通常会提高性能(尽管在这个例子中不会,因为这些函数本身已经非常小且快速)。
total.visualize() # see image to the right
现在我们可以计算这个延迟结果,以并行执行任务图
>>> total.compute()
45
装饰器¶
将 delayed 函数用作装饰器也很常见。以下是将我们的原始问题重写为并行代码的示例:
import dask
@dask.delayed
def inc(x):
return x + 1
@dask.delayed
def double(x):
return x * 2
@dask.delayed
def add(x, y):
return x + y
data = [1, 2, 3, 4, 5]
output = []
for x in data:
a = inc(x)
b = double(x)
c = add(a, b)
output.append(c)
total = dask.delayed(sum)(output)
最佳实践¶
常见问题和建议列表,请参阅 Delayed 最佳实践。
间接依赖¶
有时您可能希望向任务添加一个依赖项,但该任务并不直接将该依赖项的结果作为输入。例如,当一个任务依赖于另一个任务的副作用时。在这种情况下,您可以使用 dask.graph_manipulation.bind
。
import dask
from dask.graph_manipulation import bind
DATA = []
@dask.delayed
def inc(x):
return x + 1
@dask.delayed
def add_data(x):
DATA.append(x)
@dask.delayed
def sum_data(x):
return sum(DATA) + x
a = inc(1)
b = add_data(a)
c = inc(3)
d = add_data(c)
e = inc(5)
f = bind(sum_data, [b, d])(e)
f.compute()
sum_data
只有在预期项目都已添加到 DATA 后才会对其进行操作。bind
也可以与通过函数参数传递的直接依赖项一起使用。
执行¶
默认情况下,Dask Delayed 使用线程调度器以避免数据传输成本。如果您的代码未能很好地释放 GIL(计算主要由纯 Python 代码组成,或计算封装了外部代码且持有 GIL),您应该考虑在本地机器或集群上使用多进程调度器或dask.distributed 调度器。