创建 Dask 数组
目录
创建 Dask 数组¶
您可以从各种常见来源(如 HDF5、NetCDF、Zarr 或任何支持 NumPy 风格切片的格式)加载或存储 Dask 数组。
|
从类似数组的对象创建 dask 数组。 |
|
从 dask 延迟值创建 dask 数组 |
|
从 npy 文件堆栈加载 dask 数组 |
|
从 zarr 存储格式加载数组 |
|
沿着新轴堆叠数组 |
|
沿着现有轴拼接数组 |
NumPy 切片¶
|
从类似数组的对象创建 dask 数组。 |
许多存储格式都有 Python 项目,使用 NumPy 切片语法公开存储。这些格式包括 HDF5、NetCDF、BColz、Zarr、GRIB 等。例如,我们可以使用 h5py 从 HDF5 文件加载 Dask 数组。
>>> import h5py
>>> f = h5py.File('myfile.hdf5') # HDF5 file
>>> d = f['/data/path'] # Pointer on on-disk array
>>> d.shape # d can be very large
(1000000, 1000000)
>>> x = d[:5, :5] # We slice to get numpy arrays
给定一个像上面 d
一样具有 dtype
和 shape
属性且支持 NumPy 风格切片的对象,我们可以构建一个延迟的 Dask 数组。
>>> import dask.array as da
>>> x = da.from_array(d, chunks=(1000, 1000))
这个过程完全是延迟的。无论是创建 h5py 对象还是使用 da.from_array
包装它,都没有加载任何数据。
随机数据¶
对于实验或基准测试,通常会创建随机数据数组。dask.array.random
模块实现了 numpy.random
模块中的大部分函数。下面列出了一些常用函数,完整列表请参阅Array API。
|
从二项分布中抽取样本。 |
|
从正态(高斯)分布中抽取随机样本。 |
|
从泊松分布中抽取样本。 |
|
返回半开区间 [0.0, 1.0) 内的随机浮点数。 |
>>> import dask.array as da
>>> rng = da.random.default_rng()
>>> x = rng.random((10000, 10000), chunks=(1000, 1000))
拼接和堆叠¶
|
沿着新轴堆叠数组 |
|
沿着现有轴拼接数组 |
通常我们将数据存储在多个不同的位置,并希望将它们拼接在一起。
dask_arrays = []
for fn in filenames:
f = h5py.File(fn)
d = f['/data']
array = da.from_array(d, chunks=(1000, 1000))
dask_arrays.append(array)
x = da.concatenate(dask_arrays, axis=0) # concatenate arrays along first axis
更多信息,请参阅拼接和堆叠文档。
使用 dask.delayed
¶
|
从 dask 延迟值创建 dask 数组 |
|
沿着新轴堆叠数组 |
|
沿着现有轴拼接数组 |
有时 NumPy 风格的数据存储在不支持 NumPy 风格切片的格式中。如果有一个 Python 函数可以使用dask.delayed 生成完整数组的各个部分,我们仍然可以围绕这些数据构建 Dask 数组。Dask delayed 允许我们延迟单个函数调用,该调用会创建一个 NumPy 数组。然后,我们可以使用 da.from_delayed
包装这个延迟对象,提供 dtype 和 shape 来生成一个单块的 Dask 数组。此外,我们可以使用之前的 stack
或 concatenate
来构建一个更大的延迟数组。
例如,考虑使用 skimage.io.imread
加载图像堆栈。
import skimage.io
import dask.array as da
import dask
imread = dask.delayed(skimage.io.imread, pure=True) # Lazy version of imread
filenames = sorted(glob.glob('*.jpg'))
lazy_images = [imread(path) for path in filenames] # Lazily evaluate imread on each path
sample = lazy_images[0].compute() # load the first image (assume rest are same shape/dtype)
arrays = [da.from_delayed(lazy_image, # Construct a small Dask array
dtype=sample.dtype, # for every lazy value
shape=sample.shape)
for lazy_image in lazy_images]
stack = da.stack(arrays, axis=0) # Stack all small Dask arrays into one
请参阅关于将 dask.delayed 与集合一起使用的文档。
通常,使用 da.map_blocks
比使用 da.stack
快得多。
import glob
import skimage.io
import numpy as np
import dask.array as da
filenames = sorted(glob.glob('*.jpg'))
def read_one_image(block_id, filenames=filenames, axis=0):
# a function that reads in one chunk of data
path = filenames[block_id[axis]]
image = skimage.io.imread(path)
return np.expand_dims(image, axis=axis)
# load the first image (assume rest are same shape/dtype)
sample = skimage.io.imread(filenames[0])
stack = da.map_blocks(
read_one_image,
dtype=sample.dtype,
chunks=((1,) * len(filenames), *sample.shape)
)
从 Dask DataFrame¶
有几种方法可以从 Dask DataFrame 创建 Dask 数组。Dask DataFrame 具有 to_dask_array
方法。
>>> df = dask.dataframes.from_pandas(...)
>>> df.to_dask_array()
dask.array<values, shape=(nan, 3), dtype=float64, chunksize=(nan, 3), chunktype=numpy.ndarray>
这与 Pandas 中的 to_numpy 函数类似。还支持 values
属性。
>>> df.values
dask.array<values, shape=(nan, 3), dtype=float64, chunksize=(nan, 3), chunktype=numpy.ndarray>
然而,这些数组没有已知的块大小,因为 dask.dataframe 不会跟踪每个分区的行数。这意味着某些操作(如切片)将无法正确执行。
块大小可以计算。
>>> df.to_dask_array(lengths=True)
dask.array<array, shape=(100, 3), dtype=float64, chunksize=(50, 3), chunktype=numpy.ndarray>
指定 lengths=True
会立即触发块大小的计算。这使得依赖于已知块大小的下游计算(例如切片)成为可能。
Dask DataFrame 的 to_records
方法也返回一个 Dask 数组,但不计算 shape 信息。
>>> df.to_records()
dask.array<to_records, shape=(nan,), dtype=(numpy.record, [('index', '<i8'), ('x', '<f8'), ('y', '<f8'), ('z', '<f8')]), chunksize=(nan,), chunktype=numpy.ndarray>
如果您有一个将 Pandas DataFrame 转换为 NumPy 数组的函数,那么在 Dask DataFrame 上使用该函数调用 map_partitions
将生成一个 Dask 数组。
>>> df.map_partitions(np.asarray)
dask.array<asarray, shape=(nan, 3), dtype=float64, chunksize=(nan, 3), chunktype=numpy.ndarray>
与 NumPy 数组的交互¶
Dask 数组操作会自动将 NumPy 数组转换为单块 dask 数组。
>>> x = da.sum(np.ones(5))
>>> x.compute()
5
当 NumPy 和 Dask 数组交互时,结果将是一个 Dask 数组。自动重新分块规则通常会将 NumPy 数组切片成适当的 Dask 块形状。
>>> x = da.ones(10, chunks=(5,))
>>> y = np.ones(10)
>>> z = x + y
>>> z
dask.array<add, shape=(10,), dtype=float64, chunksize=(5,), chunktype=numpy.ndarray>
这些交互不仅适用于 NumPy 数组,也适用于任何具有 shape 和 dtype 属性并实现 NumPy 切片语法的对象。
内存映射¶
内存映射是访问原始二进制数据的一种高效方法,如果数据已在文件系统缓存中,其开销几乎为零。对于线程调度器,从原始二进制文件创建 Dask 数组可以像 a = da.from_array(np.memmap(filename, shape=shape, dtype=dtype, mode='r'))
一样简单。
对于多进程或分布式调度器,每个数组块的内存映射应在正确的工作进程上创建,而不是在主进程上创建,以避免通过集群传输数据。这可以通过使用 dask.delayed
包装创建内存映射的函数来实现。
import numpy as np
import dask
import dask.array as da
def mmap_load_chunk(filename, shape, dtype, offset, sl):
'''
Memory map the given file with overall shape and dtype and return a slice
specified by :code:`sl`.
Parameters
----------
filename : str
shape : tuple
Total shape of the data in the file
dtype:
NumPy dtype of the data in the file
offset : int
Skip :code:`offset` bytes from the beginning of the file.
sl:
Object that can be used for indexing or slicing a NumPy array to
extract a chunk
Returns
-------
numpy.memmap or numpy.ndarray
View into memory map created by indexing with :code:`sl`,
or NumPy ndarray in case no view can be created using :code:`sl`.
'''
data = np.memmap(filename, mode='r', shape=shape, dtype=dtype, offset=offset)
return data[sl]
def mmap_dask_array(filename, shape, dtype, offset=0, blocksize=5):
'''
Create a Dask array from raw binary data in :code:`filename`
by memory mapping.
This method is particularly effective if the file is already
in the file system cache and if arbitrary smaller subsets are
to be extracted from the Dask array without optimizing its
chunking scheme.
It may perform poorly on Windows if the file is not in the file
system cache. On Linux it performs well under most circumstances.
Parameters
----------
filename : str
shape : tuple
Total shape of the data in the file
dtype:
NumPy dtype of the data in the file
offset : int, optional
Skip :code:`offset` bytes from the beginning of the file.
blocksize : int, optional
Chunk size for the outermost axis. The other axes remain unchunked.
Returns
-------
dask.array.Array
Dask array matching :code:`shape` and :code:`dtype`, backed by
memory-mapped chunks.
'''
load = dask.delayed(mmap_load_chunk)
chunks = []
for index in range(0, shape[0], blocksize):
# Truncate the last chunk if necessary
chunk_size = min(blocksize, shape[0] - index)
chunk = dask.array.from_delayed(
load(
filename,
shape=shape,
dtype=dtype,
offset=offset,
sl=slice(index, index + chunk_size)
),
shape=(chunk_size, ) + shape[1:],
dtype=dtype
)
chunks.append(chunk)
return da.concatenate(chunks, axis=0)
x = mmap_dask_array(
filename='testfile-50-50-100-100-float32.raw',
shape=(50, 50, 100, 100),
dtype=np.float32
)
存储 Dask 数组¶
|
将 dask 数组存储在类似数组的对象中,覆盖目标中的数据 |
|
将数组存储在 HDF5 文件中 |
|
将 dask 数组写入 .npy 文件堆栈 |
|
将数组保存到 zarr 存储格式 |
|
一次计算多个 dask 集合。 |
内存中¶
|
一次计算多个 dask 集合。 |
如果您的数据量较小,可以在 Dask 数组上调用 np.array
或 .compute()
将其转换为普通的 NumPy 数组。
>>> x = da.arange(6, chunks=3)
>>> y = x**2
>>> np.array(y)
array([0, 1, 4, 9, 16, 25])
>>> y.compute()
array([0, 1, 4, 9, 16, 25])
NumPy 风格切片¶
|
将 dask 数组存储在类似数组的对象中,覆盖目标中的数据 |
您可以将 Dask 数组存储在任何支持 NumPy 风格切片赋值的对象中,例如 h5py.Dataset
。
>>> import h5py
>>> f = h5py.File('myfile.hdf5')
>>> d = f.require_dataset('/data', shape=x.shape, dtype=x.dtype)
>>> da.store(x, d)
另外,您可以通过传递源列表和目标列表,在一个计算中存储多个数组。
>>> da.store([array1, array2], [output1, output2]) # doctest: +SKIP
HDF5¶
|
将数组存储在 HDF5 文件中 |
HDF5 非常常见,有一个特殊函数 to_hdf5
用于使用 h5py
将数据存储到 HDF5 文件中。
>>> da.to_hdf5('myfile.hdf5', '/y', y) # doctest: +SKIP
您可以使用函数 da.to_hdf5
通过传递字典,在一个计算中存储多个数组。
>>> da.to_hdf5('myfile.hdf5', {'/x': x, '/y': y}) # doctest: +SKIP
Zarr¶
Zarr 格式是一种基于块的二进制数组存储文件格式,具有丰富的编码和压缩选项。由于每个块都存储在一个单独的文件中,因此非常适合并行读写(对于后者,如果 Dask 数组块与目标对齐)。此外,还支持在远程数据服务(如 S3 和 GCS)中存储。
例如,要将数据保存到本地 zarr 数据集,您可以这样做:
>>> arr.to_zarr('output.zarr')
或者保存到 S3 上的特定存储桶
>>> arr.to_zarr('s3://mybucket/output.zarr', storage_option={'key': 'mykey',
'secret': 'mysecret'})
或者您自己的自定义 zarr 数组
>>> z = zarr.create((10,), dtype=float, store=zarr.ZipStore("output.zarr"))
>>> arr.to_zarr(z)
要检索这些数据,您需要使用完全相同的参数调用 da.from_zarr
。生成的 Dask 数组的分块方式由文件保存方式决定,除非另有指定。
TileDB¶
TileDB 是一种二进制数组格式和存储管理器,具有可调整的分块、布局和压缩选项。TileDB 存储管理器库支持可伸缩的存储后端,如 S3 API 兼容的对象存储和 HDFS,具有自动伸缩功能,并支持多线程和多进程读(一致)和写(最终一致)。
要将数据保存到本地 TileDB 数组
>>> arr.to_tiledb('output.tdb')
或者保存到 S3 上的存储桶
>>> arr.to_tiledb('s3://mybucket/output.tdb',
storage_options={'vfs.s3.aws_access_key_id': 'mykey',
'vfs.s3.aws_secret_access_key': 'mysecret'})
可以使用相同的 URI 和任何必要的参数运行 da.from_tiledb 来检索文件。
中间存储¶
|
将 dask 数组存储在类似数组的对象中,覆盖目标中的数据 |
在某些情况下,可能希望将中间结果存储到长期存储中。这与 persist
不同,persist
主要用于管理 Dask 内部不一定具有长期性的中间结果。它也与存储最终结果不同,因为最终结果标志着 Dask 图的结束。因此,中间结果更容易重用,而无需重新加载数据。中间存储主要用于需要在 Dask 外部(例如磁盘、数据库、云等)使用数据的情况。它可以作为长时间运行或容易出错的计算的检查点。
中间存储的使用场景与典型的存储使用场景不同,因为它会向用户返回一个表示该存储操作结果的 Dask 数组。这通常通过将 store
函数的 return_stored
标志设置为 True
来完成。
x.store() # stores data, returns nothing
x = x.store(return_stored=True) # stores data, returns new dask array backed by that data
用户可以决定存储操作是立即发生(通过将 compute
标志设置为 True
)还是稍后发生(通过将 compute
标志设置为 False
)。在所有其他方面,这与正常调用 store
的行为相同。下面显示了一些示例。
>>> import dask.array as da
>>> import zarr as zr
>>> c = (2, 2)
>>> d = da.ones((10, 11), chunks=c)
>>> z1 = zr.open_array('lazy.zarr', shape=d.shape, dtype=d.dtype, chunks=c)
>>> z2 = zr.open_array('eager.zarr', shape=d.shape, dtype=d.dtype, chunks=c)
>>> d1 = d.store(z1, compute=False, return_stored=True)
>>> d2 = d.store(z2, compute=True, return_stored=True)
这可以与上述文档中或任何特定存储类型中提及的任何其他存储策略相结合。
插件¶
无论何时构建 Dask 数组,我们都可以对其运行任意用户定义的函数。这使我们能够构建各种自定义行为,从而改进调试、用户警告等。您可以将要在所有 Dask 数组上运行的函数列表注册到全局 array_plugins=
值。
>>> def f(x):
... print(x.nbytes)
>>> with dask.config.set(array_plugins=[f]):
... x = da.ones((10, 1), chunks=(5, 1))
... y = x.dot(x.T)
80
80
800
800
如果插件函数返回 None,则输入的 Dask 数组将不做任何更改地返回。如果插件函数返回其他内容,则该值将作为构造函数的结果。
示例¶
自动计算¶
我们可能希望将一些 Dask 数组代码转换为正常的 NumPy 代码。这很有用,例如,可以立即跟踪由 Dask 的延迟语义隐藏的错误。
>>> with dask.config.set(array_plugins=[lambda x: x.compute()]):
... x = da.arange(5, chunks=2)
>>> x # this was automatically converted into a numpy array
array([0, 1, 2, 3, 4])
警告大块¶
如果用户创建的块过大,我们可能希望发出警告。
def warn_on_large_chunks(x):
shapes = list(itertools.product(*x.chunks))
nbytes = [x.dtype.itemsize * np.prod(shape) for shape in shapes]
if any(nb > 1e9 for nb in nbytes):
warnings.warn("Array contains very large chunks")
with dask.config.set(array_plugins=[warn_on_large_chunks]):
...
组合¶
您还可以将这些插件组合到一个列表中。它们将依次运行,并将结果链接起来。
with dask.config.set(array_plugins=[warn_on_large_chunks, lambda x: x.compute()]):
...