重叠计算

有些数组操作需要相邻块之间边界的通信。示例操作包括以下几种:

  • 对图像应用卷积滤波器

  • 滑动求和/平均/最大值等…

  • 搜索可能跨越块边界的图像基序,如高斯斑点

  • 计算偏导数

  • 生命游戏

Dask Array 通过创建一个新数组来支持这些操作,新数组中的每个块都由其相邻块的边界略微扩展。这会产生额外的复制成本和许多小块的通信开销,但允许局部函数以易并行的方式进行评估。

这些计算的主要 API 是下面定义的 map_overlap 方法

map_overlap(func, *args[, depth, boundary, ...])

将函数映射到带有重叠的数组块上

dask.array.map_overlap(func, *args, depth=None, boundary=None, trim=True, align_arrays=True, allow_rechunk=True, **kwargs)[source]

将函数映射到带有重叠的数组块上

我们在数组块之间共享相邻区域,然后应用函数,并修剪掉相邻的条带。如果沿特定轴的深度大于任何分块,则数组将被重新分块。

请注意,此函数将在计算输出数组类型之前尝试自动确定其类型;如果您预计函数在操作 0 维数组时不会成功,请参阅 map_blocks 中的 meta 关键字参数。

参数
func: 函数

应用于每个扩展块的函数。如果提供了多个数组,函数应按相同顺序接收每个数组的分块。

**args**dask 数组
depth: int, tuple, dict 或 list,仅限关键字参数

每个块应与其相邻块共享的元素数量。如果为 tuple 或 dict,则可以按轴不同。如果为 list,则该列表的每个元素必须是 int、tuple 或 dict,用于定义 args 中相应数组的深度。可以使用 (-/+) 元组的 dict 值指定不对称深度。请注意,目前仅当 boundary 为 ‘none’ 时才支持不对称深度。默认值为 0。

boundary: str, tuple, dict 或 list,仅限关键字参数

如何处理边界。可选值包括 ‘reflect’、‘periodic’、‘nearest’、‘none’,或任何常数值,如 0 或 np.nan。如果为 list,则每个元素必须是 str、tuple 或 dict,用于定义 args 中相应数组的边界。默认值为 ‘reflect’。

trim: bool,仅限关键字参数

调用 map 函数后是否修剪每个块中 depth 个元素。如果您的映射函数已为您执行此操作,请将其设置为 False

align_arrays: bool,仅限关键字参数

提供多个数组时,是否沿相等大小的维度对齐分块。这允许将某些数组中的较大分块拆分成与其余数组中分块大小匹配的较小分块,以便它们与块函数映射兼容。如果此参数为 False,则如果数组在每个维度中没有相同数量的块,将抛出错误。

allow_rechunk: bool,仅限关键字参数

允许重新分块,否则分块大小必须匹配,并且核心维度必须仅由一个分块组成。

**kwargs

map_blocks 中有效的其他关键字参数

示例

>>> import numpy as np
>>> import dask.array as da
>>> x = np.array([1, 1, 2, 3, 3, 3, 2, 1, 1])
>>> x = da.from_array(x, chunks=5)
>>> def derivative(x):
...     return x - np.roll(x, 1)
>>> y = x.map_overlap(derivative, depth=1, boundary=0)
>>> y.compute()
array([ 1,  0,  1,  1,  0,  0, -1, -1,  0])
>>> x = np.arange(16).reshape((4, 4))
>>> d = da.from_array(x, chunks=(2, 2))
>>> d.map_overlap(lambda x: x + x.size, depth=1, boundary='reflect').compute()
array([[16, 17, 18, 19],
       [20, 21, 22, 23],
       [24, 25, 26, 27],
       [28, 29, 30, 31]])
>>> func = lambda x: x + x.size
>>> depth = {0: 1, 1: 1}
>>> boundary = {0: 'reflect', 1: 'none'}
>>> d.map_overlap(func, depth, boundary).compute()  
array([[12,  13,  14,  15],
       [16,  17,  18,  19],
       [20,  21,  22,  23],
       [24,  25,  26,  27]])

