优化

在调用调度器之前对 Dask 图进行小的优化,可以在不同场景下显著提升性能。

dask.optimization 模块包含多个函数,可以以各种有用的方式转换图。在大多数情况下,用户无需直接与这些函数交互,因为这些转换的特定子集会在 Dask 集合(dask.arraydask.bagdask.dataframe)中自动完成。然而,使用自定义图或计算的用户可能会发现应用这些方法会带来显著的加速。

通常,进行图优化有两个目标

  1. 简化计算

  2. 提高并行度

可以在图层面通过移除不必要的任务(cull)来简化计算,也可以在任务层面通过用更廉价的操作替换昂贵的操作(RewriteRule)来简化计算。

可以通过减少任务间通信来提高并行度,无论是将多个任务合并为一个(fuse),还是内联廉价操作(inlineinline_functions)。

下面,我们将通过一个示例,演示如何使用其中一些方法来优化任务图。

示例

假设您有一个用于执行单词计数任务的自定义 Dask 图

>>> def print_and_return(string):
...     print(string)
...     return string

>>> def format_str(count, val, nwords):
...     return (f'word list has {count} occurrences of '
...             f'{val}, out of {nwords} words')

>>> dsk = {'words': 'apple orange apple pear orange pear pear',
...        'nwords': (len, (str.split, 'words')),
...        'val1': 'orange',
...        'val2': 'apple',
...        'val3': 'pear',
...        'count1': (str.count, 'words', 'val1'),
...        'count2': (str.count, 'words', 'val2'),
...        'count3': (str.count, 'words', 'val3'),
...        'format1': (format_str, 'count1', 'val1', 'nwords'),
...        'format2': (format_str, 'count2', 'val2', 'nwords'),
...        'format3': (format_str, 'count3', 'val3', 'nwords'),
...        'print1': (print_and_return, 'format1'),
...        'print2': (print_and_return, 'format2'),
...        'print3': (print_and_return, 'format3')}
The original non-optimized Dask task graph.

这里,我们正在词列表中计算单词 'orange'apple''pear' 的出现次数,格式化一个报告结果的输出字符串,打印输出,然后返回输出字符串。

为了执行计算,我们首先使用 cull 函数从图中移除不必要的组件,然后将 Dask 图和所需的输出键传递给调度器的 get 函数。

>>> from dask.threaded import get
>>> from dask.optimization import cull

>>> outputs = ['print1', 'print2']
>>> dsk1, dependencies = cull(dsk, outputs)  # remove unnecessary tasks from the graph

>>> results = get(dsk1, outputs)
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words

如上所示,调度器只计算了请求的输出('print3' 从未被计算)。这是因为我们调用了 dask.optimization.cull 函数,该函数从图中移除了不必要的任务。

Culling 是几乎所有集合的默认优化过程的一部分。通常,您希望尽早调用它,以减少后续步骤的工作量。

>>> from dask.optimization import cull
>>> outputs = ['print1', 'print2']
>>> dsk1, dependencies = cull(dsk, outputs)
The Dask task graph after culling tasks for optimization.

查看上面的任务图,Dask 图中存在对常量(如 'val1''val2')的多次访问。可以使用 inline 函数将这些常量内联到任务中以提高效率。例如

>>> from dask.optimization import inline
>>> dsk2 = inline(dsk1, dependencies=dependencies)
>>> results = get(dsk2, outputs)
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words
The Dask task graph after inlining for optimization.

现在我们有两组几乎线性的任务链。它们之间的唯一联系是单词计数函数。对于这类廉价操作,序列化开销可能大于实际计算,因此多次执行计算可能比将结果传递给所有节点更快。要执行此函数内联,可以使用 inline_functions 函数。

>>> from dask.optimization import inline_functions
>>> dsk3 = inline_functions(dsk2, outputs, [len, str.split],
...                         dependencies=dependencies)
>>> results = get(dsk3, outputs)
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words
The Dask task graph after inlining functions for optimization.

现在我们有一组纯线性的任务。我们希望调度器在同一个工作节点上运行所有这些任务,以减少工作节点之间的数据序列化。一种选择是使用 fuse 函数将这些线性链合并为一个大任务。

>>> from dask.optimization import fuse
>>> dsk4, dependencies = fuse(dsk3)
>>> results = get(dsk4, outputs)
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words
The Dask task graph after fusing tasks for optimization.

