规范

Dask 是一种规范,用于编码图——特别是具有数据依赖关系的任务的 有向无环图——使用普通的 Python 数据结构,即字典、元组、函数和任意 Python 值。

定义

一个 Dask 图 是一个将 映射到 计算 的字典

{'x': (x := DataNode(None, 1)),
'y': (y := DataNode(None, 2)),
'z': (z := Task("z", add, x.ref(), y.ref())),
'w': (w := Task("w", sum, List(x.ref(), y.ref(), z.ref()))),
'v': List(Task(None, sum, List(w.ref(), z.ref())), 2)}
{'x': 1,
'y': 2,
'z': (add, 'y', 'x'),
'w': (sum, ['x', 'y', 'z']),
'v': [(sum, ['w', 'z']), 2]}

一个 是 str、int、float 或其元组

'x'
('x', 2, 3)

一个 任务 是 Dask 可以执行的计算,使用 dask.Task 类表示。任务是原子工作单元,旨在由单个工作进程运行。示例

def add(x, y):
    return x + y

t = Task("t", add, 1, 2)
assert t() == 3

t2 = Task("t2", add, t.ref(), 2)
assert t2({"t": 3}) == 5

注意

任务的旧表示形式是一个元组,其第一个元素是一个可调用函数。其余元素是该函数的参数。旧表示形式已被弃用,并将在 Dask 的未来版本中移除。请改用 dask.Task 类。

一个 计算 可以是以下之一

  1. 一个 Alias 指向 Dask 图中的另一个键,例如 Alias('new', 'x')

  2. 一个字面值,作为 DataNode,例如 DataNode(None, 1)

  3. 一个 Task,例如 Task("t", add, 1, 2)

  4. 一个 计算List,例如 List(1, TaskRef('x'), Task(None, inc, TaskRef("x"))]

因此,以下所有内容都是有效的 计算

DataNode(None, np.array([...]))
Task("t", add, 1, 2)
Task("t", add, TaskRef('x'), 2)
Task("t", add, Task(None, inc, TaskRef('x')), 2)
Task("t", sum, [1, 2])
Task("t", sum, [TaskRef('x'), Task(None, inc, TaskRef('x'))])
Task("t", np.dot, np.array([...]), np.array([...]))
Task("t", sum, List(TaskRef('x'), TaskRef('y')), 'z')
np.array([...])
(add, 1, 2)
(add, 'x', 2)
(add, (inc, 'x'), 2)
(sum, [1, 2])
(sum, ['x', (inc, 'x')])
(np.dot, np.array([...]), np.array([...]))
[(sum, ['x', 'y']), 'z']

函数应期望什么

Task("t", add, TaskRef('x'), 2) 这样的情况下,像 add 这样的函数接收具体的值而不是键。Dask 调度程序在调用 add 函数 *之前*,会用计算出的值(例如 1)替换任务引用(例如 x)。这些引用可以通过使用 TaskRef 提供字面键引用,或者如果任务的引用可用,则通过在该任务本身上调用 ref() 来提供。

入口点 - get 函数

get 函数作为所有 调度程序 的计算入口点。该函数获取与给定键关联的值。该键可以指代存储的数据,例如 'x' 的情况,或者指代一个任务,例如 'z' 的情况。在后一种情况下,get 应该执行所有必要的计算来检索计算出的值。

>>> from dask.threaded import get

>>> from operator import add

>>> dsk = {'x': (x := DataNode(None, 1)),
...       'y': (y := DataNode(None, 2)),
...       'z': (z := Task("z", add, x.ref(), y.ref())),
...       'w': (w := Task("w", sum, List(x.ref(), y.ref(), z.ref())))}
>>> get(dsk, 'x')
1

>>> get(dsk, 'z')
3

>>> get(dsk, 'w')
6

此外,如果给定一个 list,get 应该同时获取多个键的值

>>> get(dsk, ['x', 'y', 'z'])
[1, 2, 3]

因为我们接受键的列表作为键,所以我们支持嵌套列表

>>> get(dsk, [['x', 'y'], ['z', 'w']])
[[1, 2], [3, 6]]

在内部,get 可以任意复杂,可以调用分布式计算,使用缓存等等。

为什么不是元组?

元组客观上比 Task 类更紧凑,那为什么我们要引入这种新的表示形式呢?

作为元组,任务不是自描述的,并且严重依赖上下文。(func, "x", "y") 这样的元组的含义取决于它嵌入的图。字面值 xy 可以是要传递给函数的实际字面值,也可以是引用其他任务的引用。因此,对此任务的 _解释_ 必须递归遍历元组,并将遇到的每个元素与图中的已知键进行比较。特别是对于大型图或深度嵌套的元组参数,这可能会成为性能瓶颈。对于允许用户自定义键名的 API,这还可能导致误报,即预期的字面值被预先计算的任务结果替换。

目前,两种表示形式都受支持。当 Dask 遇到旧风格任务时,它们将自动转换为新风格任务。鼓励新项目和算法使用新风格任务。