创建 Dask 数组

您可以从各种常见来源(如 HDF5、NetCDF、Zarr 或任何支持 NumPy 风格切片的格式)加载或存储 Dask 数组。

from_array(x[, chunks, name, lock, asarray, ...])

从类似数组的对象创建 dask 数组。

from_delayed(value, shape[, dtype, meta, name])

从 dask 延迟值创建 dask 数组

from_npy_stack(dirname[, mmap_mode])

从 npy 文件堆栈加载 dask 数组

from_zarr(url[, component, storage_options, ...])

从 zarr 存储格式加载数组

stack(seq[, axis, allow_unknown_chunksizes])

沿着新轴堆叠数组

concatenate(seq[, axis, ...])

沿着现有轴拼接数组

NumPy 切片

from_array(x[, chunks, name, lock, asarray, ...])

从类似数组的对象创建 dask 数组。

许多存储格式都有 Python 项目,使用 NumPy 切片语法公开存储。这些格式包括 HDF5、NetCDF、BColz、Zarr、GRIB 等。例如,我们可以使用 h5py 从 HDF5 文件加载 Dask 数组。

>>> import h5py
>>> f = h5py.File('myfile.hdf5') # HDF5 file
>>> d = f['/data/path']          # Pointer on on-disk array
>>> d.shape                      # d can be very large
(1000000, 1000000)

>>> x = d[:5, :5]                # We slice to get numpy arrays

给定一个像上面 d 一样具有 dtypeshape 属性且支持 NumPy 风格切片的对象,我们可以构建一个延迟的 Dask 数组。

>>> import dask.array as da
>>> x = da.from_array(d, chunks=(1000, 1000))

这个过程完全是延迟的。无论是创建 h5py 对象还是使用 da.from_array 包装它,都没有加载任何数据。

随机数据

对于实验或基准测试,通常会创建随机数据数组。dask.array.random 模块实现了 numpy.random 模块中的大部分函数。下面列出了一些常用函数,完整列表请参阅Array API

random.binomial(*args, **kwargs)

从二项分布中抽取样本。

random.normal(*args, **kwargs)

从正态(高斯)分布中抽取随机样本。

random.poisson(*args, **kwargs)

从泊松分布中抽取样本。

random.random(*args, **kwargs)

返回半开区间 [0.0, 1.0) 内的随机浮点数。

>>> import dask.array as da
>>> rng = da.random.default_rng()
>>> x = rng.random((10000, 10000), chunks=(1000, 1000))

拼接和堆叠

stack(seq[, axis, allow_unknown_chunksizes])

沿着新轴堆叠数组

concatenate(seq[, axis, ...])

沿着现有轴拼接数组

通常我们将数据存储在多个不同的位置,并希望将它们拼接在一起。

dask_arrays = []
for fn in filenames:
    f = h5py.File(fn)
    d = f['/data']
    array = da.from_array(d, chunks=(1000, 1000))
    dask_arrays.append(array)

x = da.concatenate(dask_arrays, axis=0)  # concatenate arrays along first axis

更多信息,请参阅拼接和堆叠文档。

使用 dask.delayed

from_delayed(value, shape[, dtype, meta, name])

从 dask 延迟值创建 dask 数组

stack(seq[, axis, allow_unknown_chunksizes])

沿着新轴堆叠数组

concatenate(seq[, axis, ...])

沿着现有轴拼接数组

有时 NumPy 风格的数据存储在不支持 NumPy 风格切片的格式中。如果有一个 Python 函数可以使用dask.delayed 生成完整数组的各个部分,我们仍然可以围绕这些数据构建 Dask 数组。Dask delayed 允许我们延迟单个函数调用,该调用会创建一个 NumPy 数组。然后,我们可以使用 da.from_delayed 包装这个延迟对象,提供 dtype 和 shape 来生成一个单块的 Dask 数组。此外,我们可以使用之前的 stackconcatenate 来构建一个更大的延迟数组。

例如,考虑使用 skimage.io.imread 加载图像堆栈。

import skimage.io
import dask.array as da
import dask

imread = dask.delayed(skimage.io.imread, pure=True)  # Lazy version of imread

filenames = sorted(glob.glob('*.jpg'))

lazy_images = [imread(path) for path in filenames]   # Lazily evaluate imread on each path
sample = lazy_images[0].compute()  # load the first image (assume rest are same shape/dtype)

arrays = [da.from_delayed(lazy_image,           # Construct a small Dask array
                          dtype=sample.dtype,   # for every lazy value
                          shape=sample.shape)
          for lazy_image in lazy_images]

