API 参考

Dask API 通常遵循上游 API

此外,Dask 拥有自己的函数来启动计算、将数据持久化到内存、检查进度等等,这些函数补充了上述 API。下面将介绍这些更通用的 Dask 函数

compute(*args[, traverse, optimize_graph, ...])

同时计算多个 Dask 集合。

is_dask_collection(x)

如果 x 是 Dask 集合,则返回 True

optimize(*args[, traverse])

同时优化多个 Dask 集合。

persist(*args[, traverse, optimize_graph, ...])

将多个 Dask 集合持久化到内存中

visualize(*args[, filename, traverse, ...])

同时可视化多个 Dask 图。

这些函数适用于任何调度器。当使用较新的调度器并启动 dask.distributed.Client 时(尽管名字叫 Client,但它在单机上运行良好),可以使用更高级的操作。该 API 提供了异步提交、取消和跟踪工作的能力,并包含许多用于复杂任务间工作流的函数。这些对于正常操作来说不是必需的,但对于实时或高级操作可能很有用。

这个更高级的 API 可以在Dask 分布式文档中找到

dask.annotate(**annotations: Any) collections.abc.Iterator[None][source]

用于设置 HighLevelGraph 层注释的上下文管理器。

注释是与任务相关的元数据或软约束,Dask 调度器可以选择遵守它们:它们表示意图,但不强制执行硬约束。因此,它们主要设计用于分布式调度器。

几乎任何对象都可以作为注释,但小型 Python 对象更受欢迎,而像 NumPy 数组这样的大型对象则不建议使用。

作为注释提供的可调用对象应该接受一个 key 参数并生成适当的注释。被注释集合中的单个任务键会传递给可调用对象。

参数
**annotations键值对

另请参阅

get_annotations

示例

数组 A 中的所有任务应具有优先级 100 并在失败时重试 3 次。

>>> import dask
>>> import dask.array as da
>>> with dask.annotate(priority=100, retries=3):
...     A = da.ones((10000, 10000))

基于展平的块 ID 优先处理数组 A 中的任务。

>>> nblocks = (10, 10)
>>> with dask.annotate(priority=lambda k: k[1]*nblocks[1] + k[2]):
...     A = da.ones((1000, 1000), chunks=(100, 100))

注释可以嵌套。

>>> with dask.annotate(priority=1):
...     with dask.annotate(retries=3):
...         A = da.ones((1000, 1000))
...     B = A + 1
dask.get_annotations() dict[str, Any][source]

获取当前注释。

返回
所有当前注释的字典

另请参阅

annotate
dask.compute(*args, traverse=True, optimize_graph=True, scheduler=None, get=None, **kwargs)[source]

同时计算多个 Dask 集合。

参数
args 对象

任意数量的对象。如果是 Dask 对象,则对其进行计算并返回结果。默认情况下,还会遍历 Python 内置集合以查找 Dask 对象(更多信息请参阅 traverse 关键字)。非 Dask 参数会保持不变地通过。

traverse布尔值,可选

默认情况下,Dask 会遍历传递给 compute 的 Python 内置集合以查找 Dask 对象。对于大型集合,这可能会非常耗时。如果参数中不包含任何 Dask 对象,请设置 traverse=False 以避免执行此遍历。

scheduler字符串,可选

要使用的调度器,例如“threads”、“synchronous”或“processes”。如果未提供,默认行为是先检查全局设置,然后回退到集合默认值。

optimize_graph布尔值,可选

如果为 True [默认],则在计算之前应用每个集合的优化。否则,图会按原样运行。这对于调试可能很有用。

getNone

应保持为 None。get= 关键字已被移除。

kwargs

要转发给调度器函数的额外关键字参数。

示例

>>> import dask
>>> import dask.array as da
>>> a = da.arange(10, chunks=2).sum()
>>> b = da.arange(10, chunks=2).mean()
>>> dask.compute(a, b)
(np.int64(45), np.float64(4.5))

默认情况下,Python 集合内的 Dask 对象也将被计算

>>> dask.compute({'a': a, 'b': b, 'c': 1})
({'a': np.int64(45), 'b': np.float64(4.5), 'c': 1},)
dask.is_dask_collection(x) bool[source]

如果 x 是 Dask 集合,则返回 True

参数
xAny

要测试的对象。

返回
result布尔值

如果 x 是 Dask 集合,则为 True

注意

DaskCollection typing.Protocol 实现将 Dask 集合定义为一个类,该类从 __dask_graph__ 方法返回一个映射 (Mapping)。此辅助函数在协议实现之前就已存在。

dask.optimize(*args, traverse=True, **kwargs)[source]

