API

API

dask.delayed 接口包含一个函数,即 delayed

  • delayed 包装函数

    包装函数。可以作为装饰器使用,或直接用于函数调用(例如 delayed(foo)(a, b, c))。用 delayed 包装的函数的输出是 Delayed 类型的代理对象,其中包含所有用于获取此结果的操作图。

  • delayed 包装对象

    包装对象。用于直接创建 Delayed 代理。

Delayed 对象可以视为代表 dask 任务图中的一个键。一个 Delayed 支持 大多数 Python 操作,每个操作都会创建另一个代表结果的 Delayed 对象

  • 大多数运算符(*, - 等)

  • 元素访问和切片(a[0]

  • 属性访问(a.size

  • 方法调用(a.index(0)

不支持的操作包括

  • 变动运算符(a += 1

  • 变动魔术方法,例如 __setitem__/__setattr__a[0] = 1, a.foo = 1

  • 迭代。(for i in a: ...

  • 用作谓词(if a: ...

特别需要注意后两点,它们意味着 Delayed 对象不能用于控制流,即 Delayed 不能出现在循环或 if 语句中。换句话说,你不能迭代一个 Delayed 对象,也不能将其用作 if 语句条件的一部分,但 Delayed 对象可以用在循环或 if 语句的主体中(即上面的示例是没问题的,但如果 data 是一个 Delayed 对象就不行了)。即使有此限制,许多工作流仍可轻松并行化。

delayed([obj, name, pure, nout, traverse])

包装一个函数或对象以生成一个 Delayed

Delayed(key, dsk[, length, layer])

表示将由 dask 计算的值。

dask.delayed.delayed(obj='__no__default__', name=None, pure=None, nout=None, traverse=True)[source]

包装一个函数或对象以生成一个 Delayed

Delayed 对象充当其包装对象的代理,但所有操作都是通过内部构建 dask 图来延迟执行的。

参数
objobject

要包装的函数或对象

nameDask key, 可选

包装对象在底层图中的键。默认为哈希内容。请注意,这仅影响通过此 delayed 调用包装的对象名称,而 影响延迟函数调用的输出 - 对于后者,请使用下文描述的 dask_key_name=

注意

因为此 name 用作任务图中的键,您应确保它唯一标识 obj。如果您想提供一个仍然唯一的描述性名称,请将描述性名称与 dask.base.tokenize()(用于 array_like)结合使用。更多信息请参见 任务图

purebool, 可选

指示调用结果 Delayed 对象是否是纯操作。如果为 True,则对调用的参数进行哈希以生成确定性键。如果未提供,默认行为是检查全局 delayed_pure 设置,如果未设置则回退到 False

noutint, 可选

调用结果 Delayed 对象返回的输出数量。如果提供,则调用的 Delayed 输出可以迭代为 nout 个对象,从而允许解包结果。默认情况下,对 Delayed 对象的迭代将出错。请注意,nout=1 要求 obj 返回长度为 1 的元组,因此 nout=0 时,obj 应返回一个空元组。

traversebool, 可选

默认情况下,dask 遍历内置的 Python 集合以查找传递给 delayed 的 dask 对象。对于大型集合,这可能会很耗时。如果 obj 不包含任何 dask 对象,请设置 traverse=False 以避免进行此遍历。

示例

应用于函数以延迟执行

>>> from dask import delayed
>>> def inc(x):
...     return x + 1
>>> inc(10)
11
>>> x = delayed(inc, pure=True)(10)
>>> type(x) == Delayed
True
>>> x.compute()
11

可用作装饰器

>>> @delayed(pure=True)
... def add(a, b):
...     return a + b
>>> add(1, 2).compute()
3

delayed 也接受一个可选关键字参数 pure。如果为 False,则后续调用将始终生成不同的 Delayed。这对于非纯函数(例如 timerandom)非常有用。

>>> from random import random
>>> out1 = delayed(random, pure=False)()
>>> out2 = delayed(random, pure=False)()
>>> out1.key == out2.key
False

如果您知道一个函数是纯函数(输出仅依赖于输入,没有全局状态),则可以设置 pure=True。这将尝试对输出应用一致的名称,但如果失败,则会回退到与 pure=False 相同的行为。

>>> @delayed(pure=True)
... def add(a, b):
...     return a + b
>>> out1 = add(1, 2)
>>> out2 = add(1, 2)
>>> out1.key == out2.key
True

除了将 pure 设置为可调用对象的一个属性之外,您还可以使用 delayed_pure 设置来在上下文中进行设置。请注意,这会影响 调用 而不是可调用对象的 创建

>>> @delayed
... def mul(a, b):
...     return a * b
>>> import dask
>>> with dask.config.set(delayed_pure=True):
...     print(mul(1, 2).key == mul(1, 2).key)
True
>>> with dask.config.set(delayed_pure=False):
...     print(mul(1, 2).key == mul(1, 2).key)
False

默认情况下,调用延迟对象的결果 키名由参数哈希确定。要显式设置名称,可以在调用函数时使用 dask_key_name 关键字参数

>>> add(1, 2)   
Delayed('add-3dce7c56edd1ac2614add714086e950f')
>>> add(1, 2, dask_key_name='three')
Delayed('three')

请注意,具有相同键名称的对象被假定为具有相同的结果。如果您显式设置名称,应确保不同结果的键名称不同。

>>> add(1, 2, dask_key_name='three')
Delayed('three')
>>> add(2, 1, dask_key_name='three')
Delayed('three')
>>> add(2, 2, dask_key_name='four')
Delayed('four')

delayed 也可以应用于对象,使其上的操作变为延迟执行

>>> a = delayed([1, 2, 3])
>>> isinstance(a, Delayed)
True
>>> a.compute()
[1, 2, 3]

如果 pure=True,延迟对象的键名称默认会被哈希,如果 pure=False(默认),则随机生成。要显式设置名称,可以使用 name 关键字参数。为确保键是唯一的,应包含 tokenized 的值,或以其他方式确保其唯一性

>>> from dask.base import tokenize
>>> data = [1, 2, 3]
>>> a = delayed(data, name='mylist-' + tokenize(data))
>>> a  
Delayed('mylist-55af65871cb378a4fa6de1660c3e8fb7')

延迟结果充当底层对象的代理。许多运算符都受到支持

>>> (a + [1, 2]).compute()
[1, 2, 3, 1, 2]
>>> a[1].compute()
2

方法和属性访问也有效

>>> a.count(2).compute()
1

请注意,如果方法不存在,直到运行时才会抛出错误

>>> res = a.not_a_real_method() 
>>> res.compute()  
AttributeError("'list' object has no attribute 'not_a_real_method'")

“魔术”方法(例如运算符和属性访问)被假定为纯方法,这意味着后续调用必须返回相同的结果。这种行为无法通过 delayed 调用来覆盖,但可以使用如下所述的其他方式进行修改。

要调用非纯属性或运算符,需要在 pure=False 的延迟函数中使用它

>>> class Incrementer:
...     def __init__(self):
...         self._n = 0
...     @property
...     def n(self):
...         self._n += 1
...         return self._n
...
>>> x = delayed(Incrementer())
>>> x.n.key == x.n.key
True
>>> get_n = delayed(lambda x: x.n, pure=False)
>>> get_n(x).key == get_n(x).key
False

相比之下,方法默认被假定为非纯方法,这意味着后续调用可能会返回不同的结果。要假定纯度,请设置 pure=True。这允许共享任何中间值。

>>> a.count(2, pure=True).key == a.count(2, pure=True).key
True

与函数调用一样,方法调用也遵循全局 delayed_pure 设置并支持 dask_key_name 关键字参数

>>> a.count(2, dask_key_name="count_2")
Delayed('count_2')
>>> import dask
>>> with dask.config.set(delayed_pure=True):
...     print(a.count(2).key == a.count(2).key)
True
class dask.delayed.Delayed(key, dsk, length=None, layer=None)[source]

表示将由 dask 计算的值。

等同于 dask 图中单个键的输出。