目录

高级图操作

目录

高级图操作

在某些情况下,使用 Dask 集合进行计算会导致内存使用效率低下(例如,整个 Dask DataFrame 被加载到内存中)。这可能发生在 Dask 的调度器没有自动延迟任务图中节点的计算,以避免它们的输出长时间占用内存,或者在重新计算节点的成本远低于将其输出保存在内存中的场景中。

本页重点介绍了一组可用于避免这些情况的图操作工具。特别是,下面描述的工具会重写 Dask 集合的底层 Dask 图,生成具有不同键集的等效集合。

考虑以下示例

>>> import dask.array as da
>>> x = da.random.default_rng().normal(size=500_000_000, chunks=100_000)
>>> x_mean = x.mean()
>>> y = (x - x_mean).max().compute()

上面的示例计算了移除偏差后的分布的最大值。这涉及到将 x 的数据块加载到内存中以计算 x_mean。然而,由于在后续计算中需要 x 数组来计算 y,整个 x 数组被保留在内存中。对于大型 Dask 数组来说,这可能会带来很大问题。

为了缓解整个 x 数组需要保存在内存中的问题,可以将最后一行重写如下

>>> from dask.graph_manipulation import bind
>>> xb = bind(x, x_mean)
>>> y = (xb - x_mean).max().compute()

这里我们使用 bind() 创建了一个新的 Dask 数组 xb,它产生的输出与 x 完全相同,但其底层 Dask 图具有与 x 不同的键,并且只会在 x_mean 计算完成后才会被计算。

这使得 x 的数据块被计算后立即由 mean 进行单独归约;然后重新计算并再次立即通过流水线进行减法操作,接着由 max 进行归约。这导致峰值内存使用量大大减少,因为完整的 x 数组不再被加载到内存中。然而,代价是计算时间增加,因为 x 被计算了两次。

API

checkpoint(*collections[, split_every])

构建一个 Dask Delayed 对象,该对象会等待输入集合的所有数据块计算完成后再返回 None。

wait_on(*collections[, split_every])

确保所有输入集合的所有数据块都已计算完成后,再计算任何数据块的依赖项。

bind(children, parents, *[, omit, seed, ...])

使 children 集合(可选地省略子集合)依赖于 parents 集合。

clone(*collections[, omit, seed, assume_layers])

克隆 dask 集合,返回通过独立计算生成的等效集合。

定义

dask.graph_manipulation.checkpoint(*collections, split_every: Optional[Union[float, Literal[False]]] = None) dask.delayed.Delayed[source]

构建一个 Dask Delayed 对象,该对象会等待输入集合的所有数据块计算完成后再返回 None。

参数
collections

零个或多个 Dask 集合或包含零个或多个集合的嵌套数据结构

split_every: int >= 2 或 False,可选

确定递归聚合的深度。如果大于输入键的数量,聚合将分多步执行;聚合图的深度将是 \(log_{split_every}(input keys)\)。设置为较小的值可以减少缓存大小和网络传输,但代价是占用更多 CPU 并生成更大的 dask 图。

设置为 False 禁用。默认为 8。

返回
产生 None 的 Dask Delayed 对象
dask.graph_manipulation.wait_on(*collections, split_every: Optional[Union[float, Literal[False]]] = None)[source]

确保所有输入集合的所有数据块都已计算完成后,再计算任何数据块的依赖项。

以下示例创建了一个 dask 数组 u,该数组在计算中使用时,只会在数组 x 的所有数据块都计算完成后才继续执行,否则其行为与 x 相同。

>>> import dask.array as da
>>> x = da.ones(10, chunks=5)
>>> u = wait_on(x)

以下示例将创建两个数组 uv,它们在计算中使用时,只会在数组 xy 的所有数据块都计算完成后才继续执行,否则其行为与 xy 相同。

>>> x = da.ones(10, chunks=5)
>>> y = da.zeros(10, chunks=5)
>>> u, v = wait_on(x, y)
参数
collections

零个或多个 Dask 集合或 Dask 集合的嵌套结构

split_every

参见 checkpoint()

返回
collections 相同

与输入类型相同的 Dask 集合,计算结果相同,或与输入等效的嵌套结构,其中原始集合已被替换。新集合中重新生成的节点的键将与原始节点的键不同,以便它们可以在同一个图中使用。

dask.graph_manipulation.bind(children: dask.graph_manipulation.T, parents, *, omit=None, seed: collections.abc.Hashable | None = None, assume_layers: bool = True, split_every: Optional[Union[float, Literal[False]]] = None) dask.graph_manipulation.T[source]

