任务图

在内部,Dask 将算法编码为任务图,通常表示为字典。这种图格式可以独立于 dask 集合使用。然而,直接使用 dask 图很少见,除非您打算使用 Dask 开发新模块。即使如此,dask.delayed 通常是更好的选择。如果您是核心开发者,那么应该从这里开始。

动机

通常,人类编写程序,然后编译器/解释器解释它们(例如,pythonjavacclang)。有时人类会不同意这些编译器/解释器选择解释和执行其程序的方式。在这些情况下,人类通常将代码的分析、优化和执行引入到代码本身中。

通常,并行执行的需求导致责任从编译器转移到人类开发者。在这些情况下,我们通常将程序结构明确表示为程序内部的数据。

用户空间并行执行的一种常见方法是任务调度。在任务调度中,我们将程序分解成许多中等大小的任务或计算单元,通常是对大量数据进行的函数调用。我们将这些任务表示为图中的节点,如果一个任务依赖于另一个任务产生的数据,则在节点之间添加边。我们调用任务调度器来执行此图,以便遵循这些数据依赖性,并在可能的情况下利用并行性,从而可以同时运行多个独立任务。


_images/map-reduce-task-scheduling.svg

任务调度有多种方法,包括简单并行、MapReduce 和完整任务调度。


存在许多解决方案。这是并行执行框架中的一种常见方法。任务调度逻辑通常隐藏在其他大型框架中(例如 Luigi、Storm、Spark、IPython Parallel 等),因此经常被重新发明。

示例

考虑以下简单的程序

def inc(i):
    return i + 1

def add(a, b):
    return a + b

x = 1
y = inc(x)
z = add(y, 10)

我们将其编码为如下字典

d = {'x': DataNode(None, 1),
     'y': Task('y', inc, TaskRef('x')),
     'z': Task('z', add, TaskRef('y'), 10)}

用以下 Dask 图表示

A simple dask dictionary

尽管不如原始代码直观,但这种表示方式可以由其他 Python 代码进行分析和执行,而不仅仅是 CPython 解释器。我们不建议用户以这种方式编写代码,而应将其作为自动化系统的合适目标。此外,在非玩具示例中,执行时间可能比 incadd 大得多,因此需要额外的复杂性。

调度器

Dask 库目前包含一些调度器来执行这些图。每个调度器的工作方式不同,提供不同的性能保证并在不同的上下文中运行。这些实现并非特殊,其他人可以轻松编写更适合其他应用程序或体系结构的调度器。发出 dask 图的系统(如 Dask Array、Dask Bag 等)可以针对应用程序和硬件利用适当的调度器。

任务预期

当一个任务提交给 Dask 执行时,会对此任务做出一些假设。

不要就地修改数据

通常,不建议使用具有就地更改未来状态的副作用的任务。就地修改存储在 Dask 中的数据可能会产生意想不到的后果。例如,考虑一个涉及 Numpy 数组的工作流

from dask.distributed import Client
import numpy as np

client = Client()
x = client.submit(np.arange, 10)  # [0, 1, 2, 3, ...]

def f(arr):
    arr[arr > 5] = 0  # modifies input directly without making a copy
    arr += 1          # modifies input directly without making a copy
    return arr

y = client.submit(f, x)

在上面的示例中,Dask 将就地更新 Numpy 数组 x 的值。尽管效率很高,但这种行为可能会产生意想不到的后果,特别是如果其他任务需要使用 x,或者如果 Dask 由于工作节点故障而需要多次重新运行此计算。

避免持有 GIL

一些包装外部 C/C++ 代码的 Python 函数可能会持有 GIL,这会阻止其他 Python 代码在后台运行。这很麻烦,因为 Dask 工作节点在运行您的函数时,还需要在后台相互通信。

如果您包装了外部代码,请尝试释放 GIL。如果您使用任何常见的代码包装解决方案(如 Cython、Numba、ctypes 或其他),这通常很容易做到。