优化

在调用调度器之前,通过对 Dask 图进行小的优化,可以在不同场景下显著提高性能。

dask.optimization 模块包含多个函数,可以以各种有用的方式转换图。在大多数情况下,用户无需直接与这些函数交互,因为这些转换的专用子集会在 Dask 集合(dask.arraydask.bagdask.dataframe)中自动完成。然而,处理自定义图或计算的用户可能会发现应用这些方法会带来显著的加速。

通常,图优化的目标有两个:

  1. 简化计算

  2. 提高并行性

可以通过移除不必要的任务(cull)在图级别简化计算,或者通过用更便宜的操作替换昂贵的操作(RewriteRule)在任务级别简化计算。

可以通过减少任务间通信来提高并行性,无论是将多个任务融合为一个(fuse),还是内联廉价操作(inline, inline_functions)。

下面,我们将通过一个示例来展示如何使用其中一些方法来优化任务图。

示例

假设您有一个自定义的 Dask 图用于执行词频统计任务。

>>> def print_and_return(string):
...     print(string)
...     return string

>>> def format_str(count, val, nwords):
...     return (f'word list has {count} occurrences of '
...             f'{val}, out of {nwords} words')

>>> dsk = {'words': 'apple orange apple pear orange pear pear',
...        'nwords': (len, (str.split, 'words')),
...        'val1': 'orange',
...        'val2': 'apple',
...        'val3': 'pear',
...        'count1': (str.count, 'words', 'val1'),
...        'count2': (str.count, 'words', 'val2'),
...        'count3': (str.count, 'words', 'val3'),
...        'format1': (format_str, 'count1', 'val1', 'nwords'),
...        'format2': (format_str, 'count2', 'val2', 'nwords'),
...        'format3': (format_str, 'count3', 'val3', 'nwords'),
...        'print1': (print_and_return, 'format1'),
...        'print2': (print_and_return, 'format2'),
...        'print3': (print_and_return, 'format3')}
The original non-optimized Dask task graph.

这里我们在单词列表中统计单词 'orange''apple''pear' 的出现次数,格式化一个报告结果的输出字符串,打印输出,然后返回输出字符串。

为了执行计算,我们首先使用 cull 函数从图中移除不必要的组件,然后将 Dask 图和所需的输出键传递给调度器 get 函数。

>>> from dask.threaded import get
>>> from dask.optimization import cull

>>> outputs = ['print1', 'print2']
>>> dsk1, dependencies = cull(dsk, outputs)  # remove unnecessary tasks from the graph

>>> results = get(dsk1, outputs)
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words

如上所示,调度器只计算了请求的输出('print3' 从未被计算)。这是因为我们调用了 dask.optimization.cull 函数,它从图中移除了不必要的任务。

剪枝(Culling)是几乎所有集合默认优化过程的一部分。通常你希望早点调用它,以减少后续步骤的工作量。

>>> from dask.optimization import cull
>>> outputs = ['print1', 'print2']
>>> dsk1, dependencies = cull(dsk, outputs)
The Dask task graph after culling tasks for optimization.

查看上面的任务图,Dask 图中存在对 'val1''val2' 等常量的多次访问。可以使用 inline 函数将这些常量内联到任务中以提高效率。例如:

>>> from dask.optimization import inline
>>> dsk2 = inline(dsk1, dependencies=dependencies)
>>> results = get(dsk2, outputs)
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words
The Dask task graph after inlining for optimization.

现在我们有了两组几乎线性的任务链。它们之间唯一的连接是词频统计函数。对于这类廉价操作,序列化成本可能大于实际计算成本,因此多次执行计算可能比将结果传递给所有节点更快。要执行这种函数内联,可以使用 inline_functions 函数:

>>> from dask.optimization import inline_functions
>>> dsk3 = inline_functions(dsk2, outputs, [len, str.split],
...                         dependencies=dependencies)
>>> results = get(dsk3, outputs)
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words
The Dask task graph after inlining functions for optimization.

现在我们有一组纯线性的任务。我们希望调度器将所有这些任务在同一个 worker 上运行,以减少 worker 之间的数据序列化。一个选项是使用 fuse 函数将这些线性链合并为一个大的任务:

>>> from dask.optimization import fuse
>>> dsk4, dependencies = fuse(dsk3)
>>> results = get(dsk4, outputs)
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words
The Dask task graph after fusing tasks for optimization.

将所有操作放在一起:

>>> def optimize_and_get(dsk, keys):
...     dsk1, deps = cull(dsk, keys)
...     dsk2 = inline(dsk1, dependencies=deps)
...     dsk3 = inline_functions(dsk2, keys, [len, str.split],
...                             dependencies=deps)
...     dsk4, deps = fuse(dsk3)
...     return get(dsk4, keys)

>>> optimize_and_get(dsk, outputs)
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words

总而言之,上述操作完成了以下几点:

  1. 使用 cull 移除了所需输出不需要的任务

  2. 使用 inline 内联了常量

  3. 使用 inline_functions 内联了廉价计算,提高了并行性

  4. 使用 fuse 融合了线性任务,以确保它们在同一个 worker 上运行

如前所述,这些优化已在 Dask 集合中自动执行。对于不处理自定义图或计算的用户来说,通常无需直接与它们交互。

这些只是 dask.optimization 中提供的部分优化功能。有关更多信息,请参阅下面的 API。

重写规则 (Rewrite Rules)

对于基于上下文的优化,dask.rewrite 提供了模式匹配和项重写功能。这对于用等效的、更便宜的计算替换昂贵计算非常有用。例如,Dask Array 使用重写功能将一系列数组切片操作替换为更高效的单一切片操作。

重写系统的接口由两个类组成:

  1. RewriteRule(lhs, rhs, vars)

    给定一个左侧(lhs)、一个右侧(rhs)和一组变量(vars),重写规则以声明方式编码以下操作:

    lhs -> rhs 如果 任务 匹配 变量的 lhs

  2. RuleSet(*rules)

    重写规则的集合。RuleSet 类的设计允许高效的“多对一”模式匹配,这意味着使用规则集中的多个规则进行重写的开销极小。

示例

这里我们创建两个重写规则,表示以下数学转换:

  1. a + a -> 2*a

  2. a * a -> a**2

其中 'a' 是一个变量。

>>> from dask.rewrite import RewriteRule, RuleSet
>>> from operator import add, mul, pow

>>> variables = ('a',)

>>> rule1 = RewriteRule((add, 'a', 'a'), (mul, 'a', 2), variables)

>>> rule2 = RewriteRule((mul, 'a', 'a'), (pow, 'a', 2), variables)

>>> rs = RuleSet(rule1, rule2)

RewriteRule 对象以声明方式描述了所需的转换,而 RuleSet 构建了一个高效的自动机来应用该转换。然后可以使用 rewrite 方法执行重写:

>>> rs.rewrite((add, 5, 5))
(mul, 5, 2)

>>> rs.rewrite((mul, 5, 5))
(pow, 5, 2)

>>> rs.rewrite((mul, (add, 3, 3), (add, 3, 3)))
(pow, (mul, 3, 2), 2)

默认情况下会遍历整个任务。如果您只想将转换应用于任务的顶层,可以传入 strategy='top_level',如下所示:

# Transforms whole task
>>> rs.rewrite((sum, [(add, 3, 3), (mul, 3, 3)]))
(sum, [(mul, 3, 2), (pow, 3, 2)])

# Only applies to top level, no transform occurs
>>> rs.rewrite((sum, [(add, 3, 3), (mul, 3, 3)]), strategy='top_level')
(sum, [(add, 3, 3), (mul, 3, 3)])

重写系统提供了一个强大的抽象,用于在任务级别转换计算。同样,对于许多用户来说,直接与这些转换交互是不必要的。

关键字参数 (Keyword Arguments)

一些优化接受可选的关键字参数。要从计算调用向下传递关键字到正确的优化,请在关键字前加上优化的名称。例如,要从计算调用发送 keys= 关键字参数到 fuse 优化,请使用 fuse_keys= 关键字。

def fuse(dsk, keys=None):
    ...

x.compute(fuse_keys=['x', 'y', 'z'])

自定义优化 (Customizing Optimization)

Dask 为每种集合类型(Array、Bag、DataFrame、Delayed)定义了一个默认的优化策略。然而,不同的应用程序可能有不同的需求。为了解决需求的这种可变性,您可以构造自己的自定义优化函数并使用它代替默认函数。优化函数接受一个任务图和所需的键列表,并返回一个新的任务图。

def my_optimize_function(dsk, keys):
    new_dsk = {...}
    return new_dsk

然后您可以将此优化类注册到您喜欢的任何集合类型,它将代替默认方案使用。

with dask.config.set(array_optimize=my_optimize_function):
    x, y = dask.compute(x, y)

