自定义集合

对于许多问题,内置的 Dask 集合(dask.arraydask.dataframedask.bagdask.delayed)已经足够。在不够用的情况下,可以创建自己的 Dask 集合。本文将介绍满足 Dask 集合接口所需的方法。

注意

这被认为是一个高级特性。在大多数情况下,内置集合可能已经足够。

阅读本文之前,你应该阅读并理解

目录

Dask 集合接口

要创建自己的 Dask 集合,你需要满足 dask.typing.DaskCollection 协议定义的接口。注意,没有必需的基类。

建议也阅读 Dask 核心方法的内部原理,了解 Dask 内部如何使用此接口。

集合协议

class dask.typing.DaskCollection(*args, **kwargs)[source]

定义 Dask 集合接口的协议。

abstract __dask_graph__() collections.abc.Mapping[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any][source]

Dask 任务图。

核心 Dask 集合(Array、DataFrame、Bag 和 Delayed)使用 HighLevelGraph 表示集合任务图。也可以使用 Python 字典将任务图表示为低级别图。

返回
映射

Dask 任务图。如果实例返回 dask.highlevelgraph.HighLevelGraph,则必须实现 __dask_layers__() 方法,如 HLGDaskCollection 协议所定义。

abstract __dask_keys__() list[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...], ForwardRef('NestedKeys')]][source]

任务图的输出键。

注意,Dask 集合的键有额外的约束,与任务图规范文档中描述的约束不同。这些额外约束如下所述。

所有键必须是非空字符串,或以非空字符串为第一个元素的元组,后面跟着零个或多个任意的 str、bytes、int、float 或它们的元组。非空字符串通常被称为集合名称。dask 包中包含的所有集合都恰好有一个名称,但这并非强制要求。

