重叠计算

有些数组操作需要相邻分块之间进行边界通信。示例操作包括以下内容:

  • 在图像上应用滤波器(卷积)

  • 滑动求和/均值/最大值等

  • 搜索可能跨越分块边界的图像特征,例如高斯斑点

  • 计算偏导数

  • 生命游戏

Dask Array 通过创建一个新数组来支持这些操作,新数组中的每个分块都会因相邻分块的边界而略微扩展。这会产生额外的复制成本和许多小块的通信,但允许局部函数以易于并行的方式进行计算。

这些计算的主要 API 是下面定义的 map_overlap 方法

map_overlap(func, *args[, depth, boundary, ...])

在带有重叠部分的数组分块上映射函数

dask.array.map_overlap(func, *args, depth=None, boundary=None, trim=True, align_arrays=True, allow_rechunk=True, **kwargs)[source]

在带有重叠部分的数组分块上映射函数

我们在数组分块之间共享相邻区域,然后应用一个函数,并修剪掉相邻条带。如果沿特定轴的深度大于任何分块,则数组会被重新分块。

请注意,此函数在计算之前会尝试自动确定输出数组类型,如果您预期该函数在处理 0 维数组时不会成功,请参考 map_blocks 中的 meta 关键字参数。

参数
func: 函数

应用于每个扩展分块的函数。如果提供了多个数组,则函数应按相同顺序接收每个数组的分块。

argsdask 数组
depth: int, tuple, dict 或 list,仅限关键字参数

每个分块应与其邻居共享的元素数量。如果是 tuple 或 dict,则可以为每个轴设置不同的值。如果是 list,则该列表的每个元素必须是 int、tuple 或 dict,用于定义 args 中相应数组的深度。不对称深度可以使用 (-/+) tuple 的 dict 值指定。请注意,当前仅当 boundary 为 'none' 时才支持不对称深度。默认值为 0。

boundary: str, tuple, dict 或 list,仅限关键字参数

如何处理边界。值包括 'reflect'(反射)、'periodic'(周期性)、'nearest'(最近)、'none'(无),或任何常量值,如 0 或 np.nan。如果是 list,则每个元素必须是 str、tuple 或 dict,用于定义 args 中相应数组的边界。默认值为 'reflect'。

trim: bool,仅限关键字参数

在调用映射函数后是否从每个分块修剪掉 depth 元素。如果您的映射函数已为您完成此操作,请将其设置为 False。

align_arrays: bool,仅限关键字参数

当提供多个数组时,是否沿大小相等的维度对齐分块。这允许将某些数组中较大的分块分解成与其它数组分块大小匹配的较小分块,以便它们兼容分块函数映射。如果此参数为 false,则如果数组在每个维度上没有相同数量的分块,将抛出错误。

allow_rechunk: bool,仅限关键字参数

允许重新分块,否则分块大小需要匹配,并且核心维度只能包含一个分块。

**kwargs

map_blocks 中有效的其它关键字参数

示例

>>> import numpy as np
>>> import dask.array as da
>>> x = np.array([1, 1, 2, 3, 3, 3, 2, 1, 1])
>>> x = da.from_array(x, chunks=5)
>>> def derivative(x):
...     return x - np.roll(x, 1)
>>> y = x.map_overlap(derivative, depth=1, boundary=0)
>>> y.compute()
array([ 1,  0,  1,  1,  0,  0, -1, -1,  0])
>>> x = np.arange(16).reshape((4, 4))
>>> d = da.from_array(x, chunks=(2, 2))
>>> d.map_overlap(lambda x: x + x.size, depth=1, boundary='reflect').compute()
array([[16, 17, 18, 19],
       [20, 21, 22, 23],
       [24, 25, 26, 27],
       [28, 29, 30, 31]])
>>> func = lambda x: x + x.size
>>> depth = {0: 1, 1: 1}
>>> boundary = {0: 'reflect', 1: 'none'}
>>> d.map_overlap(func, depth, boundary).compute()  
array([[12,  13,  14,  15],
       [16,  17,  18,  19],
       [20,  21,  22,  23],
       [24,  25,  26,  27]])

