高级图

Dask 集合(如 Arrays、Bags 和 DataFrames)生成的图具有高级结构,这对于可视化和高级优化很有用。这些集合生成的任务图将这种结构显式编码为 HighLevelGraph 对象。本文档将更详细地介绍如何使用它们。

动机与示例

一般来说,Dask 调度器期望任意任务图,其中每个节点是一个 Python 函数调用,每条边是两个函数调用之间的依赖关系。这些通常存储在扁平的字典中。这里是一些简单的 Dask DataFrame 代码及其可能生成的任务图:

import dask.dataframe as dd

df = dd.read_csv('myfile.*.csv')
df = df + 100
df = df[df.name == 'Alice']
{
 ('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
 ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
 ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
 ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv'),
 ('add', 0): (operator.add, ('read-csv', 0), 100),
 ('add', 1): (operator.add, ('read-csv', 1), 100),
 ('add', 2): (operator.add, ('read-csv', 2), 100),
 ('add', 3): (operator.add, ('read-csv', 3), 100),
 ('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
 ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
 ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
 ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3)),
}

任务图是一个字典,存储计算最终结果所需的每个 Pandas 级别的函数调用。如果我们分离出与每个高级 Dask DataFrame 操作相关的任务,就可以看到这个字典有一定的结构:

{
 # From the dask.dataframe.read_csv call
 ('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
 ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
 ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
 ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv'),

 # From the df + 100 call
 ('add', 0): (operator.add, ('read-csv', 0), 100),
 ('add', 1): (operator.add, ('read-csv', 1), 100),
 ('add', 2): (operator.add, ('read-csv', 2), 100),
 ('add', 3): (operator.add, ('read-csv', 3), 100),

 # From the df[df.name == 'Alice'] call
 ('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
 ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
 ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
 ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3)),
}

通过理解这种高级结构,我们可以更容易地理解我们的任务图(对于每层有数千个任务的大型数据集来说,这一点更为重要),以及如何执行高级优化。例如,在上面的例子中,我们可能希望在加上 100 之前自动重写代码来过滤数据集。

# Before
df = dd.read_csv('myfile.*.csv')
df = df + 100
df = df[df.name == 'Alice']

# After
df = dd.read_csv('myfile.*.csv')
df = df[df.name == 'Alice']
df = df + 100

Dask 的高级图帮助我们通过将任务图存储在具有层间依赖关系的层中来显式编码这种结构:

>>> import dask.dataframe as dd

>>> df = dd.read_csv('myfile.*.csv')
>>> df = df + 100
>>> df = df[df.name == 'Alice']

>>> graph = df.__dask_graph__()
>>> graph.layers
{
 'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
              ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
              ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
              ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},

 'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
         ('add', 1): (operator.add, ('read-csv', 1), 100),
         ('add', 2): (operator.add, ('read-csv', 2), 100),
         ('add', 3): (operator.add, ('read-csv', 3), 100)}

 'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
            ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
            ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
            ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
}

>>> graph.dependencies
{
 'read-csv': set(),
 'add': {'read-csv'},
 'filter': {'add'}
}

而 DataFrame 则直接指向它所依赖的输出层:

>>> df.__dask_layers__()
{'filter'}

HighLevelGraphs

The HighLevelGraph object is a Mapping object composed of other sub-Mappings, along with a high-level dependency mapping between them

class HighLevelGraph(Mapping):
    layers: Dict[str, Mapping]
    dependencies: Dict[str, Set[str]]

You can construct a HighLevelGraph explicitly by providing both to the constructor

layers = {
   'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
                ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
                ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
                ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},

   'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
           ('add', 1): (operator.add, ('read-csv', 1), 100),
           ('add', 2): (operator.add, ('read-csv', 2), 100),
           ('add', 3): (operator.add, ('read-csv', 3), 100)},

   'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
              ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
              ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
              ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
}

dependencies = {'read-csv': set(),
                'add': {'read-csv'},
                'filter': {'add'}}

graph = HighLevelGraph(layers, dependencies)

该对象满足 Mapping 接口,因此可以像普通的 Python 字典一样操作,它是底层层的语义合并。

>>> len(graph)
12
>>> graph[('read-csv', 0)]
('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),

API

class dask.highlevelgraph.HighLevelGraph(layers: collections.abc.Mapping[str, collections.abc.Mapping[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any]], dependencies: collections.abc.Mapping[str, set[str]], key_dependencies: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], set[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]]] | None = None)[source]

由依赖子图层组成的任务图

该对象编码了一个由依赖子图层组成的 Dask 任务图,这种情况在使用 Dask array、bag 或 dataframe 等高级集合构建任务图时很常见。