以下是所有有效输出

  • []

  • ["x", "y"]

  • [[("y", "a", 0), ("y", "a", 1)], [("y", "b", 0), ("y", "b", 1)]

返回
列表

可能是一个嵌套的键列表,表示图的输出。计算后,结果将以相同的布局返回,其中键被相应的输出替换。

__dask_optimize__: Any

给定一个图和键,返回一个新的优化图。

此方法可以是 staticmethodclassmethod,但不能是 instancemethod。示例实现请参阅核心 Dask 集合中的 __dask_optimize__ 定义,例如:dask.array.Arraydask.dataframe.DataFrame 等。

注意,在调用 __dask_optimize__ 之前,图和键会被合并;因此,传递给此方法的图和键可能代表多个共享相同优化方法的集合。

参数
dsk

所有共享相同 __dask_optimize__ 方法的集合的合并图。

keysSequence[Key]

所有共享相同 __dask_optimize__ 方法的集合的 __dask_keys__ 方法的输出列表。

**kwargsAny

从调用 computepersist 转发的额外关键字参数。可以根据需要使用或忽略。

返回
MutableMapping

优化后的 Dask 图。

abstract __dask_postcompute__() tuple[collections.abc.Callable, tuple][source]

用于构建最终结果的终结函数和可选参数。

计算时,集合中的每个键都会有一个内存中的结果,postcompute 函数将每个键的结果组合成最终的内存表示。例如,dask.array.Array 将每个分块的数组连接成最终的内存数组。

返回
PostComputeCallable

一个可调用对象,接收每个最终键的结果序列以及可选参数。一个示例签名可以是 finalize(results: Sequence[Any], *args)

tuple[Any, …]

传递给函数的可选参数,紧跟在键结果之后(PostComputeCallable*args 部分)。如果不需要传递额外参数,则必须是空元组。

abstract __dask_postpersist__() tuple[dask.typing.PostPersistCallable, tuple][source]

用于构造持久化集合的重建函数和可选参数。

另请参阅 dask.typing.PostPersistCallable 的文档。

返回
PostPersistCallable

重建集合的可调用对象。签名应为 rebuild(dsk: Mapping, *args: Any, rename: Mapping[str, str] | None)(由 PostPersistCallable 协议定义)。可调用对象应返回一个等效的 Dask 集合,其键与 self 相同,但结果是通过不同的图计算得出的。对于 dask.persist() 的情况,新图将只包含输出键和已计算的值。

tuple[Any, …]

传递给重建可调用对象的可选参数。如果不需要传递额外参数,则必须是空元组。

__dask_scheduler__: staticmethod

此对象默认使用的调度器 get 方法。

通常作为 staticmethod 附加到类上,例如:

>>> import dask.threaded
>>> class MyCollection:
...     # Use the threaded scheduler by default
...     __dask_scheduler__ = staticmethod(dask.threaded.get)
abstract __dask_tokenize__() collections.abc.Hashable[source]

必须完整表示对象的值。

abstract compute(**kwargs: Any) Any[source]

计算此 Dask 集合。

这将一个惰性 Dask 集合转换为其内存中的等效形式。例如,Dask 数组转换为 NumPy 数组,Dask DataFrame 转换为 Pandas DataFrame。在调用此操作之前,整个数据集必须能放入内存。

参数
scheduler字符串,可选

要使用的调度器,如“threads”、“synchronous”或“processes”。如果未提供,默认优先检查全局设置,然后回退到集合的默认设置。

optimize_graph布尔值,可选

如果为 True [默认],则在计算前对图进行优化。否则,图将按原样运行。这对于调试很有用。

kwargs

转发给调度器函数的额外关键字参数。

返回
集合的计算结果。

另请参阅

dask.compute
abstract persist(**kwargs: Any) dask.typing.CollType[source]

将此 Dask 集合持久化到内存中

这将一个惰性 Dask 集合转换为一个具有相同元数据的 Dask 集合,但现在结果已完全计算或正在后台积极计算。

此函数的操作因活动任务调度器而异。如果任务调度器支持异步计算,例如 dask.distributed 调度器,那么 persist 将立即返回,并且返回值包含 Dask Future 对象。但是,如果任务调度器只支持阻塞计算,那么调用 persist 将会阻塞,并且返回值的任务图将包含具体的 Python 结果。

在使用分布式系统时,此函数特别有用,因为结果将保留在分布式内存中,而不是像 compute 那样返回到本地进程。

参数
scheduler字符串,可选

要使用的调度器,如“threads”、“synchronous”或“processes”。如果未提供,默认优先检查全局设置,然后回退到集合的默认设置。

optimize_graph布尔值,可选

如果为 True [默认],则在计算前对图进行优化。否则,图将按原样运行。这对于调试很有用。

**kwargs

转发给调度器函数的额外关键字参数。

返回
由内存数据支持的新 Dask 集合

另请参阅

dask.persist
abstract visualize(filename: str = 'mydask', format: str | None = None, optimize_graph: bool = False, **kwargs: Any) DisplayObject | None[source]

使用 graphviz 渲染此对象任务图的计算过程。

需要安装 graphviz

参数
filename字符串或 None,可选

要写入磁盘的文件名。如果提供的 filename 不包含扩展名,默认将使用“.png”。如果 filename 为 None,则不会写入文件,并且仅使用管道与 dot 通信。

format{‘png’, ‘pdf’, ‘dot’, ‘svg’, ‘jpeg’, ‘jpg’},可选

输出文件的写入格式。默认为 ‘png’。

optimize_graph布尔值,可选

如果为 True,则在渲染前优化图。否则,图将按原样显示。默认为 False。

color: {None, ‘order’},可选

节点着色选项。提供 cmap= 关键字以获取额外的颜色映射。

**kwargs

转发给 to_graphviz 的额外关键字参数。

返回
resultIPython.display.Image, IPython.display.SVG, 或 None

有关更多信息,请参阅 dask.dot.dot_graph。

另请参阅

dask.visualize
dask.dot.dot_graph

注意

有关优化的更多信息,请参见此处

https://docs.dask.org.cn/en/latest/optimize.html

示例

>>> x.visualize(filename='dask.pdf')  
>>> x.visualize(filename='dask.pdf', color='order')  

HLG 集合协议

注意

HighLevelGraph 正在被表达式取代并弃用。鼓励新项目不再实现自己的 HLG 层。

由 Dask 的 高级图 支持的集合必须实现此协议定义的额外方法

class dask.typing.HLGDaskCollection(*args, **kwargs)[source]

定义使用 HighLevelGraph 的 Dask 集合的协议。

此协议与 DaskCollection 几乎相同,额外增加了 __dask_layers__ 方法(高级图支持的集合必需)。

abstract __dask_layers__() collections.abc.Sequence[str][source]

HighLevelGraph 层的名称。

调度器 get 协议

SchedulerGetProtocol 定义了 Dask 集合的 __dask_scheduler__ 定义必须遵循的签名。

class dask.typing.SchedulerGetCallable(*args, **kwargs)[source]

定义 __dask_scheduler__ 可调用对象签名的协议。

__call__(dsk: collections.abc.Mapping[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any], keys: Union[collections.abc.Sequence[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], **kwargs: Any) Any[source]

作为集合的默认调度器调用的方法。

参数
dsk

任务图。

keys

对应于所需数据的键。

**kwargs

额外参数。

返回
Any

keys 关联的结果。

持久化后可调用协议

集合必须定义一个 __dask_postpersist__ 方法,该方法返回一个遵循 PostPersistCallable 接口的可调用对象。

class dask.typing.PostPersistCallable(*args, **kwargs)[source]

定义 __dask_postpersist__ 可调用对象签名的协议。

__call__(dsk: collections.abc.Mapping[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any], *args: Any, rename: collections.abc.Mapping[str, str] | None = None) dask.typing.CollType_co[source]

调用此方法以重建持久化集合。

参数
dsk: Mapping

一个映射,至少包含由 __dask_keys__() 返回的输出键。

*argsAny

额外可选参数。如果不需要额外参数,则必须是空元组。

renameMapping[str, str],可选

如果定义,它表明输出键也可能发生变化;例如,如果 __dask_keys__() 之前输出是 [("a", 0), ("a", 1)],那么调用 rebuild(dsk, *extra_args, rename={"a": "b"}) 后,它必须变为 [("b", 0), ("b", 1)]rename 映射可能不包含集合名称;在这种情况下,关联的键不会改变。它可能包含意外名称的替换,这些替换必须被忽略。

返回
Collection

一个等效的 Dask 集合,具有相同的键,但通过不同的图计算得出。

Dask 核心方法的内部原理

Dask 有一些实现常见操作的核心函数(及对应方法):

  • compute:将一个或多个 Dask 集合转换为其内存中的对应形式

  • persist:将一个或多个 Dask 集合转换为等效的 Dask 集合,其结果已计算并在内存中缓存

  • optimize:将一个或多个 Dask 集合转换为等效的 Dask 集合,共享一个大型优化图

  • visualize:给定一个或多个 Dask 集合,绘制出在调用 computepersist 期间将传递给调度器的图

在此,我们简要描述这些函数的内部原理,以说明它们与上述接口的关系。

计算

compute 的操作可以分为三个阶段:

  1. 图合并,终结

    首先,将各个集合转换为单个大型表达式和嵌套的键列表。这通过 collections_to_expr() 完成,确保所有集合一起优化以消除公共子表达式。

    注意

    在此阶段,传统 HLG 图被封装到 HLGExpr 中,该表达式将 __dask_postcompute__ 和由 __dask_optimize__ 决定的低级别优化器编码到表达式中。

    optimize_graph 参数仅与 HLG 图相关,控制是否考虑低级别优化。

    • 如果 optimize_graphTrue(默认),则首先按集合的 __dask_optimize__ 方法对集合进行分组。所有具有相同 __dask_optimize__ 方法的集合会将其图合并并将键连接,然后对每个相应的 __dask_optimize__ 进行一次调用,并传入合并后的图和键。然后将结果图合并。

    • 如果 optimize_graphFalse,则所有图都被合并,所有键都被连接。

    合并后的图通过 FinalizeCompute 表达式进行_终结_,该表达式指示表达式/图简化为单个分区,适合在计算后返回给用户。这可以通过实现集合的 __dask_postcompute__ 方法或实现表达式的优化路径来完成。

    以 DataFrame 为例,FinalizeCompute 简化为 RepartitionToFewer(..., npartition=1),它只是将所有结果连接到一个普通的 DataFrame 中。

  2. (表达式) 优化

    合并后的表达式被优化。此步骤不应与传统图的 __dask_optimize__ 定义的低级别优化混淆。这是一个总是执行的步骤,也是将表达式简化并降低到最终形式(可用于实际生成可执行任务图)的必需步骤。另请参阅,优化器

    对于传统的 HLG 图,低级别优化步骤嵌入在图具体化过程中,该过程通常仅在图传递给调度器后发生(见下文)。

  3. 计算

    图合并并执行任何优化后,生成的大型图和嵌套键列表将传递给调度器。调度器的选择如下:

    • 如果直接通过关键字指定了 get 函数,则使用该函数

    • 否则,如果设置了全局调度器,则使用该调度器

    • 否则回退到给定集合的默认调度器。注意,如果所有集合没有共享相同的 __dask_scheduler__,则会引发错误。

    一旦确定了合适的调度器 get 函数,就会使用合并后的图、键和额外关键字参数调用它。在此阶段之后,results 是一个嵌套的值列表。此列表的结构与 keys 镜像,其中每个键被其相应的结果替换。

持久化

Persist 与 compute 非常相似,除了返回值的创建方式不同。它也有三个阶段:

  1. 图合并,*无*终结

    compute 相同,但没有终结。在 persist 的情况下,我们不希望连接所有输出分区,而是希望为每个分区返回一个 future。

  2. (表达式) 优化

    compute 相同。

  3. 计算

    compute 相同,区别在于返回的结果是 Future 对象的列表。

  4. 持久化后处理

    调度器返回的 futures 与 __dask_postpersist__ 一起使用,以重建指向远程数据的集合。

    __dask_postpersist__ 返回两项内容:

    • 一个 rebuild 函数,它接受一个持久化图。此图的键与相应集合的 __dask_keys__ 相同,值是计算结果(对于单机调度器)或 futures(对于分布式调度器)。

    • 一个元组,包含在图之后传递给 rebuild 的额外参数

    为了构建 persist 的输出,会遍历集合和结果列表,并为每个集合调用其对应的重建器,传入其结果图。

优化

optimize 的操作可以分为两个阶段:

  1. 图合并,*无*终结

    persist 相同。

  2. (表达式) 优化

    computepersist 相同。

  3. 具体化和重建

    整个图被具体化(这也执行低级别优化)。类似于 persist__dask_postpersist__ 中的 rebuild 函数和参数用于从优化后的图重建等效集合。

可视化

Visualize 是这 4 个核心函数中最简单的。它只有两个阶段:

  1. 图合并与优化

    compute 相同。

  2. 图绘制

    使用 graphviz 绘制合并后的图,并输出到指定文件。

将 Dask 核心方法添加到你的类

定义上述接口将使你的对象能够被 Dask 核心函数(dask.computedask.persistdask.visualize 等)使用。要添加对应的方法版本,你可以继承 dask.base.DaskMethodsMixin 类,它基于上述接口添加了 computepersistvisualize 的实现。

定义计算的表达式

建议使用 dask.expr.Expr 类来定义 Dask 图。入门时,需要实现最少一组方法。

class dask.Expr(*args, _determ_token=None, **kwargs)[source]
__dask_graph__()[source]

遍历表达式树,收集层

除非需要自定义逻辑来处理(例如忽略)图生成期间的特定操作数,否则子类通常不应覆盖此方法。

另请参阅

Expr._layer
Expr._task
__dask_keys__()[source]

此表达式的键

这用于在计算此表达式时确定输出集合的键。

返回
keys: list

此表达式的键

_layer() dict[source]

此表达式添加的图层。

每分区应用一个任务的简单表达式可以选择只实现 Expr._task

返回
layer: dict

此表达式添加的 Dask 任务图

示例

>>> class Add(Expr):
...     def _layer(self):
...         return {
...            name: Task(
...                name,
...                operator.add,
...                TaskRef((self.left._name, i)),
...                TaskRef((self.right._name, i))
...            )
...            for i, name in enumerate(self.__dask_keys__())
...         }
_task(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], index: int) dask._task_spec.Task[source]

第 i 个分区的任务

参数
index

此 DataFrame 分区的索引

返回
task

计算此分区的 Dask 任务

另请参阅

Expr._layer

示例

>>> class Add(Expr):
...     def _task(self, i):
...         return Task(
...            self.__dask_keys__()[i],
...            operator.add,
...            TaskRef((self.left._name, i)),
...            TaskRef((self.right._name, i))
...        )

Dask 集合示例

这里我们创建一个代表元组的 Dask 集合。元组中的每个元素在图中表示为一个任务。注意,这仅用于说明目的 - 使用包含 dask.delayed 元素的普通元组也可以实现相同的用户体验。

import dask
from dask.base import DaskMethodsMixin, replace_name_in_key
from dask.expr import Expr, LLGExpr
from dask.typing import Key
from dask.task_spec import Task, DataNode, Alias


# We subclass from DaskMethodsMixin to add common dask methods to
# our class (compute, persist, and visualize). This is nice but not
# necessary for creating a Dask collection (you can define them
# yourself).
class Tuple(DaskMethodsMixin):
    def __init__(self, expr):
        self._expr = expr

    def __dask_graph__(self):
        return self._expr.__dask_graph__()

    def __dask_keys__(self):
        return self._expr.__dask_keys__()

    # Use the threaded scheduler by default.
    __dask_scheduler__ = staticmethod(dask.threaded.get)

    def __dask_postcompute__(self):
        # We want to return the results as a tuple, so our finalize
        # function is `tuple`. There are no extra arguments, so we also
        # return an empty tuple.
        return tuple, ()

    def __dask_postpersist__(self):
        return Tuple._rebuild, ("mysuffix",)

    @staticmethod
    def _rebuild(futures: dict, name: str):
        expr = LLGExpr({
            (name, i): DataNode((name, i), val)
            for i, val in  enumerate(futures.values())
        })
        return Tuple(expr)

    def __dask_tokenize__(self):
        # For tokenize to work we want to return a value that fully
        # represents this object. In this case this is done by a type
        identifier plus the (also tokenized) name of the expression
        return (type(self), self._expr._name)

class RemoteTuple(Expr):
    @property
    def npartitions(self):
        return len(self.operands)

    def __dask_keys__(self):
        return [(self._name, i) for i in range(self.npartitions)]

    def _task(self, name: Key, index: int) -> Task:
        return DataNode(name, self.operands[index])

演示此类

>>> from dask_tuple import Tuple

def from_pytuple(pytup: tuple) -> Tuple:
    return Tuple(RemoteTuple(*pytup))

>>> dask_tup = from_pytuple(tuple(range(5)))

>>> dask_tup.__dask_keys__()
[('remotetuple-b7ea9a26c3ab8287c78d11fd45f26793', 0),
('remotetuple-b7ea9a26c3ab8287c78d11fd45f26793', 1),
('remotetuple-b7ea9a26c3ab8287c78d11fd45f26793', 2)]

# Compute turns Tuple into a tuple
>>> dask_tup.compute()
(0, 1, 2)

# Persist turns Tuple into a Tuple, with each task already computed
>>> dask_tup2 = dask_tup.persist()
>>> isinstance(dask_tup2, Tuple)
True

>>> dask_tup2.__dask_graph__()
{('newname', 0): DataNode(0),
('newname', 1): DataNode(1),
('newname', 2): DataNode(2)}

>>> x2.compute()
(0, 1, 2)

# Run-time typechecking
>>> from dask.typing import DaskCollection
>>> isinstance(x, DaskCollection)
True

检查对象是否为 Dask 集合

要检查对象是否为 Dask 集合,请使用 dask.base.is_dask_collection

>>> from dask.base import is_dask_collection
>>> from dask import delayed

>>> x = delayed(sum)([1, 2, 3])
>>> is_dask_collection(x)
True
>>> is_dask_collection(1)
False

实现确定性哈希

Dask 实现了自己的确定性哈希函数,根据参数值生成键。此函数可通过 dask.base.tokenize 获得。许多常见类型已经实现了 tokenize,可以在 dask/base.py 中找到。

创建自己的自定义类时,你可能需要注册一个 tokenize 实现。有两种方法可以做到:

  1. __dask_tokenize__ 方法

    在可能的情况下,建议定义 __dask_tokenize__ 方法。此方法不接受参数,应返回一个完整代表对象的值。在返回任何非平凡对象之前,最好从该方法中调用 dask.base.normalize_token

  2. 使用 dask.base.normalize_token 注册一个函数

    如果无法在类上定义方法,或者你需要为已注册父类的类自定义 tokenize 函数(例如,如果你需要子类化内置类型),则可以使用 normalize_token 分发机制注册一个 tokenize 函数。该函数应具有与上述相同的签名。

在两种情况下,实现应该相同,只是定义的位置不同。

注意

Dask 集合和普通 Python 对象都可以使用上述任一方法实现 tokenize

示例

>>> from dask.base import tokenize, normalize_token

# Define a tokenize implementation using a method.
>>> class Point:
...     def __init__(self, x, y):
...         self.x = x
...         self.y = y
...
...     def __dask_tokenize__(self):
...         # This tuple fully represents self
...         # Wrap non-trivial objects with normalize_token before returning them
...         return normalize_token(Point), self.x, self.y

>>> x = Point(1, 2)
>>> tokenize(x)
'5988362b6e07087db2bc8e7c1c8cc560'
>>> tokenize(x) == tokenize(x)  # token is idempotent
True
>>> tokenize(Point(1, 2)) == tokenize(Point(1, 2))  # token is deterministic
True
>>> tokenize(Point(1, 2)) == tokenize(Point(2, 1))  # tokens are unique
False


# Register an implementation with normalize_token
>>> class Point3D:
...     def __init__(self, x, y, z):
...         self.x = x
...         self.y = y
...         self.z = z

>>> @normalize_token.register(Point3D)
... def normalize_point3d(x):
...     return normalize_token(Point3D), x.x, x.y, x.z

>>> y = Point3D(1, 2, 3)
>>> tokenize(y)
'5a7e9c3645aa44cf13d021c14452152e'

更多示例请参阅 dask/base.py 或任何内置的 Dask 集合。