函数 da.map_overlap 也可以接受多个数组。

>>> func = lambda x, y: x + y
>>> x = da.arange(8).reshape(2, 4).rechunk((1, 2))
>>> y = da.arange(4).rechunk(2)
>>> da.map_overlap(func, x, y, depth=1, boundary='reflect').compute() 
array([[ 0,  2,  4,  6],
       [ 4,  6,  8,  10]])

当给出多个数组时,它们无需具有相同的维度数量,但必须能够一起广播。数组会逐块对齐(就像在 da.map_blocks 中一样),因此分块必须具有共同的分块大小。只要 align_arrays 为 True,就会自动确定这种共同的分块。

>>> x = da.arange(8, chunks=4)
>>> y = da.arange(8, chunks=2)
>>> r = da.map_overlap(func, x, y, depth=1, boundary='reflect', align_arrays=True)
>>> len(r.to_delayed())
4
>>> da.map_overlap(func, x, y, depth=1, boundary='reflect', align_arrays=False).compute()
Traceback (most recent call last):
    ...
ValueError: Shapes do not align {'.0': {2, 4}}

另请注意,此函数默认情况下等同于 map_blocks。必须定义非零的 depth,以使提供给 func 的数组中出现任何重叠。

>>> func = lambda x: x.sum()
>>> x = da.ones(10, dtype='int')
>>> block_args = dict(chunks=(), drop_axis=0)
>>> da.map_blocks(func, x, **block_args).compute()
np.int64(10)
>>> da.map_overlap(func, x, **block_args, boundary='reflect').compute()
np.int64(10)
>>> da.map_overlap(func, x, **block_args, depth=1, boundary='reflect').compute()
np.int64(12)

对于可能无法处理 0 维数组的函数,也可以使用与预期结果类型匹配的空数组指定 meta。在下面的示例中,计算 meta 时,func 将导致 IndexError

>>> x = np.arange(16).reshape((4, 4))
>>> d = da.from_array(x, chunks=(2, 2))
>>> y = d.map_overlap(lambda x: x + x[2], depth=1, boundary='reflect', meta=np.array(()))
>>> y
dask.array<_trim, shape=(4, 4), dtype=float64, chunksize=(2, 2), chunktype=numpy.ndarray>
>>> y.compute()
array([[ 4,  6,  8, 10],
       [ 8, 10, 12, 14],
       [20, 22, 24, 26],
       [24, 26, 28, 30]])

类似地,也可以向 meta 指定非 NumPy 数组。

>>> import cupy  
>>> x = cupy.arange(16).reshape((4, 4))  
>>> d = da.from_array(x, chunks=(2, 2))  
>>> y = d.map_overlap(lambda x: x + x[2], depth=1, boundary='reflect', meta=cupy.array(()))  
>>> y  
dask.array<_trim, shape=(4, 4), dtype=float64, chunksize=(2, 2), chunktype=cupy.ndarray>
>>> y.compute()  
array([[ 4,  6,  8, 10],
       [ 8, 10, 12, 14],
       [20, 22, 24, 26],
       [24, 26, 28, 30]])

说明

考虑 Dask 数组中的两个相邻分块

Two neighboring blocks which do not overlap.

我们通过在数组之间交换相邻的薄切片来扩展每个分块

Two neighboring block with thin strips along their shared border representing data shared between them.

我们在所有方向上执行此操作,包括与重叠函数的对角交互

A two-dimensional grid of blocks where each one has thin strips around their borders representing data shared from their neighbors. They include small corner bits for data shared from diagonal neighbors as well.
>>> import dask.array as da
>>> import numpy as np

>>> x = np.arange(64).reshape((8, 8))
>>> d = da.from_array(x, chunks=(4, 4))
>>> d.chunks
((4, 4), (4, 4))