同时优化多个 Dask 集合。

返回等效的 Dask 集合,它们都共享合并和优化的底层图。这在将多个集合转换为延迟对象或在战略点手动应用优化时很有用。

请注意,在大多数情况下,您无需直接调用此函数。

警告

This function triggers a materialization of the collections and looses
any annotations attached to HLG layers.
参数
*args对象

任意数量的对象。如果是 Dask 对象,其图会在返回等效 Dask 集合之前与所有其他 Dask 对象的图一起进行优化和合并。非 Dask 参数会保持不变地通过。

traverse布尔值,可选

默认情况下,Dask 会遍历传递给 optimize 的 Python 内置集合以查找 Dask 对象。对于大型集合,这可能会非常耗时。如果参数中不包含任何 Dask 对象,请设置 traverse=False 以避免执行此遍历。

optimizations可调用对象列表,可选

要执行的附加优化过程。

**kwargs

要转发给优化过程的额外关键字参数。

示例

>>> import dask
>>> import dask.array as da
>>> a = da.arange(10, chunks=2).sum()
>>> b = da.arange(10, chunks=2).mean()
>>> a2, b2 = dask.optimize(a, b)
>>> a2.compute() == a.compute()
np.True_
>>> b2.compute() == b.compute()
np.True_
dask.persist(*args, traverse=True, optimize_graph=True, scheduler=None, **kwargs)[source]

将多个 Dask 集合持久化到内存中

这将惰性 Dask 集合转换为具有相同元数据的 Dask 集合,但其结果现在已完全计算或正在后台积极计算。

例如,由许多惰性调用构建的惰性 dask.array 现在将是一个具有相同形状、dtype、块等的 dask.array,但所有先前惰性任务要么已在内存中计算为许多小的 numpy.array(单机情况),要么在集群上异步运行(分布式情况)。

如果存在 dask.distributed.Client 并连接到分布式调度器,则此函数的行为会有所不同。在这种情况下,该函数会在任务图提交到集群后立即返回,但不会等到计算完成。计算会在后台异步继续。当使用单机调度器时,此函数会阻塞直到计算完成。

在单机上使用 Dask 时,应确保数据集完全适合内存。

参数
*args: Dask 集合
scheduler字符串,可选

要使用的调度器,例如“threads”、“synchronous”或“processes”。如果未提供,默认行为是先检查全局设置,然后回退到集合默认值。

traverse布尔值,可选

默认情况下,Dask 会遍历传递给 persist 的 Python 内置集合以查找 Dask 对象。对于大型集合,这可能会非常耗时。如果参数中不包含任何 Dask 对象,请设置 traverse=False 以避免执行此遍历。

optimize_graph布尔值,可选

如果为 True [默认],则在计算之前优化图。否则,图会按原样运行。这对于调试可能很有用。

**kwargs

要转发给调度器函数的额外关键字参数。

返回
由内存数据支持的新 Dask 集合

示例

>>> df = dd.read_csv('/path/to/*.csv')  
>>> df = df[df.name == 'Alice']  
>>> df['in-debt'] = df.balance < 0  
>>> df = df.persist()  # triggers computation  
>>> df.value().min()  # future computations are now fast  
-10
>>> df.value().max()  
100
>>> from dask import persist  # use persist function on multiple collections
>>> a, b = persist(a, b)  
dask.visualize(*args, filename='mydask', traverse=True, optimize_graph=False, maxval=None, engine: Optional[Literal['cytoscape', 'ipycytoscape','graphviz']] = None, **kwargs)[source]

同时可视化多个 Dask 图。

需要安装 graphviz。所有非 Dask 图的选项都应作为关键字参数传递。

参数
args 对象

任意数量的对象。如果是 Dask 集合(例如,Dask DataFrame、Array、Bag 或 Delayed),其关联的图将包含在 visualize 的输出中。默认情况下,还会遍历 Python 内置集合以查找 Dask 对象(更多信息请参阅 traverse 关键字)。缺少关联图的参数将被忽略。

filename字符串或 None,可选

要写入磁盘的文件名。如果提供的 filename 不包含扩展名,默认将使用“.png”。如果 filename 为 None,则不写入文件,仅使用管道与 dot 进行通信。

format{‘png’, ‘pdf’, ‘dot’, ‘svg’, ‘jpeg’, ‘jpg’} 之一,可选

写入输出文件的格式。默认是“png”。

traverse布尔值,可选

默认情况下,Dask 会遍历传递给 visualize 的 Python 内置集合以查找 Dask 对象。对于大型集合,这可能会非常耗时。如果参数中不包含任何 Dask 对象,请设置 traverse=False 以避免执行此遍历。

