高级图操作
目录
高级图操作¶
在某些情况下,使用 Dask 集合进行计算会导致内存使用效率低下(例如,整个 Dask DataFrame 被加载到内存中)。这可能发生在 Dask 的调度器没有自动延迟任务图中节点的计算,以避免它们的输出长时间占用内存,或者在重新计算节点的成本远低于将其输出保存在内存中的场景中。
本页重点介绍了一组可用于避免这些情况的图操作工具。特别是,下面描述的工具会重写 Dask 集合的底层 Dask 图,生成具有不同键集的等效集合。
考虑以下示例
>>> import dask.array as da
>>> x = da.random.default_rng().normal(size=500_000_000, chunks=100_000)
>>> x_mean = x.mean()
>>> y = (x - x_mean).max().compute()
上面的示例计算了移除偏差后的分布的最大值。这涉及到将 x
的数据块加载到内存中以计算 x_mean
。然而,由于在后续计算中需要 x
数组来计算 y
,整个 x
数组被保留在内存中。对于大型 Dask 数组来说,这可能会带来很大问题。
为了缓解整个 x
数组需要保存在内存中的问题,可以将最后一行重写如下
>>> from dask.graph_manipulation import bind
>>> xb = bind(x, x_mean)
>>> y = (xb - x_mean).max().compute()
这里我们使用 bind()
创建了一个新的 Dask 数组 xb
,它产生的输出与 x
完全相同,但其底层 Dask 图具有与 x
不同的键,并且只会在 x_mean
计算完成后才会被计算。
这使得 x
的数据块被计算后立即由 mean
进行单独归约;然后重新计算并再次立即通过流水线进行减法操作,接着由 max
进行归约。这导致峰值内存使用量大大减少,因为完整的 x
数组不再被加载到内存中。然而,代价是计算时间增加,因为 x
被计算了两次。
API¶
|
构建一个 Dask Delayed 对象,该对象会等待输入集合的所有数据块计算完成后再返回 None。 |
|
确保所有输入集合的所有数据块都已计算完成后,再计算任何数据块的依赖项。 |
|
使 |
|
克隆 dask 集合,返回通过独立计算生成的等效集合。 |
定义¶
- dask.graph_manipulation.checkpoint(*collections, split_every: Optional[Union[float, Literal[False]]] = None) dask.delayed.Delayed [source]¶
构建一个 Dask Delayed 对象,该对象会等待输入集合的所有数据块计算完成后再返回 None。
- 参数
- collections
零个或多个 Dask 集合或包含零个或多个集合的嵌套数据结构
- split_every: int >= 2 或 False,可选
确定递归聚合的深度。如果大于输入键的数量,聚合将分多步执行;聚合图的深度将是 \(log_{split_every}(input keys)\)。设置为较小的值可以减少缓存大小和网络传输,但代价是占用更多 CPU 并生成更大的 dask 图。
设置为 False 禁用。默认为 8。
- 返回
- 产生 None 的 Dask Delayed 对象
- dask.graph_manipulation.wait_on(*collections, split_every: Optional[Union[float, Literal[False]]] = None)[source]¶
确保所有输入集合的所有数据块都已计算完成后,再计算任何数据块的依赖项。
以下示例创建了一个 dask 数组
u
,该数组在计算中使用时,只会在数组x
的所有数据块都计算完成后才继续执行,否则其行为与x
相同。>>> import dask.array as da >>> x = da.ones(10, chunks=5) >>> u = wait_on(x)
以下示例将创建两个数组
u
和v
,它们在计算中使用时,只会在数组x
和y
的所有数据块都计算完成后才继续执行,否则其行为与x
和y
相同。>>> x = da.ones(10, chunks=5) >>> y = da.zeros(10, chunks=5) >>> u, v = wait_on(x, y)
- 参数
- collections
零个或多个 Dask 集合或 Dask 集合的嵌套结构
- split_every
参见
checkpoint()
- 返回
- 与
collections
相同 与输入类型相同的 Dask 集合,计算结果相同,或与输入等效的嵌套结构,其中原始集合已被替换。新集合中重新生成的节点的键将与原始节点的键不同,以便它们可以在同一个图中使用。
- 与
- dask.graph_manipulation.bind(children: dask.graph_manipulation.T, parents, *, omit=None, seed: collections.abc.Hashable | None = None, assume_layers: bool = True, split_every: Optional[Union[float, Literal[False]]] = None) dask.graph_manipulation.T [source]¶
使
children
集合(可选地省略子集合)依赖于parents
集合。以下是两个示例。第一个示例创建了一个数组
b2
,其计算过程是先完整计算数组a
,然后完整计算b
,并在计算b
的过程中重新计算a
。>>> import dask >>> import dask.array as da >>> a = da.ones(4, chunks=2) >>> b = a + 1 >>> b2 = bind(b, a) >>> len(b2.dask) 9 >>> b2.compute() array([2., 2., 2., 2.])
第二个示例创建了数组
b3
和c3
,它们的计算过程是先计算数组a
,然后计算加法,这次在过程中不再重新计算a
。>>> c = a + 2 >>> b3, c3 = bind((b, c), a, omit=a) >>> len(b3.dask), len(c3.dask) (7, 7) >>> dask.compute(b3, c3) (array([2., 2., 2., 2.]), array([3., 3., 3., 3.]))
- 参数
- children
Dask 集合或 Dask 集合的嵌套结构
- parents
Dask 集合或 Dask 集合的嵌套结构
- omit
Dask 集合或 Dask 集合的嵌套结构
- seed
用于种子化键重新生成的 Hashable 对象。省略时默认为一个随机数,每次调用都会生成不同的键。
- assume_layers
- True
使用在层级工作的快速算法,这假设
children
和omit
中的所有集合都使用
HighLevelGraph
,定义了
__dask_layers__()
方法,并且在
omit
集合和children
集合创建之间从未被压扁和重建其图;换句话说,如果在children
集合的键中可以找到omit
集合的键,那么对于层来说也必须如此。
- False
使用在键级工作的较慢算法,这不作上述任何假设。
- split_every
参见
checkpoint()
- 返回
- 与
children
相同 与
children
等效的 Dask 集合或 Dask 集合结构,计算结果相同。children
的所有节点都将被重新生成,直到(但不包括)omit
的节点。紧邻omit
上方的节点,或者如果未找到omit
中的集合则为叶节点,在parents
中的所有集合完全计算完成之前,将无法计算。重新生成的节点的键将与原始节点的键不同,以便它们可以在同一个图中使用。
- 与
- dask.graph_manipulation.clone(*collections, omit=None, seed: collections.abc.Hashable = None, assume_layers: bool = True)[source]¶
克隆 dask 集合,返回通过独立计算生成的等效集合。
- 参数
- 返回
- 与
collections
相同 与输入类型相同的 Dask 集合,计算结果相同,或与输入等效的嵌套结构,其中原始集合已被替换。新集合中重新生成的节点的键将与原始节点的键不同,以便它们可以在同一个图中使用。
- 与
示例
(为简洁起见,标记已简化)
>>> import dask.array as da >>> x_i = da.asarray([1, 1, 1, 1], chunks=2) >>> y_i = x_i + 1 >>> z_i = y_i + 2 >>> dict(z_i.dask) {('array-1', 0): array([1, 1]), ('array-1', 1): array([1, 1]), ('add-2', 0): (<function operator.add>, ('array-1', 0), 1), ('add-2', 1): (<function operator.add>, ('array-1', 1), 1), ('add-3', 0): (<function operator.add>, ('add-2', 0), 1), ('add-3', 1): (<function operator.add>, ('add-2', 1), 1)} >>> w_i = clone(z_i, omit=x_i) >>> w_i.compute() array([4, 4, 4, 4]) >>> dict(w_i.dask) {('array-1', 0): array([1, 1]), ('array-1', 1): array([1, 1]), ('add-4', 0): (<function operator.add>, ('array-1', 0), 1), ('add-4', 1): (<function operator.add>, ('array-1', 1), 1), ('add-5', 0): (<function operator.add>, ('add-4', 0), 1), ('add-5', 1): (<function operator.add>, ('add-4', 1), 1)}
clone()
的典型使用模式如下>>> x = cheap_computation_with_large_output() >>> y = expensive_and_long_computation(x) >>> z = wrap_up(clone(x), y)
在上面的代码中,一旦 x 的数据块被 y 的数据块消费,它们就会被遗忘,然后在计算的最后重新生成。如果没有
clone()
,x 只会被计算一次,然后保留在内存中,贯穿整个 y 的计算过程,不必要地消耗内存。