通常,每个高级 array、bag 或 dataframe 操作都会获取输入集合的任务图,将它们合并,然后为新操作添加一个或多个新的任务层。这些层的任务数量通常至少与集合中的分区或块数量相同。HighLevelGraph 对象将每个操作的子图分别存储在子图中,并且还存储它们之间的依赖结构。

参数
layersMapping[str, Mapping]

子图层,以唯一名称作为键

dependenciesMapping[str, set[str]]

每个层所依赖的层集合

key_dependenciesdict[Key, set], optional

高级图中的(某些)键到其依赖关系的映射。如果某个键丢失,其依赖关系将动态计算。

另请参阅

HighLevelGraph.from_collections

通常由开发人员用于创建新的 HighLevelGraphs

示例

这是一个理想化的示例,展示了 HighLevelGraph 的内部状态:

>>> import dask.dataframe as dd
>>> df = dd.read_csv('myfile.*.csv')  
>>> df = df + 100  
>>> df = df[df.name == 'Alice']  
>>> graph = df.__dask_graph__()  
>>> graph.layers  
{
 'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
              ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
              ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
              ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},
 'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
         ('add', 1): (operator.add, ('read-csv', 1), 100),
         ('add', 2): (operator.add, ('read-csv', 2), 100),
         ('add', 3): (operator.add, ('read-csv', 3), 100)}
 'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
            ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
            ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
            ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
}
>>> graph.dependencies  
{
 'read-csv': set(),
 'add': {'read-csv'},
 'filter': {'add'}
}
cull(keys: collections.abc.Iterable[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]]) dask.highlevelgraph.HighLevelGraph[source]

返回只包含计算给定键所需任务的新 HighLevelGraph。

换句话说,从 dask 中移除不必要的任务。

参数
keys

keys 的可迭代对象或嵌套列表,例如 __dask_keys__() 的输出

返回值
hlg: HighLevelGraph

剪枝后的高级图

cull_layers(layers: collections.abc.Iterable[str]]) dask.highlevelgraph.HighLevelGraph[source]

返回一个只包含给定层及其依赖关系的新 HighLevelGraph。内部,层不会被修改。

这是 HighLevelGraph.cull() 的一个变体,速度快得多,并且在稍后合并两个剪枝后的图时,不会冒着创建两个同名但内容不同的层之间冲突的风险。

返回值
hlg: HighLevelGraph

剪枝后的高级图

classmethod from_collections(name: str, layer: collections.abc.Mapping[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any], dependencies: collections.abc.Sequence[dask.typing.DaskCollection] = ()) dask.highlevelgraph.HighLevelGraph[source]

从新层和集合集构造 HighLevelGraph

这在常见情况下构造 HighLevelGraph,即我们有一个新的单层和一组我们想要依赖的旧集合。

如果集合存在 __dask_layers__() 方法,则会将其提取出来,并将其添加到新层的依赖项中。它还会将所有依赖集合中的所有层合并到此图的新层中。

参数
namestr

新层的名称

layerMapping

图层本身

dependenciesList of Dask collections

其他具有自身图的 dask 集合(如数组或 dataframe)的列表

示例

在典型用法中,我们创建一个新的任务层,然后将该层以及所有依赖集合传递给此方法。

>>> def add(self, other):
...     name = 'add-' + tokenize(self, other)
...     layer = {(name, i): (add, input_key, other)
...              for i, input_key in enumerate(self.__dask_keys__())}
...     graph = HighLevelGraph.from_collections(name, layer, dependencies=[self])
...     return new_collection(name, graph)
get(k[, d]) D[k] if k in D, else d.  d defaults to None.
get_all_dependencies() dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], set[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]]][source]

获取所有键的依赖关系

在大多数情况下,这将实例化所有层,使其成为一个昂贵的操作。

返回值
map: Mapping

将每个键映射到其依赖关系的映射

get_all_external_keys() set[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]][source]

获取所有层的所有输出键

在大多数情况下,这不会实例化任何层,使其成为一个相对便宜的操作。

返回值
keys: set

所有外部键的集合

items() a set-like object providing a view on D's items[source]
keys() collections.abc.KeysView[source]

获取所有层的所有键。

在许多情况下,这将实例化层,使其成为一个相对昂贵的操作。请参阅 get_all_external_keys() 以获得更快的替代方法。

to_dict() dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any][source]

有效地转换为普通字典。此方法比 dict(self) 更快。

values() an object providing a view on D's values[source]