将所有这些结合起来

>>> def optimize_and_get(dsk, keys):
...     dsk1, deps = cull(dsk, keys)
...     dsk2 = inline(dsk1, dependencies=deps)
...     dsk3 = inline_functions(dsk2, keys, [len, str.split],
...                             dependencies=deps)
...     dsk4, deps = fuse(dsk3)
...     return get(dsk4, keys)

>>> optimize_and_get(dsk, outputs)
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words

总而言之,以上操作完成了以下目标

  1. 使用 cull 移除所需的输出不需要的任务

  2. 使用 inline 内联常量

  3. 使用 inline_functions 内联廉价计算,提高并行度

  4. 使用 fuse 将线性任务合并在一起,以确保它们在同一个工作节点上运行

如前所述,这些优化已在 Dask 集合中自动执行。不使用自定义图或计算的用户很少需要直接与它们交互。

这些只是 dask.optimization 中提供的部分优化功能。更多信息请参阅下面的 API。

重写规则

对于基于上下文的优化,dask.rewrite 提供了模式匹配和术语重写的功能。这对于用等效的、更廉价的计算替换昂贵的计算很有用。例如,Dask Array 使用重写功能将一系列数组切片操作替换为更高效的单一切片。

重写系统的接口包括两个类

  1. RewriteRule(lhs, rhs, vars)

    给定左侧(lhs)、右侧(rhs)和一组变量(vars),重写规则声明性地编码以下操作

    lhs -> rhs if task matches lhs over variables

  2. RuleSet(*rules)

    一组重写规则的集合。RuleSet 类的设计允许高效的“多对一”模式匹配,这意味着使用规则集中的多个规则进行重写时开销极小。

示例

这里我们创建两个重写规则,表示以下数学转换

  1. a + a -> 2*a

  2. a * a -> a**2

其中 'a' 是一个变量

>>> from dask.rewrite import RewriteRule, RuleSet
>>> from operator import add, mul, pow

>>> variables = ('a',)

>>> rule1 = RewriteRule((add, 'a', 'a'), (mul, 'a', 2), variables)

>>> rule2 = RewriteRule((mul, 'a', 'a'), (pow, 'a', 2), variables)

>>> rs = RuleSet(rule1, rule2)

RewriteRule 对象以声明方式描述了所需的转换,而 RuleSet 构建了一个高效的自动机来应用该转换。然后可以使用 rewrite 方法进行重写。

>>> rs.rewrite((add, 5, 5))
(mul, 5, 2)

>>> rs.rewrite((mul, 5, 5))
(pow, 5, 2)

>>> rs.rewrite((mul, (add, 3, 3), (add, 3, 3)))
(pow, (mul, 3, 2), 2)

默认情况下会遍历整个任务。如果您只想将转换应用于任务的顶层,可以按所示传递 strategy='top_level'

# Transforms whole task
>>> rs.rewrite((sum, [(add, 3, 3), (mul, 3, 3)]))
(sum, [(mul, 3, 2), (pow, 3, 2)])

# Only applies to top level, no transform occurs
>>> rs.rewrite((sum, [(add, 3, 3), (mul, 3, 3)]), strategy='top_level')
(sum, [(add, 3, 3), (mul, 3, 3)])

重写系统为任务层面的计算转换提供了强大的抽象。同样,对于许多用户而言,直接与这些转换交互是不必要的。

关键字参数

某些优化接受可选的关键字参数。要将计算调用中的关键字传递给正确的优化,请在关键字前面加上优化的名称。例如,要从计算调用向 fuse 优化发送一个 keys= 关键字参数,请使用 fuse_keys= 关键字。

def fuse(dsk, keys=None):
    ...

x.compute(fuse_keys=['x', 'y', 'z'])

自定义优化

Dask 为每种集合类型(Array、Bag、DataFrame、Delayed)定义了默认的优化策略。然而,不同的应用程序可能有不同的需求。为了满足这种需求的多样性,您可以构建自己的自定义优化函数并使用它来代替默认函数。优化函数接受任务图和所需键列表作为输入,并返回一个新的任务图。

def my_optimize_function(dsk, keys):
    new_dsk = {...}
    return new_dsk

然后,您可以针对您喜欢的任何集合类型注册此优化类,它将取代默认方案被使用。

with dask.config.set(array_optimize=my_optimize_function):
    x, y = dask.compute(x, y)