您可以为不同的集合注册单独的优化函数,或者如果您不希望对特定类型的集合进行优化,则可以注册 None

with dask.config.set(array_optimize=my_optimize_function,
                     dataframe_optimize=None,
                     delayed_optimize=my_other_optimize_function):
    ...

您无需指定所有集合。未指定的集合将使用其标准优化方案(通常是一个不错的选择)。

API

顶级优化

cull(dsk, keys)

返回只包含计算所需键的任务的新 dask。

fuse(dsk[, keys, dependencies, ave_width, ...])

融合形成约简的任务;比 fuse_linear 更高级

inline(dsk[, keys, inline_constants, ...])

返回一个新 dask,其中给定的键及其值已内联。

inline_functions(dsk, output[, ...])

将廉价函数内联到较大的操作中

实用函数

functions_of(task)

嵌套任务中包含的函数集合

重写规则 (Rewrite Rules)

RewriteRule(lhs, rhs[, vars])

一个重写规则。

RuleSet(*rules)

一组重写规则。

定义

dask.optimization.cull(dsk, keys)[source]

返回只包含计算所需键的任务的新 dask。

换句话说,从 dask 中移除不必要的任务。keys 可以是单个键或键列表。

返回值
dsk: 剪枝后的 dask 图
dependencies: 字典,映射 {key: [deps]}。有用的副作用,可以加速

其他优化,尤其是 fuse。

示例

>>> def inc(x):
...     return x + 1
>>> def add(x, y):
...     return x + y
>>> d = {'x': 1, 'y': (inc, 'x'), 'out': (add, 'x', 10)}
>>> dsk, dependencies = cull(d, 'out')
>>> dsk                                                     
{'out': (<function add at ...>, 'x', 10), 'x': 1}
>>> dependencies                                            
{'out': ['x'], 'x': []}
dask.optimization.fuse(dsk, keys=None, dependencies=None, ave_width=Default.token, max_width=Default.token, max_height=Default.token, max_depth_new_edges=Default.token, rename_keys=Default.token)[source]

融合形成约简的任务;比 fuse_linear 更高级

通过使任务粒度更粗,以牺牲并行机会为代价换取更快的调度。它可以在优化过程中取代 fuse_linear

此优化适用于所有约简——即最多有一个依赖项的任务——因此可以视为将“多输入,单输出”的任务组融合成一个任务。有许多参数可以微调行为,下文将进行描述。ave_width 是比较并行性与粒度的自然参数,因此应始终指定。其他参数的合理值将在必要时使用 ave_width 确定。

参数
dsk: dict

dask 图

keys: list 或 set, 可选

必须保留在返回的 dask 图中的键

dependencies: dict, 可选

{key: [键列表]}。必须是列表以提供每个键的计数。此可选输入通常来自 cull

ave_width: float (默认 1)

width = num_nodes / height 的上限,这是并行性的一个良好衡量标准。dask.config 键:optimization.fuse.ave-width

max_width: int (默认无限)

如果总宽度大于此值,则不进行融合。dask.config 键:optimization.fuse.max-width

max_height: int 或 None (默认 None)

融合级别不超过此数量。设置为 None 以根据 1.5 + ave_width * log(ave_width + 1) 动态调整。dask.config 键:optimization.fuse.max-height

max_depth_new_edges: int 或 None (默认 None)

如果在达到此级别后添加了新的依赖项,则不进行融合。设置为 None 以根据 ave_width * 1.5 动态调整。dask.config 键:optimization.fuse.max-depth-new-edges

rename_keys: bool 或 func, 可选 (默认 True)

是否使用 default_fused_keys_renamer 重命名融合后的键。重命名融合后的键可以使图更易理解和全面,但这会带来额外的处理成本。如果为 False,则将使用最顶层的键。对于高级用法,也接受用于创建新名称的函数。dask.config 键:optimization.fuse.rename-keys

返回值
dsk

键已融合的输出图

dependencies

融合后的依赖关系字典。有用的副作用,可以加速下游其他优化。

dask.optimization.inline(dsk, keys=None, inline_constants=True, dependencies=None)[source]

返回一个新 dask,其中给定的键及其值已内联。

如果 inline_constants 关键字为 True,则内联所有常量。请注意,常量键将保留在图中,要移除它们,请在 inline 后使用 cull

示例

