规范

Dask 是一种规范,用于使用普通的 Python 数据结构(即字典、元组、函数和任意 Python 值)编码图——特别是具有数据依赖关系的任务有向无环图。

定义

一个 Dask 图 是一个将 映射到 计算 的字典

{'x': (x := DataNode(None, 1)),
'y': (y := DataNode(None, 2)),
'z': (z := Task("z", add, x.ref(), y.ref())),
'w': (w := Task("w", sum, List(x.ref(), y.ref(), z.ref()))),
'v': List(Task(None, sum, List(w.ref(), z.ref())), 2)}
{'x': 1,
'y': 2,
'z': (add, 'y', 'x'),
'w': (sum, ['x', 'y', 'z']),
'v': [(sum, ['w', 'z']), 2]}

一个 是一个 str、int、float 或它们的元组

'x'
('x', 2, 3)

一个 任务 是 Dask 可以执行的计算,并使用 dask.Task 类来表示。任务表示应由单个工作进程运行的原子工作单元。示例

def add(x, y):
    return x + y

t = Task("t", add, 1, 2)
assert t() == 3

t2 = Task("t2", add, t.ref(), 2)
assert t2({"t": 3}) == 5

注意

任务的遗留表示是一个元组,其第一个元素是可调用函数。其余元素是该函数的参数。遗留表示已被弃用,并将在 Dask 的未来版本中移除。请改用 dask.Task 类。

一个 计算 可以是以下之一

  1. 一个指向 Dask 图中另一个键的 Alias,例如 Alias('new', 'x')

  2. 一个字面值作为 DataNode,例如 DataNode(None, 1)

  3. 一个 Task,例如 Task("t", add, 1, 2)

  4. 一个包含 计算List,例如 List(1, TaskRef('x'), Task(None, inc, TaskRef("x"))]

因此,以下所有内容都是有效的 计算

DataNode(None, np.array([...]))
Task("t", add, 1, 2)
Task("t", add, TaskRef('x'), 2)
Task("t", add, Task(None, inc, TaskRef('x')), 2)
Task("t", sum, [1, 2])
Task("t", sum, [TaskRef('x'), Task(None, inc, TaskRef('x'))])
Task("t", np.dot, np.array([...]), np.array([...]))
Task("t", sum, List(TaskRef('x'), TaskRef('y')), 'z')
np.array([...])
(add, 1, 2)
(add, 'x', 2)
(add, (inc, 'x'), 2)
(sum, [1, 2])
(sum, ['x', (inc, 'x')])
(np.dot, np.array([...]), np.array([...]))
[(sum, ['x', 'y']), 'z']

函数应该期望什么

在诸如 Task("t", add, TaskRef('x'), 2) 的情况下,像 add 这样的函数接收的是具体的值而不是键。Dask 调度器在调用 add 函数 之前,会用计算出的值(例如 1)替换任务引用(例如 x)。这些引用可以作为字面键引用使用 TaskRef 提供,或者如果任务的引用可用,则可以通过在任务本身上调用 ref() 来提供。

入口点 - get 函数

get 函数是所有调度器的计算入口点。此函数获取与给定键关联的值。该键可能引用存储的数据,例如 'x' 的情况;或引用任务,例如 'z' 的情况。在后一种情况下,get 应该执行所有必要的计算以检索计算出的值。

>>> from dask.threaded import get

>>> from operator import add

>>> dsk = {'x': (x := DataNode(None, 1)),
...       'y': (y := DataNode(None, 2)),
...       'z': (z := Task("z", add, x.ref(), y.ref())),
...       'w': (w := Task("w", sum, List(x.ref(), y.ref(), z.ref())))}
>>> get(dsk, 'x')
1

>>> get(dsk, 'z')
3

>>> get(dsk, 'w')
6

此外,如果给定一个 list,get 应该同时获取多个键的值

>>> get(dsk, ['x', 'y', 'z'])
[1, 2, 3]

由于我们接受键列表作为键,因此我们支持嵌套列表

>>> get(dsk, [['x', 'y'], ['z', 'w']])
[[1, 2], [3, 6]]

在内部,get 可以任意复杂,可以调用分布式计算、使用缓存等等。

为什么不是元组?

元组客观上比 Task 类更紧凑,那么我们为什么选择引入这种新的表示方式呢?

作为元组,任务不是自描述的,并且高度依赖于上下文。诸如 (func, "x", "y") 之类的元组的含义取决于其嵌入的图。字面值 xy 可以是要传递给函数的实际字面值,也可以是对其他任务的引用。因此,对此任务的 _解释_ 必须递归遍历元组,并将遇到的每个元素与图中的已知键进行比较。特别是对于大型图或深度嵌套的元组参数,这可能成为性能瓶颈。对于允许用户自定义键名的 API,这还可能导致误报,即将预期的字面值替换为预先计算的任务结果。

目前,两种表示方式都受到支持。遗留风格的任务在 Dask 遇到时会自动转换为新风格的任务。鼓励新的项目和算法使用新风格的任务。