您可以为不同的集合注册单独的优化函数,如果您不希望对特定类型的集合进行优化,也可以注册 None

with dask.config.set(array_optimize=my_optimize_function,
                     dataframe_optimize=None,
                     delayed_optimize=my_other_optimize_function):
    ...

您无需指定所有集合。集合将默认使用其标准优化方案(这通常是一个不错的选择)。

API

顶级优化

cull(dsk, keys)

返回只包含计算键所需任务的新 dask 图。

fuse(dsk[, keys, dependencies, ave_width, ...])

合并形成规约的任务;比 fuse_linear 更高级

inline(dsk[, keys, inline_constants, ...])

返回将给定键与其值内联到其中的新 dask 图。

inline_functions(dsk, output[, ...])

将廉价函数内联到更大的操作中

实用函数

functions_of(task)

嵌套任务中包含的函数集合

重写规则

RewriteRule(lhs, rhs[, vars])

一个重写规则。

RuleSet(*rules)

一组重写规则。

定义

dask.optimization.cull(dsk, keys)[source]

返回只包含计算键所需任务的新 dask 图。

换句话说,从 dask 图中移除不必要的任务。keys 可以是单个键或键列表。

返回
dsk: 剔除后的 dask 图
dependencies: {键: [依赖项]} 映射的字典。有用的副作用,可加速

其他优化,特别是 fuse。

示例

>>> def inc(x):
...     return x + 1
>>> def add(x, y):
...     return x + y
>>> d = {'x': 1, 'y': (inc, 'x'), 'out': (add, 'x', 10)}
>>> dsk, dependencies = cull(d, 'out')
>>> dsk                                                     
{'out': (<function add at ...>, 'x', 10), 'x': 1}
>>> dependencies                                            
{'out': ['x'], 'x': []}
dask.optimization.fuse(dsk, keys=None, dependencies=None, ave_width=Default.token, max_width=Default.token, max_height=Default.token, max_depth_new_edges=Default.token, rename_keys=Default.token)[source]

合并形成规约的任务;比 fuse_linear 更高级

这通过降低任务粒度来牺牲并行机会,换取更快的调度。它可以替换优化过程中的 fuse_linear

此优化适用于所有规约——即至多有一个依赖项的任务——因此可以视为将“多输入、单输出”的任务组合并为一个任务。有许多参数可以微调其行为,如下所述。ave_width 是比较并行性与粒度的自然参数,因此应始终指定。如有必要,其他参数的合理值将根据 ave_width 确定。

参数
dsk: dict

dask 图

keys: list or set, optional

返回的 dask 图中必须保留的键

dependencies: dict, optional

{键: [键列表]}。必须是列表以提供每个键的计数。此可选输入通常来自 cull

ave_width: float (default 1)

width = num_nodes / height 的上限,这是并行能力的一个良好衡量标准。dask.config 键:optimization.fuse.ave-width

max_width: int (default infinite)

如果总宽度大于此值,则不合并。dask.config 键:optimization.fuse.max-width

max_height: int or None (default None)

合并层数不超过此值。设置为 None 以动态调整为 1.5 + ave_width * log(ave_width + 1)。dask.config 键:optimization.fuse.max-height

max_depth_new_edges: int or None (default None)

如果在此层数后添加了新的依赖项,则不合并。设置为 None 以动态调整为 ave_width * 1.5。dask.config 键:optimization.fuse.max-depth-new-edges

rename_keys: bool or func, optional (default True)

是否使用 default_fused_keys_renamer 重命名合并后的键。重命名合并后的键可以使图更容易理解和全面,但这会带来额外的处理开销。如果为 False,则使用最顶部的键。对于高级用法,也接受一个函数来创建新名称。dask.config 键:optimization.fuse.rename-keys

返回
dsk

合并键后的输出图

dependencies

合并后依赖项映射的字典。有用的副作用,可加速其他下游优化。

dask.optimization.inline(dsk, keys=None, inline_constants=True, dependencies=None)[source]

返回将给定键与其值内联到其中的新 dask 图。

如果 inline_constants 关键字为 True,则内联所有常量。请注意,常量键将保留在图中;要移除它们,请在 inline 后跟 cull

示例