>>> def inc(x):
...     return x + 1
>>> def add(x, y):
...     return x + y
>>> d = {'x': 1, 'y': (inc, 'x'), 'z': (add, 'x', 'y')}
>>> inline(d)       
{'x': 1, 'y': (<function inc at ...>, 1), 'z': (<function add at ...>, 1, 'y')}
>>> inline(d, keys='y') 
{'x': 1, 'y': (<function inc at ...>, 1), 'z': (<function add at ...>, 1, (<function inc at ...>, 1))}
>>> inline(d, keys='y', inline_constants=False) 
{'x': 1, 'y': (<function inc at ...>, 'x'), 'z': (<function add at ...>, 'x', (<function inc at ...>, 'x'))}
dask.optimization.inline_functions(dsk, output, fast_functions=None, inline_constants=False, dependencies=None)[source]

将廉价函数内联到较大的操作中

示例

>>> inc = lambda x: x + 1
>>> add = lambda x, y: x + y
>>> double = lambda x: x * 2
>>> dsk = {'out': (add, 'i', 'd'),  
...        'i': (inc, 'x'),
...        'd': (double, 'y'),
...        'x': 1, 'y': 1}
>>> inline_functions(dsk, [], [inc])  
{'out': (add, (inc, 'x'), 'd'),
 'd': (double, 'y'),
 'x': 1, 'y': 1}

保护输出键。在下面的例子中,i 没有被内联,因为它被标记为输出键。

>>> inline_functions(dsk, ['i', 'out'], [inc, double])  
{'out': (add, 'i', (double, 'y')),
 'i': (inc, 'x'),
 'x': 1, 'y': 1}
dask.optimization.functions_of(task)[source]

嵌套任务中包含的函数集合

示例

>>> inc = lambda x: x + 1
>>> add = lambda x, y: x + y
>>> mul = lambda x, y: x * y
>>> task = (add, (mul, 1, 2), (inc, 3))  
>>> functions_of(task)  
set([add, mul, inc])
dask.rewrite.RewriteRule(lhs, rhs, vars=())[source]

一个重写规则。

表示 lhs -> rhs,用于变量 vars

参数
lhstask

重写规则的左侧。

rhstask 或 function

重写规则的右侧。如果是一个任务,rhs 中的变量将被主题中与 lhs 中的变量匹配的项替换。如果是一个函数,将使用此类匹配的字典调用该函数。

vars: tuple, 可选

位于 lhs 中的变量的元组。变量可以表示为任何可哈希对象;一个好的约定是使用字符串。如果没有变量,可以省略。

示例

这是一个 RewriteRule,用于替换所有嵌套的 list 调用,以便将 (list, (list, ‘x’)) 替换为 (list, ‘x’),其中 ‘x’ 是一个变量。

>>> import dask.rewrite as dr
>>> lhs = (list, (list, 'x'))
>>> rhs = (list, 'x')
>>> variables = ('x',)
>>> rule = dr.RewriteRule(lhs, rhs, variables)

这是一个更复杂的规则,使用可调用右侧。可调用 rhs 接受一个字典,将变量映射到其匹配值。如果 ‘x’ 本身是一个列表,则此规则将 (list, ‘x’) 的所有出现替换为 ‘x’

>>> lhs = (list, 'x')
>>> def repl_list(sd):
...     x = sd['x']
...     if isinstance(x, list):
...         return x
...     else:
...         return (list, x)
>>> rule = dr.RewriteRule(lhs, repl_list, variables)
dask.rewrite.RuleSet(*rules)[source]

一组重写规则。

为一组重写规则构建一个快速重写结构。这允许同时对许多模式进行项的句法匹配。

示例

>>> import dask.rewrite as dr
>>> def f(*args): pass
>>> def g(*args): pass
>>> def h(*args): pass
>>> from operator import add
>>> rs = dr.RuleSet(
...         dr.RewriteRule((add, 'x', 0), 'x', ('x',)),
...         dr.RewriteRule((f, (g, 'x'), 'y'),
...                        (h, 'x', 'y'),
...                        ('x', 'y')))
>>> rs.rewrite((add, 2, 0))
2
>>> rs.rewrite((f, (g, 'a', 3)))    
(<function h at ...>, 'a', 3)
>>> dsk = {'a': (add, 2, 0),
...        'b': (f, (g, 'a', 3))}
>>> from toolz import valmap
>>> valmap(rs.rewrite, dsk)         
{'a': 2, 'b': (<function h at ...>, 'a', 3)}
属性
ruleslist

包含在 RuleSet 中的 RewriteRule 列表。