创建 Dask 数组

您可以从各种常见来源加载或存储 Dask 数组,例如 HDF5、NetCDF、Zarr,或任何支持 NumPy 风格切片的格式。

from_array(x[, chunks, name, lock, asarray, ...])

从类似数组的对象创建 dask 数组。

from_delayed(value, shape[, dtype, meta, name])

从 dask delayed 值创建 dask 数组

from_npy_stack(dirname[, mmap_mode])

从 npy 文件堆栈加载 dask 数组

from_zarr(url[, component, storage_options, ...])

从 zarr 存储格式加载数组

stack(seq[, axis, allow_unknown_chunksizes])

沿新轴堆叠数组

concatenate(seq[, axis, ...])

沿现有轴连接数组

NumPy 切片

from_array(x[, chunks, name, lock, asarray, ...])

从类似数组的对象创建 dask 数组。

许多存储格式都有使用 NumPy 切片语法暴露存储的 Python 项目。这些包括 HDF5、NetCDF、BColz、Zarr、GRIB 等。例如,我们可以使用 h5py 从 HDF5 文件加载 Dask 数组

>>> import h5py
>>> f = h5py.File('myfile.hdf5') # HDF5 file
>>> d = f['/data/path']          # Pointer on on-disk array
>>> d.shape                      # d can be very large
(1000000, 1000000)

>>> x = d[:5, :5]                # We slice to get numpy arrays

给定一个如上所示的对象,例如 d,它具有 dtypeshape 属性并支持 NumPy 风格切片,我们可以构建一个惰性 Dask 数组

>>> import dask.array as da
>>> x = da.from_array(d, chunks=(1000, 1000))

这个过程完全是惰性的。无论是创建 h5py 对象还是用 da.from_array 包装它,都没有加载任何数据。

随机数据

为了实验或基准测试,通常会创建随机数据数组。dask.array.random 模块实现了 numpy.random 模块中的大部分函数。下面列出了一些常用函数,完整的列表请参阅数组 API

random.binomial(*args, **kwargs)

从二项分布中抽取样本。

random.normal(*args, **kwargs)

从正态(高斯)分布中抽取随机样本。

random.poisson(*args, **kwargs)

从泊松分布中抽取样本。

random.random(*args, **kwargs)

返回半开区间 [0.0, 1.0) 内的随机浮点数。

>>> import dask.array as da
>>> rng = da.random.default_rng()
>>> x = rng.random((10000, 10000), chunks=(1000, 1000))

连接和堆叠

stack(seq[, axis, allow_unknown_chunksizes])

沿新轴堆叠数组

concatenate(seq[, axis, ...])

沿现有轴连接数组

通常我们将数据存储在多个不同的位置,并希望将它们拼接在一起

dask_arrays = []
for fn in filenames:
    f = h5py.File(fn)
    d = f['/data']
    array = da.from_array(d, chunks=(1000, 1000))
    dask_arrays.append(array)

x = da.concatenate(dask_arrays, axis=0)  # concatenate arrays along first axis

更多信息,请参阅连接和堆叠文档。

使用 dask.delayed

from_delayed(value, shape[, dtype, meta, name])

从 dask delayed 值创建 dask 数组

stack(seq[, axis, allow_unknown_chunksizes])

沿新轴堆叠数组

concatenate(seq[, axis, ...])

沿现有轴连接数组

有时 NumPy 风格的数据存在于不支持 NumPy 风格切片的格式中。如果有一个 Python 函数可以使用 dask.delayed 生成完整数组的各个部分,我们仍然可以围绕这些数据构建 Dask 数组。Dask delayed 允许我们延迟一个将创建 NumPy 数组的函数调用。然后我们可以使用 da.from_delayed 包装这个 delayed 对象,提供 dtype 和 shape 来生成一个单分块的 Dask 数组。此外,我们可以使用之前提到的 stackconcatenate 来构建更大的惰性数组。

例如,考虑使用 skimage.io.imread 加载图像堆栈

import skimage.io
import dask.array as da
import dask

imread = dask.delayed(skimage.io.imread, pure=True)  # Lazy version of imread

filenames = sorted(glob.glob('*.jpg'))

lazy_images = [imread(path) for path in filenames]   # Lazily evaluate imread on each path
sample = lazy_images[0].compute()  # load the first image (assume rest are same shape/dtype)

arrays = [da.from_delayed(lazy_image,           # Construct a small Dask array
                          dtype=sample.dtype,   # for every lazy value
                          shape=sample.shape)
          for lazy_image in lazy_images]