optimize_graph布尔值,可选

如果为 True,则在渲染前优化图。否则,图会按原样显示。默认值为 False。

color{None, ‘order’, ‘ages’, ‘freed’, ‘memoryincreases’, ‘memorydecreases’, ‘memorypressure’} 之一,可选

节点着色选项。颜色映射

  • None,默认值,不着色。

  • ‘order’,根据节点在图中出现的顺序为节点边框着色。

  • ‘ages’,节点数据保留时长。

  • ‘freed’,运行节点后释放的依赖项数量。

  • ‘memoryincreases’,节点生命周期结束后保留的更多输出数量。较大的值可能表明节点应该稍后运行。

  • ‘memorydecreases’,节点生命周期结束后保留的较少输出数量。较大的值可能表明节点应该更早运行。

  • ‘memorypressure’,节点运行时保留的数据数量(圆圈),或数据释放时的数量(矩形)。

maxval{int, float} 之一,可选

颜色映射的最大值,用于归一化到 0 到 1.0。默认值 None 将使其成为最大值数量。

collapse_outputs布尔值,可选

是否折叠输出框,输出框通常带有空标签。默认值为 False。

verbose布尔值,可选

即使数据未分块,是否也标记输出和输入框。注意:这些标签可能会非常长。默认值为 False。

engine{“graphviz”, “ipycytoscape”, “cytoscape”} 之一,可选。

要使用的可视化引擎。如果未提供,则检查 Dask 配置值“visualization.engine”。如果未设置,它会尝试导入 graphvizipycytoscape,并使用第一个成功的。

**kwargs

要转发给可视化引擎的附加关键字参数。

返回
resultIPython.display.Image, IPython.display.SVG, 或 None

更多信息请参阅 dask.dot.dot_graph。

另请参阅

dask.dot.dot_graph

注意

有关优化的更多信息,请参阅此处

https://docs.dask.org.cn/en/latest/optimize.html

示例

>>> x.visualize(filename='dask.pdf')  
>>> x.visualize(filename='dask.pdf', color='order')  

数据集

Dask 有一些用于生成演示数据集的辅助函数

dask.datasets.make_people(npartitions=10, records_per_partition=1000, seed=None, locale='en')[source]

生成随机人员数据集

这将生成一个包含随机生成人员字典记录的 Dask Bag。这需要可选库 mimesis 来生成记录。

参数
npartitionsint

分区数量

records_per_partitionint

每个分区的记录数量

seedint, (可选)

随机种子

locale字符串

语言区域设置,例如“en”、“fr”、“zh”或“ru”

返回
b: Dask Bag
dask.datasets.timeseries(start='2000-01-01', end='2000-01-31', freq='1s', partition_freq='1D', dtypes=None, seed=None, **kwargs)[source]

创建带随机数据的时序 DataFrame

参数
startdatetime(或类似 datetime 的字符串)

时间序列开始时间

enddatetime(或类似 datetime 的字符串)

时间序列结束时间

dtypes字典 (可选)

列名到类型的映射。有效类型包括 {float, int, str, ‘category’}

freq字符串

表示时间序列频率的字符串,例如“2s”、“1H”或“12W”

partition_freq字符串

将 DataFrame 分割成多个分区的字符串,例如“1M”或“2Y”

seedint (可选)

随机状态种子

kwargs

要传递给单个列创建函数的关键字。关键字应以列名开头,后跟下划线。

示例

>>> import dask
>>> df = dask.datasets.timeseries()
>>> df.head()  
          timestamp    id     name         x         y
2000-01-01 00:00:00   967    Jerry -0.031348 -0.040633
2000-01-01 00:00:01  1066  Michael -0.262136  0.307107
2000-01-01 00:00:02   988    Wendy -0.526331  0.128641
2000-01-01 00:00:03  1016   Yvonne  0.620456  0.767270
2000-01-01 00:00:04   998   Ursula  0.684902 -0.463278
>>> df = dask.datasets.timeseries(
...     '2000', '2010',
...     freq='2h', partition_freq='1D', seed=1,  # data frequency
...     dtypes={'value': float, 'name': str, 'id': int},  # data types
...     id_lam=1000  # control number of items in id column
... )

具有定义规范的数据集

以下辅助函数仍处于实验阶段

dask.dataframe.io.demo.with_spec(spec: dask.dataframe.io.demo.DatasetSpec, seed: int | None = None)[source]

根据提供的规范生成随机数据集

参数
specDatasetSpec

指定数据集的所有参数

seed: int (可选)

随机状态种子

注意

此 API 仍在实验阶段,未来可能会发生变化

