API 参考
目录
API 参考¶
Dask API 通常遵循上游 API
数组 (Arrays) 遵循 NumPy
DataFrames 遵循 Pandas
Bag 遵循 Spark 和 Python 迭代器中常见的 map/filter/groupby/reduce
延迟计算 (Delayed) 包装通用 Python 代码
Future 遵循标准库中用于实时计算的 concurrent.futures。
此外,Dask 拥有自己的函数来启动计算、将数据持久化到内存、检查进度等等,这些函数补充了上述 API。下面将介绍这些更通用的 Dask 函数
|
同时计算多个 Dask 集合。 |
如果 |
|
|
同时优化多个 Dask 集合。 |
|
将多个 Dask 集合持久化到内存中 |
|
同时可视化多个 Dask 图。 |
这些函数适用于任何调度器。当使用较新的调度器并启动 dask.distributed.Client
时(尽管名字叫 Client,但它在单机上运行良好),可以使用更高级的操作。该 API 提供了异步提交、取消和跟踪工作的能力,并包含许多用于复杂任务间工作流的函数。这些对于正常操作来说不是必需的,但对于实时或高级操作可能很有用。
这个更高级的 API 可以在Dask 分布式文档中找到
- dask.annotate(**annotations: Any) collections.abc.Iterator[None] [source]¶
用于设置 HighLevelGraph 层注释的上下文管理器。
注释是与任务相关的元数据或软约束,Dask 调度器可以选择遵守它们:它们表示意图,但不强制执行硬约束。因此,它们主要设计用于分布式调度器。
几乎任何对象都可以作为注释,但小型 Python 对象更受欢迎,而像 NumPy 数组这样的大型对象则不建议使用。
作为注释提供的可调用对象应该接受一个 key 参数并生成适当的注释。被注释集合中的单个任务键会传递给可调用对象。
- 参数
- **annotations键值对
另请参阅
示例
数组 A 中的所有任务应具有优先级 100 并在失败时重试 3 次。
>>> import dask >>> import dask.array as da >>> with dask.annotate(priority=100, retries=3): ... A = da.ones((10000, 10000))
基于展平的块 ID 优先处理数组 A 中的任务。
>>> nblocks = (10, 10) >>> with dask.annotate(priority=lambda k: k[1]*nblocks[1] + k[2]): ... A = da.ones((1000, 1000), chunks=(100, 100))
注释可以嵌套。
>>> with dask.annotate(priority=1): ... with dask.annotate(retries=3): ... A = da.ones((1000, 1000)) ... B = A + 1
- dask.compute(*args, traverse=True, optimize_graph=True, scheduler=None, get=None, **kwargs)[source]¶
同时计算多个 Dask 集合。
- 参数
- args 对象
任意数量的对象。如果是 Dask 对象,则对其进行计算并返回结果。默认情况下,还会遍历 Python 内置集合以查找 Dask 对象(更多信息请参阅
traverse
关键字)。非 Dask 参数会保持不变地通过。- traverse布尔值,可选
默认情况下,Dask 会遍历传递给
compute
的 Python 内置集合以查找 Dask 对象。对于大型集合,这可能会非常耗时。如果参数中不包含任何 Dask 对象,请设置traverse=False
以避免执行此遍历。- scheduler字符串,可选
要使用的调度器,例如“threads”、“synchronous”或“processes”。如果未提供,默认行为是先检查全局设置,然后回退到集合默认值。
- optimize_graph布尔值,可选
如果为 True [默认],则在计算之前应用每个集合的优化。否则,图会按原样运行。这对于调试可能很有用。
- get
None
应保持为
None
。get= 关键字已被移除。- kwargs
要转发给调度器函数的额外关键字参数。
示例
>>> import dask >>> import dask.array as da >>> a = da.arange(10, chunks=2).sum() >>> b = da.arange(10, chunks=2).mean() >>> dask.compute(a, b) (np.int64(45), np.float64(4.5))
默认情况下,Python 集合内的 Dask 对象也将被计算
>>> dask.compute({'a': a, 'b': b, 'c': 1}) ({'a': np.int64(45), 'b': np.float64(4.5), 'c': 1},)
- dask.is_dask_collection(x) bool [source]¶
如果
x
是 Dask 集合,则返回True
。- 参数
- xAny
要测试的对象。
- 返回
- result布尔值
如果 x 是 Dask 集合,则为
True
。
注意
DaskCollection typing.Protocol 实现将 Dask 集合定义为一个类,该类从
__dask_graph__
方法返回一个映射 (Mapping)。此辅助函数在协议实现之前就已存在。
- dask.optimize(*args, traverse=True, **kwargs)[source]¶
同时优化多个 Dask 集合。
返回等效的 Dask 集合,它们都共享合并和优化的底层图。这在将多个集合转换为延迟对象或在战略点手动应用优化时很有用。
请注意,在大多数情况下,您无需直接调用此函数。
警告
This function triggers a materialization of the collections and looses any annotations attached to HLG layers.
- 参数
- *args对象
任意数量的对象。如果是 Dask 对象,其图会在返回等效 Dask 集合之前与所有其他 Dask 对象的图一起进行优化和合并。非 Dask 参数会保持不变地通过。
- traverse布尔值,可选
默认情况下,Dask 会遍历传递给
optimize
的 Python 内置集合以查找 Dask 对象。对于大型集合,这可能会非常耗时。如果参数中不包含任何 Dask 对象,请设置traverse=False
以避免执行此遍历。- optimizations可调用对象列表,可选
要执行的附加优化过程。
- **kwargs
要转发给优化过程的额外关键字参数。
示例
>>> import dask >>> import dask.array as da >>> a = da.arange(10, chunks=2).sum() >>> b = da.arange(10, chunks=2).mean() >>> a2, b2 = dask.optimize(a, b)
>>> a2.compute() == a.compute() np.True_ >>> b2.compute() == b.compute() np.True_
- dask.persist(*args, traverse=True, optimize_graph=True, scheduler=None, **kwargs)[source]¶
将多个 Dask 集合持久化到内存中
这将惰性 Dask 集合转换为具有相同元数据的 Dask 集合,但其结果现在已完全计算或正在后台积极计算。
例如,由许多惰性调用构建的惰性 dask.array 现在将是一个具有相同形状、dtype、块等的 dask.array,但所有先前惰性任务要么已在内存中计算为许多小的
numpy.array
(单机情况),要么在集群上异步运行(分布式情况)。如果存在
dask.distributed.Client
并连接到分布式调度器,则此函数的行为会有所不同。在这种情况下,该函数会在任务图提交到集群后立即返回,但不会等到计算完成。计算会在后台异步继续。当使用单机调度器时,此函数会阻塞直到计算完成。在单机上使用 Dask 时,应确保数据集完全适合内存。
- 参数
- *args: Dask 集合
- scheduler字符串,可选
要使用的调度器,例如“threads”、“synchronous”或“processes”。如果未提供,默认行为是先检查全局设置,然后回退到集合默认值。
- traverse布尔值,可选
默认情况下,Dask 会遍历传递给
persist
的 Python 内置集合以查找 Dask 对象。对于大型集合,这可能会非常耗时。如果参数中不包含任何 Dask 对象,请设置traverse=False
以避免执行此遍历。- optimize_graph布尔值,可选
如果为 True [默认],则在计算之前优化图。否则,图会按原样运行。这对于调试可能很有用。
- **kwargs
要转发给调度器函数的额外关键字参数。
- 返回
- 由内存数据支持的新 Dask 集合
示例
>>> df = dd.read_csv('/path/to/*.csv') >>> df = df[df.name == 'Alice'] >>> df['in-debt'] = df.balance < 0 >>> df = df.persist() # triggers computation
>>> df.value().min() # future computations are now fast -10 >>> df.value().max() 100
>>> from dask import persist # use persist function on multiple collections >>> a, b = persist(a, b)
- dask.visualize(*args, filename='mydask', traverse=True, optimize_graph=False, maxval=None, engine: Optional[Literal['cytoscape', 'ipycytoscape','graphviz']] = None, **kwargs)[source]¶
同时可视化多个 Dask 图。
需要安装
graphviz
。所有非 Dask 图的选项都应作为关键字参数传递。- 参数
- args 对象
任意数量的对象。如果是 Dask 集合(例如,Dask DataFrame、Array、Bag 或 Delayed),其关联的图将包含在 visualize 的输出中。默认情况下,还会遍历 Python 内置集合以查找 Dask 对象(更多信息请参阅
traverse
关键字)。缺少关联图的参数将被忽略。- filename字符串或 None,可选
要写入磁盘的文件名。如果提供的 filename 不包含扩展名,默认将使用“.png”。如果 filename 为 None,则不写入文件,仅使用管道与 dot 进行通信。
- format{‘png’, ‘pdf’, ‘dot’, ‘svg’, ‘jpeg’, ‘jpg’} 之一,可选
写入输出文件的格式。默认是“png”。
- traverse布尔值,可选
默认情况下,Dask 会遍历传递给
visualize
的 Python 内置集合以查找 Dask 对象。对于大型集合,这可能会非常耗时。如果参数中不包含任何 Dask 对象,请设置traverse=False
以避免执行此遍历。- optimize_graph布尔值,可选
如果为 True,则在渲染前优化图。否则,图会按原样显示。默认值为 False。
- color{None, ‘order’, ‘ages’, ‘freed’, ‘memoryincreases’, ‘memorydecreases’, ‘memorypressure’} 之一,可选
节点着色选项。颜色映射
None,默认值,不着色。
‘order’,根据节点在图中出现的顺序为节点边框着色。
‘ages’,节点数据保留时长。
‘freed’,运行节点后释放的依赖项数量。
‘memoryincreases’,节点生命周期结束后保留的更多输出数量。较大的值可能表明节点应该稍后运行。
‘memorydecreases’,节点生命周期结束后保留的较少输出数量。较大的值可能表明节点应该更早运行。
‘memorypressure’,节点运行时保留的数据数量(圆圈),或数据释放时的数量(矩形)。
- maxval{int, float} 之一,可选
颜色映射的最大值,用于归一化到 0 到 1.0。默认值
None
将使其成为最大值数量。- collapse_outputs布尔值,可选
是否折叠输出框,输出框通常带有空标签。默认值为 False。
- verbose布尔值,可选
即使数据未分块,是否也标记输出和输入框。注意:这些标签可能会非常长。默认值为 False。
- engine{“graphviz”, “ipycytoscape”, “cytoscape”} 之一,可选。
要使用的可视化引擎。如果未提供,则检查 Dask 配置值“visualization.engine”。如果未设置,它会尝试导入
graphviz
和ipycytoscape
,并使用第一个成功的。- **kwargs
要转发给可视化引擎的附加关键字参数。
- 返回
- resultIPython.display.Image, IPython.display.SVG, 或 None
更多信息请参阅 dask.dot.dot_graph。
另请参阅
dask.dot.dot_graph
注意
有关优化的更多信息,请参阅此处
https://docs.dask.org.cn/en/latest/optimize.html
示例
>>> x.visualize(filename='dask.pdf') >>> x.visualize(filename='dask.pdf', color='order')
数据集¶
Dask 有一些用于生成演示数据集的辅助函数
- dask.datasets.make_people(npartitions=10, records_per_partition=1000, seed=None, locale='en')[source]¶
生成随机人员数据集
这将生成一个包含随机生成人员字典记录的 Dask Bag。这需要可选库
mimesis
来生成记录。- 参数
- npartitionsint
分区数量
- records_per_partitionint
每个分区的记录数量
- seedint, (可选)
随机种子
- locale字符串
语言区域设置,例如“en”、“fr”、“zh”或“ru”
- 返回
- b: Dask Bag
- dask.datasets.timeseries(start='2000-01-01', end='2000-01-31', freq='1s', partition_freq='1D', dtypes=None, seed=None, **kwargs)[source]¶
创建带随机数据的时序 DataFrame
- 参数
- startdatetime(或类似 datetime 的字符串)
时间序列开始时间
- enddatetime(或类似 datetime 的字符串)
时间序列结束时间
- dtypes字典 (可选)
列名到类型的映射。有效类型包括 {float, int, str, ‘category’}
- freq字符串
表示时间序列频率的字符串,例如“2s”、“1H”或“12W”
- partition_freq字符串
将 DataFrame 分割成多个分区的字符串,例如“1M”或“2Y”
- seedint (可选)
随机状态种子
- kwargs
要传递给单个列创建函数的关键字。关键字应以列名开头,后跟下划线。
示例
>>> import dask >>> df = dask.datasets.timeseries() >>> df.head() timestamp id name x y 2000-01-01 00:00:00 967 Jerry -0.031348 -0.040633 2000-01-01 00:00:01 1066 Michael -0.262136 0.307107 2000-01-01 00:00:02 988 Wendy -0.526331 0.128641 2000-01-01 00:00:03 1016 Yvonne 0.620456 0.767270 2000-01-01 00:00:04 998 Ursula 0.684902 -0.463278 >>> df = dask.datasets.timeseries( ... '2000', '2010', ... freq='2h', partition_freq='1D', seed=1, # data frequency ... dtypes={'value': float, 'name': str, 'id': int}, # data types ... id_lam=1000 # control number of items in id column ... )
具有定义规范的数据集¶
以下辅助函数仍处于实验阶段
- dask.dataframe.io.demo.with_spec(spec: dask.dataframe.io.demo.DatasetSpec, seed: int | None = None)[source]¶
根据提供的规范生成随机数据集
- 参数
- specDatasetSpec
指定数据集的所有参数
- seed: int (可选)
随机状态种子
注意
此 API 仍在实验阶段,未来可能会发生变化
示例
>>> from dask.dataframe.io.demo import ColumnSpec, DatasetSpec, with_spec >>> ddf = with_spec( ... DatasetSpec( ... npartitions=10, ... nrecords=10_000, ... column_specs=[ ... ColumnSpec(dtype=int, number=2, prefix="p"), ... ColumnSpec(dtype=int, number=2, prefix="n", method="normal"), ... ColumnSpec(dtype=float, number=2, prefix="f"), ... ColumnSpec(dtype=str, prefix="s", number=2, random=True, length=10), ... ColumnSpec(dtype="category", prefix="c", choices=["Y", "N"]), ... ], ... ), seed=42) >>> ddf.head(10) p1 p2 n1 n2 f1 f2 s1 s2 c1 0 1002 972 -811 20 0.640846 -0.176875 L#h98#}J`? _8C607/:6e N 1 985 982 -1663 -777 0.790257 0.792796 u:XI3,omoZ w~@ /d)'-@ N 2 947 970 799 -269 0.740869 -0.118413 O$dnwCuq\ !WtSe+(;#9 Y 3 1003 983 1133 521 -0.987459 0.278154 j+Qr_2{XG& &XV7cy$y1T Y 4 1017 1049 826 5 -0.875667 -0.744359 bJ3E-{:o {+jC).?vK+ Y 5 984 1017 -492 -399 0.748181 0.293761 ~zUNHNgD"! yuEkXeVot| Y 6 992 1027 -856 67 -0.125132 -0.234529 j.7z;o]Gc9 g|Fi5*}Y92 Y 7 1011 974 762 -1223 0.471696 0.937935 yT?j~N/-u] JhEB[W-}^$ N 8 984 974 856 74 0.109963 0.367864 _j"&@ i&;/ OYXQ)w{hoH N 9 1030 1001 -792 -262 0.435587 -0.647970 Pmrwl{{|.K 3UTqM$86Sg N
ColumnSpec 类¶
- class dask.dataframe.io.demo.ColumnSpec(prefix: str | None = None, dtype: str | type | None = None, number: int = 1, nunique: int | None = None, choices: list = <factory>, low: int | None = None, high: int | None = None, length: int | None = None, random: bool = False, method: str | None = None, args: tuple[typing.Any, ...] = <factory>, kwargs: dict[str, typing.Any] = <factory>)[source]¶
基类:
object
封装具有相同 dtype 的一系列列的属性。可以为整数 dtype 指定不同的方法(“poisson”、“uniform”、“binomial”等)
注意
此 API 仍在实验阶段,未来可能会发生变化
RangeIndexSpec 类¶
DatetimeIndexSpec 类¶
DatasetSpec 类¶
- class dask.dataframe.io.demo.DatasetSpec(npartitions: int = 1, nrecords: int = 1000, index_spec: dask.dataframe.io.demo.RangeIndexSpec | dask.dataframe.io.demo.DatetimeIndexSpec = <factory>, column_specs: list[dask.dataframe.io.demo.ColumnSpec] = <factory>)[source]¶
基类:
object
定义一个带有随机数据的数据集,例如要生成哪些列和数据类型
注意
此 API 仍在实验阶段,未来可能会发生变化
- column_specs: list[dask.dataframe.io.demo.ColumnSpec]¶
列定义列表
- index_spec: dask.dataframe.io.demo.RangeIndexSpec | dask.dataframe.io.demo.DatetimeIndexSpec¶
索引的属性
工具类¶
Dask 有一些公共工具方法。这些方法主要用于解析配置值。
- dask.utils.apply(func, args, kwargs=None)[source]¶
给定位置参数和关键字参数应用函数。
等价于
func(*args, **kwargs)
。大多数 Dask 用户永远不需要使用apply
函数。它通常仅供需要将关键字参数值注入到低级 Dask 任务图中的人使用。- 参数
- func可调用对象
要应用的函数。
- args元组
一个包含 func 所需所有位置参数的元组(例如:
(arg_1, arg_2, arg_3)
)- kwargs字典,可选
一个映射关键字参数的字典(例如:
{"kwarg_1": value, "kwarg_2": value}
示例
>>> from dask.utils import apply >>> def add(number, second_number=5): ... return number + second_number ... >>> apply(add, (10,), {"second_number": 2}) # equivalent to add(*args, **kwargs) 12
>>> task = apply(add, (10,), {"second_number": 2}) >>> dsk = {'task-name': task} # adds the task to a low level Dask task graph
- dask.utils.format_bytes(n: int) str [source]¶
将字节格式化为文本
>>> from dask.utils import format_bytes >>> format_bytes(1) '1 B' >>> format_bytes(1234) '1.21 kiB' >>> format_bytes(12345678) '11.77 MiB' >>> format_bytes(1234567890) '1.15 GiB' >>> format_bytes(1234567890000) '1.12 TiB' >>> format_bytes(1234567890000000) '1.10 PiB'
对于所有 < 2**60 的值,输出始终 <= 10 个字符。
- dask.utils.format_time(n: float) str [source]¶
将整数格式化为时间
>>> from dask.utils import format_time >>> format_time(1) '1.00 s' >>> format_time(0.001234) '1.23 ms' >>> format_time(0.00012345) '123.45 us' >>> format_time(123.456) '123.46 s' >>> format_time(1234.567) '20m 34s' >>> format_time(12345.67) '3hr 25m' >>> format_time(123456.78) '34hr 17m' >>> format_time(1234567.89) '14d 6hr'
- dask.utils.parse_bytes(s: float | str) int [source]¶
将字节字符串解析为数字
>>> from dask.utils import parse_bytes >>> parse_bytes('100') 100 >>> parse_bytes('100 MB') 100000000 >>> parse_bytes('100M') 100000000 >>> parse_bytes('5kB') 5000 >>> parse_bytes('5.4 kB') 5400 >>> parse_bytes('1kiB') 1024 >>> parse_bytes('1e6') 1000000 >>> parse_bytes('1e6 kB') 1000000000 >>> parse_bytes('MB') 1000000 >>> parse_bytes(123) 123 >>> parse_bytes('5 foos') Traceback (most recent call last): ... ValueError: Could not interpret 'foos' as a byte unit
- dask.utils.parse_timedelta(s: None, default: str | Literal[False] = 'seconds') None [source]¶
- dask.utils.parse_timedelta(s: str | float | timedelta, default: str | Literal[False] = 'seconds') float
将 timedelta 字符串解析为秒数
- 参数
- s字符串、浮点数、timedelta 或 None
- default: 字符串或 False,可选
如果 s 未指定单位,则使用的测量单位。默认为秒。设置为 False 要求 s 显式指定其自己的单位。
示例
>>> from datetime import timedelta >>> from dask.utils import parse_timedelta >>> parse_timedelta('3s') 3 >>> parse_timedelta('3.5 seconds') 3.5 >>> parse_timedelta('300ms') 0.3 >>> parse_timedelta(timedelta(seconds=3)) # also supports timedeltas 3