高级图
目录
高级图¶
由 Arrays、Bags 和 DataFrames 等集合生成的 Dask 图具有高级结构,对于可视化和高级优化非常有用。这些集合生成的任务图将此结构明确编码为 HighLevelGraph
对象。本文档详细描述了如何使用这些对象。
动机与示例¶
通常来说,Dask 调度器期望任意任务图,其中每个节点是一个 Python 函数调用,每条边是两个函数调用之间的依赖关系。这些通常存储在扁平的字典中。以下是一些简单的 Dask DataFrame 代码及其可能生成的任务图
import dask.dataframe as dd
df = dd.read_csv('myfile.*.csv')
df = df + 100
df = df[df.name == 'Alice']
{
('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
('read-csv', 3): (pandas.read_csv, 'myfile.3.csv'),
('add', 0): (operator.add, ('read-csv', 0), 100),
('add', 1): (operator.add, ('read-csv', 1), 100),
('add', 2): (operator.add, ('read-csv', 2), 100),
('add', 3): (operator.add, ('read-csv', 3), 100),
('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3)),
}
任务图是一个字典,存储了计算最终结果所需的所有 Pandas 级别的函数调用。如果我们分离出与每个高级 Dask DataFrame 操作相关的任务,我们可以看到这个字典具有一定的结构。
{
# From the dask.dataframe.read_csv call
('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
('read-csv', 3): (pandas.read_csv, 'myfile.3.csv'),
# From the df + 100 call
('add', 0): (operator.add, ('read-csv', 0), 100),
('add', 1): (operator.add, ('read-csv', 1), 100),
('add', 2): (operator.add, ('read-csv', 2), 100),
('add', 3): (operator.add, ('read-csv', 3), 100),
# From the df[df.name == 'Alice'] call
('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3)),
}
通过理解这种高级结构,我们可以更容易地理解我们的任务图(对于大型数据集,当每层有数千个任务时,这一点更重要)以及如何执行高级优化。例如,在上述情况下,我们可能希望在添加 100 之前自动重写代码以过滤数据集。
# Before
df = dd.read_csv('myfile.*.csv')
df = df + 100
df = df[df.name == 'Alice']
# After
df = dd.read_csv('myfile.*.csv')
df = df[df.name == 'Alice']
df = df + 100
Dask 的高级图通过将任务图存储在具有层间依赖关系的层中,帮助我们显式编码这种结构。
>>> import dask.dataframe as dd
>>> df = dd.read_csv('myfile.*.csv')
>>> df = df + 100
>>> df = df[df.name == 'Alice']
>>> graph = df.__dask_graph__()
>>> graph.layers
{
'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},
'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
('add', 1): (operator.add, ('read-csv', 1), 100),
('add', 2): (operator.add, ('read-csv', 2), 100),
('add', 3): (operator.add, ('read-csv', 3), 100)}
'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
}
>>> graph.dependencies
{
'read-csv': set(),
'add': {'read-csv'},
'filter': {'add'}
}
而 DataFrame 直接指向其依赖的输出层
>>> df.__dask_layers__()
{'filter'}
HighLevelGraphs¶
HighLevelGraph
对象是一个由其他子 Mapping
组成的 Mapping
对象,以及它们之间的高级依赖映射。
class HighLevelGraph(Mapping):
layers: Dict[str, Mapping]
dependencies: Dict[str, Set[str]]
您可以通过将两者都提供给构造函数来显式构建 HighLevelGraph
layers = {
'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},
'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
('add', 1): (operator.add, ('read-csv', 1), 100),
('add', 2): (operator.add, ('read-csv', 2), 100),
('add', 3): (operator.add, ('read-csv', 3), 100)},
'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
}
dependencies = {'read-csv': set(),
'add': {'read-csv'},
'filter': {'add'}}
graph = HighLevelGraph(layers, dependencies)
此对象满足 Mapping
接口,因此作为普通的 Python 字典运行,它是底层层的语义合并。
>>> len(graph)
12
>>> graph[('read-csv', 0)]
('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
API¶
- class dask.highlevelgraph.HighLevelGraph(layers: collections.abc.Mapping[str, collections.abc.Mapping[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any]], dependencies: collections.abc.Mapping[str, set[str]], key_dependencies: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], set[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]]] | None = None)[source]¶
由依赖子图层组成的任务图
此对象编码了一个由依赖子图层组成的 Dask 任务图,这在使用 Dask 数组、bag 或 dataframe 等高级集合构建任务图时很常见。
通常,每个高级数组、bag 或 dataframe 操作都接受输入集合的任务图,将它们合并,然后为新操作添加一个或多个新的任务层。这些层通常至少包含与集合中的分区或块数量相同的任务。HighLevelGraph 对象将每个操作的子图分别存储在子图中,并存储它们之间的依赖结构。
- 参数
- layers映射[str, 映射]
子图层,以唯一名称为键
- dependencies映射[str, set[str]]
每个层依赖的层集合
- key_dependencies字典[键, set], 可选
将高级图中的(部分)键映射到其依赖项。如果某个键缺失,其依赖项将即时计算。
另请参阅
HighLevelGraph.from_collections
开发者通常用于创建新的 HighLevelGraphs
示例
这是一个理想化的示例,展示了 HighLevelGraph 的内部状态
>>> import dask.dataframe as dd
>>> df = dd.read_csv('myfile.*.csv') >>> df = df + 100 >>> df = df[df.name == 'Alice']
>>> graph = df.__dask_graph__() >>> graph.layers { 'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'), ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'), ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'), ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')}, 'add': {('add', 0): (operator.add, ('read-csv', 0), 100), ('add', 1): (operator.add, ('read-csv', 1), 100), ('add', 2): (operator.add, ('read-csv', 2), 100), ('add', 3): (operator.add, ('read-csv', 3), 100)} 'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)), ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)), ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)), ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))} }
>>> graph.dependencies { 'read-csv': set(), 'add': {'read-csv'}, 'filter': {'add'} }
- cull(keys: collections.abc.Iterable[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]]) dask.highlevelgraph.HighLevelGraph [source]¶
返回仅包含计算键所需任务的新 HighLevelGraph。
换句话说,从 dask 中移除不必要的任务。
- 参数
- 键
键的可迭代对象或嵌套列表,例如
__dask_keys__()
的输出
- 返回
- hlg: HighLevelGraph
精简的高级图
- cull_layers(layers: collections.abc.Iterable[str]) dask.highlevelgraph.HighLevelGraph [source]¶
返回一个仅包含给定层及其依赖项的新 HighLevelGraph。在内部,层不会被修改。
这是
HighLevelGraph.cull()
的一个变体,速度更快,并且在稍后合并两个精简图时,不会有同名但内容不同的层之间发生冲突的风险。- 返回
- hlg: HighLevelGraph
精简的高级图
- classmethod from_collections(name: str, layer: collections.abc.Mapping[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any], dependencies: collections.abc.Sequence[dask.typing.DaskCollection] = ()) dask.highlevelgraph.HighLevelGraph [source]¶
从新层和一组集合构建 HighLevelGraph
这在常见情况下构建 HighLevelGraph,即我们有一个新的层和一组我们想要依赖的旧集合。
如果集合存在
__dask_layers__()
方法,此方法会将其提取出来,并将其添加到此新层的依赖项中。它还会将所有依赖集合的所有层合并到此图的新层中。- 参数
- 名称str
新层的名称
- 层Mapping
图层本身
- dependenciesDask 集合列表
具有图的其他 dask 集合列表(如数组或 dataframes)
示例
在典型用法中,我们创建一个新的任务层,然后将该层以及所有依赖集合传递给此方法。
>>> def add(self, other): ... name = 'add-' + tokenize(self, other) ... layer = {(name, i): (add, input_key, other) ... for i, input_key in enumerate(self.__dask_keys__())} ... graph = HighLevelGraph.from_collections(name, layer, dependencies=[self]) ... return new_collection(name, graph)
- get(k[, d]) D[k] if k in D, else d. d defaults to None. ¶
- get_all_dependencies() dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], set[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]]] [source]¶
获取所有键的依赖项
这在大多数情况下会实例化所有层,使其成为一个昂贵的操作。
- 返回
- map: 映射
一个将每个键映射到其依赖项的映射
- get_all_external_keys() set[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]] [source]¶
获取所有层的所有输出键
这在大多数情况下_不会_实例化任何层,使其成为一个相对便宜的操作。
- 返回
- 键: set
所有外部键的集合
- keys() collections.abc.KeysView [source]¶
获取所有层的所有键。
这在许多情况下会实例化层,使其成为一个相对昂贵的操作。有关更快的替代方法,请参阅
get_all_external_keys()
。