扩展 DataFrame

子类化 DataFrame

有一些项目子类化或复制了 Pandas 对象的功能:

  • GeoPandas: 用于地理空间分析

  • cuDF: 用于 GPU 上的数据分析

这些项目可能也想使用 Dask 生成自身的并行变体,并可能想重用 Dask DataFrame 中的一些代码。子类化 Dask DataFrame 旨在为这些库的维护者使用,而不是面向普通用户。

实现 dask, name, meta 和 divisions

您需要实现 ._meta.dask.divisions._name,具体定义请参见DataFrame 设计文档

扩展分派方法

如果您要传递非普通 Pandas 对象的 Pandas 类对象,则需要扩展一些分派方法:make_metaget_collection_typeconcat

make_meta

此函数返回给定非空非 Dask 对象的空版本。

from dask.dataframe.dispatch import make_meta_dispatch

@make_meta_dispatch.register(MyDataFrame)
def make_meta_dataframe(df, index=None):
    return df.head(0)


@make_meta_dispatch.register(MySeries)
def make_meta_series(s, index=None):
    return s.head(0)


@make_meta_dispatch.register(MyIndex)
def make_meta_index(ind, index=None):
    return ind[:0]

对于将任意 object 类型分派到各自的后端,我们建议为 make_meta_obj 注册分派。

from dask.dataframe.dispatch import make_meta_obj

@make_meta_obj.register(MyDataFrame)
def make_meta_object(x, index=None):
    if isinstance(x, dict):
        return MyDataFrame()
    elif isinstance(x, int):
        return MySeries
    .
    .
    .

此外,您应该创建一个类似函数,它返回填充了一些代表性或随机数据的非 Dask DataFrame 对象的非空版本。当未提供类型时,这用于猜测类型。它应接受一个空版本对象(包含列、dtypes、索引名称),并应返回一个非空版本。

from dask.dataframe.utils import meta_nonempty

@meta_nonempty.register(MyDataFrame)
def meta_nonempty_dataframe(df):
    ...
    return MyDataFrame(..., columns=df.columns,
                       index=MyIndex(..., name=df.index.name)


@meta_nonempty.register(MySeries)
def meta_nonempty_series(s):
    ...


@meta_nonempty.register(MyIndex)
def meta_nonempty_index(ind):
    ...

get_collection_type

给定一个非 Dask DataFrame 对象,返回其 Dask 等价物。

from dask.dataframe import get_collection_type

@get_collection_type.register(MyDataFrame)
def get_collection_type_dataframe(df):
    return MyDaskDataFrame


@get_collection_type.register(MySeries)
def get_collection_type_series(s):
    return MyDaskSeries


@get_collection_type.register(MyIndex)
def get_collection_type_index(ind):
    return MyDaskIndex

concat

将多个非 Dask DataFrame 对象连接在一起。它应接受一个对象列表(类型一致)。

from dask.dataframe.methods import concat_dispatch

@concat_dispatch.register((MyDataFrame, MySeries, MyIndex))
def concat_pandas(dfs, axis=0, join='outer', uniform=False, filter_warning=True):
    ...

扩展数组

您可能对使用 扩展数组 扩展 Pandas 感兴趣,而不是子类化 Pandas DataFrame。

所有第一方扩展数组(在 pandas 本身中实现的)都直接受 dask 支持。

实现第三方扩展数组(在 pandas 之外)的开发者需要将其 ExtensionDtype 注册到 Dask,以便它在 dask.dataframe 中正常工作。

例如,我们将注册来自 pandas 测试套件的 *仅用于测试* 的 DecimalDtype

from decimal import Decimal
from dask.dataframe.extensions import make_array_nonempty, make_scalar
from pandas.tests.extension.decimal import DecimalArray, DecimalDtype

@make_array_nonempty.register(DecimalDtype)
def _(dtype):
    return DecimalArray._from_sequence([Decimal('0'), Decimal('NaN')],
                                       dtype=dtype)


@make_scalar.register(Decimal)
def _(x):
   return Decimal('1')

在内部,Dask 将使用此功能创建一个小的虚拟 Series 来通过操作跟踪元数据。

>>> make_array_nonempty(DecimalDtype())
<DecimalArray>
[Decimal('0'), Decimal('NaN')]
Length: 2, dtype: decimal

因此,您(或您的用户)现在可以创建并存储包含您的扩展数组的 dask DataFrameSeries

>>> from decimal import Decimal
>>> import dask.dataframe as dd
>>> import pandas as pd
>>> from pandas.tests.extension.decimal import DecimalArray
>>> s = pd.Series(DecimalArray([Decimal('0.0')] * 10))
>>> ds = dd.from_pandas(s, 3)
>>> ds
Dask Series Structure:
npartitions=3
0    decimal
4        ...
8        ...
9        ...
dtype: decimal
Dask Name: from_pandas, 3 tasks

注意 decimal dtype。

访问器

许多扩展数组使用访问器在其 Series 或 DataFrame 对象上公开其功能。Dask 提供了类似 pandas 的装饰器来注册访问器。更多信息请参阅pandas 关于访问器的文档

dask.dataframe.extensions.register_dataframe_accessor(name)[source]

dask.dataframe.DataFrame 上注册自定义访问器。

更多信息请参见pandas.api.extensions.register_dataframe_accessor()

dask.dataframe.extensions.register_series_accessor(name)[source]

dask.dataframe.Series 上注册自定义访问器。

更多信息请参见pandas.api.extensions.register_series_accessor()

dask.dataframe.extensions.register_index_accessor(name)[source]

dask.dataframe.Index 上注册自定义访问器。

更多信息请参见pandas.api.extensions.register_index_accessor()