示例

>>> from dask.dataframe.io.demo import ColumnSpec, DatasetSpec, with_spec
>>> ddf = with_spec(
...     DatasetSpec(
...         npartitions=10,
...         nrecords=10_000,
...         column_specs=[
...             ColumnSpec(dtype=int, number=2, prefix="p"),
...             ColumnSpec(dtype=int, number=2, prefix="n", method="normal"),
...             ColumnSpec(dtype=float, number=2, prefix="f"),
...             ColumnSpec(dtype=str, prefix="s", number=2, random=True, length=10),
...             ColumnSpec(dtype="category", prefix="c", choices=["Y", "N"]),
...         ],
...     ), seed=42)
>>> ddf.head(10)  
     p1    p2    n1    n2        f1        f2          s1          s2 c1
0  1002   972  -811    20  0.640846 -0.176875  L#h98#}J`?  _8C607/:6e  N
1   985   982 -1663  -777  0.790257  0.792796  u:XI3,omoZ  w~@ /d)'-@  N
2   947   970   799  -269  0.740869 -0.118413  O$dnwCuq\  !WtSe+(;#9  Y
3  1003   983  1133   521 -0.987459  0.278154  j+Qr_2{XG&  &XV7cy$y1T  Y
4  1017  1049   826     5 -0.875667 -0.744359  bJ3E-{:o  {+jC).?vK+  Y
5   984  1017  -492  -399  0.748181  0.293761  ~zUNHNgD"!  yuEkXeVot|  Y
6   992  1027  -856    67 -0.125132 -0.234529  j.7z;o]Gc9  g|Fi5*}Y92  Y
7  1011   974   762 -1223  0.471696  0.937935  yT?j~N/-u]  JhEB[W-}^$  N
8   984   974   856    74  0.109963  0.367864  _j"&@ i&;/  OYXQ)w{hoH  N
9  1030  1001  -792  -262  0.435587 -0.647970  Pmrwl{{|.K  3UTqM$86Sg  N

ColumnSpec 类

class dask.dataframe.io.demo.ColumnSpec(prefix: str | None = None, dtype: str | type | None = None, number: int = 1, nunique: int | None = None, choices: list = <factory>, low: int | None = None, high: int | None = None, length: int | None = None, random: bool = False, method: str | None = None, args: tuple[typing.Any, ...] = <factory>, kwargs: dict[str, typing.Any] = <factory>)[source]

基类: object

封装具有相同 dtype 的一系列列的属性。可以为整数 dtype 指定不同的方法(“poisson”、“uniform”、“binomial”等)

注意

此 API 仍在实验阶段,未来可能会发生变化

args: tuple[Any, ...]

要传递给方法的参数

choices: list

对于“category”或 str 列,可能值的列表

dtype: str | type | None = None

列数据类型。仅支持 numpy dtypes

high: int | None = None

对于 int 列,范围上限

kwargs: dict[str, Any]

要传递给方法的其他关键字参数

length: int | None = None

对于 random=True 的 str 或“category”列,要生成的字符串大小

low: int | None = None

int 列的起始值。如果 random=True 则可选,因为 randint 不接受 high 和 low。

method: str | None = None

对于 int 列,生成值时使用的方法,例如“poisson”、“uniform”、“binomial”。默认为“poisson”。委托给 RandomState 的同名方法。

number: int = 1

创建具有这些属性的列数。默认为 1。如果指定多个列,它们将被编号:“int1”、“int2”等。

nunique: int | None = None

对于“category”列,要生成多少个唯一类别

prefix: str | None = None

列前缀。如果未指定,将默认为 str(dtype)

random: bool = False

对于 int 列,是否使用 randint。对于字符串列,生成指定 length 的随机字符串。

RangeIndexSpec 类

class dask.dataframe.io.demo.RangeIndexSpec(dtype: str | type = <class 'int'>, step: int = 1)[source]

基类: object

DataFrame RangeIndex 的属性

注意

此 API 仍在实验阶段,未来可能会发生变化

dtype¶

索引 dtype

int 的别名

step: int = 1

RangeIndex 的步长

DatetimeIndexSpec 类

class dask.dataframe.io.demo.DatetimeIndexSpec(dtype: str | type = <class 'int'>, start: str | None = None, freq: str = '1H', partition_freq: str | None = None)[source]

基类: object

DataFrame DatetimeIndex 的属性

注意

此 API 仍在实验阶段,未来可能会发生变化

dtype¶

索引 dtype

int 的别名

freq: str = '1H'

索引的频率(“1H”、“1D”等)

partition_freq: str | None = None

分区频率(“1D”、“1M”等)

start: str | None = None

索引的第一个值

DatasetSpec 类

class dask.dataframe.io.demo.DatasetSpec(npartitions: int = 1, nrecords: int = 1000, index_spec: dask.dataframe.io.demo.RangeIndexSpec | dask.dataframe.io.demo.DatetimeIndexSpec = <factory>, column_specs: list[dask.dataframe.io.demo.ColumnSpec] = <factory>)[source]

基类: object

定义一个带有随机数据的数据集,例如要生成哪些列和数据类型

注意

此 API 仍在实验阶段,未来可能会发生变化

column_specs: list[dask.dataframe.io.demo.ColumnSpec]

列定义列表

index_spec: dask.dataframe.io.demo.RangeIndexSpec | dask.dataframe.io.demo.DatetimeIndexSpec

索引的属性

npartitions: int = 1

DataFrame 中生成的分区数量。如果 DataFrame 有 DatetimeIndex,请改为指定其 partition_freq

nrecords: int = 1000

要生成的总记录数

工具类

Dask 有一些公共工具方法。这些方法主要用于解析配置值。

dask.utils.apply(func, args, kwargs=None)[source]

给定位置参数和关键字参数应用函数。

等价于 func(*args, **kwargs)。大多数 Dask 用户永远不需要使用 apply 函数。它通常仅供需要将关键字参数值注入到低级 Dask 任务图中的人使用。

参数
func可调用对象

要应用的函数。

args元组

一个包含 func 所需所有位置参数的元组(例如:(arg_1, arg_2, arg_3)

kwargs字典,可选

一个映射关键字参数的字典(例如:{"kwarg_1": value, "kwarg_2": value}

示例

>>> from dask.utils import apply
>>> def add(number, second_number=5):
...     return number + second_number
...
>>> apply(add, (10,), {"second_number": 2})  # equivalent to add(*args, **kwargs)
12
>>> task = apply(add, (10,), {"second_number": 2})
>>> dsk = {'task-name': task}  # adds the task to a low level Dask task graph
dask.utils.format_bytes(n: int) str[source]

将字节格式化为文本

>>> from dask.utils import format_bytes
>>> format_bytes(1)
'1 B'
>>> format_bytes(1234)
'1.21 kiB'
>>> format_bytes(12345678)
'11.77 MiB'
>>> format_bytes(1234567890)
'1.15 GiB'
>>> format_bytes(1234567890000)
'1.12 TiB'
>>> format_bytes(1234567890000000)
'1.10 PiB'

对于所有 < 2**60 的值,输出始终 <= 10 个字符。

dask.utils.format_time(n: float) str[source]

将整数格式化为时间

>>> from dask.utils import format_time
>>> format_time(1)
'1.00 s'
>>> format_time(0.001234)
'1.23 ms'
>>> format_time(0.00012345)
'123.45 us'
>>> format_time(123.456)
'123.46 s'
>>> format_time(1234.567)
'20m 34s'
>>> format_time(12345.67)
'3hr 25m'
>>> format_time(123456.78)
'34hr 17m'
>>> format_time(1234567.89)
'14d 6hr'
dask.utils.parse_bytes(s: float | str) int[source]

将字节字符串解析为数字

>>> from dask.utils import parse_bytes
>>> parse_bytes('100')
100
>>> parse_bytes('100 MB')
100000000
>>> parse_bytes('100M')
100000000
>>> parse_bytes('5kB')
5000
>>> parse_bytes('5.4 kB')
5400
>>> parse_bytes('1kiB')
1024
>>> parse_bytes('1e6')
1000000
>>> parse_bytes('1e6 kB')
1000000000
>>> parse_bytes('MB')
1000000
>>> parse_bytes(123)
123
>>> parse_bytes('5 foos')
Traceback (most recent call last):
    ...
ValueError: Could not interpret 'foos' as a byte unit
dask.utils.parse_timedelta(s: None, default: str | Literal[False] = 'seconds') None[source]
dask.utils.parse_timedelta(s: str | float | timedelta, default: str | Literal[False] = 'seconds') float

将 timedelta 字符串解析为秒数

参数
s字符串、浮点数、timedelta 或 None
default: 字符串或 False,可选

如果 s 未指定单位,则使用的测量单位。默认为秒。设置为 False 要求 s 显式指定其自己的单位。

示例

>>> from datetime import timedelta
>>> from dask.utils import parse_timedelta
>>> parse_timedelta('3s')
3
>>> parse_timedelta('3.5 seconds')
3.5
>>> parse_timedelta('300ms')
0.3
>>> parse_timedelta(timedelta(seconds=3))  # also supports timedeltas
3