>>> g = da.overlap.overlap(d, depth={0: 2, 1: 1},
...                       boundary={0: 100, 1: 'reflect'})
>>> g.chunks
((8, 8), (6, 6))

>>> np.array(g)
array([[100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100],
       [100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100],
       [  0,   0,   1,   2,   3,   4,   3,   4,   5,   6,   7,   7],
       [  8,   8,   9,  10,  11,  12,  11,  12,  13,  14,  15,  15],
       [ 16,  16,  17,  18,  19,  20,  19,  20,  21,  22,  23,  23],
       [ 24,  24,  25,  26,  27,  28,  27,  28,  29,  30,  31,  31],
       [ 32,  32,  33,  34,  35,  36,  35,  36,  37,  38,  39,  39],
       [ 40,  40,  41,  42,  43,  44,  43,  44,  45,  46,  47,  47],
       [ 16,  16,  17,  18,  19,  20,  19,  20,  21,  22,  23,  23],
       [ 24,  24,  25,  26,  27,  28,  27,  28,  29,  30,  31,  31],
       [ 32,  32,  33,  34,  35,  36,  35,  36,  37,  38,  39,  39],
       [ 40,  40,  41,  42,  43,  44,  43,  44,  45,  46,  47,  47],
       [ 48,  48,  49,  50,  51,  52,  51,  52,  53,  54,  55,  55],
       [ 56,  56,  57,  58,  59,  60,  59,  60,  61,  62,  63,  63],
       [100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100],
       [100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100]])

边界处理

关于重叠,您可以指定如何处理边界。当前的策略包括以下几种:

  • periodic - 将边界环绕到另一侧

  • reflect - 将每个边界向外反射

  • any-constant - 用此值填充边界

边界类型参数的示例可能如下所示:

{0: 'periodic',
 1: 'reflect',
 2: np.nan}

或者,您可以使用 dask.array.pad() 进行其它类型的填充。

在分块上映射函数

重叠与在分块上映射函数密切相关。这个函数现在可以使用从邻居复制过来的额外信息,这些信息并未本地存储在每个分块中。

>>> from scipy.ndimage import gaussian_filter
>>> def func(block):
...    return gaussian_filter(block, sigma=1)

>>> filt = g.map_blocks(func)

虽然在这种情况下我们使用了 SciPy 函数,但也可以使用任何任意函数。这是与 Numba 的一个很好的交互点。

如果您的函数不保留分块的形状,那么您需要提供一个 chunks 关键字参数。如果您的分块大小是规则的,那么此参数可以采用例如 (1000, 1000) 的分块形状。如果分块大小不规则,它必须是一个包含完整分块形状的 tuple,例如 ((1000, 700, 1000), (200, 300))

>>> g.map_blocks(myfunc, chunks=(5, 5))

如果您的函数需要知道它操作的分块位置,您可以给您的函数一个关键字参数 block_id

def func(block, block_id=None):
    ...

这个额外的关键字参数将被赋予一个 tuple,提供分块位置,例如左上角分块的 (0, 0) 或该分块右侧分块的 (0, 1)

修剪多余部分

在映射分块函数后,您可能希望从每个分块修剪掉与扩展量相同的边界。这里的函数 trim_internal 会很有用,它接受与 overlap 相同的 depth 参数。

>>> x.chunks
((10, 10, 10, 10), (10, 10, 10, 10))

>>> y = da.overlap.trim_internal(x, {0: 2, 1: 1})
>>> y.chunks
((6, 6, 6, 6), (8, 8, 8, 8))

完整工作流程

因此,一个相当典型的重叠工作流程包括 overlapmap_blockstrim_internal

>>> x = ...
>>> g = da.overlap.overlap(x, depth={0: 2, 1: 2},
...                        boundary={0: 'periodic', 1: 'periodic'})
>>> g2 = g.map_blocks(myfunc)
>>> result = da.overlap.trim_internal(g2, {0: 2, 1: 2})