高级图

由 Arrays、Bags 和 DataFrames 等集合生成的 Dask 图具有高级结构,对于可视化和高级优化非常有用。这些集合生成的任务图将此结构明确编码为 HighLevelGraph 对象。本文档详细描述了如何使用这些对象。

动机与示例

通常来说,Dask 调度器期望任意任务图,其中每个节点是一个 Python 函数调用,每条边是两个函数调用之间的依赖关系。这些通常存储在扁平的字典中。以下是一些简单的 Dask DataFrame 代码及其可能生成的任务图

import dask.dataframe as dd

df = dd.read_csv('myfile.*.csv')
df = df + 100
df = df[df.name == 'Alice']
{
 ('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
 ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
 ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
 ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv'),
 ('add', 0): (operator.add, ('read-csv', 0), 100),
 ('add', 1): (operator.add, ('read-csv', 1), 100),
 ('add', 2): (operator.add, ('read-csv', 2), 100),
 ('add', 3): (operator.add, ('read-csv', 3), 100),
 ('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
 ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
 ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
 ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3)),
}

任务图是一个字典,存储了计算最终结果所需的所有 Pandas 级别的函数调用。如果我们分离出与每个高级 Dask DataFrame 操作相关的任务,我们可以看到这个字典具有一定的结构。

{
 # From the dask.dataframe.read_csv call
 ('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
 ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
 ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
 ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv'),

 # From the df + 100 call
 ('add', 0): (operator.add, ('read-csv', 0), 100),
 ('add', 1): (operator.add, ('read-csv', 1), 100),
 ('add', 2): (operator.add, ('read-csv', 2), 100),
 ('add', 3): (operator.add, ('read-csv', 3), 100),

 # From the df[df.name == 'Alice'] call
 ('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
 ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
 ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
 ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3)),
}

通过理解这种高级结构,我们可以更容易地理解我们的任务图(对于大型数据集,当每层有数千个任务时,这一点更重要)以及如何执行高级优化。例如,在上述情况下,我们可能希望在添加 100 之前自动重写代码以过滤数据集。

# Before
df = dd.read_csv('myfile.*.csv')
df = df + 100
df = df[df.name == 'Alice']

# After
df = dd.read_csv('myfile.*.csv')
df = df[df.name == 'Alice']
df = df + 100

Dask 的高级图通过将任务图存储在具有层间依赖关系的层中,帮助我们显式编码这种结构。

>>> import dask.dataframe as dd

>>> df = dd.read_csv('myfile.*.csv')
>>> df = df + 100
>>> df = df[df.name == 'Alice']

>>> graph = df.__dask_graph__()
>>> graph.layers
{
 'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
              ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
              ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
              ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},

 'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
         ('add', 1): (operator.add, ('read-csv', 1), 100),
         ('add', 2): (operator.add, ('read-csv', 2), 100),
         ('add', 3): (operator.add, ('read-csv', 3), 100)}

 'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
            ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
            ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
            ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
}

>>> graph.dependencies
{
 'read-csv': set(),
 'add': {'read-csv'},
 'filter': {'add'}
}

而 DataFrame 直接指向其依赖的输出层

>>> df.__dask_layers__()
{'filter'}

HighLevelGraphs

HighLevelGraph 对象是一个由其他子 Mapping 组成的 Mapping 对象,以及它们之间的高级依赖映射。

class HighLevelGraph(Mapping):
    layers: Dict[str, Mapping]
    dependencies: Dict[str, Set[str]]

您可以通过将两者都提供给构造函数来显式构建 HighLevelGraph

layers = {
   'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
                ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
                ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
                ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},

   'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
           ('add', 1): (operator.add, ('read-csv', 1), 100),
           ('add', 2): (operator.add, ('read-csv', 2), 100),
           ('add', 3): (operator.add, ('read-csv', 3), 100)},

   'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
              ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
              ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
              ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
}

dependencies = {'read-csv': set(),
                'add': {'read-csv'},
                'filter': {'add'}}

graph = HighLevelGraph(layers, dependencies)

此对象满足 Mapping 接口,因此作为普通的 Python 字典运行,它是底层层的语义合并。

>>> len(graph)
12
>>> graph[('read-csv', 0)]
('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),

API

class dask.highlevelgraph.HighLevelGraph(layers: collections.abc.Mapping[str, collections.abc.Mapping[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any]], dependencies: collections.abc.Mapping[str, set[str]], key_dependencies: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], set[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]]] | None = None)[source]

由依赖子图层组成的任务图