stack = da.stack(arrays, axis=0)                # Stack all small Dask arrays into one

请参阅关于将 dask.delayed 与集合一起使用的文档

通常,使用 da.map_blocks 比使用 da.stack 快得多。

import glob
import skimage.io
import numpy as np
import dask.array as da

filenames = sorted(glob.glob('*.jpg'))

def read_one_image(block_id, filenames=filenames, axis=0):
    # a function that reads in one chunk of data
    path = filenames[block_id[axis]]
    image = skimage.io.imread(path)
    return np.expand_dims(image, axis=axis)

# load the first image (assume rest are same shape/dtype)
sample = skimage.io.imread(filenames[0])

stack = da.map_blocks(
    read_one_image,
    dtype=sample.dtype,
    chunks=((1,) * len(filenames),  *sample.shape)
)

从 Dask DataFrame

有几种方法可以从 Dask DataFrame 创建 Dask 数组。Dask DataFrame 具有 to_dask_array 方法。

>>> df = dask.dataframes.from_pandas(...)
>>> df.to_dask_array()
dask.array<values, shape=(nan, 3), dtype=float64, chunksize=(nan, 3), chunktype=numpy.ndarray>

这与 Pandas 中的 to_numpy 函数类似。还支持 values 属性。

>>> df.values
dask.array<values, shape=(nan, 3), dtype=float64, chunksize=(nan, 3), chunktype=numpy.ndarray>

然而,这些数组没有已知的块大小,因为 dask.dataframe 不会跟踪每个分区的行数。这意味着某些操作(如切片)将无法正确执行。

块大小可以计算。

>>> df.to_dask_array(lengths=True)
dask.array<array, shape=(100, 3), dtype=float64, chunksize=(50, 3), chunktype=numpy.ndarray>

指定 lengths=True 会立即触发块大小的计算。这使得依赖于已知块大小的下游计算(例如切片)成为可能。

Dask DataFrame 的 to_records 方法也返回一个 Dask 数组,但不计算 shape 信息。

>>> df.to_records()
dask.array<to_records, shape=(nan,), dtype=(numpy.record, [('index', '<i8'), ('x', '<f8'), ('y', '<f8'), ('z', '<f8')]), chunksize=(nan,), chunktype=numpy.ndarray>

如果您有一个将 Pandas DataFrame 转换为 NumPy 数组的函数,那么在 Dask DataFrame 上使用该函数调用 map_partitions 将生成一个 Dask 数组。

>>> df.map_partitions(np.asarray)
dask.array<asarray, shape=(nan, 3), dtype=float64, chunksize=(nan, 3), chunktype=numpy.ndarray>

与 NumPy 数组的交互

Dask 数组操作会自动将 NumPy 数组转换为单块 dask 数组。

>>> x = da.sum(np.ones(5))
>>> x.compute()
5

当 NumPy 和 Dask 数组交互时,结果将是一个 Dask 数组。自动重新分块规则通常会将 NumPy 数组切片成适当的 Dask 块形状。

>>> x = da.ones(10, chunks=(5,))
>>> y = np.ones(10)
>>> z = x + y
>>> z
dask.array<add, shape=(10,), dtype=float64, chunksize=(5,), chunktype=numpy.ndarray>

这些交互不仅适用于 NumPy 数组,也适用于任何具有 shape 和 dtype 属性并实现 NumPy 切片语法的对象。

内存映射

内存映射是访问原始二进制数据的一种高效方法,如果数据已在文件系统缓存中,其开销几乎为零。对于线程调度器,从原始二进制文件创建 Dask 数组可以像 a = da.from_array(np.memmap(filename, shape=shape, dtype=dtype, mode='r')) 一样简单。

对于多进程或分布式调度器,每个数组块的内存映射应在正确的工作进程上创建,而不是在主进程上创建,以避免通过集群传输数据。这可以通过使用 dask.delayed 包装创建内存映射的函数来实现。

import numpy as np
import dask
import dask.array as da


def mmap_load_chunk(filename, shape, dtype, offset, sl):
    '''
    Memory map the given file with overall shape and dtype and return a slice
    specified by :code:`sl`.

    Parameters
    ----------

    filename : str
    shape : tuple
        Total shape of the data in the file
    dtype:
        NumPy dtype of the data in the file
    offset : int
        Skip :code:`offset` bytes from the beginning of the file.
    sl:
        Object that can be used for indexing or slicing a NumPy array to
        extract a chunk

    Returns
    -------

    numpy.memmap or numpy.ndarray
        View into memory map created by indexing with :code:`sl`,
        or NumPy ndarray in case no view can be created using :code:`sl`.
    '''
    data = np.memmap(filename, mode='r', shape=shape, dtype=dtype, offset=offset)
    return data[sl]


