自定义集合
目录
自定义集合¶
对于许多问题,内置的 Dask 集合(dask.array
、dask.dataframe
、dask.bag
和 dask.delayed
)就已足够。在它们不足够的情况下,可以创建自己的 Dask 集合。本文档描述了满足 Dask 集合接口所需的必要方法。
注意
这被认为是一项高级特性。对于大多数情况,内置集合可能已足够。
在阅读本文之前,您应该阅读并理解
目录
Dask 集合接口¶
要创建自己的 Dask 集合,您需要实现由 dask.typing.DaskCollection
协议定义的接口。请注意,没有必需的基类。
建议也阅读 核心 Dask 方法的内部结构 以了解 Dask 内部如何使用此接口。
集合协议¶
- class dask.typing.DaskCollection(*args, **kwargs)[source]¶
定义 Dask 集合接口的协议。
- abstract __dask_graph__() collections.abc.Mapping[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any] [source]¶
Dask 任务图。
核心 Dask 集合(Array、DataFrame、Bag 和 Delayed)使用
HighLevelGraph
来表示集合任务图。也可以使用 Python 字典将任务图表示为低层图。- 返回
- Mapping
Dask 任务图。如果实例返回
dask.highlevelgraph.HighLevelGraph
,则必须实现__dask_layers__()
方法,其定义遵循HLGDaskCollection
协议。
- abstract __dask_keys__() list[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...], ForwardRef('NestedKeys')]] [source]¶
任务图的输出键。
请注意,Dask 集合的键除了 任务图规范文档 中描述的约束外,还有额外的约束。这些额外约束描述如下。
所有键必须是非空字符串或元组,其中第一个元素是非空字符串,后面跟着零个或多个任意的 str、bytes、int、float 或它们的元组。非空字符串通常称为*集合名称*。dask 包中包含的所有集合都只有一个名称,但这不是强制要求。
这些都是有效的输出
[]
["x", "y"]
[[("y", "a", 0), ("y", "a", 1)], [("y", "b", 0), ("y", "b", 1)]
- 返回
- list
一个可能是嵌套的键列表,表示图的输出。计算后,结果将以相同的布局返回,键被其相应的输出替换。
- __dask_optimize__: Any¶
给定一个图和键,返回一个新的优化后的图。
此方法可以是
staticmethod
或classmethod
,但不能是instancemethod
。有关示例实现,请参阅核心 Dask 集合中__dask_optimize__
的定义:dask.array.Array
、dask.dataframe.DataFrame
等。请注意,在调用
__dask_optimize__
之前,图和键会合并;因此,传递给此方法的图和键可能代表多个共享相同优化方法的集合。- 参数
- dskGraph
共享相同
__dask_optimize__
方法的所有集合的合并图。- keysSequence[Key]
共享相同
__dask_optimize__
方法的所有集合的__dask_keys__
输出列表。- **kwargsAny
从
compute
或persist
调用转发的额外关键字参数。可根据需要使用或忽略。
- 返回
- MutableMapping
优化后的 Dask 图。
- abstract __dask_postcompute__() tuple[collections.abc.Callable, tuple] [source]¶
用于构建最终结果的终结器函数和可选参数。
计算后,集合中的每个键都将有一个内存中的结果,postcompute 函数将每个键的结果组合成最终的内存中表示。例如,dask.array.Array 将每个分块处的数组连接成最终的内存数组。
- 返回
- PostComputeCallable
可调用对象,接收每个最终键的结果序列以及可选参数。一个示例签名为
finalize(results: Sequence[Any], *args)
。- tuple[Any, …]
在键结果之后传递给函数的可选参数(
PostComputeCallable
的 *args 部分)。如果不传递额外参数,则必须是一个空元组。
- abstract __dask_postpersist__() tuple[dask.typing.PostPersistCallable, tuple] [source]¶
用于构建持久化集合的重构函数和可选参数。
另请参阅
dask.typing.PostPersistCallable
的文档。- 返回
- PostPersistCallable
重构集合的可调用对象。签名应为
rebuild(dsk: Mapping, *args: Any, rename: Mapping[str, str] | None)
(由PostPersistCallable
协议定义)。该可调用对象应返回一个等效的 Dask 集合,其键与 self 相同,但结果是通过不同的图计算得出的。对于dask.persist()
的情况,新图将只有输出键和已经计算好的值。- tuple[Any, …]
传递给重构可调用对象的可选参数。如果不传递额外参数,则必须是一个空元组。
- __dask_scheduler__: staticmethod¶
用于此对象的默认调度器
get
。通常作为 staticmethod 附加到类上,例如
>>> import dask.threaded >>> class MyCollection: ... # Use the threaded scheduler by default ... __dask_scheduler__ = staticmethod(dask.threaded.get)
- abstract __dask_tokenize__() collections.abc.Hashable [source]¶
必须完全代表对象的值。
- abstract compute(**kwargs: Any) Any [source]¶
计算此 dask 集合。
这将一个延迟执行的 Dask 集合转换为其内存中的等效形式。例如,Dask 数组转换为 NumPy 数组,Dask DataFrame 转换为 Pandas DataFrame。在调用此操作之前,整个数据集必须能够放入内存。
- 参数
- schedulerstring, optional
要使用的调度器,如 "threads"、"synchronous" 或 "processes"。如果未提供,默认情况下首先检查全局设置,然后回退到集合的默认设置。
- optimize_graphbool, optional
如果为 True [默认],图在计算前进行优化。否则,图按原样运行。这对于调试很有用。
- kwargs
转发给调度器函数的额外关键字参数。
- 返回
- 集合的计算结果。
另请参阅
- abstract persist(**kwargs: Any) dask.typing.CollType [source]¶
将此 dask 集合持久化到内存中
这将一个延迟执行的 Dask 集合转换为一个元数据相同但结果已完全计算或正在后台积极计算的 Dask 集合。
此函数的操作根据活动的任务调度器有很大差异。如果任务调度器支持异步计算,例如 dask.distributed 调度器,则 persist 将*立即*返回,并且返回值任务图将包含 Dask Future 对象。然而,如果任务调度器仅支持阻塞计算,则对 persist 的调用将*阻塞*,并且返回值任务图将包含具体的 Python 结果。
此函数在使用分布式系统时特别有用,因为结果将保留在分布式内存中,而不是像 compute 那样返回到本地进程。
- 参数
- schedulerstring, optional
要使用的调度器,如 "threads"、"synchronous" 或 "processes"。如果未提供,默认情况下首先检查全局设置,然后回退到集合的默认设置。
- optimize_graphbool, optional
如果为 True [默认],图在计算前进行优化。否则,图按原样运行。这对于调试很有用。
- **kwargs
转发给调度器函数的额外关键字参数。
- 返回
- 由内存中数据支持的新 dask 集合
另请参阅
- abstract visualize(filename: str = 'mydask', format: str | None = None, optimize_graph: bool = False, **kwargs: Any) DisplayObject | None [source]¶
使用 graphviz 渲染此对象的任务图计算过程。
需要安装
graphviz
。- 参数
- filenamestr 或 None, optional
写入磁盘的文件名。如果提供的 filename 不包含扩展名,默认将使用 '.png'。如果 filename 为 None,则不写入文件,仅使用管道与 dot 通信。
- format{‘png’, ‘pdf’, ‘dot’, ‘svg’, ‘jpeg’, ‘jpg’}, optional
输出文件的写入格式。默认为 'png'。
- optimize_graphbool, optional
如果为 True,图在渲染前进行优化。否则,图按原样显示。默认为 False。
- color: {None, ‘order’}, optional
节点着色选项。提供
cmap=
关键字参数可指定额外的色图。- **kwargs
转发给
to_graphviz
的额外关键字参数。
- 返回
- resultIPython.display.Image, IPython.display.SVG, or None
更多信息请参见 dask.dot.dot_graph。
另请参阅
dask.visualize
dask.dot.dot_graph
注意
有关优化的更多信息请参见此处
https://docs.dask.org.cn/en/latest/optimize.html
示例
>>> x.visualize(filename='dask.pdf') >>> x.visualize(filename='dask.pdf', color='order')
HLG 集合协议¶
注意
HighLevelGraphs 正在被弃用,取而代之的是表达式。新的项目鼓励不要实现自己的 HLG 层。
由 Dask 的 高层图 支持的集合必须实现一个额外的方法,由本协议定义
- class dask.typing.HLGDaskCollection(*args, **kwargs)[source]¶
定义使用 HighLevelGraphs 的 Dask 集合的协议。
此协议几乎与
DaskCollection
完全相同,仅添加了__dask_layers__
方法(由高层图支持的集合所必需)。- abstract __dask_layers__() collections.abc.Sequence[str] [source]¶
HighLevelGraph 层的名称。
调度器 get
协议¶
SchedulerGetProtocol
定义了 Dask 集合的 __dask_scheduler__
定义必须遵循的签名。
- class dask.typing.SchedulerGetCallable(*args, **kwargs)[source]¶
定义
__dask_scheduler__
可调用对象签名的协议。- __call__(dsk: collections.abc.Mapping[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any], keys: Union[collections.abc.Sequence[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], **kwargs: Any) Any [source]¶
作为集合默认调度器调用的方法。
- 参数
- dsk
任务图。
- keys
对应所需数据的键。
- **kwargs
额外参数。
- 返回
- Any
与 keys 关联的结果。
Post-persist 可调用协议¶
集合必须定义一个 __dask_postpersist__
方法,该方法返回一个符合 PostPersistCallable
接口的可调用对象。
- class dask.typing.PostPersistCallable(*args, **kwargs)[source]¶
定义
__dask_postpersist__
可调用对象签名的协议。- __call__(dsk: collections.abc.Mapping[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any], *args: Any, rename: collections.abc.Mapping[str, str] | None = None) dask.typing.CollType_co [source]¶
用于重构持久化集合的方法。
- 参数
- dsk: Mapping
一个包含至少由 __dask_keys__() 返回的输出键的映射。
- *argsAny
额外的可选参数。如果不需要额外参数,则必须是一个空元组。
- renameMapping[str, str], optional
如果定义,则表明输出键也可能正在更改;例如,如果
__dask_keys__()
的先前输出是[("a", 0), ("a", 1)]
,则在调用rebuild(dsk, *extra_args, rename={"a": "b"})
后必须变为[("b", 0), ("b", 1)]
。rename
映射可能不包含集合名称;在这种情况下,关联的键不会改变。它可能包含对意外名称的替换,这些替换必须被忽略。
- 返回
- Collection
一个等效的 Dask 集合,其键与通过不同图计算出的键相同。
核心 Dask 方法的内部结构¶
Dask 有几个*核心*函数(以及相应的方法),它们实现了常见的操作
compute
:将一个或多个 Dask 集合转换为其内存中的对应形式persist
:将一个或多个 Dask 集合转换为等效的 Dask 集合,其结果已经计算并在内存中缓存optimize
:将一个或多个 Dask 集合转换为共享一个大型优化图的等效 Dask 集合visualize
:给定一个或多个 Dask 集合,绘制出在调用compute
或persist
期间将传递给调度器的图
这里我们简要描述这些函数的内部结构,以说明它们与上述接口的关系。
Compute (计算)¶
compute
的操作可以分解为三个阶段
图合并,终结
首先,单个集合被转换为单个大型表达式和嵌套的键列表。这是通过
collections_to_expr()
完成的,它确保所有集合一起优化以消除常见的子表达式。注意
在此阶段,遗留的 HLG 图被包装到
HLGExpr
中,该表达式根据 __dask_optimize__ 将 __dask_postcompute__ 和低级优化器编码到表达式中。optimize_graph
参数仅与 HLG 图相关,并控制是否考虑低级优化。如果
optimize_graph
为True
(默认),则首先按其__dask_optimize__
方法对集合进行分组。所有具有相同__dask_optimize__
方法的集合都会合并其图并连接其键,然后对各自的__dask_optimize__
进行单次调用,并传递合并后的图和键。然后合并生成的结果图。如果
optimize_graph
为False
,则合并所有图并连接所有键。
组合后的图使用
FinalizeCompute
表达式进行_终结_,该表达式指示表达式/图缩减为单个分区,适合在计算后返回给用户。这可以通过实现集合的__dask_postcompute__
方法或实现表达式的优化路径来完成。以 DataFrame 为例,
FinalizeCompute
简化为RepartitionToFewer(..., npartition=1)
,它只是将所有结果连接成一个普通 DataFrame。(表达式) 优化
合并后的表达式被优化。此步骤不应与为遗留图定义的 __dask_optimize__ 低级优化相混淆。这是一个总是执行的步骤,是将表达式简化和降低到最终形式的必需步骤,该形式可用于实际生成可执行的任务图。另请参阅 Optimizer (优化器)。
对于遗留的 HLG 图,低级优化步骤嵌入在图物化中,这通常仅在图传递给调度器后发生(见下文)。
计算
图合并并执行任何优化后,生成的大型图和嵌套的键列表将传递给调度器。调度器的选择如下:
如果直接以关键字指定了
get
函数,则使用该函数否则,如果设置了全局调度器,则使用该调度器
否则回退到给定集合的默认调度器。请注意,如果所有集合不共享相同的
__dask_scheduler__
,则会引发错误。
确定适当的调度器
get
函数后,将使用合并后的图、键和额外关键字参数调用它。在此阶段之后,results
是一个嵌套的值列表。此列表的结构与keys
的结构相同,每个键都替换为其相应的结果。
Persist (持久化)¶
Persist 与 compute
非常相似,除了返回值的创建方式。它也包含三个阶段
图合并,*不*终结
与
compute
中相同,但不进行终结。在 persist 的情况下,我们不希望连接所有输出分区,而是希望为每个分区返回一个 Future。(表达式) 优化
与
compute
中相同。计算
与
compute
中相同,不同之处在于返回的结果是 Futures 列表。Postpersist (持久化后处理)
调度器返回的 Futures 与
__dask_postpersist__
一起用于重构指向远程数据的集合。__dask_postpersist__
返回两项内容一个
rebuild
函数,该函数接受一个持久化的图作为输入。该图的键与相应集合的__dask_keys__
相同,值是计算结果(对于单机调度器)或 Futures(对于分布式调度器)。一个元组,包含在图之后传递给
rebuild
的额外参数
为了构建
persist
的输出,遍历集合和结果列表,并对每个集合的重构器在其相应结果的图上进行调用。
Optimize (优化)¶
optimize
的操作可以分解为两个阶段
图合并,*不*终结
与
persist
中相同。(表达式) 优化
与
compute
和persist
中相同。物化和重构
整个图被物化(这也执行了低级优化)。类似于
persist
,使用__dask_postpersist__
返回的rebuild
函数和参数从优化后的图中重构等效的集合。
Visualize (可视化)¶
Visualize 是四个核心函数中最简单的一个。它只有两个阶段
图合并和优化
与
compute
中相同。图绘制
使用
graphviz
绘制生成的合并图,并输出到指定文件。
向您的类添加核心 Dask 方法¶
定义上述接口将允许您的对象被核心 Dask 函数(dask.compute
、dask.persist
、dask.visualize
等)使用。要添加相应的方法版本,您可以继承 dask.base.DaskMethodsMixin
,它根据上述接口添加了 compute
、persist
和 visualize
的实现。
使用表达式定义计算¶
建议使用 dask.expr.Expr
类定义 dask 图。要开始,必须实现最少的方法集。
- class dask.Expr(*args, _determ_token=None, **kwargs)[source]¶
-
- _layer() dict [source]¶
此表达式添加的图层。
对每个分区应用一个任务的简单表达式可以选择仅实现 Expr._task。
- 返回
- layer: dict
此表达式添加的 Dask 任务图
示例
>>> class Add(Expr): ... def _layer(self): ... return { ... name: Task( ... name, ... operator.add, ... TaskRef((self.left._name, i)), ... TaskRef((self.right._name, i)) ... ) ... for i, name in enumerate(self.__dask_keys__()) ... }
- _task(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], index: int) dask._task_spec.Task [source]¶
第 i 个分区的任务
- 参数
- index
此 DataFrame 分区的索引
- 返回
- task
用于计算此分区的 Dask 任务
另请参阅
示例
>>> class Add(Expr): ... def _task(self, i): ... return Task( ... self.__dask_keys__()[i], ... operator.add, ... TaskRef((self.left._name, i)), ... TaskRef((self.right._name, i)) ... )
Dask 集合示例¶
这里我们创建一个表示元组的 Dask 集合。元组中的每个元素在图中都表示为一个任务。请注意,这仅用于说明目的 - 同样的用户体验可以使用带有 dask.delayed
元素的普通元组实现
import dask
from dask.base import DaskMethodsMixin, replace_name_in_key
from dask.expr import Expr, LLGExpr
from dask.typing import Key
from dask.task_spec import Task, DataNode, Alias
# We subclass from DaskMethodsMixin to add common dask methods to
# our class (compute, persist, and visualize). This is nice but not
# necessary for creating a Dask collection (you can define them
# yourself).
class Tuple(DaskMethodsMixin):
def __init__(self, expr):
self._expr = expr
def __dask_graph__(self):
return self._expr.__dask_graph__()
def __dask_keys__(self):
return self._expr.__dask_keys__()
# Use the threaded scheduler by default.
__dask_scheduler__ = staticmethod(dask.threaded.get)
def __dask_postcompute__(self):
# We want to return the results as a tuple, so our finalize
# function is `tuple`. There are no extra arguments, so we also
# return an empty tuple.
return tuple, ()
def __dask_postpersist__(self):
return Tuple._rebuild, ("mysuffix",)
@staticmethod
def _rebuild(futures: dict, name: str):
expr = LLGExpr({
(name, i): DataNode((name, i), val)
for i, val in enumerate(futures.values())
})
return Tuple(expr)
def __dask_tokenize__(self):
# For tokenize to work we want to return a value that fully
# represents this object. In this case this is done by a type
identifier plus the (also tokenized) name of the expression
return (type(self), self._expr._name)
class RemoteTuple(Expr):
@property
def npartitions(self):
return len(self.operands)
def __dask_keys__(self):
return [(self._name, i) for i in range(self.npartitions)]
def _task(self, name: Key, index: int) -> Task:
return DataNode(name, self.operands[index])
演示这个类
>>> from dask_tuple import Tuple
def from_pytuple(pytup: tuple) -> Tuple:
return Tuple(RemoteTuple(*pytup))
>>> dask_tup = from_pytuple(tuple(range(5)))
>>> dask_tup.__dask_keys__()
[('remotetuple-b7ea9a26c3ab8287c78d11fd45f26793', 0),
('remotetuple-b7ea9a26c3ab8287c78d11fd45f26793', 1),
('remotetuple-b7ea9a26c3ab8287c78d11fd45f26793', 2)]
# Compute turns Tuple into a tuple
>>> dask_tup.compute()
(0, 1, 2)
# Persist turns Tuple into a Tuple, with each task already computed
>>> dask_tup2 = dask_tup.persist()
>>> isinstance(dask_tup2, Tuple)
True
>>> dask_tup2.__dask_graph__()
{('newname', 0): DataNode(0),
('newname', 1): DataNode(1),
('newname', 2): DataNode(2)}
>>> x2.compute()
(0, 1, 2)
# Run-time typechecking
>>> from dask.typing import DaskCollection
>>> isinstance(x, DaskCollection)
True
检查对象是否为 Dask 集合¶
要检查对象是否为 Dask 集合,请使用 dask.base.is_dask_collection
>>> from dask.base import is_dask_collection
>>> from dask import delayed
>>> x = delayed(sum)([1, 2, 3])
>>> is_dask_collection(x)
True
>>> is_dask_collection(1)
False
实现确定性哈希¶
Dask 实现了自己的确定性哈希函数,根据参数的值生成键。此函数可作为 dask.base.tokenize
使用。许多常见类型已经实现了 tokenize
,可以在 dask/base.py
中找到。
创建自己的自定义类时,可能需要注册 tokenize
实现。有两种方法可以做到这一点
__dask_tokenize__
方法在可能的情况下,建议定义
__dask_tokenize__
方法。此方法不接受参数,应返回一个完全代表对象的值。在返回任何非平凡对象之前,最好从其中调用dask.base.normalize_token
。使用
dask.base.normalize_token
注册函数如果在类上定义方法不可行,或者您需要为已注册了父类的类自定义 tokenize 函数(例如,如果需要继承内置类),则可以使用
normalize_token
分派注册 tokenize 函数。该函数的签名应与上述描述相同。
在两种情况下,实现应该相同,只是定义的位置不同。
注意
Dask 集合和普通 Python 对象都可以使用上述任一方法实现 tokenize
。
示例¶
>>> from dask.base import tokenize, normalize_token
# Define a tokenize implementation using a method.
>>> class Point:
... def __init__(self, x, y):
... self.x = x
... self.y = y
...
... def __dask_tokenize__(self):
... # This tuple fully represents self
... # Wrap non-trivial objects with normalize_token before returning them
... return normalize_token(Point), self.x, self.y
>>> x = Point(1, 2)
>>> tokenize(x)
'5988362b6e07087db2bc8e7c1c8cc560'
>>> tokenize(x) == tokenize(x) # token is idempotent
True
>>> tokenize(Point(1, 2)) == tokenize(Point(1, 2)) # token is deterministic
True
>>> tokenize(Point(1, 2)) == tokenize(Point(2, 1)) # tokens are unique
False
# Register an implementation with normalize_token
>>> class Point3D:
... def __init__(self, x, y, z):
... self.x = x
... self.y = y
... self.z = z
>>> @normalize_token.register(Point3D)
... def normalize_point3d(x):
... return normalize_token(Point3D), x.x, x.y, x.z
>>> y = Point3D(1, 2, 3)
>>> tokenize(y)
'5a7e9c3645aa44cf13d021c14452152e'
更多示例请参见 dask/base.py
或任何内置 Dask 集合。