扩展 DataFrames

子类化 DataFrames

有一些项目对 Pandas 对象的功能进行子类化或复制

  • GeoPandas: 用于地理空间分析

  • cuDF: 用于 GPU 上的数据分析

这些项目可能也希望使用 Dask 生成自己的并行变体,并且可能希望重用 Dask DataFrame 中的一些代码。子类化 Dask DataFrames 适用于这些库的维护者,而不是一般用户。

实现 dask, name, meta 和 divisions

您需要实现 ._meta.dask.divisions._name,如DataFrame 设计文档中所定义。

扩展分派方法

如果您要传递非标准 Pandas 对象的类 Pandas 对象,我们请您扩展几个分派方法:make_metaget_collection_typeconcat

make_meta

此函数给定一个非空的非 Dask 对象,返回您的一个非 Dask 对象的空版本

from dask.dataframe.dispatch import make_meta_dispatch

@make_meta_dispatch.register(MyDataFrame)
def make_meta_dataframe(df, index=None):
    return df.head(0)


@make_meta_dispatch.register(MySeries)
def make_meta_series(s, index=None):
    return s.head(0)


@make_meta_dispatch.register(MyIndex)
def make_meta_index(ind, index=None):
    return ind[:0]

为了将任何任意 object 类型分派到相应的后端,我们建议为 make_meta_obj 注册一个分派函数

from dask.dataframe.dispatch import make_meta_obj

@make_meta_obj.register(MyDataFrame)
def make_meta_object(x, index=None):
    if isinstance(x, dict):
        return MyDataFrame()
    elif isinstance(x, int):
        return MySeries
    .
    .
    .

此外,您应该创建一个类似的函数,返回您的非 Dask DataFrame 对象的非空版本,其中填充了一些代表性或随机数据行。这用于在未提供类型时猜测类型。它应该接受一个空版本的您的对象(包含列、dtypes、索引名),并返回一个非空版本

from dask.dataframe.utils import meta_nonempty

@meta_nonempty.register(MyDataFrame)
def meta_nonempty_dataframe(df):
    ...
    return MyDataFrame(..., columns=df.columns,
                       index=MyIndex(..., name=df.index.name)


@meta_nonempty.register(MySeries)
def meta_nonempty_series(s):
    ...


@meta_nonempty.register(MyIndex)
def meta_nonempty_index(ind):
    ...

get_collection_type

给定一个非 Dask DataFrame 对象,返回其对应的 Dask 等价物

from dask.dataframe import get_collection_type

@get_collection_type.register(MyDataFrame)
def get_collection_type_dataframe(df):
    return MyDaskDataFrame


@get_collection_type.register(MySeries)
def get_collection_type_series(s):
    return MyDaskSeries


@get_collection_type.register(MyIndex)
def get_collection_type_index(ind):
    return MyDaskIndex

concat

将您的许多非 Dask DataFrame 对象连接起来。它应该接受一个您的对象列表(类型一致)

from dask.dataframe.methods import concat_dispatch

@concat_dispatch.register((MyDataFrame, MySeries, MyIndex))
def concat_pandas(dfs, axis=0, join='outer', uniform=False, filter_warning=True):
    ...

扩展数组

除了子类化 Pandas DataFrames 之外,您可能对使用扩展数组来扩展 Pandas 感兴趣。

所有第一方扩展数组(在 pandas 本身中实现的)都直接受到 dask 支持。

实现第三方扩展数组(在 pandas 之外)的开发者需要将其 ExtensionDtype 注册到 Dask,以便它在 dask.dataframe 中正常工作。

例如,我们将注册来自 pandas 测试套件的仅用于测试的 DecimalDtype

from decimal import Decimal
from dask.dataframe.extensions import make_array_nonempty, make_scalar
from pandas.tests.extension.decimal import DecimalArray, DecimalDtype

@make_array_nonempty.register(DecimalDtype)
def _(dtype):
    return DecimalArray._from_sequence([Decimal('0'), Decimal('NaN')],
                                       dtype=dtype)


@make_scalar.register(Decimal)
def _(x):
   return Decimal('1')

在内部,Dask 将使用它创建一个小的虚拟 Series,用于在操作中跟踪元数据。

>>> make_array_nonempty(DecimalDtype())
<DecimalArray>
[Decimal('0'), Decimal('NaN')]
Length: 2, dtype: decimal

因此,您(或您的用户)现在可以创建并存储一个包含您的扩展数组的 dask DataFrameSeries

>>> from decimal import Decimal
>>> import dask.dataframe as dd
>>> import pandas as pd
>>> from pandas.tests.extension.decimal import DecimalArray
>>> s = pd.Series(DecimalArray([Decimal('0.0')] * 10))
>>> ds = dd.from_pandas(s, 3)
>>> ds
Dask Series Structure:
npartitions=3
0    decimal
4        ...
8        ...
9        ...
dtype: decimal
Dask Name: from_pandas, 3 tasks

请注意 decimal dtype。

访问器

许多扩展数组使用访问器在 Series 或 DataFrame 对象上暴露其功能。Dask 提供了类似于 pandas 的装饰器来注册访问器。更多信息请参见pandas 关于访问器的文档

dask.dataframe.extensions.register_dataframe_accessor(name)[source]

dask.dataframe.DataFrame 上注册一个自定义访问器。

更多信息请参见 pandas.api.extensions.register_dataframe_accessor()

dask.dataframe.extensions.register_series_accessor(name)[source]

dask.dataframe.Series 上注册一个自定义访问器。

更多信息请参见 pandas.api.extensions.register_series_accessor()

dask.dataframe.extensions.register_index_accessor(name)[source]

dask.dataframe.Index 上注册一个自定义访问器。

更多信息请参见 pandas.api.extensions.register_index_accessor()