最佳实践

Dask delayed 入门很容易,但要使用得则需要一些经验。本页提供了一些最佳实践建议,并包含常见问题的解决方案。

对函数调用 delayed,而不是对结果调用

Dask delayed 用于函数,例如 dask.delayed(f)(x, y),而不是用于其结果,例如 dask.delayed(f(x, y))。当你使用后者时,Python 会在 Dask 有机会介入之前先计算 f(x, y)

错误示例

正确示例

# This executes immediately

dask.delayed(f(x, y))
# This makes a delayed function, acting lazily

dask.delayed(f)(x, y)

一次性计算大量内容

为了提高并行性,你需要在每次 compute 调用中包含大量计算。理想情况下,你应该进行多次 dask.delayed 调用来定义计算,然后只在最后调用 dask.compute。在计算过程中调用 dask.compute 也是可以的,但 Dask 会在那里停止并计算出结果,然后才能继续执行你的代码。

错误示例

正确示例

# Avoid calling compute repeatedly

results = []
for x in L:
    y = dask.delayed(f)(x)
    results.append(y.compute())

results
# Collect many calls for one compute

results = []
for x in L:
    y = dask.delayed(f)(x)
    results.append(y)

results = dask.compute(*results)

在循环内调用 y.compute() 会等待每次计算的结果,从而抑制并行性。

不要修改输入

你的函数不应该直接改变输入。

错误示例

正确示例

# Mutate inputs in functions

@dask.delayed
def f(x):
    x += 1
    return x
# Return new values or copies

@dask.delayed
def f(x):
    x = x + 1
    return x

如果你需要使用可变操作,则先在函数内部进行复制

@dask.delayed
def f(x):
    x = copy(x)
    x += 1
    return x

避免全局状态

理想情况下,你的操作不应依赖于全局状态。如果你只使用线程,使用全局状态可能有效,但当你转向多进程或分布式计算时,很可能会遇到令人困惑的错误。

错误示例

L = []

# This references global variable L

@dask.delayed
def f(x):
    L.append(x)

不要依赖副作用

Delayed 函数只有在被计算时才会执行。你总是需要将输出传递给最终会调用 compute 的东西。

错误示例

正确示例

# Forget to call compute

dask.delayed(f)(1, 2, 3)

...
# Ensure delayed tasks are computed

x = dask.delayed(f)(1, 2, 3)
...
dask.compute(x, ...)

在这里的第一个例子中,什么都没发生,因为从未调用 compute()

将计算分解成许多部分

从 Dask 的角度来看,每个 dask.delayed 函数调用都是一个单一操作。你通过进行多次 delayed 调用来实现并行性,而不是只使用一个:Dask 不会查看用 @dask.delayed 装饰的函数内部并自动并行化其代码。要实现并行化,它需要你帮助找到合适的分解计算的位置。

错误示例

正确示例

# One giant task


def load(filename):
    ...


def process(data):
    ...


def save(data):
    ...

@dask.delayed
def f(filenames):
    results = []
    for filename in filenames:
        data = load(filename)
        data = process(data)
        result = save(data)
        results.append(result)

    return results

dask.compute(f(filenames))
# Break up into many tasks

@dask.delayed
def load(filename):
    ...

@dask.delayed
def process(data):
    ...

@dask.delayed
def save(data):
    ...


def f(filenames):
    results = []
    for filename in filenames:
        data = load(filename)
        data = process(data)
        result = save(data)
        results.append(result)

    return results

dask.compute(f(filenames))

第一个版本只有一个 delayed 任务,因此无法并行化。

避免任务过多

每个 delayed 任务都有几百微秒的开销。通常这没问题,但如果你对 dask.delayed 应用得过于精细,就可能成为问题。在这种情况下,最好将你的许多任务分解成批次,或使用某个 Dask 集合来帮助你。

错误示例

正确示例

# Too many tasks

results = []
for x in range(10000000):
    y = dask.delayed(f)(x)
    results.append(y)
# Use collections

import dask.bag as db
b = db.from_sequence(range(10000000), npartitions=1000)
b = b.map(f)
...

这里我们使用 dask.bag 自动对我们的函数应用进行批处理。我们也可以自己构建批处理,如下所示

def batch(seq):
    sub_results = []
    for x in seq:
        sub_results.append(f(x))
    return sub_results

 batches = []
 for i in range(0, 10000000, 10000):
     result_batch = dask.delayed(batch)(range(i, i + 10000))
     batches.append(result_batch)

这里我们构建的批次中,每个 delayed 函数调用都针对原始输入的许多数据点进行计算。

避免在 delayed 函数中调用 delayed

通常,如果你刚开始使用 Dask delayed,可能会到处放置 dask.delayed 调用,并期望得到最好的结果。虽然这可能确实有效,但通常速度较慢且会导致难以理解的解决方案。

通常你绝不会在 dask.delayed 函数内部调用 dask.delayed

错误示例

正确示例

# Delayed function calls delayed

@dask.delayed
def process_all(L):
    result = []
    for x in L:
        y = dask.delayed(f)(x)
        result.append(y)
    return result
# Normal function calls delayed


def process_all(L):
    result = []
    for x in L:
        y = dask.delayed(f)(x)
        result.append(y)
    return result

因为普通的函数只做 delayed 工作,所以速度非常快,没有理由再对其进行 delayed。

不要对其他 Dask 集合调用 dask.delayed

当你将 Dask 数组或 Dask DataFrame 放入 delayed 调用中时,该函数将接收到相应的 NumPy 或 Pandas 等价物。请注意,如果你的数组很大,这可能会导致你的工作节点崩溃。

相反,更常用的是使用 da.map_blocks 等方法

错误示例

正确示例

# Call delayed functions on Dask collections

import dask.dataframe as dd
df = dd.read_csv('/path/to/*.csv')

dask.delayed(train)(df)
# Use mapping methods if applicable

import dask.dataframe as dd
df = dd.read_csv('/path/to/*.csv')

df.map_partitions(train)

另外,如果过程不适合映射,你总是可以将数组或 dataframes 转换成许多 delayed 对象,例如

partitions = df.to_delayed()
delayed_values = [dask.delayed(train)(part)
                  for part in partitions]

然而,如果你不介意将 Dask 数组/DataFrame 变成单个分块,那么这样做也可以。

dask.delayed(train)(..., y=df.sum())

避免重复将大型输入放入 delayed 调用中

每次你传递具体结果(任何不是 delayed 的东西)时,Dask 默认会对其进行哈希处理以命名。这通常很快(大约 500 MB/s),但如果重复多次执行,就会变慢。因此,最好也将你的数据进行延迟处理。

这在使用分布式集群时尤为重要,可以避免为每个函数调用单独发送数据。

错误示例

正确示例

x = np.array(...)  # some large array

results = [dask.delayed(train)(x, i)
           for i in range(1000)]
x = np.array(...)    # some large array
x = dask.delayed(x)  # delay the data once
results = [dask.delayed(train)(x, i)
           for i in range(1000)]

每次调用 dask.delayed(train)(x, ...) 都必须对 NumPy 数组 x 进行哈希处理,这会降低速度。