最佳实践
目录
最佳实践¶
Dask delayed 入门很容易,但要用好则需要一些经验。本页面包含最佳实践建议,并提供了常见问题的解决方案。
对函数调用 delayed,而不是对结果¶
Dask delayed 操作的是函数,例如 dask.delayed(f)(x, y)
,而不是函数的结果,例如 dask.delayed(f(x, y))
。如果采用后一种写法,Python 会在 Dask 有机会介入之前先计算 f(x, y)
。
错误做法 |
正确做法 |
# This executes immediately
dask.delayed(f(x, y))
|
# This makes a delayed function, acting lazily
dask.delayed(f)(x, y)
|
一次计算大量任务¶
为了提高并行性,您希望在每次计算调用中包含大量计算。理想情况下,您应该进行多次 dask.delayed
调用来定义计算,然后在最后才调用 dask.compute
。当然,在计算过程中调用 dask.compute
也是可以的,但代码将在那里停止,因为 Dask 会先计算这些结果,然后您的代码才会继续执行。
错误做法 |
正确做法 |
# Avoid calling compute repeatedly
results = []
for x in L:
y = dask.delayed(f)(x)
results.append(y.compute())
results
|
# Collect many calls for one compute
results = []
for x in L:
y = dask.delayed(f)(x)
results.append(y)
results = dask.compute(*results)
|
在循环内调用 y.compute() 会在每次迭代时等待计算结果,从而阻碍并行性。
不要修改输入¶
您的函数不应该直接改变输入。
错误做法 |
正确做法 |
# Mutate inputs in functions
@dask.delayed
def f(x):
x += 1
return x
|
# Return new values or copies
@dask.delayed
def f(x):
x = x + 1
return x
|
如果您需要使用可变操作,请先在函数内部创建一个副本
@dask.delayed
def f(x):
x = copy(x)
x += 1
return x
避免使用全局状态¶
理想情况下,您的操作不应该依赖于全局状态。如果您只使用线程,使用全局状态 可能 会起作用,但当您转向多进程或分布式计算时,可能会遇到令人困惑的错误。
错误做法 |
L = []
# This references global variable L
@dask.delayed
def f(x):
L.append(x)
|
不要依赖副作用¶
Delayed 函数只有在被计算时才会执行操作。您始终需要将输出传递给最终会调用 compute 的对象。
错误做法 |
正确做法 |
# Forget to call compute
dask.delayed(f)(1, 2, 3)
...
|
# Ensure delayed tasks are computed
x = dask.delayed(f)(1, 2, 3)
...
dask.compute(x, ...)
|
在第一个例子中,什么都不会发生,因为从未调用 compute()
。
将计算分解成许多小块¶
从 Dask 的角度来看,每一个 dask.delayed
函数调用都是一个单一操作。您通过进行多次 delayed 调用来实现并行性,而不是只使用一次:Dask 不会查看用 @dask.delayed
装饰的函数内部并对其代码进行并行化。要实现这一点,Dask 需要您的帮助来找到分解计算的好位置。
错误做法 |
正确做法 |
# One giant task
def load(filename):
...
def process(data):
...
def save(data):
...
@dask.delayed
def f(filenames):
results = []
for filename in filenames:
data = load(filename)
data = process(data)
result = save(data)
results.append(result)
return results
dask.compute(f(filenames))
|
# Break up into many tasks
@dask.delayed
def load(filename):
...
@dask.delayed
def process(data):
...
@dask.delayed
def save(data):
...
def f(filenames):
results = []
for filename in filenames:
data = load(filename)
data = process(data)
result = save(data)
results.append(result)
return results
dask.compute(f(filenames))
|
第一个版本只有一个 delayed 任务,因此无法实现并行化。
避免任务过多¶
每个 delayed 任务都有几百微秒的开销。通常这没问题,但如果您将 dask.delayed
应用得过于精细,就可能成为问题。在这种情况下,通常最好将您的许多任务分解成批次,或者使用 Dask 的集合之一来帮助您。
错误做法 |
正确做法 |
# Too many tasks
results = []
for x in range(10000000):
y = dask.delayed(f)(x)
results.append(y)
|
# Use collections
import dask.bag as db
b = db.from_sequence(range(10000000), npartitions=1000)
b = b.map(f)
...
|
这里我们使用 dask.bag
自动批量应用我们的函数。我们也可以按如下方式构建自己的批处理:
def batch(seq):
sub_results = []
for x in seq:
sub_results.append(f(x))
return sub_results
batches = []
for i in range(0, 10000000, 10000):
result_batch = dask.delayed(batch)(range(i, i + 10000))
batches.append(result_batch)
在这里,我们构建批次,其中每个 delayed 函数调用计算来自原始输入的许多数据点。
避免在 delayed 函数内部调用 delayed¶
通常,如果您刚开始使用 Dask delayed,您可能会到处放置 dask.delayed
调用,并期望得到最好的结果。虽然这可能实际可行,但通常速度较慢且导致难以理解的解决方案。
通常,您绝不会在 dask.delayed
函数内部调用 dask.delayed
。
错误做法 |
正确做法 |
# Delayed function calls delayed
@dask.delayed
def process_all(L):
result = []
for x in L:
y = dask.delayed(f)(x)
result.append(y)
return result
|
# Normal function calls delayed
def process_all(L):
result = []
for x in L:
y = dask.delayed(f)(x)
result.append(y)
return result
|
因为普通函数本身只执行延迟工作,执行速度很快,所以没有理由再对其进行延迟处理。
不要对其他 Dask 集合调用 dask.delayed¶
当您将 Dask 数组或 Dask DataFrame 放入 delayed 调用时,该函数将接收到对应的 NumPy 或 Pandas 对象。请注意,如果您的数组很大,这可能会导致 worker 崩溃。
相反,更常见的是使用诸如 da.map_blocks
之类的方法
错误做法 |
正确做法 |
# Call delayed functions on Dask collections
import dask.dataframe as dd
df = dd.read_csv('/path/to/*.csv')
dask.delayed(train)(df)
|
# Use mapping methods if applicable
import dask.dataframe as dd
df = dd.read_csv('/path/to/*.csv')
df.map_partitions(train)
|
或者,如果过程不适合映射操作,您总是可以将数组或 dataframes 转换为 许多 delayed 对象,例如:
partitions = df.to_delayed()
delayed_values = [dask.delayed(train)(part)
for part in partitions]
但是,如果您不介意将 Dask 数组/DataFrame 变成单个块,那么这样做是可以的。
dask.delayed(train)(..., y=df.sum())
避免重复将大型输入放入 delayed 调用¶
每次您传递一个具体结果(任何不是 delayed 的对象),Dask 默认会对其进行哈希处理以给它一个名称。这相当快(大约 500 MB/s),但如果您反复这样做,速度可能会很慢。相反,最好也对您的数据进行 delayed 处理。
在使用分布式集群时,这一点尤为重要,可以避免在每次函数调用时单独发送数据。
错误做法 |
正确做法 |
x = np.array(...) # some large array
results = [dask.delayed(train)(x, i)
for i in range(1000)]
|
x = np.array(...) # some large array
x = dask.delayed(x) # delay the data once
results = [dask.delayed(train)(x, i)
for i in range(1000)]
|
每次调用 dask.delayed(train)(x, ...)
都必须对 NumPy 数组 x
进行哈希处理,这会降低速度。