def mmap_dask_array(filename, shape, dtype, offset=0, blocksize=5):
    '''
    Create a Dask array from raw binary data in :code:`filename`
    by memory mapping.

    This method is particularly effective if the file is already
    in the file system cache and if arbitrary smaller subsets are
    to be extracted from the Dask array without optimizing its
    chunking scheme.

    It may perform poorly on Windows if the file is not in the file
    system cache. On Linux it performs well under most circumstances.

    Parameters
    ----------

    filename : str
    shape : tuple
        Total shape of the data in the file
    dtype:
        NumPy dtype of the data in the file
    offset : int, optional
        Skip :code:`offset` bytes from the beginning of the file.
    blocksize : int, optional
        Chunk size for the outermost axis. The other axes remain unchunked.

    Returns
    -------

    dask.array.Array
        Dask array matching :code:`shape` and :code:`dtype`, backed by
        memory-mapped chunks.
    '''
    load = dask.delayed(mmap_load_chunk)
    chunks = []
    for index in range(0, shape[0], blocksize):
        # Truncate the last chunk if necessary
        chunk_size = min(blocksize, shape[0] - index)
        chunk = dask.array.from_delayed(
            load(
                filename,
                shape=shape,
                dtype=dtype,
                offset=offset,
                sl=slice(index, index + chunk_size)
            ),
            shape=(chunk_size, ) + shape[1:],
            dtype=dtype
        )
        chunks.append(chunk)
    return da.concatenate(chunks, axis=0)

x = mmap_dask_array(
    filename='testfile-50-50-100-100-float32.raw',
    shape=(50, 50, 100, 100),
    dtype=np.float32
)

更多信息请参阅数组块文档。

存储 Dask 数组

store(sources, targets[, lock, regions, ...])

将 dask 数组存储在类似数组的对象中,覆盖目标中的数据

to_hdf5(filename, *args[, chunks])

将数组存储在 HDF5 文件中

to_npy_stack(dirname, x[, axis])

将 dask 数组写入 .npy 文件堆栈

to_zarr(arr, url[, component, ...])

将数组保存到 zarr 存储格式

compute(*args[, traverse, optimize_graph, ...])

一次计算多个 dask 集合。

内存中

compute(*args[, traverse, optimize_graph, ...])

一次计算多个 dask 集合。

如果您的数据量较小,可以在 Dask 数组上调用 np.array.compute() 将其转换为普通的 NumPy 数组。

>>> x = da.arange(6, chunks=3)
>>> y = x**2
>>> np.array(y)
array([0, 1, 4, 9, 16, 25])

>>> y.compute()
array([0, 1, 4, 9, 16, 25])

NumPy 风格切片

store(sources, targets[, lock, regions, ...])

将 dask 数组存储在类似数组的对象中,覆盖目标中的数据

您可以将 Dask 数组存储在任何支持 NumPy 风格切片赋值的对象中,例如 h5py.Dataset

>>> import h5py
>>> f = h5py.File('myfile.hdf5')
>>> d = f.require_dataset('/data', shape=x.shape, dtype=x.dtype)
>>> da.store(x, d)

另外,您可以通过传递源列表和目标列表,在一个计算中存储多个数组。

>>> da.store([array1, array2], [output1, output2])  # doctest: +SKIP

HDF5

to_hdf5(filename, *args[, chunks])

将数组存储在 HDF5 文件中

HDF5 非常常见,有一个特殊函数 to_hdf5 用于使用 h5py 将数据存储到 HDF5 文件中。

>>> da.to_hdf5('myfile.hdf5', '/y', y)  # doctest: +SKIP

您可以使用函数 da.to_hdf5 通过传递字典,在一个计算中存储多个数组。

>>> da.to_hdf5('myfile.hdf5', {'/x': x, '/y': y})  # doctest: +SKIP

Zarr

Zarr 格式是一种基于块的二进制数组存储文件格式,具有丰富的编码和压缩选项。由于每个块都存储在一个单独的文件中,因此非常适合并行读写(对于后者,如果 Dask 数组块与目标对齐)。此外,还支持在远程数据服务(如 S3 和 GCS)中存储。

例如,要将数据保存到本地 zarr 数据集,您可以这样做:

>>> arr.to_zarr('output.zarr')

或者保存到 S3 上的特定存储桶

>>> arr.to_zarr('s3://mybucket/output.zarr', storage_option={'key': 'mykey',
                'secret': 'mysecret'})