使 children 集合(可选地省略子集合)依赖于 parents 集合。以下是两个示例。

第一个示例创建了一个数组 b2,其计算过程是先完整计算数组 a,然后完整计算 b,并在计算 b 的过程中重新计算 a

>>> import dask
>>> import dask.array as da
>>> a = da.ones(4, chunks=2)
>>> b = a + 1
>>> b2 = bind(b, a)
>>> len(b2.dask)
9
>>> b2.compute()
array([2., 2., 2., 2.])

第二个示例创建了数组 b3c3,它们的计算过程是先计算数组 a,然后计算加法,这次在过程中不再重新计算 a

>>> c = a + 2
>>> b3, c3 = bind((b, c), a, omit=a)
>>> len(b3.dask), len(c3.dask)
(7, 7)
>>> dask.compute(b3, c3)
(array([2., 2., 2., 2.]), array([3., 3., 3., 3.]))
参数
children

Dask 集合或 Dask 集合的嵌套结构

parents

Dask 集合或 Dask 集合的嵌套结构

omit

Dask 集合或 Dask 集合的嵌套结构

seed

用于种子化键重新生成的 Hashable 对象。省略时默认为一个随机数,每次调用都会生成不同的键。

assume_layers
True

使用在层级工作的快速算法,这假设 childrenomit 中的所有集合都

  1. 使用 HighLevelGraph

  2. 定义了 __dask_layers__() 方法,并且

  3. omit 集合和 children 集合创建之间从未被压扁和重建其图;换句话说,如果在 children 集合的键中可以找到 omit 集合的键,那么对于层来说也必须如此。

False

使用在键级工作的较慢算法,这不作上述任何假设。

split_every

参见 checkpoint()

返回
children 相同

children 等效的 Dask 集合或 Dask 集合结构,计算结果相同。 children 的所有节点都将被重新生成,直到(但不包括)omit 的节点。紧邻 omit 上方的节点,或者如果未找到 omit 中的集合则为叶节点,在 parents 中的所有集合完全计算完成之前,将无法计算。重新生成的节点的键将与原始节点的键不同,以便它们可以在同一个图中使用。

dask.graph_manipulation.clone(*collections, omit=None, seed: collections.abc.Hashable = None, assume_layers: bool = True)[source]

克隆 dask 集合,返回通过独立计算生成的等效集合。

参数
collections

零个或多个 Dask 集合或 Dask 集合的嵌套结构

omit

不会被克隆的 Dask 集合或 Dask 集合的嵌套结构

seed

参见 bind()

assume_layers

参见 bind()

返回
collections 相同

与输入类型相同的 Dask 集合,计算结果相同,或与输入等效的嵌套结构,其中原始集合已被替换。新集合中重新生成的节点的键将与原始节点的键不同,以便它们可以在同一个图中使用。

示例

(为简洁起见,标记已简化)

>>> import dask.array as da
>>> x_i = da.asarray([1, 1, 1, 1], chunks=2)
>>> y_i = x_i + 1
>>> z_i = y_i + 2
>>> dict(z_i.dask)  
{('array-1', 0): array([1, 1]),
 ('array-1', 1): array([1, 1]),
 ('add-2', 0): (<function operator.add>, ('array-1', 0), 1),
 ('add-2', 1): (<function operator.add>, ('array-1', 1), 1),
 ('add-3', 0): (<function operator.add>, ('add-2', 0), 1),
 ('add-3', 1): (<function operator.add>, ('add-2', 1), 1)}
>>> w_i = clone(z_i, omit=x_i)
>>> w_i.compute()
array([4, 4, 4, 4])
>>> dict(w_i.dask)  
{('array-1', 0): array([1, 1]),
 ('array-1', 1): array([1, 1]),
 ('add-4', 0): (<function operator.add>, ('array-1', 0), 1),
 ('add-4', 1): (<function operator.add>, ('array-1', 1), 1),
 ('add-5', 0): (<function operator.add>, ('add-4', 0), 1),
 ('add-5', 1): (<function operator.add>, ('add-4', 1), 1)}

clone() 的典型使用模式如下

>>> x = cheap_computation_with_large_output()  
>>> y = expensive_and_long_computation(x)  
>>> z = wrap_up(clone(x), y)  

在上面的代码中,一旦 x 的数据块被 y 的数据块消费,它们就会被遗忘,然后在计算的最后重新生成。如果没有 clone(),x 只会被计算一次,然后保留在内存中,贯穿整个 y 的计算过程,不必要地消耗内存。