此对象编码了一个由依赖子图层组成的 Dask 任务图,这在使用 Dask 数组、bag 或 dataframe 等高级集合构建任务图时很常见。

通常,每个高级数组、bag 或 dataframe 操作都接受输入集合的任务图,将它们合并,然后为新操作添加一个或多个新的任务层。这些层通常至少包含与集合中的分区或块数量相同的任务。HighLevelGraph 对象将每个操作的子图分别存储在子图中,并存储它们之间的依赖结构。

参数
layers映射[str, 映射]

子图层,以唯一名称为键

dependencies映射[str, set[str]]

每个层依赖的层集合

key_dependencies字典[键, set], 可选

将高级图中的(部分)键映射到其依赖项。如果某个键缺失,其依赖项将即时计算。

另请参阅

HighLevelGraph.from_collections

开发者通常用于创建新的 HighLevelGraphs

示例

这是一个理想化的示例,展示了 HighLevelGraph 的内部状态

>>> import dask.dataframe as dd
>>> df = dd.read_csv('myfile.*.csv')  
>>> df = df + 100  
>>> df = df[df.name == 'Alice']  
>>> graph = df.__dask_graph__()  
>>> graph.layers  
{
 'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
              ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
              ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
              ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},
 'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
         ('add', 1): (operator.add, ('read-csv', 1), 100),
         ('add', 2): (operator.add, ('read-csv', 2), 100),
         ('add', 3): (operator.add, ('read-csv', 3), 100)}
 'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
            ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
            ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
            ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
}
>>> graph.dependencies  
{
 'read-csv': set(),
 'add': {'read-csv'},
 'filter': {'add'}
}
cull(keys: collections.abc.Iterable[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]]) dask.highlevelgraph.HighLevelGraph[source]

返回仅包含计算键所需任务的新 HighLevelGraph。

换句话说,从 dask 中移除不必要的任务。

参数

键的可迭代对象或嵌套列表,例如 __dask_keys__() 的输出

返回
hlg: HighLevelGraph

精简的高级图

cull_layers(layers: collections.abc.Iterable[str]) dask.highlevelgraph.HighLevelGraph[source]

返回一个仅包含给定层及其依赖项的新 HighLevelGraph。在内部,层不会被修改。

这是 HighLevelGraph.cull() 的一个变体,速度更快,并且在稍后合并两个精简图时,不会有同名但内容不同的层之间发生冲突的风险。

返回
hlg: HighLevelGraph

精简的高级图

classmethod from_collections(name: str, layer: collections.abc.Mapping[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any], dependencies: collections.abc.Sequence[dask.typing.DaskCollection] = ()) dask.highlevelgraph.HighLevelGraph[source]

从新层和一组集合构建 HighLevelGraph

这在常见情况下构建 HighLevelGraph,即我们有一个新的层和一组我们想要依赖的旧集合。

如果集合存在 __dask_layers__() 方法,此方法会将其提取出来,并将其添加到此新层的依赖项中。它还会将所有依赖集合的所有层合并到此图的新层中。

参数
名称str

新层的名称

Mapping

图层本身

dependenciesDask 集合列表

具有图的其他 dask 集合列表(如数组或 dataframes)

示例

在典型用法中,我们创建一个新的任务层,然后将该层以及所有依赖集合传递给此方法。

>>> def add(self, other):
...     name = 'add-' + tokenize(self, other)
...     layer = {(name, i): (add, input_key, other)
...              for i, input_key in enumerate(self.__dask_keys__())}
...     graph = HighLevelGraph.from_collections(name, layer, dependencies=[self])
...     return new_collection(name, graph)
get(k[, d]) D[k] if k in D, else d.  d defaults to None.
get_all_dependencies() dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], set[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]]][source]

获取所有键的依赖项

这在大多数情况下会实例化所有层,使其成为一个昂贵的操作。

返回
map: 映射

一个将每个键映射到其依赖项的映射

get_all_external_keys() set[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]][source]

获取所有层的所有输出键

这在大多数情况下_不会_实例化任何层,使其成为一个相对便宜的操作。

返回
键: set

所有外部键的集合

items() a set-like object providing a view on D's items[source]
keys() collections.abc.KeysView[source]

获取所有层的所有键。

这在许多情况下会实例化层,使其成为一个相对昂贵的操作。有关更快的替代方法,请参阅 get_all_external_keys()

to_dict() dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any][source]

高效转换为普通字典。此方法比 dict(self) 更快。

values() an object providing a view on D's values[source]