或者您自己的自定义 zarr 数组

>>> z = zarr.create((10,), dtype=float, store=zarr.ZipStore("output.zarr"))
>>> arr.to_zarr(z)

要检索这些数据,您需要使用完全相同的参数调用 da.from_zarr。生成的 Dask 数组的分块方式由文件保存方式决定,除非另有指定。

TileDB

TileDB 是一种二进制数组格式和存储管理器,具有可调整的分块、布局和压缩选项。TileDB 存储管理器库支持可伸缩的存储后端,如 S3 API 兼容的对象存储和 HDFS,具有自动伸缩功能,并支持多线程和多进程读(一致)和写(最终一致)。

要将数据保存到本地 TileDB 数组

>>> arr.to_tiledb('output.tdb')

或者保存到 S3 上的存储桶

>>> arr.to_tiledb('s3://mybucket/output.tdb',
                  storage_options={'vfs.s3.aws_access_key_id': 'mykey',
                                   'vfs.s3.aws_secret_access_key': 'mysecret'})

可以使用相同的 URI 和任何必要的参数运行 da.from_tiledb 来检索文件。

中间存储

store(sources, targets[, lock, regions, ...])

将 dask 数组存储在类似数组的对象中,覆盖目标中的数据

在某些情况下,可能希望将中间结果存储到长期存储中。这与 persist 不同,persist 主要用于管理 Dask 内部不一定具有长期性的中间结果。它也与存储最终结果不同,因为最终结果标志着 Dask 图的结束。因此,中间结果更容易重用,而无需重新加载数据。中间存储主要用于需要在 Dask 外部(例如磁盘、数据库、云等)使用数据的情况。它可以作为长时间运行或容易出错的计算的检查点。

中间存储的使用场景与典型的存储使用场景不同,因为它会向用户返回一个表示该存储操作结果的 Dask 数组。这通常通过将 store 函数的 return_stored 标志设置为 True 来完成。

x.store()  # stores data, returns nothing
x = x.store(return_stored=True)  # stores data, returns new dask array backed by that data

用户可以决定存储操作是立即发生(通过将 compute 标志设置为 True)还是稍后发生(通过将 compute 标志设置为 False)。在所有其他方面,这与正常调用 store 的行为相同。下面显示了一些示例。

>>> import dask.array as da
>>> import zarr as zr
>>> c = (2, 2)
>>> d = da.ones((10, 11), chunks=c)
>>> z1 = zr.open_array('lazy.zarr', shape=d.shape, dtype=d.dtype, chunks=c)
>>> z2 = zr.open_array('eager.zarr', shape=d.shape, dtype=d.dtype, chunks=c)
>>> d1 = d.store(z1, compute=False, return_stored=True)
>>> d2 = d.store(z2, compute=True, return_stored=True)

这可以与上述文档中或任何特定存储类型中提及的任何其他存储策略相结合。

插件

无论何时构建 Dask 数组,我们都可以对其运行任意用户定义的函数。这使我们能够构建各种自定义行为,从而改进调试、用户警告等。您可以将要在所有 Dask 数组上运行的函数列表注册到全局 array_plugins= 值。

>>> def f(x):
...     print(x.nbytes)

>>> with dask.config.set(array_plugins=[f]):
...     x = da.ones((10, 1), chunks=(5, 1))
...     y = x.dot(x.T)
80
80
800
800

如果插件函数返回 None,则输入的 Dask 数组将不做任何更改地返回。如果插件函数返回其他内容,则该值将作为构造函数的结果。

示例

自动计算

我们可能希望将一些 Dask 数组代码转换为正常的 NumPy 代码。这很有用,例如,可以立即跟踪由 Dask 的延迟语义隐藏的错误。

>>> with dask.config.set(array_plugins=[lambda x: x.compute()]):
...     x = da.arange(5, chunks=2)

>>> x  # this was automatically converted into a numpy array
array([0, 1, 2, 3, 4])

警告大块

如果用户创建的块过大,我们可能希望发出警告。

def warn_on_large_chunks(x):
    shapes = list(itertools.product(*x.chunks))
    nbytes = [x.dtype.itemsize * np.prod(shape) for shape in shapes]
    if any(nb > 1e9 for nb in nbytes):
        warnings.warn("Array contains very large chunks")

with dask.config.set(array_plugins=[warn_on_large_chunks]):
    ...

组合

您还可以将这些插件组合到一个列表中。它们将依次运行,并将结果链接起来。

with dask.config.set(array_plugins=[warn_on_large_chunks, lambda x: x.compute()]):
    ...