自定义图
目录
自定义图¶
有时您可能想进行并行计算,但您的应用程序无法完全适应 Dask Array 或 Dask Bag 等。在这些情况下,您可以直接与 Dask 调度器交互。这些调度器作为独立模块运行良好。
这种分离为复杂情况提供了解决方案,并允许高级项目获得额外的并行执行机会,即使这些项目对其计算有内部表示。随着 Dask 调度器的改进或扩展到分布式内存,使用 Dask 调度器编写的代码也将随之进步。
示例¶
正如在动机和规范部分讨论的那样,调度器接收一个任务图(这是一个由函数元组组成的字典)以及该图中期望的键列表。
这里是一个构建传统清理和分析流水线图的模拟示例
def load(filename):
...
def clean(data):
...
def analyze(sequence_of_data):
...
def store(result):
with open(..., 'w') as f:
f.write(result)
dsk = {'load-1': (load, 'myfile.a.data'),
'load-2': (load, 'myfile.b.data'),
'load-3': (load, 'myfile.c.data'),
'clean-1': (clean, 'load-1'),
'clean-2': (clean, 'load-2'),
'clean-3': (clean, 'load-3'),
'analyze': (analyze, ['clean-%d' % i for i in [1, 2, 3]]),
'store': (store, 'analyze')}
from dask.threaded import get
get(dsk, 'store') # executes in parallel
自定义 Dask 图中的关键字参数¶
有时,您可能希望将关键字参数传递给自定义 Dask 图中的函数。您可以使用 dask.utils.apply()
函数来做到这一点,如下所示
from dask.utils import apply
task = (apply, func, args, kwargs) # equivalent to func(*args, **kwargs)
dsk = {'task-name': task,
...
}
在上面的示例中
args
应该是一个元组(例如:(arg_1, arg_2, arg_3)
),并且kwargs
应该是一个字典(例如:{"kwarg_1": value, "kwarg_2": value}