stack = da.stack(arrays, axis=0)                # Stack all small Dask arrays into one

请参阅关于将 dask.delayed 与集合一起使用的文档

通常使用 da.map_blocks 会比 da.stack 快得多

import glob
import skimage.io
import numpy as np
import dask.array as da

filenames = sorted(glob.glob('*.jpg'))

def read_one_image(block_id, filenames=filenames, axis=0):
    # a function that reads in one chunk of data
    path = filenames[block_id[axis]]
    image = skimage.io.imread(path)
    return np.expand_dims(image, axis=axis)

# load the first image (assume rest are same shape/dtype)
sample = skimage.io.imread(filenames[0])

stack = da.map_blocks(
    read_one_image,
    dtype=sample.dtype,
    chunks=((1,) * len(filenames),  *sample.shape)
)

从 Dask DataFrame 创建

有几种方法可以从 Dask DataFrame 创建 Dask 数组。Dask DataFrame 有一个 to_dask_array 方法

>>> df = dask.dataframes.from_pandas(...)
>>> df.to_dask_array()
dask.array<values, shape=(nan, 3), dtype=float64, chunksize=(nan, 3), chunktype=numpy.ndarray>

这模仿了 Pandas 中的 to_numpy 函数。也支持 values 属性

>>> df.values
dask.array<values, shape=(nan, 3), dtype=float64, chunksize=(nan, 3), chunktype=numpy.ndarray>

然而,这些数组没有已知的分块大小,因为 dask.dataframe 不跟踪每个分区的行数。这意味着某些操作(如切片)将无法正确执行。

可以计算分块大小

>>> df.to_dask_array(lengths=True)
dask.array<array, shape=(100, 3), dtype=float64, chunksize=(50, 3), chunktype=numpy.ndarray>

指定 lengths=True 会立即触发分块大小的计算。这使得依赖于已知分块大小的后续计算(例如切片)成为可能。

Dask DataFrame 的 to_records 方法也返回一个 Dask 数组,但不计算形状信息

>>> df.to_records()
dask.array<to_records, shape=(nan,), dtype=(numpy.record, [('index', '<i8'), ('x', '<f8'), ('y', '<f8'), ('z', '<f8')]), chunksize=(nan,), chunktype=numpy.ndarray>

如果您有一个将 Pandas DataFrame 转换为 NumPy 数组的函数,那么在 Dask DataFrame 上使用该函数调用 map_partitions 将产生一个 Dask 数组

>>> df.map_partitions(np.asarray)
dask.array<asarray, shape=(nan, 3), dtype=float64, chunksize=(nan, 3), chunktype=numpy.ndarray>

与 NumPy 数组的交互

Dask 数组操作会自动将 NumPy 数组转换为单分块 dask 数组

>>> x = da.sum(np.ones(5))
>>> x.compute()
5

当 NumPy 和 Dask 数组交互时,结果将是一个 Dask 数组。自动 rechunking 规则通常会将 NumPy 数组切片成适当的 Dask 分块形状

>>> x = da.ones(10, chunks=(5,))
>>> y = np.ones(10)
>>> z = x + y
>>> z
dask.array<add, shape=(10,), dtype=float64, chunksize=(5,), chunktype=numpy.ndarray>

这些交互不仅适用于 NumPy 数组,也适用于任何具有 shape 和 dtype 属性并实现 NumPy 切片语法的对象。

内存映射

内存映射是一种访问原始二进制数据的高效方法,因为如果数据已经在文件系统缓存中,其开销几乎为零。对于线程调度器,从原始二进制文件创建 Dask 数组可以像 a = da.from_array(np.memmap(filename, shape=shape, dtype=dtype, mode='r')) 一样简单。

对于多进程或分布式调度器,每个数组分块的内存映射应该在正确的工作进程上创建,而不是在主进程上,以避免通过集群进行数据传输。这可以通过使用 dask.delayed 包装创建内存映射的函数来实现。

import numpy as np
import dask
import dask.array as da


def mmap_load_chunk(filename, shape, dtype, offset, sl):
    '''
    Memory map the given file with overall shape and dtype and return a slice
    specified by :code:`sl`.

    Parameters
    ----------

    filename : str
    shape : tuple
        Total shape of the data in the file
    dtype:
        NumPy dtype of the data in the file
    offset : int
        Skip :code:`offset` bytes from the beginning of the file.
    sl:
        Object that can be used for indexing or slicing a NumPy array to
        extract a chunk

    Returns
    -------

    numpy.memmap or numpy.ndarray
        View into memory map created by indexing with :code:`sl`,
        or NumPy ndarray in case no view can be created using :code:`sl`.
    '''
    data = np.memmap(filename, mode='r', shape=shape, dtype=dtype, offset=offset)
    return data[sl]