da.map_overlap 函数也可以接受多个数组。

>>> func = lambda x, y: x + y
>>> x = da.arange(8).reshape(2, 4).rechunk((1, 2))
>>> y = da.arange(4).rechunk(2)
>>> da.map_overlap(func, x, y, depth=1, boundary='reflect').compute() 
array([[ 0,  2,  4,  6],
       [ 4,  6,  8,  10]])

当给定多个数组时,它们无需具有相同的维度数量,但必须可以广播在一起。数组是按块对齐的(就像在 da.map_blocks 中一样),因此这些块必须具有共同的分块大小。只要 align_arrays 为 True,就会自动确定此共同分块。

>>> x = da.arange(8, chunks=4)
>>> y = da.arange(8, chunks=2)
>>> r = da.map_overlap(func, x, y, depth=1, boundary='reflect', align_arrays=True)
>>> len(r.to_delayed())
4
>>> da.map_overlap(func, x, y, depth=1, boundary='reflect', align_arrays=False).compute()
Traceback (most recent call last):
    ...
ValueError: Shapes do not align {'.0': {2, 4}}

另请注意,默认情况下,此函数等同于 map_blocks。必须定义非零 depth,才能在提供给 func 的数组中出现任何重叠。

>>> func = lambda x: x.sum()
>>> x = da.ones(10, dtype='int')
>>> block_args = dict(chunks=(), drop_axis=0)
>>> da.map_blocks(func, x, **block_args).compute()
np.int64(10)
>>> da.map_overlap(func, x, **block_args, boundary='reflect').compute()
np.int64(10)
>>> da.map_overlap(func, x, **block_args, depth=1, boundary='reflect').compute()
np.int64(12)

对于可能无法处理 0 维数组的函数,也可以使用与预期结果类型匹配的空数组指定 meta。在下面的示例中,计算 meta 时,func 将导致 IndexError

>>> x = np.arange(16).reshape((4, 4))
>>> d = da.from_array(x, chunks=(2, 2))
>>> y = d.map_overlap(lambda x: x + x[2], depth=1, boundary='reflect', meta=np.array(()))
>>> y
dask.array<_trim, shape=(4, 4), dtype=float64, chunksize=(2, 2), chunktype=numpy.ndarray>
>>> y.compute()
array([[ 4,  6,  8, 10],
       [ 8, 10, 12, 14],
       [20, 22, 24, 26],
       [24, 26, 28, 30]])

类似地,也可以指定非 NumPy 数组作为 meta

>>> import cupy  
>>> x = cupy.arange(16).reshape((4, 4))  
>>> d = da.from_array(x, chunks=(2, 2))  
>>> y = d.map_overlap(lambda x: x + x[2], depth=1, boundary='reflect', meta=cupy.array(()))  
>>> y  
dask.array<_trim, shape=(4, 4), dtype=float64, chunksize=(2, 2), chunktype=cupy.ndarray>
>>> y.compute()  
array([[ 4,  6,  8, 10],
       [ 8, 10, 12, 14],
       [20, 22, 24, 26],
       [24, 26, 28, 30]])

解释

考虑 Dask 数组中的两个相邻块

Two neighboring blocks which do not overlap.

我们通过在数组之间交换相邻的薄切片来扩展每个块

Two neighboring block with thin strips along their shared border representing data shared between them.

我们在所有方向上都这样做,包括与重叠函数的对角交互

A two-dimensional grid of blocks where each one has thin strips around their borders representing data shared from their neighbors. They include small corner bits for data shared from diagonal neighbors as well.
>>> import dask.array as da
>>> import numpy as np

>>> x = np.arange(64).reshape((8, 8))
>>> d = da.from_array(x, chunks=(4, 4))
>>> d.chunks
((4, 4), (4, 4))

