自定义集合

对于许多问题,内置的 Dask 集合(dask.arraydask.dataframedask.bagdask.delayed)就已足够。在它们不足够的情况下,可以创建自己的 Dask 集合。本文档描述了满足 Dask 集合接口所需的必要方法。

注意

这被认为是一项高级特性。对于大多数情况,内置集合可能已足够。

在阅读本文之前,您应该阅读并理解

目录

Dask 集合接口

要创建自己的 Dask 集合,您需要实现由 dask.typing.DaskCollection 协议定义的接口。请注意,没有必需的基类。

建议也阅读 核心 Dask 方法的内部结构 以了解 Dask 内部如何使用此接口。

集合协议

class dask.typing.DaskCollection(*args, **kwargs)[source]

定义 Dask 集合接口的协议。

abstract __dask_graph__() collections.abc.Mapping[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any][source]

Dask 任务图。

核心 Dask 集合(Array、DataFrame、Bag 和 Delayed)使用 HighLevelGraph 来表示集合任务图。也可以使用 Python 字典将任务图表示为低层图。

返回
Mapping

Dask 任务图。如果实例返回 dask.highlevelgraph.HighLevelGraph,则必须实现 __dask_layers__() 方法,其定义遵循 HLGDaskCollection 协议。

abstract __dask_keys__() list[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...], ForwardRef('NestedKeys')]][source]

任务图的输出键。

请注意,Dask 集合的键除了 任务图规范文档 中描述的约束外,还有额外的约束。这些额外约束描述如下。

所有键必须是非空字符串或元组,其中第一个元素是非空字符串,后面跟着零个或多个任意的 str、bytes、int、float 或它们的元组。非空字符串通常称为*集合名称*。dask 包中包含的所有集合都只有一个名称,但这不是强制要求。