def mmap_dask_array(filename, shape, dtype, offset=0, blocksize=5):
    '''
    Create a Dask array from raw binary data in :code:`filename`
    by memory mapping.

    This method is particularly effective if the file is already
    in the file system cache and if arbitrary smaller subsets are
    to be extracted from the Dask array without optimizing its
    chunking scheme.

    It may perform poorly on Windows if the file is not in the file
    system cache. On Linux it performs well under most circumstances.

    Parameters
    ----------

    filename : str
    shape : tuple
        Total shape of the data in the file
    dtype:
        NumPy dtype of the data in the file
    offset : int, optional
        Skip :code:`offset` bytes from the beginning of the file.
    blocksize : int, optional
        Chunk size for the outermost axis. The other axes remain unchunked.

    Returns
    -------

    dask.array.Array
        Dask array matching :code:`shape` and :code:`dtype`, backed by
        memory-mapped chunks.
    '''
    load = dask.delayed(mmap_load_chunk)
    chunks = []
    for index in range(0, shape[0], blocksize):
        # Truncate the last chunk if necessary
        chunk_size = min(blocksize, shape[0] - index)
        chunk = dask.array.from_delayed(
            load(
                filename,
                shape=shape,
                dtype=dtype,
                offset=offset,
                sl=slice(index, index + chunk_size)
            ),
            shape=(chunk_size, ) + shape[1:],
            dtype=dtype
        )
        chunks.append(chunk)
    return da.concatenate(chunks, axis=0)

x = mmap_dask_array(
    filename='testfile-50-50-100-100-float32.raw',
    shape=(50, 50, 100, 100),
    dtype=np.float32
)

分块

更多信息,请参阅关于数组分块的文档

存储 Dask 数组

store(sources, targets[, lock, regions, ...])

将 dask 数组存储到类似数组的对象中,覆盖目标数据

to_hdf5(filename, *args[, chunks])

将数组存储到 HDF5 文件中

to_npy_stack(dirname, x[, axis])

将 dask 数组写入 .npy 文件堆栈

to_zarr(arr, url[, component, ...])

将数组保存到 zarr 存储格式

compute(*args[, traverse, optimize_graph, ...])

一次计算多个 dask 集合。

内存中

compute(*args[, traverse, optimize_graph, ...])

一次计算多个 dask 集合。

如果数据量小,可以在 Dask 数组上调用 np.array.compute() 将其转换为普通的 NumPy 数组

>>> x = da.arange(6, chunks=3)
>>> y = x**2
>>> np.array(y)
array([0, 1, 4, 9, 16, 25])

>>> y.compute()
array([0, 1, 4, 9, 16, 25])

NumPy 风格切片

store(sources, targets[, lock, regions, ...])

将 dask 数组存储到类似数组的对象中,覆盖目标数据

可以将 Dask 数组存储到任何支持 NumPy 风格切片赋值的对象中,例如 h5py.Dataset

>>> import h5py
>>> f = h5py.File('myfile.hdf5')
>>> d = f.require_dataset('/data', shape=x.shape, dtype=x.dtype)
>>> da.store(x, d)

此外,可以通过传递源和目标的列表,在一次计算中存储多个数组

>>> da.store([array1, array2], [output1, output2])  # doctest: +SKIP

HDF5

to_hdf5(filename, *args[, chunks])

将数组存储到 HDF5 文件中

HDF5 非常常见,因此有一个特殊函数 to_hdf5,用于使用 h5py 将数据存储到 HDF5 文件中

>>> da.to_hdf5('myfile.hdf5', '/y', y)  # doctest: +SKIP

可以通过传入字典,使用函数 da.to_hdf5 在一次计算中存储多个数组

>>> da.to_hdf5('myfile.hdf5', {'/x': x, '/y': y})  # doctest: +SKIP

Zarr

Zarr 格式是一种按分块存储的二进制数组文件格式,具有多种编码和压缩选项。由于每个分块都存储在单独的文件中,因此非常适合并行读写访问(后者要求 Dask 数组分块与目标对齐)。此外,还支持在 远程数据服务(如 S3 和 GCS)中存储。

例如,要将数据保存到本地 zarr 数据集,您可以这样做

>>> arr.to_zarr('output.zarr')

或保存到 S3 上的特定 bucket

>>> arr.to_zarr('s3://mybucket/output.zarr', storage_option={'key': 'mykey',
                'secret': 'mysecret'})

