选择集合后端

警告: 集合级别的后端库分派仍是一个实验性功能。 DaskBackendEntrypoint API 和“可分派”函数集预计会发生变化。

更改默认后端库

Dask-Dataframe 和 Dask-Array 模块最初分别考虑到 Pandas 和 Numpy 后端库。然而,其他 dataframe 和 array 库可以利用相同的集合 API 进行外存和并行处理。例如,安装了 cupy 的用户可以使用 "array.backend" 配置选项将其默认的 Dask-Array 后端更改为 cupy

>>> import dask
>>> import dask.array as da
>>> with dask.config.set({"array.backend": "cupy"}):
...     darr = da.ones(10, chunks=(5,))  # Get cupy-backed collection

此代码退出了默认的 ("numpy") 后端,针对可分派的 Dask-Array 创建函数,并转而使用为 "cupy" 注册的创建函数。当前 Dask-Array 的可分派创建函数集如下:

  • ones

  • zeros

  • empty

  • full

  • arange

Dask-Array API 还可以分派后端 RandomState 类,用于生成随机数。这意味着 dask.array.random 中的所有创建函数也是可分派的。

当前 Dask-Dataframe 的可分派创建函数集如下:

  • from_dict

  • read_parquet

  • read_json

  • read_orc

  • read_csv

  • read_hdf

随着后端库分派系统变得更加成熟,这组可分派的创建函数可能会增加。

对于现有集合,可以使用 to_backend 方法将底层数据强制移动到所需的后端。

>>> import dask
>>> import dask.array as da
>>> darr = da.ones(10, chunks=(5,))  # Creates numpy-backed collection
>>> with dask.config.set({"array.backend": "cupy"}):
...     darr = darr.to_backend()  # Moves numpy data to cupy

定义新的集合后端

警告: 对于大多数用户和下游库来说,目前建议定义自定义后端。后端入口点系统仍应视为实验性功能。

Dask 目前在 dask.array.backendsdask.dataframe.backends 组下暴露了一个入口点,以使用户和第三方库能够开发和维护 Dask-Array 和 Dask-Dataframe 的后端实现。自定义 Dask-Array 后端应定义 DaskArrayBackendEntrypoint 的子类(定义在 dask.array.backends 中),而自定义 Dask-DataFrame 后端应定义 DataFrameBackendEntrypoint 的子类(定义在 dask.dataframe.backends 中)。

例如,一个基于 cudf 的 Dask-Dataframe 后端定义如下所示的 CudfBackendEntrypoint 定义:

from dask.dataframe.backends import DataFrameBackendEntrypoint
from dask.dataframe.dispatch import (
   ...
   make_meta_dispatch,
   ...
)
...

def make_meta_cudf(x, index=None):
   return x.head(0)
...

class CudfBackendEntrypoint(DataFrameBackendEntrypoint):

   def __init__(self):
      # Register compute-based dispatch functions
      # (e.g. make_meta_dispatch, sizeof_dispatch)
      ...
      make_meta_dispatch.register(
         (cudf.Series, cudf.DataFrame),
         func=make_meta_cudf,
      )
      # NOTE: Registration may also be outside __init__
      # if it is in the same module as this class
   ...

   @staticmethod
   def read_orc(*args, **kwargs):
      from .io import read_orc

      # Use dask_cudf version of read_orc
      return read_orc(*args, **kwargs)
   ...

为了支持使用 DataFrame.to_backend 进行 pandas 到 cudf 的转换,该类还需要实现适当的 to_backendto_backend_dispatch 方法。

为了将此类公开为 dask.dataframe.backends 入口点,在 cudf(或 dask_cudf)中必要的 setup.cfg 配置如下:

[options.entry_points]
dask.dataframe.backends =
   cudf = <module-path>:CudfBackendEntrypoint

计算分派

注意

Dask-Array 和 Dask-DataFrame 中类数组计算操作的主要分派机制是 NEP-18 中定义的 __array_function__ 协议。为了使自定义集合后端能够正常工作,该协议必须覆盖目标数组后端的许多常见 numpy 函数。例如,Dask-DataFrame 的 cudf 后端依赖于为 cudf 及其互补的数组后端 (cupy) 定义 __array_function__ 协议。本节讨论的基于计算的分派函数对应于 NEP-18 尚未捕获的功能。

请注意,CudfBackendEntrypoint 定义必须为每个可分派的创建例程定义一个不同的方法,并在 __init__ 逻辑中注册所有非创建(基于计算的)分派函数。这些计算分派函数不在集合 API 级别操作,而是在计算时(任务内部)操作。所有当前“计算”分派函数的列表如下所示。

Dask-Array 基于计算的分派函数(定义在 dask.array.dispatch 中,以及为 Numpy 定义在 dask.array.backends 中)

  • concatenate_lookup

  • divide_lookup

  • einsum_lookup

  • empty_lookup

  • nannumel_lookup

  • numel_lookup

  • percentile_lookup

  • tensordot_lookup

  • take_lookup

Dask-Dataframe 基于计算的分派函数(定义在 dask.dataframe.dispatch 中,以及为 Pandas 定义在 dask.dataframe.backends 中)

  • categorical_dtype_dispatch

  • concat_dispatch

  • get_collection_type

  • group_split_dispatch

  • grouper_dispatch

  • hash_object_dispatch

  • is_categorical_dtype_dispatch

  • make_meta_dispatch

  • make_meta_obj

  • meta_lib_from_array

  • meta_nonempty

  • pyarrow_schema_dispatch

  • tolist_dispatch

  • union_categoricals_dispatch

请注意,基于计算的分派系统可能会发生变化。实现一个完整的后端仍需要付出大量努力。然而,长期目标是进一步简化这一过程。