创建 Dask 数组
目录
创建 Dask 数组¶
您可以从各种常见来源加载或存储 Dask 数组,例如 HDF5、NetCDF、Zarr,或任何支持 NumPy 风格切片的格式。
|
从类似数组的对象创建 dask 数组。 |
|
从 dask delayed 值创建 dask 数组 |
|
从 npy 文件堆栈加载 dask 数组 |
|
从 zarr 存储格式加载数组 |
|
沿新轴堆叠数组 |
|
沿现有轴连接数组 |
NumPy 切片¶
|
从类似数组的对象创建 dask 数组。 |
许多存储格式都有使用 NumPy 切片语法暴露存储的 Python 项目。这些包括 HDF5、NetCDF、BColz、Zarr、GRIB 等。例如,我们可以使用 h5py 从 HDF5 文件加载 Dask 数组
>>> import h5py
>>> f = h5py.File('myfile.hdf5') # HDF5 file
>>> d = f['/data/path'] # Pointer on on-disk array
>>> d.shape # d can be very large
(1000000, 1000000)
>>> x = d[:5, :5] # We slice to get numpy arrays
给定一个如上所示的对象,例如 d
,它具有 dtype
和 shape
属性并支持 NumPy 风格切片,我们可以构建一个惰性 Dask 数组
>>> import dask.array as da
>>> x = da.from_array(d, chunks=(1000, 1000))
这个过程完全是惰性的。无论是创建 h5py 对象还是用 da.from_array
包装它,都没有加载任何数据。
随机数据¶
为了实验或基准测试,通常会创建随机数据数组。dask.array.random
模块实现了 numpy.random
模块中的大部分函数。下面列出了一些常用函数,完整的列表请参阅数组 API
|
从二项分布中抽取样本。 |
|
从正态(高斯)分布中抽取随机样本。 |
|
从泊松分布中抽取样本。 |
|
返回半开区间 [0.0, 1.0) 内的随机浮点数。 |
>>> import dask.array as da
>>> rng = da.random.default_rng()
>>> x = rng.random((10000, 10000), chunks=(1000, 1000))
连接和堆叠¶
|
沿新轴堆叠数组 |
|
沿现有轴连接数组 |
通常我们将数据存储在多个不同的位置,并希望将它们拼接在一起
dask_arrays = []
for fn in filenames:
f = h5py.File(fn)
d = f['/data']
array = da.from_array(d, chunks=(1000, 1000))
dask_arrays.append(array)
x = da.concatenate(dask_arrays, axis=0) # concatenate arrays along first axis
更多信息,请参阅连接和堆叠文档。
使用 dask.delayed
¶
|
从 dask delayed 值创建 dask 数组 |
|
沿新轴堆叠数组 |
|
沿现有轴连接数组 |
有时 NumPy 风格的数据存在于不支持 NumPy 风格切片的格式中。如果有一个 Python 函数可以使用 dask.delayed 生成完整数组的各个部分,我们仍然可以围绕这些数据构建 Dask 数组。Dask delayed 允许我们延迟一个将创建 NumPy 数组的函数调用。然后我们可以使用 da.from_delayed
包装这个 delayed 对象,提供 dtype 和 shape 来生成一个单分块的 Dask 数组。此外,我们可以使用之前提到的 stack
或 concatenate
来构建更大的惰性数组。
例如,考虑使用 skimage.io.imread
加载图像堆栈
import skimage.io
import dask.array as da
import dask
imread = dask.delayed(skimage.io.imread, pure=True) # Lazy version of imread
filenames = sorted(glob.glob('*.jpg'))
lazy_images = [imread(path) for path in filenames] # Lazily evaluate imread on each path
sample = lazy_images[0].compute() # load the first image (assume rest are same shape/dtype)
arrays = [da.from_delayed(lazy_image, # Construct a small Dask array
dtype=sample.dtype, # for every lazy value
shape=sample.shape)
for lazy_image in lazy_images]
stack = da.stack(arrays, axis=0) # Stack all small Dask arrays into one
请参阅关于将 dask.delayed 与集合一起使用的文档。
通常使用 da.map_blocks
会比 da.stack
快得多
import glob
import skimage.io
import numpy as np
import dask.array as da
filenames = sorted(glob.glob('*.jpg'))
def read_one_image(block_id, filenames=filenames, axis=0):
# a function that reads in one chunk of data
path = filenames[block_id[axis]]
image = skimage.io.imread(path)
return np.expand_dims(image, axis=axis)
# load the first image (assume rest are same shape/dtype)
sample = skimage.io.imread(filenames[0])
stack = da.map_blocks(
read_one_image,
dtype=sample.dtype,
chunks=((1,) * len(filenames), *sample.shape)
)
从 Dask DataFrame 创建¶
有几种方法可以从 Dask DataFrame 创建 Dask 数组。Dask DataFrame 有一个 to_dask_array
方法
>>> df = dask.dataframes.from_pandas(...)
>>> df.to_dask_array()
dask.array<values, shape=(nan, 3), dtype=float64, chunksize=(nan, 3), chunktype=numpy.ndarray>
这模仿了 Pandas 中的 to_numpy 函数。也支持 values
属性
>>> df.values
dask.array<values, shape=(nan, 3), dtype=float64, chunksize=(nan, 3), chunktype=numpy.ndarray>
然而,这些数组没有已知的分块大小,因为 dask.dataframe 不跟踪每个分区的行数。这意味着某些操作(如切片)将无法正确执行。
可以计算分块大小
>>> df.to_dask_array(lengths=True)
dask.array<array, shape=(100, 3), dtype=float64, chunksize=(50, 3), chunktype=numpy.ndarray>
指定 lengths=True
会立即触发分块大小的计算。这使得依赖于已知分块大小的后续计算(例如切片)成为可能。
Dask DataFrame 的 to_records
方法也返回一个 Dask 数组,但不计算形状信息
>>> df.to_records()
dask.array<to_records, shape=(nan,), dtype=(numpy.record, [('index', '<i8'), ('x', '<f8'), ('y', '<f8'), ('z', '<f8')]), chunksize=(nan,), chunktype=numpy.ndarray>
如果您有一个将 Pandas DataFrame 转换为 NumPy 数组的函数,那么在 Dask DataFrame 上使用该函数调用 map_partitions
将产生一个 Dask 数组
>>> df.map_partitions(np.asarray)
dask.array<asarray, shape=(nan, 3), dtype=float64, chunksize=(nan, 3), chunktype=numpy.ndarray>
与 NumPy 数组的交互¶
Dask 数组操作会自动将 NumPy 数组转换为单分块 dask 数组
>>> x = da.sum(np.ones(5))
>>> x.compute()
5
当 NumPy 和 Dask 数组交互时,结果将是一个 Dask 数组。自动 rechunking 规则通常会将 NumPy 数组切片成适当的 Dask 分块形状
>>> x = da.ones(10, chunks=(5,))
>>> y = np.ones(10)
>>> z = x + y
>>> z
dask.array<add, shape=(10,), dtype=float64, chunksize=(5,), chunktype=numpy.ndarray>
这些交互不仅适用于 NumPy 数组,也适用于任何具有 shape 和 dtype 属性并实现 NumPy 切片语法的对象。
内存映射¶
内存映射是一种访问原始二进制数据的高效方法,因为如果数据已经在文件系统缓存中,其开销几乎为零。对于线程调度器,从原始二进制文件创建 Dask 数组可以像 a = da.from_array(np.memmap(filename, shape=shape, dtype=dtype, mode='r'))
一样简单。
对于多进程或分布式调度器,每个数组分块的内存映射应该在正确的工作进程上创建,而不是在主进程上,以避免通过集群进行数据传输。这可以通过使用 dask.delayed
包装创建内存映射的函数来实现。
import numpy as np
import dask
import dask.array as da
def mmap_load_chunk(filename, shape, dtype, offset, sl):
'''
Memory map the given file with overall shape and dtype and return a slice
specified by :code:`sl`.
Parameters
----------
filename : str
shape : tuple
Total shape of the data in the file
dtype:
NumPy dtype of the data in the file
offset : int
Skip :code:`offset` bytes from the beginning of the file.
sl:
Object that can be used for indexing or slicing a NumPy array to
extract a chunk
Returns
-------
numpy.memmap or numpy.ndarray
View into memory map created by indexing with :code:`sl`,
or NumPy ndarray in case no view can be created using :code:`sl`.
'''
data = np.memmap(filename, mode='r', shape=shape, dtype=dtype, offset=offset)
return data[sl]
def mmap_dask_array(filename, shape, dtype, offset=0, blocksize=5):
'''
Create a Dask array from raw binary data in :code:`filename`
by memory mapping.
This method is particularly effective if the file is already
in the file system cache and if arbitrary smaller subsets are
to be extracted from the Dask array without optimizing its
chunking scheme.
It may perform poorly on Windows if the file is not in the file
system cache. On Linux it performs well under most circumstances.
Parameters
----------
filename : str
shape : tuple
Total shape of the data in the file
dtype:
NumPy dtype of the data in the file
offset : int, optional
Skip :code:`offset` bytes from the beginning of the file.
blocksize : int, optional
Chunk size for the outermost axis. The other axes remain unchunked.
Returns
-------
dask.array.Array
Dask array matching :code:`shape` and :code:`dtype`, backed by
memory-mapped chunks.
'''
load = dask.delayed(mmap_load_chunk)
chunks = []
for index in range(0, shape[0], blocksize):
# Truncate the last chunk if necessary
chunk_size = min(blocksize, shape[0] - index)
chunk = dask.array.from_delayed(
load(
filename,
shape=shape,
dtype=dtype,
offset=offset,
sl=slice(index, index + chunk_size)
),
shape=(chunk_size, ) + shape[1:],
dtype=dtype
)
chunks.append(chunk)
return da.concatenate(chunks, axis=0)
x = mmap_dask_array(
filename='testfile-50-50-100-100-float32.raw',
shape=(50, 50, 100, 100),
dtype=np.float32
)
存储 Dask 数组¶
|
将 dask 数组存储到类似数组的对象中,覆盖目标数据 |
|
将数组存储到 HDF5 文件中 |
|
将 dask 数组写入 .npy 文件堆栈 |
|
将数组保存到 zarr 存储格式 |
|
一次计算多个 dask 集合。 |
内存中¶
|
一次计算多个 dask 集合。 |
如果数据量小,可以在 Dask 数组上调用 np.array
或 .compute()
将其转换为普通的 NumPy 数组
>>> x = da.arange(6, chunks=3)
>>> y = x**2
>>> np.array(y)
array([0, 1, 4, 9, 16, 25])
>>> y.compute()
array([0, 1, 4, 9, 16, 25])
NumPy 风格切片¶
|
将 dask 数组存储到类似数组的对象中,覆盖目标数据 |
可以将 Dask 数组存储到任何支持 NumPy 风格切片赋值的对象中,例如 h5py.Dataset
>>> import h5py
>>> f = h5py.File('myfile.hdf5')
>>> d = f.require_dataset('/data', shape=x.shape, dtype=x.dtype)
>>> da.store(x, d)
此外,可以通过传递源和目标的列表,在一次计算中存储多个数组
>>> da.store([array1, array2], [output1, output2]) # doctest: +SKIP
HDF5¶
|
将数组存储到 HDF5 文件中 |
HDF5 非常常见,因此有一个特殊函数 to_hdf5
,用于使用 h5py
将数据存储到 HDF5 文件中
>>> da.to_hdf5('myfile.hdf5', '/y', y) # doctest: +SKIP
可以通过传入字典,使用函数 da.to_hdf5
在一次计算中存储多个数组
>>> da.to_hdf5('myfile.hdf5', {'/x': x, '/y': y}) # doctest: +SKIP
Zarr¶
Zarr 格式是一种按分块存储的二进制数组文件格式,具有多种编码和压缩选项。由于每个分块都存储在单独的文件中,因此非常适合并行读写访问(后者要求 Dask 数组分块与目标对齐)。此外,还支持在 远程数据服务(如 S3 和 GCS)中存储。
例如,要将数据保存到本地 zarr 数据集,您可以这样做
>>> arr.to_zarr('output.zarr')
或保存到 S3 上的特定 bucket
>>> arr.to_zarr('s3://mybucket/output.zarr', storage_option={'key': 'mykey',
'secret': 'mysecret'})
或您自己的自定义 zarr 数组
>>> z = zarr.create((10,), dtype=float, store=zarr.ZipStore("output.zarr"))
>>> arr.to_zarr(z)
要检索这些数据,可以使用完全相同的参数调用 da.from_zarr
。除非另有指定,结果 Dask 数组的分块方式取决于文件保存的方式。
TileDB¶
TileDB 是一种二进制数组格式和存储管理器,具有可调的分块、布局和压缩选项。TileDB 存储管理器库支持可伸缩的存储后端,例如兼容 S3 API 的对象存储和 HDFS,并支持自动扩展,以及多线程和多进程读取(一致)和写入(最终一致)。
要将数据保存到本地 TileDB 数组
>>> arr.to_tiledb('output.tdb')
或保存到 S3 上的一个 bucket
>>> arr.to_tiledb('s3://mybucket/output.tdb',
storage_options={'vfs.s3.aws_access_key_id': 'mykey',
'vfs.s3.aws_secret_access_key': 'mysecret'})
可以通过使用相同的 URI 和任何必要的参数运行 da.from_tiledb 来检索文件。
中间存储¶
|
将 dask 数组存储到类似数组的对象中,覆盖目标数据 |
在某些情况下,可能希望将中间结果存储到长期存储中。这不同于 persist
,后者主要用于管理 Dask 内部不一定具有持久性的中间结果。它也不同于存储最终结果,因为最终结果标志着 Dask 图的结束。因此,中间结果更容易在不重新加载数据的情况下重用。中间存储主要适用于需要在 Dask 外部使用数据的情况(例如在磁盘、数据库、云端等)。它可以用作长时间运行或容易出错的计算的检查点。
中间存储用例与典型的存储用例不同,因为它会向用户返回一个 Dask 数组,代表该存储操作的结果。这通常通过将 store
函数的 return_stored
标志设置为 True
来完成。
x.store() # stores data, returns nothing
x = x.store(return_stored=True) # stores data, returns new dask array backed by that data
用户可以决定存储操作是立即发生(通过将 compute
标志设置为 True
)还是稍后发生(通过将 compute
标志设置为 False
)。在所有其他方面,这与对 store
的正常调用行为相同。下面显示了一些示例。
>>> import dask.array as da
>>> import zarr as zr
>>> c = (2, 2)
>>> d = da.ones((10, 11), chunks=c)
>>> z1 = zr.open_array('lazy.zarr', shape=d.shape, dtype=d.dtype, chunks=c)
>>> z2 = zr.open_array('eager.zarr', shape=d.shape, dtype=d.dtype, chunks=c)
>>> d1 = d.store(z1, compute=False, return_stored=True)
>>> d2 = d.store(z2, compute=True, return_stored=True)
这可以与上面提到、文档中描述的任何其他存储策略或任何专门的存储类型结合使用。
插件¶
我们可以在 Dask 数组构建时在其上运行任意用户自定义函数。这使我们能够构建各种自定义行为,以改进调试、用户警告等。可以将要应用于所有 Dask 数组的函数列表注册到全局的 array_plugins=
值
>>> def f(x):
... print(x.nbytes)
>>> with dask.config.set(array_plugins=[f]):
... x = da.ones((10, 1), chunks=(5, 1))
... y = x.dot(x.T)
80
80
800
800
如果插件函数返回 None,则输入的 Dask 数组将不经更改地返回。如果插件函数返回其他值,则该值将成为构造函数的结果。
示例¶
自动计算¶
我们可能希望将一些 Dask 数组代码转换为普通的 NumPy 代码。这很有用,例如,可以立即跟踪到否则会被 Dask 惰性语义隐藏的错误
>>> with dask.config.set(array_plugins=[lambda x: x.compute()]):
... x = da.arange(5, chunks=2)
>>> x # this was automatically converted into a numpy array
array([0, 1, 2, 3, 4])
大分块警告¶
我们可能希望在用户创建过大分块时发出警告
def warn_on_large_chunks(x):
shapes = list(itertools.product(*x.chunks))
nbytes = [x.dtype.itemsize * np.prod(shape) for shape in shapes]
if any(nb > 1e9 for nb in nbytes):
warnings.warn("Array contains very large chunks")
with dask.config.set(array_plugins=[warn_on_large_chunks]):
...
组合¶
您还可以将这些插件组合成一个列表。它们将依次运行,并将结果通过它们串联起来
with dask.config.set(array_plugins=[warn_on_large_chunks, lambda x: x.compute()]):
...