>>> g = da.overlap.overlap(d, depth={0: 2, 1: 1},
...                       boundary={0: 100, 1: 'reflect'})
>>> g.chunks
((8, 8), (6, 6))

>>> np.array(g)
array([[100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100],
       [100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100],
       [  0,   0,   1,   2,   3,   4,   3,   4,   5,   6,   7,   7],
       [  8,   8,   9,  10,  11,  12,  11,  12,  13,  14,  15,  15],
       [ 16,  16,  17,  18,  19,  20,  19,  20,  21,  22,  23,  23],
       [ 24,  24,  25,  26,  27,  28,  27,  28,  29,  30,  31,  31],
       [ 32,  32,  33,  34,  35,  36,  35,  36,  37,  38,  39,  39],
       [ 40,  40,  41,  42,  43,  44,  43,  44,  45,  46,  47,  47],
       [ 16,  16,  17,  18,  19,  20,  19,  20,  21,  22,  23,  23],
       [ 24,  24,  25,  26,  27,  28,  27,  28,  29,  30,  31,  31],
       [ 32,  32,  33,  34,  35,  36,  35,  36,  37,  38,  39,  39],
       [ 40,  40,  41,  42,  43,  44,  43,  44,  45,  46,  47,  47],
       [ 48,  48,  49,  50,  51,  52,  51,  52,  53,  54,  55,  55],
       [ 56,  56,  57,  58,  59,  60,  59,  60,  61,  62,  63,  63],
       [100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100],
       [100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100]])

边界

关于重叠,您可以指定如何处理边界。当前的策略包括以下几种:

  • periodic - 将边界环绕到另一侧

  • reflect - 向外反射每个边界

  • any-constant - 使用此值填充边界

示例边界类型参数可能如下所示

{0: 'periodic',
 1: 'reflect',
 2: np.nan}

或者,您可以使用 dask.array.pad() 进行其他类型的填充。

跨块映射函数

重叠与跨块映射函数密切相关。此函数现在可以使用从邻居复制过来的、未存储在每个块本地的附加信息

>>> from scipy.ndimage import gaussian_filter
>>> def func(block):
...    return gaussian_filter(block, sigma=1)

>>> filt = g.map_blocks(func)

虽然在此例中我们使用了 SciPy 函数,但也可以使用任何任意函数。这是与 Numba 良好的交互点。

如果您的函数不保留块的形状,则需要提供 chunks 关键字参数。如果您的块大小规则,则此参数可以采用块形状,例如 (1000, 1000)。如果块大小不规则,它必须是一个 tuple,包含完整的分块形状,如 ((1000, 700, 1000), (200, 300))

>>> g.map_blocks(myfunc, chunks=(5, 5))

如果您的函数需要知道其操作的块的位置,您可以为函数提供一个关键字参数 block_id

def func(block, block_id=None):
    ...

这个额外的关键字参数将获得一个 tuple,提供块的位置,例如左上角的块为 (0, 0),该块右侧的块为 (0, 1)

修剪多余部分

在映射分块函数后,您可能希望从每个块中修剪掉与它们被扩展量相同的边界。这里的函数 trim_internal 很有用,它接受与 overlap 相同的 depth 参数

>>> x.chunks
((10, 10, 10, 10), (10, 10, 10, 10))

>>> y = da.overlap.trim_internal(x, {0: 2, 1: 1})
>>> y.chunks
((6, 6, 6, 6), (8, 8, 8, 8))

完整工作流程

因此,一个相当典型的重叠工作流程包括 overlapmap_blockstrim_internal

>>> x = ...
>>> g = da.overlap.overlap(x, depth={0: 2, 1: 2},
...                        boundary={0: 'periodic', 1: 'periodic'})
>>> g2 = g.map_blocks(myfunc)
>>> result = da.overlap.trim_internal(g2, {0: 2, 1: 2})