自定义集合
目录
自定义集合¶
对于许多问题,内置的 Dask 集合(dask.array
、dask.dataframe
、dask.bag
和 dask.delayed
)已经足够。在不够用的情况下,可以创建自己的 Dask 集合。本文将介绍满足 Dask 集合接口所需的方法。
注意
这被认为是一个高级特性。在大多数情况下,内置集合可能已经足够。
阅读本文之前,你应该阅读并理解
目录
Dask 集合接口¶
要创建自己的 Dask 集合,你需要满足 dask.typing.DaskCollection
协议定义的接口。注意,没有必需的基类。
建议也阅读 Dask 核心方法的内部原理,了解 Dask 内部如何使用此接口。
集合协议¶
- class dask.typing.DaskCollection(*args, **kwargs)[source]¶
定义 Dask 集合接口的协议。
- abstract __dask_graph__() collections.abc.Mapping[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any] [source]¶
Dask 任务图。
核心 Dask 集合(Array、DataFrame、Bag 和 Delayed)使用
HighLevelGraph
表示集合任务图。也可以使用 Python 字典将任务图表示为低级别图。- 返回
- 映射
Dask 任务图。如果实例返回
dask.highlevelgraph.HighLevelGraph
,则必须实现__dask_layers__()
方法,如HLGDaskCollection
协议所定义。
- abstract __dask_keys__() list[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...], ForwardRef('NestedKeys')]] [source]¶
任务图的输出键。
注意,Dask 集合的键有额外的约束,与任务图规范文档中描述的约束不同。这些额外约束如下所述。
所有键必须是非空字符串,或以非空字符串为第一个元素的元组,后面跟着零个或多个任意的 str、bytes、int、float 或它们的元组。非空字符串通常被称为集合名称。dask 包中包含的所有集合都恰好有一个名称,但这并非强制要求。
以下是所有有效输出
[]
["x", "y"]
[[("y", "a", 0), ("y", "a", 1)], [("y", "b", 0), ("y", "b", 1)]
- 返回
- 列表
可能是一个嵌套的键列表,表示图的输出。计算后,结果将以相同的布局返回,其中键被相应的输出替换。
- __dask_optimize__: Any¶
给定一个图和键,返回一个新的优化图。
此方法可以是
staticmethod
或classmethod
,但不能是instancemethod
。示例实现请参阅核心 Dask 集合中的__dask_optimize__
定义,例如:dask.array.Array
、dask.dataframe.DataFrame
等。注意,在调用
__dask_optimize__
之前,图和键会被合并;因此,传递给此方法的图和键可能代表多个共享相同优化方法的集合。- 参数
- dsk图
所有共享相同
__dask_optimize__
方法的集合的合并图。- keysSequence[Key]
所有共享相同
__dask_optimize__
方法的集合的__dask_keys__
方法的输出列表。- **kwargsAny
从调用
compute
或persist
转发的额外关键字参数。可以根据需要使用或忽略。
- 返回
- MutableMapping
优化后的 Dask 图。
- abstract __dask_postcompute__() tuple[collections.abc.Callable, tuple] [source]¶
用于构建最终结果的终结函数和可选参数。
计算时,集合中的每个键都会有一个内存中的结果,postcompute 函数将每个键的结果组合成最终的内存表示。例如,dask.array.Array 将每个分块的数组连接成最终的内存数组。
- 返回
- PostComputeCallable
一个可调用对象,接收每个最终键的结果序列以及可选参数。一个示例签名可以是
finalize(results: Sequence[Any], *args)
。- tuple[Any, …]
传递给函数的可选参数,紧跟在键结果之后(
PostComputeCallable
的 *args 部分)。如果不需要传递额外参数,则必须是空元组。
- abstract __dask_postpersist__() tuple[dask.typing.PostPersistCallable, tuple] [source]¶
用于构造持久化集合的重建函数和可选参数。
另请参阅
dask.typing.PostPersistCallable
的文档。- 返回
- PostPersistCallable
重建集合的可调用对象。签名应为
rebuild(dsk: Mapping, *args: Any, rename: Mapping[str, str] | None)
(由PostPersistCallable
协议定义)。可调用对象应返回一个等效的 Dask 集合,其键与 self 相同,但结果是通过不同的图计算得出的。对于dask.persist()
的情况,新图将只包含输出键和已计算的值。- tuple[Any, …]
传递给重建可调用对象的可选参数。如果不需要传递额外参数,则必须是空元组。
- __dask_scheduler__: staticmethod¶
此对象默认使用的调度器
get
方法。通常作为 staticmethod 附加到类上,例如:
>>> import dask.threaded >>> class MyCollection: ... # Use the threaded scheduler by default ... __dask_scheduler__ = staticmethod(dask.threaded.get)
- abstract __dask_tokenize__() collections.abc.Hashable [source]¶
必须完整表示对象的值。
- abstract compute(**kwargs: Any) Any [source]¶
计算此 Dask 集合。
这将一个惰性 Dask 集合转换为其内存中的等效形式。例如,Dask 数组转换为 NumPy 数组,Dask DataFrame 转换为 Pandas DataFrame。在调用此操作之前,整个数据集必须能放入内存。
- 参数
- scheduler字符串,可选
要使用的调度器,如“threads”、“synchronous”或“processes”。如果未提供,默认优先检查全局设置,然后回退到集合的默认设置。
- optimize_graph布尔值,可选
如果为 True [默认],则在计算前对图进行优化。否则,图将按原样运行。这对于调试很有用。
- kwargs
转发给调度器函数的额外关键字参数。
- 返回
- 集合的计算结果。
另请参阅
- abstract persist(**kwargs: Any) dask.typing.CollType [source]¶
将此 Dask 集合持久化到内存中
这将一个惰性 Dask 集合转换为一个具有相同元数据的 Dask 集合,但现在结果已完全计算或正在后台积极计算。
此函数的操作因活动任务调度器而异。如果任务调度器支持异步计算,例如 dask.distributed 调度器,那么 persist 将立即返回,并且返回值包含 Dask Future 对象。但是,如果任务调度器只支持阻塞计算,那么调用 persist 将会阻塞,并且返回值的任务图将包含具体的 Python 结果。
在使用分布式系统时,此函数特别有用,因为结果将保留在分布式内存中,而不是像 compute 那样返回到本地进程。
- 参数
- scheduler字符串,可选
要使用的调度器,如“threads”、“synchronous”或“processes”。如果未提供,默认优先检查全局设置,然后回退到集合的默认设置。
- optimize_graph布尔值,可选
如果为 True [默认],则在计算前对图进行优化。否则,图将按原样运行。这对于调试很有用。
- **kwargs
转发给调度器函数的额外关键字参数。
- 返回
- 由内存数据支持的新 Dask 集合
另请参阅
- abstract visualize(filename: str = 'mydask', format: str | None = None, optimize_graph: bool = False, **kwargs: Any) DisplayObject | None [source]¶
使用 graphviz 渲染此对象任务图的计算过程。
需要安装
graphviz
。- 参数
- filename字符串或 None,可选
要写入磁盘的文件名。如果提供的 filename 不包含扩展名,默认将使用“.png”。如果 filename 为 None,则不会写入文件,并且仅使用管道与 dot 通信。
- format{‘png’, ‘pdf’, ‘dot’, ‘svg’, ‘jpeg’, ‘jpg’},可选
输出文件的写入格式。默认为 ‘png’。
- optimize_graph布尔值,可选
如果为 True,则在渲染前优化图。否则,图将按原样显示。默认为 False。
- color: {None, ‘order’},可选
节点着色选项。提供
cmap=
关键字以获取额外的颜色映射。- **kwargs
转发给
to_graphviz
的额外关键字参数。
- 返回
- resultIPython.display.Image, IPython.display.SVG, 或 None
有关更多信息,请参阅 dask.dot.dot_graph。
另请参阅
dask.visualize
dask.dot.dot_graph
注意
有关优化的更多信息,请参见此处
https://docs.dask.org.cn/en/latest/optimize.html
示例
>>> x.visualize(filename='dask.pdf') >>> x.visualize(filename='dask.pdf', color='order')
HLG 集合协议¶
注意
HighLevelGraph 正在被表达式取代并弃用。鼓励新项目不再实现自己的 HLG 层。
由 Dask 的 高级图 支持的集合必须实现此协议定义的额外方法
- class dask.typing.HLGDaskCollection(*args, **kwargs)[source]¶
定义使用 HighLevelGraph 的 Dask 集合的协议。
此协议与
DaskCollection
几乎相同,额外增加了__dask_layers__
方法(高级图支持的集合必需)。- abstract __dask_layers__() collections.abc.Sequence[str] [source]¶
HighLevelGraph 层的名称。
调度器 get
协议¶
SchedulerGetProtocol
定义了 Dask 集合的 __dask_scheduler__
定义必须遵循的签名。
- class dask.typing.SchedulerGetCallable(*args, **kwargs)[source]¶
定义
__dask_scheduler__
可调用对象签名的协议。- __call__(dsk: collections.abc.Mapping[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any], keys: Union[collections.abc.Sequence[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], **kwargs: Any) Any [source]¶
作为集合的默认调度器调用的方法。
- 参数
- dsk
任务图。
- keys
对应于所需数据的键。
- **kwargs
额外参数。
- 返回
- Any
与 keys 关联的结果。
持久化后可调用协议¶
集合必须定义一个 __dask_postpersist__
方法,该方法返回一个遵循 PostPersistCallable
接口的可调用对象。
- class dask.typing.PostPersistCallable(*args, **kwargs)[source]¶
定义
__dask_postpersist__
可调用对象签名的协议。- __call__(dsk: collections.abc.Mapping[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any], *args: Any, rename: collections.abc.Mapping[str, str] | None = None) dask.typing.CollType_co [source]¶
调用此方法以重建持久化集合。
- 参数
- dsk: Mapping
一个映射,至少包含由 __dask_keys__() 返回的输出键。
- *argsAny
额外可选参数。如果不需要额外参数,则必须是空元组。
- renameMapping[str, str],可选
如果定义,它表明输出键也可能发生变化;例如,如果
__dask_keys__()
之前输出是[("a", 0), ("a", 1)]
,那么调用rebuild(dsk, *extra_args, rename={"a": "b"})
后,它必须变为[("b", 0), ("b", 1)]
。rename
映射可能不包含集合名称;在这种情况下,关联的键不会改变。它可能包含意外名称的替换,这些替换必须被忽略。
- 返回
- Collection
一个等效的 Dask 集合,具有相同的键,但通过不同的图计算得出。
Dask 核心方法的内部原理¶
Dask 有一些实现常见操作的核心函数(及对应方法):
compute
:将一个或多个 Dask 集合转换为其内存中的对应形式persist
:将一个或多个 Dask 集合转换为等效的 Dask 集合,其结果已计算并在内存中缓存optimize
:将一个或多个 Dask 集合转换为等效的 Dask 集合,共享一个大型优化图visualize
:给定一个或多个 Dask 集合,绘制出在调用compute
或persist
期间将传递给调度器的图
在此,我们简要描述这些函数的内部原理,以说明它们与上述接口的关系。
计算¶
compute
的操作可以分为三个阶段:
图合并,终结
首先,将各个集合转换为单个大型表达式和嵌套的键列表。这通过
collections_to_expr()
完成,确保所有集合一起优化以消除公共子表达式。注意
在此阶段,传统 HLG 图被封装到
HLGExpr
中,该表达式将 __dask_postcompute__ 和由 __dask_optimize__ 决定的低级别优化器编码到表达式中。optimize_graph
参数仅与 HLG 图相关,控制是否考虑低级别优化。如果
optimize_graph
为True
(默认),则首先按集合的__dask_optimize__
方法对集合进行分组。所有具有相同__dask_optimize__
方法的集合会将其图合并并将键连接,然后对每个相应的__dask_optimize__
进行一次调用,并传入合并后的图和键。然后将结果图合并。如果
optimize_graph
为False
,则所有图都被合并,所有键都被连接。
合并后的图通过
FinalizeCompute
表达式进行_终结_,该表达式指示表达式/图简化为单个分区,适合在计算后返回给用户。这可以通过实现集合的__dask_postcompute__
方法或实现表达式的优化路径来完成。以 DataFrame 为例,
FinalizeCompute
简化为RepartitionToFewer(..., npartition=1)
,它只是将所有结果连接到一个普通的 DataFrame 中。(表达式) 优化
合并后的表达式被优化。此步骤不应与传统图的 __dask_optimize__ 定义的低级别优化混淆。这是一个总是执行的步骤,也是将表达式简化并降低到最终形式(可用于实际生成可执行任务图)的必需步骤。另请参阅,优化器。
对于传统的 HLG 图,低级别优化步骤嵌入在图具体化过程中,该过程通常仅在图传递给调度器后发生(见下文)。
计算
图合并并执行任何优化后,生成的大型图和嵌套键列表将传递给调度器。调度器的选择如下:
如果直接通过关键字指定了
get
函数,则使用该函数否则,如果设置了全局调度器,则使用该调度器
否则回退到给定集合的默认调度器。注意,如果所有集合没有共享相同的
__dask_scheduler__
,则会引发错误。
一旦确定了合适的调度器
get
函数,就会使用合并后的图、键和额外关键字参数调用它。在此阶段之后,results
是一个嵌套的值列表。此列表的结构与keys
镜像,其中每个键被其相应的结果替换。
持久化¶
Persist 与 compute
非常相似,除了返回值的创建方式不同。它也有三个阶段:
图合并,*无*终结
与
compute
相同,但没有终结。在 persist 的情况下,我们不希望连接所有输出分区,而是希望为每个分区返回一个 future。(表达式) 优化
与
compute
相同。计算
与
compute
相同,区别在于返回的结果是 Future 对象的列表。持久化后处理
调度器返回的 futures 与
__dask_postpersist__
一起使用,以重建指向远程数据的集合。__dask_postpersist__
返回两项内容:一个
rebuild
函数,它接受一个持久化图。此图的键与相应集合的__dask_keys__
相同,值是计算结果(对于单机调度器)或 futures(对于分布式调度器)。一个元组,包含在图之后传递给
rebuild
的额外参数
为了构建
persist
的输出,会遍历集合和结果列表,并为每个集合调用其对应的重建器,传入其结果图。
优化¶
optimize
的操作可以分为两个阶段:
图合并,*无*终结
与
persist
相同。(表达式) 优化
与
compute
和persist
相同。具体化和重建
整个图被具体化(这也执行低级别优化)。类似于
persist
,__dask_postpersist__
中的rebuild
函数和参数用于从优化后的图重建等效集合。
将 Dask 核心方法添加到你的类¶
定义上述接口将使你的对象能够被 Dask 核心函数(dask.compute
、dask.persist
、dask.visualize
等)使用。要添加对应的方法版本,你可以继承 dask.base.DaskMethodsMixin
类,它基于上述接口添加了 compute
、persist
和 visualize
的实现。
定义计算的表达式¶
建议使用 dask.expr.Expr
类来定义 Dask 图。入门时,需要实现最少一组方法。
- class dask.Expr(*args, _determ_token=None, **kwargs)[source]¶
-
- _layer() dict [source]¶
此表达式添加的图层。
每分区应用一个任务的简单表达式可以选择只实现 Expr._task。
- 返回
- layer: dict
此表达式添加的 Dask 任务图
示例
>>> class Add(Expr): ... def _layer(self): ... return { ... name: Task( ... name, ... operator.add, ... TaskRef((self.left._name, i)), ... TaskRef((self.right._name, i)) ... ) ... for i, name in enumerate(self.__dask_keys__()) ... }
- _task(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], index: int) dask._task_spec.Task [source]¶
第 i 个分区的任务
- 参数
- index
此 DataFrame 分区的索引
- 返回
- task
计算此分区的 Dask 任务
另请参阅
示例
>>> class Add(Expr): ... def _task(self, i): ... return Task( ... self.__dask_keys__()[i], ... operator.add, ... TaskRef((self.left._name, i)), ... TaskRef((self.right._name, i)) ... )
Dask 集合示例¶
这里我们创建一个代表元组的 Dask 集合。元组中的每个元素在图中表示为一个任务。注意,这仅用于说明目的 - 使用包含 dask.delayed
元素的普通元组也可以实现相同的用户体验。
import dask
from dask.base import DaskMethodsMixin, replace_name_in_key
from dask.expr import Expr, LLGExpr
from dask.typing import Key
from dask.task_spec import Task, DataNode, Alias
# We subclass from DaskMethodsMixin to add common dask methods to
# our class (compute, persist, and visualize). This is nice but not
# necessary for creating a Dask collection (you can define them
# yourself).
class Tuple(DaskMethodsMixin):
def __init__(self, expr):
self._expr = expr
def __dask_graph__(self):
return self._expr.__dask_graph__()
def __dask_keys__(self):
return self._expr.__dask_keys__()
# Use the threaded scheduler by default.
__dask_scheduler__ = staticmethod(dask.threaded.get)
def __dask_postcompute__(self):
# We want to return the results as a tuple, so our finalize
# function is `tuple`. There are no extra arguments, so we also
# return an empty tuple.
return tuple, ()
def __dask_postpersist__(self):
return Tuple._rebuild, ("mysuffix",)
@staticmethod
def _rebuild(futures: dict, name: str):
expr = LLGExpr({
(name, i): DataNode((name, i), val)
for i, val in enumerate(futures.values())
})
return Tuple(expr)
def __dask_tokenize__(self):
# For tokenize to work we want to return a value that fully
# represents this object. In this case this is done by a type
identifier plus the (also tokenized) name of the expression
return (type(self), self._expr._name)
class RemoteTuple(Expr):
@property
def npartitions(self):
return len(self.operands)
def __dask_keys__(self):
return [(self._name, i) for i in range(self.npartitions)]
def _task(self, name: Key, index: int) -> Task:
return DataNode(name, self.operands[index])
演示此类
>>> from dask_tuple import Tuple
def from_pytuple(pytup: tuple) -> Tuple:
return Tuple(RemoteTuple(*pytup))
>>> dask_tup = from_pytuple(tuple(range(5)))
>>> dask_tup.__dask_keys__()
[('remotetuple-b7ea9a26c3ab8287c78d11fd45f26793', 0),
('remotetuple-b7ea9a26c3ab8287c78d11fd45f26793', 1),
('remotetuple-b7ea9a26c3ab8287c78d11fd45f26793', 2)]
# Compute turns Tuple into a tuple
>>> dask_tup.compute()
(0, 1, 2)
# Persist turns Tuple into a Tuple, with each task already computed
>>> dask_tup2 = dask_tup.persist()
>>> isinstance(dask_tup2, Tuple)
True
>>> dask_tup2.__dask_graph__()
{('newname', 0): DataNode(0),
('newname', 1): DataNode(1),
('newname', 2): DataNode(2)}
>>> x2.compute()
(0, 1, 2)
# Run-time typechecking
>>> from dask.typing import DaskCollection
>>> isinstance(x, DaskCollection)
True
检查对象是否为 Dask 集合¶
要检查对象是否为 Dask 集合,请使用 dask.base.is_dask_collection
。
>>> from dask.base import is_dask_collection
>>> from dask import delayed
>>> x = delayed(sum)([1, 2, 3])
>>> is_dask_collection(x)
True
>>> is_dask_collection(1)
False
实现确定性哈希¶
Dask 实现了自己的确定性哈希函数,根据参数值生成键。此函数可通过 dask.base.tokenize
获得。许多常见类型已经实现了 tokenize
,可以在 dask/base.py
中找到。
创建自己的自定义类时,你可能需要注册一个 tokenize
实现。有两种方法可以做到:
__dask_tokenize__
方法在可能的情况下,建议定义
__dask_tokenize__
方法。此方法不接受参数,应返回一个完整代表对象的值。在返回任何非平凡对象之前,最好从该方法中调用dask.base.normalize_token
。使用
dask.base.normalize_token
注册一个函数如果无法在类上定义方法,或者你需要为已注册父类的类自定义 tokenize 函数(例如,如果你需要子类化内置类型),则可以使用
normalize_token
分发机制注册一个 tokenize 函数。该函数应具有与上述相同的签名。
在两种情况下,实现应该相同,只是定义的位置不同。
注意
Dask 集合和普通 Python 对象都可以使用上述任一方法实现 tokenize
。
示例¶
>>> from dask.base import tokenize, normalize_token
# Define a tokenize implementation using a method.
>>> class Point:
... def __init__(self, x, y):
... self.x = x
... self.y = y
...
... def __dask_tokenize__(self):
... # This tuple fully represents self
... # Wrap non-trivial objects with normalize_token before returning them
... return normalize_token(Point), self.x, self.y
>>> x = Point(1, 2)
>>> tokenize(x)
'5988362b6e07087db2bc8e7c1c8cc560'
>>> tokenize(x) == tokenize(x) # token is idempotent
True
>>> tokenize(Point(1, 2)) == tokenize(Point(1, 2)) # token is deterministic
True
>>> tokenize(Point(1, 2)) == tokenize(Point(2, 1)) # tokens are unique
False
# Register an implementation with normalize_token
>>> class Point3D:
... def __init__(self, x, y, z):
... self.x = x
... self.y = y
... self.z = z
>>> @normalize_token.register(Point3D)
... def normalize_point3d(x):
... return normalize_token(Point3D), x.x, x.y, x.z
>>> y = Point3D(1, 2, 3)
>>> tokenize(y)
'5a7e9c3645aa44cf13d021c14452152e'
更多示例请参阅 dask/base.py
或任何内置的 Dask 集合。