或您自己的自定义 zarr 数组

>>> z = zarr.create((10,), dtype=float, store=zarr.ZipStore("output.zarr"))
>>> arr.to_zarr(z)

要检索这些数据,可以使用完全相同的参数调用 da.from_zarr。除非另有指定,结果 Dask 数组的分块方式取决于文件保存的方式。

TileDB

TileDB 是一种二进制数组格式和存储管理器,具有可调的分块、布局和压缩选项。TileDB 存储管理器库支持可伸缩的存储后端,例如兼容 S3 API 的对象存储和 HDFS,并支持自动扩展,以及多线程和多进程读取(一致)和写入(最终一致)。

要将数据保存到本地 TileDB 数组

>>> arr.to_tiledb('output.tdb')

或保存到 S3 上的一个 bucket

>>> arr.to_tiledb('s3://mybucket/output.tdb',
                  storage_options={'vfs.s3.aws_access_key_id': 'mykey',
                                   'vfs.s3.aws_secret_access_key': 'mysecret'})

可以通过使用相同的 URI 和任何必要的参数运行 da.from_tiledb 来检索文件。

中间存储

store(sources, targets[, lock, regions, ...])

将 dask 数组存储到类似数组的对象中,覆盖目标数据

在某些情况下,可能希望将中间结果存储到长期存储中。这不同于 persist,后者主要用于管理 Dask 内部不一定具有持久性的中间结果。它也不同于存储最终结果,因为最终结果标志着 Dask 图的结束。因此,中间结果更容易在不重新加载数据的情况下重用。中间存储主要适用于需要在 Dask 外部使用数据的情况(例如在磁盘、数据库、云端等)。它可以用作长时间运行或容易出错的计算的检查点。

中间存储用例与典型的存储用例不同,因为它会向用户返回一个 Dask 数组,代表该存储操作的结果。这通常通过将 store 函数的 return_stored 标志设置为 True 来完成。

x.store()  # stores data, returns nothing
x = x.store(return_stored=True)  # stores data, returns new dask array backed by that data

用户可以决定存储操作是立即发生(通过将 compute 标志设置为 True)还是稍后发生(通过将 compute 标志设置为 False)。在所有其他方面,这与对 store 的正常调用行为相同。下面显示了一些示例。

>>> import dask.array as da
>>> import zarr as zr
>>> c = (2, 2)
>>> d = da.ones((10, 11), chunks=c)
>>> z1 = zr.open_array('lazy.zarr', shape=d.shape, dtype=d.dtype, chunks=c)
>>> z2 = zr.open_array('eager.zarr', shape=d.shape, dtype=d.dtype, chunks=c)
>>> d1 = d.store(z1, compute=False, return_stored=True)
>>> d2 = d.store(z2, compute=True, return_stored=True)

这可以与上面提到、文档中描述的任何其他存储策略或任何专门的存储类型结合使用。

插件

我们可以在 Dask 数组构建时在其上运行任意用户自定义函数。这使我们能够构建各种自定义行为,以改进调试、用户警告等。可以将要应用于所有 Dask 数组的函数列表注册到全局的 array_plugins=

>>> def f(x):
...     print(x.nbytes)

>>> with dask.config.set(array_plugins=[f]):
...     x = da.ones((10, 1), chunks=(5, 1))
...     y = x.dot(x.T)
80
80
800
800

如果插件函数返回 None,则输入的 Dask 数组将不经更改地返回。如果插件函数返回其他值,则该值将成为构造函数的结果。

示例

自动计算

我们可能希望将一些 Dask 数组代码转换为普通的 NumPy 代码。这很有用,例如,可以立即跟踪到否则会被 Dask 惰性语义隐藏的错误

>>> with dask.config.set(array_plugins=[lambda x: x.compute()]):
...     x = da.arange(5, chunks=2)

>>> x  # this was automatically converted into a numpy array
array([0, 1, 2, 3, 4])

大分块警告

我们可能希望在用户创建过大分块时发出警告

def warn_on_large_chunks(x):
    shapes = list(itertools.product(*x.chunks))
    nbytes = [x.dtype.itemsize * np.prod(shape) for shape in shapes]
    if any(nb > 1e9 for nb in nbytes):
        warnings.warn("Array contains very large chunks")

with dask.config.set(array_plugins=[warn_on_large_chunks]):
    ...

组合

您还可以将这些插件组合成一个列表。它们将依次运行,并将结果通过它们串联起来

with dask.config.set(array_plugins=[warn_on_large_chunks, lambda x: x.compute()]):
    ...