>>> def inc(x):
...     return x + 1
>>> def add(x, y):
...     return x + y
>>> d = {'x': 1, 'y': (inc, 'x'), 'z': (add, 'x', 'y')}
>>> inline(d)       
{'x': 1, 'y': (<function inc at ...>, 1), 'z': (<function add at ...>, 1, 'y')}
>>> inline(d, keys='y') 
{'x': 1, 'y': (<function inc at ...>, 1), 'z': (<function add at ...>, 1, (<function inc at ...>, 1))}
>>> inline(d, keys='y', inline_constants=False) 
{'x': 1, 'y': (<function inc at ...>, 'x'), 'z': (<function add at ...>, 'x', (<function inc at ...>, 'x'))}
dask.optimization.inline_functions(dsk, output, fast_functions=None, inline_constants=False, dependencies=None)[source]

将廉价函数内联到更大的操作中

示例

>>> inc = lambda x: x + 1
>>> add = lambda x, y: x + y
>>> double = lambda x: x * 2
>>> dsk = {'out': (add, 'i', 'd'),  
...        'i': (inc, 'x'),
...        'd': (double, 'y'),
...        'x': 1, 'y': 1}
>>> inline_functions(dsk, [], [inc])  
{'out': (add, (inc, 'x'), 'd'),
 'd': (double, 'y'),
 'x': 1, 'y': 1}

保护输出键。在下面的示例中,i 未被内联,因为它被标记为输出键。

>>> inline_functions(dsk, ['i', 'out'], [inc, double])  
{'out': (add, 'i', (double, 'y')),
 'i': (inc, 'x'),
 'x': 1, 'y': 1}
dask.optimization.functions_of(task)[source]

嵌套任务中包含的函数集合

示例

>>> inc = lambda x: x + 1
>>> add = lambda x, y: x + y
>>> mul = lambda x, y: x * y
>>> task = (add, (mul, 1, 2), (inc, 3))  
>>> functions_of(task)  
set([add, mul, inc])
dask.rewrite.RewriteRule(lhs, rhs, vars=())[source]

一个重写规则。

表示 lhs -> rhs,针对变量 vars

参数
lhstask

重写规则的左侧。

rhstask 或 function

重写规则的右侧。如果它是任务,则 rhs 中的变量将被主题中与 lhs 中的变量匹配的项替换。如果它是函数,则将使用包含此类匹配项的字典调用该函数。

vars: tuple, optional

lhs 中找到的变量元组。变量可以表示为任何可哈希对象;一个好的约定是使用字符串。如果没有变量,可以省略此项。

示例

这是一个 RewriteRule,用于替换所有嵌套的 list 调用,这样 (list, (list, ‘x’)) 将被替换为 (list, ‘x’),其中 ‘x’ 是一个变量。

>>> import dask.rewrite as dr
>>> lhs = (list, (list, 'x'))
>>> rhs = (list, 'x')
>>> variables = ('x',)
>>> rule = dr.RewriteRule(lhs, rhs, variables)

这是一个使用可调用右侧的更复杂的规则。可调用的 rhs 接收一个将变量映射到其匹配值的字典。如果 ‘x’ 本身是一个列表,则此规则会将所有出现的 (list, ‘x’) 替换为 ‘x’

>>> lhs = (list, 'x')
>>> def repl_list(sd):
...     x = sd['x']
...     if isinstance(x, list):
...         return x
...     else:
...         return (list, x)
>>> rule = dr.RewriteRule(lhs, repl_list, variables)
dask.rewrite.RuleSet(*rules)[source]

一组重写规则。

为一组重写规则提供了快速重写的结构。这允许同时对许多模式进行术语的语法匹配。

示例

>>> import dask.rewrite as dr
>>> def f(*args): pass
>>> def g(*args): pass
>>> def h(*args): pass
>>> from operator import add
>>> rs = dr.RuleSet(
...         dr.RewriteRule((add, 'x', 0), 'x', ('x',)),
...         dr.RewriteRule((f, (g, 'x'), 'y'),
...                        (h, 'x', 'y'),
...                        ('x', 'y')))
>>> rs.rewrite((add, 2, 0))
2
>>> rs.rewrite((f, (g, 'a', 3)))    
(<function h at ...>, 'a', 3)
>>> dsk = {'a': (add, 2, 0),
...        'b': (f, (g, 'a', 3))}
>>> from toolz import valmap
>>> valmap(rs.rewrite, dsk)         
{'a': 2, 'b': (<function h at ...>, 'a', 3)}
属性
ruleslist

RuleSet 中包含的 RewriteRule 列表。