这些都是有效的输出

  • []

  • ["x", "y"]

  • [[("y", "a", 0), ("y", "a", 1)], [("y", "b", 0), ("y", "b", 1)]

返回
list

一个可能是嵌套的键列表,表示图的输出。计算后,结果将以相同的布局返回,键被其相应的输出替换。

__dask_optimize__: Any

给定一个图和键,返回一个新的优化后的图。

此方法可以是 staticmethodclassmethod,但不能是 instancemethod。有关示例实现,请参阅核心 Dask 集合中 __dask_optimize__ 的定义:dask.array.Arraydask.dataframe.DataFrame 等。

请注意,在调用 __dask_optimize__ 之前,图和键会合并;因此,传递给此方法的图和键可能代表多个共享相同优化方法的集合。

参数
dskGraph

共享相同 __dask_optimize__ 方法的所有集合的合并图。

keysSequence[Key]

共享相同 __dask_optimize__ 方法的所有集合的 __dask_keys__ 输出列表。

**kwargsAny

computepersist 调用转发的额外关键字参数。可根据需要使用或忽略。

返回
MutableMapping

优化后的 Dask 图。

abstract __dask_postcompute__() tuple[collections.abc.Callable, tuple][source]

用于构建最终结果的终结器函数和可选参数。

计算后,集合中的每个键都将有一个内存中的结果,postcompute 函数将每个键的结果组合成最终的内存中表示。例如,dask.array.Array 将每个分块处的数组连接成最终的内存数组。

返回
PostComputeCallable

可调用对象,接收每个最终键的结果序列以及可选参数。一个示例签名为 finalize(results: Sequence[Any], *args)

tuple[Any, …]

在键结果之后传递给函数的可选参数(PostComputeCallable*args 部分)。如果不传递额外参数,则必须是一个空元组。

abstract __dask_postpersist__() tuple[dask.typing.PostPersistCallable, tuple][source]

用于构建持久化集合的重构函数和可选参数。

另请参阅 dask.typing.PostPersistCallable 的文档。

返回
PostPersistCallable

重构集合的可调用对象。签名应为 rebuild(dsk: Mapping, *args: Any, rename: Mapping[str, str] | None)(由 PostPersistCallable 协议定义)。该可调用对象应返回一个等效的 Dask 集合,其键与 self 相同,但结果是通过不同的图计算得出的。对于 dask.persist() 的情况,新图将只有输出键和已经计算好的值。

tuple[Any, …]

传递给重构可调用对象的可选参数。如果不传递额外参数,则必须是一个空元组。

__dask_scheduler__: staticmethod

用于此对象的默认调度器 get

通常作为 staticmethod 附加到类上,例如

>>> import dask.threaded
>>> class MyCollection:
...     # Use the threaded scheduler by default
...     __dask_scheduler__ = staticmethod(dask.threaded.get)
abstract __dask_tokenize__() collections.abc.Hashable[source]

必须完全代表对象的值。

abstract compute(**kwargs: Any) Any[source]

计算此 dask 集合。

这将一个延迟执行的 Dask 集合转换为其内存中的等效形式。例如,Dask 数组转换为 NumPy 数组,Dask DataFrame 转换为 Pandas DataFrame。在调用此操作之前,整个数据集必须能够放入内存。

参数
schedulerstring, optional

要使用的调度器,如 "threads"、"synchronous" 或 "processes"。如果未提供,默认情况下首先检查全局设置,然后回退到集合的默认设置。

optimize_graphbool, optional

如果为 True [默认],图在计算前进行优化。否则,图按原样运行。这对于调试很有用。

kwargs

转发给调度器函数的额外关键字参数。

返回
集合的计算结果。

另请参阅

dask.compute
abstract persist(**kwargs: Any) dask.typing.CollType[source]

将此 dask 集合持久化到内存中

这将一个延迟执行的 Dask 集合转换为一个元数据相同但结果已完全计算或正在后台积极计算的 Dask 集合。

此函数的操作根据活动的任务调度器有很大差异。如果任务调度器支持异步计算,例如 dask.distributed 调度器,则 persist 将*立即*返回,并且返回值任务图将包含 Dask Future 对象。然而,如果任务调度器仅支持阻塞计算,则对 persist 的调用将*阻塞*,并且返回值任务图将包含具体的 Python 结果。

此函数在使用分布式系统时特别有用,因为结果将保留在分布式内存中,而不是像 compute 那样返回到本地进程。

参数
schedulerstring, optional

要使用的调度器,如 "threads"、"synchronous" 或 "processes"。如果未提供,默认情况下首先检查全局设置,然后回退到集合的默认设置。

optimize_graphbool, optional

如果为 True [默认],图在计算前进行优化。否则,图按原样运行。这对于调试很有用。

**kwargs

转发给调度器函数的额外关键字参数。

返回
由内存中数据支持的新 dask 集合

另请参阅

dask.persist
abstract visualize(filename: str = 'mydask', format: str | None = None, optimize_graph: bool = False, **kwargs: Any) DisplayObject | None[source]

使用 graphviz 渲染此对象的任务图计算过程。

需要安装 graphviz

参数
filenamestr 或 None, optional

写入磁盘的文件名。如果提供的 filename 不包含扩展名,默认将使用 '.png'。如果 filename 为 None,则不写入文件,仅使用管道与 dot 通信。

format{‘png’, ‘pdf’, ‘dot’, ‘svg’, ‘jpeg’, ‘jpg’}, optional

输出文件的写入格式。默认为 'png'。

optimize_graphbool, optional

如果为 True,图在渲染前进行优化。否则,图按原样显示。默认为 False。

color: {None, ‘order’}, optional

节点着色选项。提供 cmap= 关键字参数可指定额外的色图。

**kwargs

转发给 to_graphviz 的额外关键字参数。

返回
resultIPython.display.Image, IPython.display.SVG, or None

更多信息请参见 dask.dot.dot_graph。

另请参阅

dask.visualize
dask.dot.dot_graph

注意

有关优化的更多信息请参见此处

https://docs.dask.org.cn/en/latest/optimize.html

示例

>>> x.visualize(filename='dask.pdf')  
>>> x.visualize(filename='dask.pdf', color='order')  

HLG 集合协议

注意

HighLevelGraphs 正在被弃用,取而代之的是表达式。新的项目鼓励不要实现自己的 HLG 层。

由 Dask 的 高层图 支持的集合必须实现一个额外的方法,由本协议定义

class dask.typing.HLGDaskCollection(*args, **kwargs)[source]

定义使用 HighLevelGraphs 的 Dask 集合的协议。

此协议几乎与 DaskCollection 完全相同,仅添加了 __dask_layers__ 方法(由高层图支持的集合所必需)。

abstract __dask_layers__() collections.abc.Sequence[str][source]

HighLevelGraph 层的名称。

调度器 get 协议

SchedulerGetProtocol 定义了 Dask 集合的 __dask_scheduler__ 定义必须遵循的签名。

class dask.typing.SchedulerGetCallable(*args, **kwargs)[source]

定义 __dask_scheduler__ 可调用对象签名的协议。

__call__(dsk: collections.abc.Mapping[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any], keys: Union[collections.abc.Sequence[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], **kwargs: Any) Any[source]

作为集合默认调度器调用的方法。

参数
dsk

任务图。

keys

对应所需数据的键。

**kwargs

额外参数。

返回
Any

keys 关联的结果。

Post-persist 可调用协议

集合必须定义一个 __dask_postpersist__ 方法,该方法返回一个符合 PostPersistCallable 接口的可调用对象。

class dask.typing.PostPersistCallable(*args, **kwargs)[source]

定义 __dask_postpersist__ 可调用对象签名的协议。

__call__(dsk: collections.abc.Mapping[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any], *args: Any, rename: collections.abc.Mapping[str, str] | None = None) dask.typing.CollType_co[source]

用于重构持久化集合的方法。

参数
dsk: Mapping

一个包含至少由 __dask_keys__() 返回的输出键的映射。

*argsAny

额外的可选参数。如果不需要额外参数,则必须是一个空元组。

renameMapping[str, str], optional

如果定义,则表明输出键也可能正在更改;例如,如果 __dask_keys__() 的先前输出是 [("a", 0), ("a", 1)],则在调用 rebuild(dsk, *extra_args, rename={"a": "b"}) 后必须变为 [("b", 0), ("b", 1)]rename 映射可能不包含集合名称;在这种情况下,关联的键不会改变。它可能包含对意外名称的替换,这些替换必须被忽略。

返回
Collection

一个等效的 Dask 集合,其键与通过不同图计算出的键相同。

核心 Dask 方法的内部结构

Dask 有几个*核心*函数(以及相应的方法),它们实现了常见的操作

  • compute:将一个或多个 Dask 集合转换为其内存中的对应形式

  • persist:将一个或多个 Dask 集合转换为等效的 Dask 集合,其结果已经计算并在内存中缓存

  • optimize:将一个或多个 Dask 集合转换为共享一个大型优化图的等效 Dask 集合

  • visualize:给定一个或多个 Dask 集合,绘制出在调用 computepersist 期间将传递给调度器的图

这里我们简要描述这些函数的内部结构,以说明它们与上述接口的关系。

Compute (计算)

compute 的操作可以分解为三个阶段

  1. 图合并,终结

    首先,单个集合被转换为单个大型表达式和嵌套的键列表。这是通过 collections_to_expr() 完成的,它确保所有集合一起优化以消除常见的子表达式。

    注意

    在此阶段,遗留的 HLG 图被包装到 HLGExpr 中,该表达式根据 __dask_optimize__ 将 __dask_postcompute__ 和低级优化器编码到表达式中。

    optimize_graph 参数仅与 HLG 图相关,并控制是否考虑低级优化。

    • 如果 optimize_graphTrue(默认),则首先按其 __dask_optimize__ 方法对集合进行分组。所有具有相同 __dask_optimize__ 方法的集合都会合并其图并连接其键,然后对各自的 __dask_optimize__ 进行单次调用,并传递合并后的图和键。然后合并生成的结果图。

    • 如果 optimize_graphFalse,则合并所有图并连接所有键。

    组合后的图使用 FinalizeCompute 表达式进行_终结_,该表达式指示表达式/图缩减为单个分区,适合在计算后返回给用户。这可以通过实现集合的 __dask_postcompute__ 方法或实现表达式的优化路径来完成。

    以 DataFrame 为例,FinalizeCompute 简化为 RepartitionToFewer(..., npartition=1),它只是将所有结果连接成一个普通 DataFrame。

  2. (表达式) 优化

    合并后的表达式被优化。此步骤不应与为遗留图定义的 __dask_optimize__ 低级优化相混淆。这是一个总是执行的步骤,是将表达式简化和降低到最终形式的必需步骤,该形式可用于实际生成可执行的任务图。另请参阅 Optimizer (优化器)

    对于遗留的 HLG 图,低级优化步骤嵌入在图物化中,这通常仅在图传递给调度器后发生(见下文)。

  3. 计算

    图合并并执行任何优化后,生成的大型图和嵌套的键列表将传递给调度器。调度器的选择如下:

    • 如果直接以关键字指定了 get 函数,则使用该函数

    • 否则,如果设置了全局调度器,则使用该调度器

    • 否则回退到给定集合的默认调度器。请注意,如果所有集合不共享相同的 __dask_scheduler__,则会引发错误。

    确定适当的调度器 get 函数后,将使用合并后的图、键和额外关键字参数调用它。在此阶段之后,results 是一个嵌套的值列表。此列表的结构与 keys 的结构相同,每个键都替换为其相应的结果。

Persist (持久化)

Persist 与 compute 非常相似,除了返回值的创建方式。它也包含三个阶段

  1. 图合并,*不*终结

    compute 中相同,但不进行终结。在 persist 的情况下,我们不希望连接所有输出分区,而是希望为每个分区返回一个 Future。

  2. (表达式) 优化

    compute 中相同。

  3. 计算

    compute 中相同,不同之处在于返回的结果是 Futures 列表。

  4. Postpersist (持久化后处理)

    调度器返回的 Futures 与 __dask_postpersist__ 一起用于重构指向远程数据的集合。

    __dask_postpersist__ 返回两项内容

    • 一个 rebuild 函数,该函数接受一个持久化的图作为输入。该图的键与相应集合的 __dask_keys__ 相同,值是计算结果(对于单机调度器)或 Futures(对于分布式调度器)。

    • 一个元组,包含在图之后传递给 rebuild 的额外参数

    为了构建 persist 的输出,遍历集合和结果列表,并对每个集合的重构器在其相应结果的图上进行调用。

Optimize (优化)

optimize 的操作可以分解为两个阶段

  1. 图合并,*不*终结

    persist 中相同。

  2. (表达式) 优化

    computepersist 中相同。

  3. 物化和重构

    整个图被物化(这也执行了低级优化)。类似于 persist,使用 __dask_postpersist__ 返回的 rebuild 函数和参数从优化后的图中重构等效的集合。

Visualize (可视化)

Visualize 是四个核心函数中最简单的一个。它只有两个阶段

  1. 图合并和优化

    compute 中相同。

  2. 图绘制

    使用 graphviz 绘制生成的合并图,并输出到指定文件。

向您的类添加核心 Dask 方法

定义上述接口将允许您的对象被核心 Dask 函数(dask.computedask.persistdask.visualize 等)使用。要添加相应的方法版本,您可以继承 dask.base.DaskMethodsMixin,它根据上述接口添加了 computepersistvisualize 的实现。

使用表达式定义计算

建议使用 dask.expr.Expr 类定义 dask 图。要开始,必须实现最少的方法集。

class dask.Expr(*args, _determ_token=None, **kwargs)[source]
__dask_graph__()[source]

遍历表达式树,收集层

除非需要自定义逻辑来处理(例如忽略)图生成过程中的特定操作数,否则子类通常不需要覆盖此方法。

另请参阅

Expr._layer
Expr._task
__dask_keys__()[source]

此表达式的键

这用于确定计算此表达式时输出集合的键。

返回
keys: list

此表达式的键

_layer() dict[source]

此表达式添加的图层。

对每个分区应用一个任务的简单表达式可以选择仅实现 Expr._task

返回
layer: dict

此表达式添加的 Dask 任务图

示例

>>> class Add(Expr):
...     def _layer(self):
...         return {
...            name: Task(
...                name,
...                operator.add,
...                TaskRef((self.left._name, i)),
...                TaskRef((self.right._name, i))
...            )
...            for i, name in enumerate(self.__dask_keys__())
...         }
_task(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], index: int) dask._task_spec.Task[source]

第 i 个分区的任务

参数
index

此 DataFrame 分区的索引

返回
task

用于计算此分区的 Dask 任务

另请参阅

Expr._layer

示例

>>> class Add(Expr):
...     def _task(self, i):
...         return Task(
...            self.__dask_keys__()[i],
...            operator.add,
...            TaskRef((self.left._name, i)),
...            TaskRef((self.right._name, i))
...        )

Dask 集合示例

这里我们创建一个表示元组的 Dask 集合。元组中的每个元素在图中都表示为一个任务。请注意,这仅用于说明目的 - 同样的用户体验可以使用带有 dask.delayed 元素的普通元组实现

import dask
from dask.base import DaskMethodsMixin, replace_name_in_key
from dask.expr import Expr, LLGExpr
from dask.typing import Key
from dask.task_spec import Task, DataNode, Alias


# We subclass from DaskMethodsMixin to add common dask methods to
# our class (compute, persist, and visualize). This is nice but not
# necessary for creating a Dask collection (you can define them
# yourself).
class Tuple(DaskMethodsMixin):
    def __init__(self, expr):
        self._expr = expr

    def __dask_graph__(self):
        return self._expr.__dask_graph__()

    def __dask_keys__(self):
        return self._expr.__dask_keys__()

    # Use the threaded scheduler by default.
    __dask_scheduler__ = staticmethod(dask.threaded.get)

    def __dask_postcompute__(self):
        # We want to return the results as a tuple, so our finalize
        # function is `tuple`. There are no extra arguments, so we also
        # return an empty tuple.
        return tuple, ()

    def __dask_postpersist__(self):
        return Tuple._rebuild, ("mysuffix",)

    @staticmethod
    def _rebuild(futures: dict, name: str):
        expr = LLGExpr({
            (name, i): DataNode((name, i), val)
            for i, val in  enumerate(futures.values())
        })
        return Tuple(expr)

    def __dask_tokenize__(self):
        # For tokenize to work we want to return a value that fully
        # represents this object. In this case this is done by a type
        identifier plus the (also tokenized) name of the expression
        return (type(self), self._expr._name)

class RemoteTuple(Expr):
    @property
    def npartitions(self):
        return len(self.operands)

    def __dask_keys__(self):
        return [(self._name, i) for i in range(self.npartitions)]

    def _task(self, name: Key, index: int) -> Task:
        return DataNode(name, self.operands[index])

演示这个类

>>> from dask_tuple import Tuple

def from_pytuple(pytup: tuple) -> Tuple:
    return Tuple(RemoteTuple(*pytup))

>>> dask_tup = from_pytuple(tuple(range(5)))

>>> dask_tup.__dask_keys__()
[('remotetuple-b7ea9a26c3ab8287c78d11fd45f26793', 0),
('remotetuple-b7ea9a26c3ab8287c78d11fd45f26793', 1),
('remotetuple-b7ea9a26c3ab8287c78d11fd45f26793', 2)]

# Compute turns Tuple into a tuple
>>> dask_tup.compute()
(0, 1, 2)

# Persist turns Tuple into a Tuple, with each task already computed
>>> dask_tup2 = dask_tup.persist()
>>> isinstance(dask_tup2, Tuple)
True

>>> dask_tup2.__dask_graph__()
{('newname', 0): DataNode(0),
('newname', 1): DataNode(1),
('newname', 2): DataNode(2)}

>>> x2.compute()
(0, 1, 2)

# Run-time typechecking
>>> from dask.typing import DaskCollection
>>> isinstance(x, DaskCollection)
True

检查对象是否为 Dask 集合

要检查对象是否为 Dask 集合,请使用 dask.base.is_dask_collection

>>> from dask.base import is_dask_collection
>>> from dask import delayed

>>> x = delayed(sum)([1, 2, 3])
>>> is_dask_collection(x)
True
>>> is_dask_collection(1)
False

实现确定性哈希

Dask 实现了自己的确定性哈希函数,根据参数的值生成键。此函数可作为 dask.base.tokenize 使用。许多常见类型已经实现了 tokenize,可以在 dask/base.py 中找到。

创建自己的自定义类时,可能需要注册 tokenize 实现。有两种方法可以做到这一点

  1. __dask_tokenize__ 方法

    在可能的情况下,建议定义 __dask_tokenize__ 方法。此方法不接受参数,应返回一个完全代表对象的值。在返回任何非平凡对象之前,最好从其中调用 dask.base.normalize_token

  2. 使用 dask.base.normalize_token 注册函数

    如果在类上定义方法不可行,或者您需要为已注册了父类的类自定义 tokenize 函数(例如,如果需要继承内置类),则可以使用 normalize_token 分派注册 tokenize 函数。该函数的签名应与上述描述相同。

在两种情况下,实现应该相同,只是定义的位置不同。

注意

Dask 集合和普通 Python 对象都可以使用上述任一方法实现 tokenize

示例

>>> from dask.base import tokenize, normalize_token

# Define a tokenize implementation using a method.
>>> class Point:
...     def __init__(self, x, y):
...         self.x = x
...         self.y = y
...
...     def __dask_tokenize__(self):
...         # This tuple fully represents self
...         # Wrap non-trivial objects with normalize_token before returning them
...         return normalize_token(Point), self.x, self.y

>>> x = Point(1, 2)
>>> tokenize(x)
'5988362b6e07087db2bc8e7c1c8cc560'
>>> tokenize(x) == tokenize(x)  # token is idempotent
True
>>> tokenize(Point(1, 2)) == tokenize(Point(1, 2))  # token is deterministic
True
>>> tokenize(Point(1, 2)) == tokenize(Point(2, 1))  # tokens are unique
False


# Register an implementation with normalize_token
>>> class Point3D:
...     def __init__(self, x, y, z):
...         self.x = x
...         self.y = y
...         self.z = z

>>> @normalize_token.register(Point3D)
... def normalize_point3d(x):
...     return normalize_token(Point3D), x.x, x.y, x.z

>>> y = Point3D(1, 2, 3)
>>> tokenize(y)
'5a7e9c3645aa44cf13d021c14452152e'

更多示例请参见 dask/base.py 或任何内置 Dask 集合。