Python API (高级)

在某些罕见情况下,专家可能希望在 Python 中显式创建 SchedulerWorkerNanny 对象。这通常在构建工具以在自定义设置中自动部署 Dask 时是必需的。

更常见的方法是使用 在一台机器上使用 Client() 创建本地集群 或使用 命令行界面 (CLI)。建议新读者从那里开始。

如果您确实想自己启动 Scheduler 和 Worker 对象,您应该对 async/await 风格的 Python 语法有所了解。这些对象是可等待的 (awaitable),并且通常在 async with 上下文管理器中使用。这里有一些示例,展示了几种启动和完成它们的方式。

完整示例

Scheduler([loop, services, service_kwargs, ...])

动态分布式任务调度器

Worker(scheduler_ip, scheduler_port, *, ...)

Dask 分布式集群中的 Worker 节点

Client([address, loop, timeout, ...])

连接到 Dask 集群并提交计算

我们首先从一个综合示例开始,演示如何在同一个事件循环中设置一个 Scheduler、两个 Worker 和一个 Client,运行一个简单的计算,然后清理所有内容。

import asyncio
from dask.distributed import Scheduler, Worker, Client

async def f():
    async with Scheduler() as s:
        async with Worker(s.address) as w1, Worker(s.address) as w2:
            async with Client(s.address, asynchronous=True) as client:
                future = client.submit(lambda x: x + 1, 10)
                result = await future
                print(result)

asyncio.get_event_loop().run_until_complete(f())

现在我们来看一些更简单的示例,逐步构建到上面那个案例。

Scheduler

Scheduler([loop, services, service_kwargs, ...])

动态分布式任务调度器

我们通过创建一个 Scheduler() 对象来创建调度器,然后 await 该对象以等待其启动。之后,我们可以等待 .finished 方法,直到它关闭。在此期间,调度器将处于活动状态并管理集群。

import asyncio
from dask.distributed import Scheduler, Worker

async def f():
    s = Scheduler()        # scheduler created, but not yet running
    s = await s            # the scheduler is running
    await s.finished()     # wait until the scheduler closes

asyncio.get_event_loop().run_until_complete(f())

这个程序将永远运行,除非有外部进程连接到调度器并告诉它停止。如果您想自行关闭,可以通过等待 .close 方法来关闭任何 SchedulerWorkerNannyClient 类。

await s.close()

Worker

Worker(scheduler_ip, scheduler_port, *, ...)

Dask 分布式集群中的 Worker 节点

Worker 遵循相同的 API。唯一的区别是 worker 需要知道调度器的地址。

import asyncio
from dask.distributed import Scheduler, Worker

async def f(scheduler_address):
    w = await Worker(scheduler_address)
    await w.finished()

asyncio.get_event_loop().run_until_complete(f("tcp://127.0.0.1:8786"))

在同一个事件循环中启动多个

Scheduler([loop, services, service_kwargs, ...])

动态分布式任务调度器

Worker(scheduler_ip, scheduler_port, *, ...)

Dask 分布式集群中的 Worker 节点

我们可以在同一个事件循环中运行任意数量的这些对象。

import asyncio
from dask.distributed import Scheduler, Worker

async def f():
    s = await Scheduler()
    w = await Worker(s.address)
    await w.finished()
    await s.finished()

asyncio.get_event_loop().run_until_complete(f())

使用上下文管理器

我们也可以使用 async with 上下文管理器来确保正确清理。下面是与上面相同的示例:

import asyncio
from dask.distributed import Scheduler, Worker

async def f():
    async with Scheduler() as s:
        async with Worker(s.address) as w:
            await w.finished()
            await s.finished()

asyncio.get_event_loop().run_until_complete(f())

或者,在下面的示例中,我们还包含一个 Client,运行一个小型计算,然后在计算完成后允许进行清理。

import asyncio
from dask.distributed import Scheduler, Worker, Client

async def f():
    async with Scheduler() as s:
        async with Worker(s.address) as w1, Worker(s.address) as w2:
            async with Client(s.address, asynchronous=True) as client:
                future = client.submit(lambda x: x + 1, 10)
                result = await future
                print(result)

asyncio.get_event_loop().run_until_complete(f())

这相当于创建并等待每个服务器,然后在退出上下文时对每个服务器调用 .close。在这个示例中,我们不等待 s.finished(),因此它会相对快速地终止。如果您希望它永远运行,则可以调用 await s.finished()

Nanny

Nanny([scheduler_ip, scheduler_port, ...])

用于管理 worker 进程的进程

或者,如果我们希望 worker 在单独的进程中进行管理,则可以将 Worker 替换为 NannyNanny 构造函数遵循相同的 API。这允许 worker 在失败时自行重启。此外,它还提供了一些附加监控,并且在协调许多应该位于不同进程中以避免 GIL 的 worker 时非常有用。

# w = await Worker(s.address)
w = await Nanny(s.address)

API

这些类具有多种关键字参数,可用于控制其行为。有关更多信息,请参阅下面的 API 文档。

Scheduler

class distributed.Scheduler(loop: tornado.ioloop.IOLoop | None = None, services: dict | None = None, service_kwargs: dict | None = None, allowed_failures: int | None = None, extensions: dict | None = None, validate: bool | None = None, scheduler_file: str | None = None, security: dict | distributed.security.Security | None = None, worker_ttl: float | None = None, idle_timeout: float | None = None, interface: str | None = None, host: str | None = None, port: int = 0, protocol: str | None = None, dashboard_address: str | None = None, dashboard: bool | None = None, http_prefix: str | None = '/', preload: str | collections.abc.Sequence[str] | None = None, preload_argv: str | collections.abc.Sequence[str] | collections.abc.Sequence[collections.abc.Sequence[str]] = (), plugins: collections.abc.Sequence[distributed.diagnostics.plugin.SchedulerPlugin] = (), contact_address: str | None = None, transition_counter_max: bool | int = False, jupyter: bool = False, **kwargs: Any)[source]

动态分布式任务调度器

调度器跟踪 worker、数据和计算的当前状态。调度器监听事件并适当控制 worker 进行响应。它不断尝试使用 worker 来执行不断增长的 dask 图。

所有事件都得到快速处理,处理时间与其输入(通常是常量大小)呈线性关系,通常在毫秒内完成。为了实现这一点,调度器跟踪了大量状态。每个操作都维护了此状态的一致性。

调度器通过 Comm 对象与外部世界通信。即使同时监听多个客户端,它也能维护一致且有效的数据视图。

Scheduler 通常通过 dask scheduler 可执行文件启动

$ dask scheduler
Scheduler started at 127.0.0.1:8786

或者在 LocalCluster 中,Client 无需连接信息即可启动

>>> c = Client()  
>>> c.cluster.scheduler  
Scheduler(...)

用户通常不直接与调度器交互,而是与客户端对象 Client 交互。

参数 contact_address 允许为 worker 通信向调度器发布一个特定的地址,该地址与调度器绑定的地址不同。这在调度器监听一个私有地址(worker 因此无法联系到它)时非常有用。

状态

调度器包含以下状态变量。每个变量与其存储的内容和简要描述一起列出。

  • tasks: {task key: TaskState}

    调度器当前已知的任务

  • unrunnable: {TaskState}

    处于“无 worker”状态的任务

  • workers: {worker key: WorkerState}

    当前连接到调度器的 Worker

  • idle: {WorkerState}

    未充分利用的 worker 集合

  • saturated: {WorkerState}

    未过载的 worker 集合

  • host_info: {hostname: dict}

    关于每个 worker 主机的信息

  • clients: {client key: ClientState}

    根据当前工作负载所需的 worker 数量

  • 这查看当前正在运行的任务和内存使用情况,并返回所需的 worker 数量。这通常由自适应调度使用。

    参数

  • target_durationfloat | None

    计算所需的目标持续时间。这会影响调度器请求扩缩的速度。

  • 另请参阅

    distributed.deploy.Adaptive

  • async add_client(comm: distributed.comm.core.Comm, client: str, versions: dict[str, Any]) None[source]

    将 client 添加到网络

  • 我们监听来自此 Comm 的所有未来消息。

    add_keys(worker: str, keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]] = (), stimulus_id: str | None = None) Literal['OK', 'not found'][source]

得知某个 worker 持有特定的键

这在实践中不应使用,主要是出于历史原因。然而,worker 会不时发送它。

add_plugin(plugin: distributed.diagnostics.plugin.SchedulerPlugin, *, idempotent: bool = False, name: str | None = None, **kwargs: Any) None[source]

向调度器添加外部插件。
参见 https://distributed.dask.org.cn/en/latest/plugins.html

pluginSchedulerPlugin

要添加的 SchedulerPlugin 实例

idempotentbool
如果为 True,则假定插件已存在,不执行任何操作。

namestr

插件的名称,如果为 None,则检查 Plugin 实例上的 name 属性,如果未找到则生成一个。

async add_worker(comm: distributed.comm.core.Comm, *, address: str, status: str, server_id: str, nthreads: int, name: str, resolve_address: bool = True, now: float, resources: dict[str, float], host_info: None = None, memory_limit: int | None, metrics: dict[str, Any], pid: int = 0, services: dict[str, int], local_directory: str, versions: dict[str, Any], nanny: str, extra: dict, stimulus_id: str) None[source]

了解工作节点拥有某些键

这在实践中不应使用,主要出于历史原因保留。然而,工作节点有时会发送此消息。

add_plugin(plugin: distributed.diagnostics.plugin.SchedulerPlugin, *, idempotent: bool = False, name: str | None = None, **kwargs: Any) None[source]

向调度器添加外部插件。

请参阅 https://distributed.dask.org.cn/en/latest/plugins.html

向调度器添加外部插件。
pluginSchedulerPlugin

要添加的 SchedulerPlugin 实例

idempotentbool

如果为 True,则假定插件已存在,不执行任何操作。

namestr

插件的名称;如果为 None,则检查 Plugin 实例上的 name 属性,如果未发现则生成一个名称。

async add_worker(comm: distributed.comm.core.Comm, *, address: str, status: str, server_id: str, nthreads: int, name: str, resolve_address: bool = True, now: float, resources: dict[str, float], host_info: None = None, memory_limit: int | None, metrics: dict[str, Any], pid: int = 0, services: dict[str, int], local_directory: str, versions: dict[str, Any], nanny: str, extra: dict, stimulus_id: str) None[source]

向集群添加一个新的 worker

async benchmark_hardware() dict[str, dict[str, float]][source]

在 worker 上运行基准测试,获取内存、磁盘和网络带宽

返回
result: dict

一个字典,将名称“disk”、“memory”和“network”映射到将大小映射到带宽的字典。这些带宽是在许多 worker 在集群中运行计算时平均得出的。

async broadcast(*, msg: dict, workers: collections.abc.Collection[str] | None = None, hosts: collections.abc.Collection[str] | None = None, nanny: bool = False, serializers: Any = None, on_error: Literal['raise', 'return','return_pickle', 'ignore'] = 'raise') dict[str, Any][source]

向 worker 广播消息,返回所有结果

client_heartbeat(client: str) None[source]

处理来自 Client 的心跳

client_releases_keys(keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], client: str, stimulus_id: str | None = None) None[source]

从 client 的期望列表中移除键

client_send(client: str, msg: dict) None[source]

向 client 发送消息

async close(timeout: float | None = None, reason: str = 'unknown') None[source]

向所有协程发送清理信号,然后等待直到完成

要添加的 SchedulerPlugin 实例

Scheduler.cleanup
close_worker(worker: str) None[source]

要求 worker 自行关闭。不等待其生效。请注意,不能保证 worker 实际上会接受该命令。

请注意,remove_worker() 在 close=True 时会在内部发送相同的命令。

要添加的 SchedulerPlugin 实例

retire_workers
remove_worker
coerce_address(addr: str | tuple, resolve: bool = True) str[source]

强制将可能的输入地址转换为标准格式。*resolve* 可以禁用,用于使用虚假主机名进行测试。

处理字符串、元组或别名。

async delete_worker_data(worker_address: str, keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], stimulus_id: str) None[source]

从 worker 中删除数据并更新相应的 worker/task 状态

向调度器添加外部插件。
worker_address: str

要从中删除键的 Worker 地址

keys: list[Key]

要在指定 worker 上删除的键列表

async dump_cluster_state_to_url(url: str, exclude: collections.abc.Collection[str], format: Literal['msgpack', 'yaml'], **storage_options: dict[str, Any]) None[source]

将集群状态转储写入 fsspec 兼容的 URL。

async feed(comm: distributed.comm.core.Comm, function: bytes | None = None, setup: bytes | None = None, teardown: bytes | None = None, interval: str | float = '1s', **kwargs: Any) None[source]

向外部请求者提供数据 Comm

注意:这将在调度器上运行任意 Python 代码。最终应逐步淘汰此功能。它主要用于诊断。

async gather(keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], serializers: list[str] | None = None) dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], object][source]

从 worker 向调度器收集数据

async gather_on_worker(worker_address: str, who_has: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]]) set[source]

将键从多个 worker 对等复制到单个 worker

向调度器添加外部插件。
worker_address: str

接收 worker 地址,将键复制到此处

who_has: dict[Key, list[str]]

{key: [发送方地址, 发送方地址, …], key: …}

返回
返回

未能复制的键集合

async get_cluster_state(exclude: collections.abc.Collection[str]) dict[source]

生成集群状态转储中使用的状态字典

async get_story(keys_or_stimuli: collections.abc.Iterable[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]]) list[distributed.scheduler.Transition][source]

用于 SchedulerState.story() 的 RPC Hook。

请注意,msgpack 序列化/反序列化往返过程会将 Transition 命名元组转换为普通元组。

get_worker_service_addr(worker: str, service_name: str, protocol: bool = False) tuple[str, int] | str | None[source]

获取 worker 上指定服务的 (host, port) 地址。如果服务不存在则返回 None。

向调度器添加外部插件。
worker地址
service_namestr

常见服务包括 ‘bokeh’ 和 ‘nanny’

protocol布尔值

是否包含带协议的完整地址 (True) 或仅包含 (host, port) 对

handle_long_running(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], worker: str, run_id: int, compute_duration: float | None = None, stimulus_id: str) None[source]

一个任务已从线程池中分离

我们阻止该任务将来被盗取,并更改任务持续时间计算方式,就好像任务已停止一样。

handle_request_refresh_who_has(keys: collections.abc.Iterable[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], worker: str, stimulus_id: str) None[source]

来自工作节点的请求,要求刷新某些键的 who_has 信息。不要与 scheduler.who_has 混淆,后者是来自客户端的专用通信 RPC 请求。

async handle_worker(comm: distributed.comm.core.Comm, worker: str) None[source]

监听来自单个工作节点的响应

这是调度器与工作节点交互的主循环

要添加的 SchedulerPlugin 实例

Scheduler.handle_client

客户端的等效协程

identity(n_workers: int = - 1) dict[str, Any][source]

关于自身和集群的基本信息

log_event(topic: str | collections.abc.Collection[str], msg: Any) None[source]

在给定主题下记录事件

向调度器添加外部插件。
topicstr, list[str]

记录事件的主题名称。要在多个主题下记录同一事件,请传递主题名称列表。

msg

要记录的事件消息。注意,这必须是 msgpack 可序列化的。

要添加的 SchedulerPlugin 实例

Client.log_event
async proxy(msg: dict, worker: str, serializers: Any = None) Any[source]

通过调度器代理与其他工作节点的通信

async rebalance(keys: collections.abc.Iterable[Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]] | None = None, workers: collections.abc.Iterable[str] | None = None, stimulus_id: str | None = None) dict[source]

重新平衡键,使每个工作节点的进程内存(托管+非托管)大致相同。

警告

此操作通常未针对调度器的正常运行进行充分测试。不建议在等待计算时使用此操作。

算法

  1. 找到集群的平均占用率,定义为 dask 托管的数据 + 在工作节点上存在至少 30 秒(distributed.worker.memory.recent-to-old-time)的非托管进程内存。这使我们可以忽略由任务堆使用引起的临时峰值。

    另外,您可以通过 distributed.worker.memory.rebalance.measure 更改测量单个工作节点内存和计算平均值的方式。特别是,这对于忽略不准确的操作系统内存测量可能很有用。

  2. 丢弃占用率在集群平均占用率的 5% 以内(distributed.worker.memory.rebalance.sender-recipient-gap / 2)的工作节点。这有助于避免数据在集群中重复跳动。

  3. 高于平均值的工作节点是发送者;低于平均值的是接收者。

  4. 丢弃绝对占用率低于 30%(distributed.worker.memory.rebalance.sender-min)的发送者。换句话说,只要所有工作节点的占用率都低于 30%,无论是否存在不平衡,都不会移动数据。

  5. 丢弃绝对占用率高于 60%(distributed.worker.memory.rebalance.recipient-max)的接收者。请注意,此阈值默认与 distributed.worker.memory.target 相同,以防止工作节点接受数据后立即溢出到磁盘。

  6. 迭代地选择离平均值最远的发送者和接收者,并在两者之间移动最近插入时间最靠前的键,直到所有发送者或所有接收者都落入平均值的 5% 以内。

    如果接收者已经拥有数据的副本,则会跳过该接收者。换句话说,此方法不会降低复制等级。如果没有足够的内存接受该键且尚未持有该键副本的接收者,则会跳过该键。

最近插入时间最靠前 (LRI) 策略是一种贪婪选择,优点是时间复杂度为 O(1),实现简单(它依赖于 python 字典的插入排序),并且在大多数情况下表现良好。被放弃的替代策略包括

  • 最大优先。时间复杂度为 O(n*log(n)),需要额外的复杂数据结构,并且存在导致最大的数据块像弹珠一样在集群中反复移动的风险。

  • 最近最少使用 (LRU)。此信息目前仅在工作节点上可用,并且在调度器上复制并不简单;通过网络传输它会非常昂贵。此外,请注意 dask 会尽力最大限度地减少中间键在内存中保留的时间,因此在这种情况下,LRI 是 LRU 的近似值。

向调度器添加外部插件。
keys: 可选

允许移动的 dask 键列表。所有其他键将被忽略。请注意,这不能保证键实际会被移动(例如,因为没有必要,或者没有可接受该键的接收工作节点)。

workers: 可选

允许被视为发送者或接收者工作节点的地址列表。所有其他工作节点将被忽略。集群的平均占用率将仅使用允许的工作节点进行计算。

async register_nanny_plugin(comm: None, plugin: bytes, name: str, idempotent: bool | None = None) dict[str, distributed.core.OKMessage][source]

在所有当前运行和未来的 nanny 上注册 nanny 插件

async register_scheduler_plugin(plugin: bytes | distributed.diagnostics.plugin.SchedulerPlugin, name: str | None = None, idempotent: bool | None = None) None[source]

在调度器上注册插件。

async register_worker_plugin(comm: None, plugin: bytes, name: str, idempotent: bool | None = None) dict[str, distributed.core.OKMessage][source]

在所有当前运行和未来的工作节点上注册工作节点插件

remove_client(client: str, stimulus_id: str | None = None) None[source]

从网络中移除客户端

remove_plugin(name: str | None = None) None[source]

从调度器中移除外部插件

向调度器添加外部插件。
namestr

要移除的插件名称

remove_worker(address: str, *, stimulus_id: str, expected: bool = False, close: bool = True) Literal['OK', 'already-removed'][source]

从集群中移除工作节点。

当工作节点报告计划离开或似乎无响应时,我们会执行此操作。这可能会将其任务发送回释放状态。

要添加的 SchedulerPlugin 实例

retire_workers
close_worker
async replicate(keys: list[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], n: int | None = None, workers: collections.abc.Iterable | None = None, branching_factor: int = 2, delete: bool = True, stimulus_id: str | None = None) dict | None[source]

在整个集群中复制数据

这将对每份数据单独执行树状复制,遍布整个网络。

向调度器添加外部插件。
keys: Iterable

要复制的键列表

n: int

我们期望在集群中看到的复制次数

branching_factor: int, optional

每一代可以复制数据的工作节点数量。分支因子越大,单步复制的数据越多,但给定工作节点被数据请求淹没的风险也越大。

要添加的 SchedulerPlugin 实例

Scheduler.rebalance
report(msg: dict, ts: distributed.scheduler.TaskState | None = None, client: str | None = None) None[source]

向所有监听的队列 (Queues) 和通信 (Comms) 发布更新

如果消息包含键,则我们只将消息发送给那些关心该键的通信通道。

request_acquire_replicas(addr: str, keys: collections.abc.Iterable[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], *, stimulus_id: str) None[source]

异步请求工作节点从其他工作节点获取列出的键的副本。这是一个即发即弃操作,不提供成功或失败的反馈,主要用于维护而非计算。

request_remove_replicas(addr: str, keys: list[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], *, stimulus_id: str) None[source]

异步请求工作节点丢弃列出的键的副本。绝不能用于销毁某个键的最后一个副本。这是一个即发即弃操作,主要用于维护而非计算。

副本会立即从调度器端的 TaskState.who_has 中消失;如果工作节点拒绝删除,例如因为该任务是其上运行的另一个任务的依赖项,它将(同样异步地)通知调度器将其重新添加到 who_has 中。如果工作节点同意丢弃任务,则没有反馈。

async restart(*, client: str | None = None, timeout: float = 30, wait_for_workers: bool = True, stimulus_id: str) None[source]

忘记所有任务,并在所有工作节点上调用 restart_workers。

向调度器添加外部插件。
timeout

请参阅 restart_workers

wait_for_workers

请参阅 restart_workers

async restart_workers(workers: list[str] | None = None, *, client: str | None = None, timeout: float = 30, wait_for_workers: bool = True, on_error: Literal['raise', 'return'] = 'raise', stimulus_id: str) dict[str, Literal['OK', 'removed', 'timed out']][source]

重启选定的工作节点。可以选择等待工作节点返回。

没有 nanny 的工作节点会被关闭,期望外部部署系统能重启它们。因此,如果不使用 nanny 且您的部署系统不自动重启工作节点,则 restart 只会关闭所有工作节点,然后超时!

restart 之后,所有连接的工作节点都是新的,无论是否引发 TimeoutError。未能及时关闭的任何工作节点将被移除,并且将来可能或可能不会自行关闭。

向调度器添加外部插件。
workers

要重启的工作节点地址列表。如果省略,则重启所有工作节点。

timeout

如果 wait_for_workers 为 True,则等待工作节点关闭并返回的时长,否则仅等待工作节点关闭的时长。如果超出此时间,则引发 asyncio.TimeoutError

wait_for_workers

是否等待所有工作节点重新连接,或者只等待它们关闭(默认为 True)。结合使用 restart(wait_for_workers=False)Client.wait_for_workers() 可以对等待多少工作节点进行细粒度控制。

on_error

如果为 ‘raise’(默认),则在重启工作节点时任何 nanny 超时则引发异常。如果为 ‘return’,则返回错误消息。

返回
{工作节点地址: “OK”, “no nanny”, “timed out” 或错误消息}

要添加的 SchedulerPlugin 实例

Client.restart
Client.restart_workers
Scheduler.restart
async retire_workers(workers: list[str], *, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None') dict[str, Any]
async retire_workers(*, names: list, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None') dict[str, Any]
async retire_workers(*, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None', memory_ratio: int | float | None = 'None', n: int | None = 'None', key: Callable[[WorkerState], Hashable] | bytes | None = 'None', minimum: int | None = 'None', target: int | None = 'None', attribute: str = "'address'") dict[str, Any]

从集群中优雅地退役工作节点。只在退役工作节点内存中的任何键都会被复制到别处。

向调度器添加外部插件。
workers: list[str] (可选)

要退役的工作节点地址列表。

names: list (可选)

要退役的工作节点名称列表。与 workers 互斥。如果既未提供 workers 也未提供 names,我们将调用 workers_to_close 来找到一个合适的工作节点集。

close_workers: bool (默认为 False)

是否实际从此处显式关闭工作节点。否则,我们期望外部作业调度器来终止工作节点。

remove: bool (默认为 True)

是否立即移除 worker 元数据,还是等待 worker 联系我们后再移除。

如果 close_workers=False 且 remove=False,此方法仅将 worker 内存中的任务刷新出去,然后返回。如果 close_workers=True 且 remove=False,此方法将在 worker 仍留在集群中时返回,尽管它们不再接受新任务。如果 close_workers=False 或由于任何原因 worker 未接受关闭命令,它将永久无法接受新任务,并且预期会通过其他方式关闭。

**kwargs: dict

传递给 workers_to_close 的额外选项,用于确定应删除哪些 worker。仅在省略 workersnames 时接受。

返回
字典,将 worker ID/地址映射到有关该 worker 的信息字典,针对每个已退役 worker。
该 worker 的信息。
如果存在仅在正在退役的 worker 内存中存在的键,并且
无法将它们复制到其他地方(例如,因为没有
其他正在运行的 worker),则持有这些键的 worker 将不会被退役,并且
不会出现在返回的字典中。

要添加的 SchedulerPlugin 实例

Scheduler.workers_to_close
run_function(comm: distributed.comm.core.Comm, function: collections.abc.Callable, args: tuple = (), kwargs: dict | None = None, wait: bool = True) Any[source]

在此进程中运行一个函数

要添加的 SchedulerPlugin 实例

Client.run_on_scheduler
async scatter(data: dict, workers: collections.abc.Iterable | None, client: str, broadcast: bool = False, timeout: float = 2) list[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]][source]

将数据发送到 worker

要添加的 SchedulerPlugin 实例

Scheduler.broadcast
send_all(client_msgs: dict[str, list[dict[str, Any]]], worker_msgs: dict[str, list[dict[str, Any]]]) None[source]

向客户端和 worker 发送消息

send_task_to_worker(worker: str, ts: distributed.scheduler.TaskState) None[source]

向 worker 发送单个计算任务

async start_unsafe() Self[source]

清除旧状态并重启所有正在运行的协程

stimulus_cancel(keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], client: str, force: bool, reason: str, msg: str) None[source]

停止执行键列表上的任务

stimulus_queue_slots_maybe_opened(*, stimulus_id: str) None[source]

响应可能在 worker 线程池中打开了槽位的事件

根据 worker 上可用的任务槽总数(可能为 0),从队列前面选择适当数量的任务,并将其转换为 processing 状态。

注意

与此事件相关的其他转换应事先完全处理,以便任何变得可运行的任务都已处于 processing 状态。否则,如果队列中的任务在下游任务之前被调度,可能会发生过度生产。

必须在 check_idle_saturated 之后调用;即 idle_task_count 必须是最新的。

stimulus_task_erred(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], worker: str, exception: Any, stimulus_id: str, traceback: Any, run_id: str, **kwargs: Any) tuple[dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], dict[str, list[dict[str, typing.Any]]], dict[str, list[dict[str, typing.Any]]]][source]

标记某个任务在特定 worker 上出错

stimulus_task_finished(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], worker: str, stimulus_id: str, run_id: int, **kwargs: Any) tuple[dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'processing', 'memory', 'erred', 'forgotten']], dict[str, list[dict[str, typing.Any]]], dict[str, list[dict[str, typing.Any]]]][source]

标记某个任务在特定 worker 上执行完成

transition(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], finish: Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred','forgotten'], stimulus_id: str, **kwargs: Any) dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']][source]

将键从当前状态转换为完成状态

返回
关于未来转换的建议字典

要添加的 SchedulerPlugin 实例

Scheduler.transitions

此函数的传递版本

示例

>>> self.transition('x', 'waiting')
{'x': 'processing'}
transitions(recommendations: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], stimulus_id: str) None[source]

处理转换直到没有剩余

这包括来自先前转换的反馈,并持续到达到稳定状态

async unregister_nanny_plugin(comm: None, name: str) dict[str, distributed.core.ErrorMessage | distributed.core.OKMessage][source]

取消注册 worker 插件

async unregister_scheduler_plugin(name: str) None[source]

在调度器上取消注册插件。

async unregister_worker_plugin(comm: None, name: str) dict[str, distributed.core.ErrorMessage | distributed.core.OKMessage][source]

取消注册 worker 插件

update_data(*, who_has: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]], nbytes: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], int], client: str | None = None) None[source]

得知新数据已从外部源进入网络

worker_send(worker: str, msg: dict[str, Any]) None[source]

向 worker 发送消息

这也通过添加回调在下一个周期移除 worker 来处理连接失败。

workers_list(workers: collections.abc.Iterable[str] | None) list[str][source]

符合条件的 worker 列表

接受 worker 地址或主机名列表。返回所有匹配的 worker 地址列表。

workers_to_close(memory_ratio: int | float | None = None, n: int | None = None, key: collections.abc.Callable[[distributed.scheduler.WorkerState], collections.abc.Hashable] | bytes | None = None, minimum: int | None = None, target: int | None = None, attribute: str = 'address') list[str][source]

查找可以低成本关闭的 worker

这返回一个适合退役的 worker 列表。这些 worker 没有运行任何任务,并且相对于其对等节点存储的数据相对较少。如果所有 worker 都处于空闲状态,我们仍会保留足够的 worker 以拥有足够的内存来存储我们的数据,并留有充足的缓冲空间。

这适用于诸如 distributed.deploy.adaptive 之类的系统。

向调度器添加外部插件。
memory_ratio数字

希望为存储数据保留的额外空间量。默认为 2,即我们希望拥有当前数据量两倍的内存。

n整数

要关闭的 worker 数量

minimum整数

至少保留的 worker 数量

key可调用对象(WorkerState)

一个可选的可调用对象,用于将 WorkerState 对象映射到组关联。组将一起关闭。这在必须集体关闭 worker 时很有用,例如按主机名关闭。

target整数

关闭后 worker 的目标数量

attribute字符串

要返回的 WorkerState 对象的属性,例如“address”或“name”。默认为“address”。

返回
to_close:可以关闭的 worker 地址列表

要添加的 SchedulerPlugin 实例

Scheduler.retire_workers

示例

>>> scheduler.workers_to_close()
['tcp://192.168.0.1:1234', 'tcp://192.168.0.2:1234']

在关闭之前按主机名对 worker 进行分组

>>> scheduler.workers_to_close(key=lambda ws: ws.host)
['tcp://192.168.0.1:1234', 'tcp://192.168.0.1:4567']

移除两个 worker

>>> scheduler.workers_to_close(n=2)

保留足够的 worker,使其内存是我们所需内存的两倍。

>>> scheduler.workers_to_close(memory_ratio=2)

Worker

class distributed.Worker(scheduler_ip: str | None = None, scheduler_port: int | None = None, *, scheduler_file: str | None = None, nthreads: int | None = None, loop: IOLoop | None = None, local_directory: str | None = None, services: dict | None = None, name: Any | None = None, reconnect: bool | None = None, executor: Executor | dict[str, Executor] | Literal['offload'] | None = None, resources: dict[str, float] | None = None, silence_logs: int | None = None, death_timeout: Any | None = None, preload: list[str] | None = None, preload_argv: list[str] | list[list[str]] | None = None, security: Security | dict[str, Any] | None = None, contact_address: str | None = None, heartbeat_interval: Any = '1s', extensions: dict[str, type] | None = None, metrics: Mapping[str, Callable[[Worker], Any]] = {}, startup_information: Mapping[str, Callable[[Worker], Any]] = {}, interface: str | None = None, host: str | None = None, port: int | str | Collection[int] | None = None, protocol: str | None = None, dashboard_address: str | None = None, dashboard: bool = False, http_prefix: str = '/', nanny: Nanny | None = None, plugins: tuple[WorkerPlugin, ...] = (), low_level_profiler: bool | None = None, validate: bool | None = None, profile_cycle_interval=None, lifetime: Any | None = None, lifetime_stagger: Any | None = None, lifetime_restart: bool | None = None, transition_counter_max: int | Literal[False] = False, memory_limit: str | float = 'auto', data: WorkerDataParameter = None, memory_target_fraction: float | Literal[False] | None = None, memory_spill_fraction: float | Literal[False] | None = None, memory_pause_fraction: float | Literal[False] | None = None, scheduler_sni: str | None = None, WorkerStateClass: type = <class 'distributed.worker_state_machine.WorkerState'>, **kwargs)[source]

Dask 分布式集群中的 Worker 节点

Worker 执行两项功能

  1. 服务数据,来自本地字典

  2. 执行计算,在这些数据以及来自对等节点的数据上执行

Worker 会将它们的数据告知调度器,并在需要执行计算时使用该调度器从其他 worker 收集数据。

您可以使用 dask worker 命令行应用程序启动 worker

$ dask worker scheduler-ip:port

使用 --help 标志查看更多选项

$ dask worker --help

此文档字符串的其余部分是关于 worker 用于管理和跟踪内部计算的内部状态。

状态

信息状态

这些属性在执行过程中不会发生显著变化。

  • nthreads: int

    此 worker 进程使用的 nthreads 数量

  • executors: dict[str, concurrent.futures.Executor]

    用于执行计算的执行器。始终包含默认执行器。

  • local_directory: path

    本地机器上存储临时文件的路径

  • scheduler: PooledRPCCall

    调度器的位置。参见 .ip/.port 属性。

  • name: string

    别名

  • services: {str: Server}

    在此 worker 上运行的辅助 Web 服务器

  • service_ports: {str: port}

  • transfer_outgoing_count_limit: int

    并发传出数据传输的最大数量。另请参见 distributed.worker_state_machine.WorkerState.transfer_incoming_count_limit

  • batched_stream: BatchedSend

    用于与调度器通信的批处理流

  • log: [(message)]

    结构化且可查询的日志。参见 Worker.story

易变状态

这些属性跟踪此 worker 正在尝试完成的任务的进度。在下面的描述中,key 是我们想要计算的任务名称,而 dep 是我们想要从其他 worker 收集的依赖数据块的名称。

  • threads: {key: int}

    任务运行所在的线程 ID

  • active_threads: {int: key}

    当前在活动线程上运行的键

  • state: WorkerState

    封装的状态机。参见 BaseWorkerWorkerState

向调度器添加外部插件。
scheduler_ip: str, 可选
scheduler_port: int, 可选
scheduler_file: str, 可选
host: str, 可选
data: MutableMapping, type, None

用于存储的对象,默认为构建一个磁盘支持的 LRU 字典。

如果提供了用于构建存储对象的可调用对象,并且调用签名中包含名为 worker_local_directory 的参数,则该对象将接收 worker 的属性:local_directory 作为参数。

nthreads: int, 可选
local_directory: str, 可选

放置本地资源的目录

name: str, 可选
memory_limit: int, float, string

此 worker 应使用的内存字节数。设置为零表示无限制。设置为“auto”则计算为 system.MEMORY_LIMIT * min(1, nthreads / total_cores)。使用字符串或数字,例如 5GB 或 5e9。

memory_target_fraction: float 或 False

尝试保持低于此内存比例(默认:从配置键 distributed.worker.memory.target 读取)

memory_spill_fraction: float 或 False

开始溢出到磁盘时的内存比例(默认:从配置键 distributed.worker.memory.spill 读取)

memory_pause_fraction: float 或 False

停止运行新任务时的内存比例(默认:从配置键 distributed.worker.memory.pause 读取)

max_spill: int, string 或 False

溢出到磁盘的字节数限制。(默认:从配置键 distributed.worker.memory.max-spill 读取)

executor: concurrent.futures.Executor, dict[str, concurrent.futures.Executor], “offload”
要使用的执行器。根据类型不同,其含义如下:
  • Executor 实例:默认执行器。

  • Dict[str, Executor]:将名称映射到 Executor 实例的字典。如果字典中没有“default”键,则将使用 ThreadPoolExecutor(nthreads) 创建一个“default”执行器。

  • 字符串:字符串“offload”,指用于卸载通信的同一线程池。这导致同一线程用于反序列化和计算。

resources: dict

此 worker 拥有的资源,例如 {'GPU': 2}

nanny: str

如果存在,则联系 nanny 的地址

lifetime: str

优雅关闭 worker 的时间量,例如“1 hour”。默认为 None,表示没有明确的关闭时间。

lifetime_stagger: str

用于错开生命周期值的时间量,例如“5 minutes”。实际生命周期将在 lifetime +/- lifetime_stagger 之间均匀随机选择。

lifetime_restart: bool

worker 达到其生命周期后是否重启。默认为 False。

kwargs: 可选

ServerNode 构造函数的附加参数

要添加的 SchedulerPlugin 实例

distributed.scheduler.Scheduler
distributed.nanny.Nanny

示例

使用命令行启动 worker

$ dask scheduler
Start scheduler at 127.0.0.1:8786

$ dask worker 127.0.0.1:8786
Start worker at:               127.0.0.1:1234
Registered with scheduler at:  127.0.0.1:8786
batched_send(msg: dict[str, Any]) None[source]

实现 BaseWorker 抽象方法。

通过批量通信向调度器发送即发即弃的消息。

如果当前未连接到调度器,消息将被静默丢弃!

要添加的 SchedulerPlugin 实例

distributed.worker_state_machine.BaseWorker.batched_send
async close(timeout: float = 30, executor_wait: bool = True, nanny: bool = True, reason: str = 'worker-close') str | None[source]

关闭 worker

关闭 worker 上运行的异步操作,停止所有执行器和通信。如果请求,这也关闭 nanny。

向调度器添加外部插件。
timeout

关闭单个指令的超时时间(秒)

executor_wait

如果为 True,则同步关闭执行器,否则异步关闭

nanny

如果为 True,则关闭 nanny

reason

关闭 worker 的原因

返回
str | None

如果 worker 已处于关闭状态或失败,则为 None,否则为“OK”

async close_gracefully(restart=None, reason: str = 'worker-close-gracefully')[source]

优雅地关闭 worker

这首先告知调度器我们正在关闭,并请求其将我们的数据移动到其他地方。之后,我们照常关闭。

property data: collections.abc.MutableMapping[Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], object]

所有已完成任务的 {任务键: 任务载荷},无论它们是在此 Worker 上计算的,还是在其他地方计算然后通过网络传输到这里的。

使用默认配置时,这是一个 zict 缓冲区,当超出目标阈值时会自动溢出到磁盘。如果禁用溢出,它就是一个普通的 dict。它也可以是初始化 Worker 或 Nanny 时传递的用户定义的任意类字典对象。Worker 逻辑应将其视为不透明对象,并遵循 MutableMapping API。

注意

这个集合也可以通过 self.state.dataself.memory_manager.data 访问。

digest_metric(name: collections.abc.Hashable, value: float) None[source]

通过调用 Server.digest_metric 实现 BaseWorker.digest_metric

async execute(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent[source]

执行一个任务。实现 BaseWorker 抽象方法。

要添加的 SchedulerPlugin 实例

distributed.worker_state_machine.BaseWorker.execute
async gather(who_has: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]]) dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], object][source]

Scheduler.rebalance() 和 Scheduler.replicate() 使用的端点

async gather_dep(worker: str, to_gather: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], total_nbytes: int, *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent[source]

实现 BaseWorker 抽象方法

要添加的 SchedulerPlugin 实例

distributed.worker_state_machine.BaseWorker.gather_dep
get_current_task() Union[str, int, float, tuple[ForwardRef('Key'), ...]][source]

获取当前正在运行的任务的键

这仅在任务内部运行时有意义

要添加的 SchedulerPlugin 实例

get_worker

示例

>>> from dask.distributed import get_worker
>>> def f():
...     return get_worker().get_current_task()
>>> future = client.submit(f)  
>>> future.result()  
'f-1234'
handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) None[source]

覆盖 BaseWorker 方法以进行额外验证

log_event(topic: str | collections.abc.Collection[str], msg: Any) None[source]

在给定主题下记录事件

向调度器添加外部插件。
topicstr, list[str]

记录事件的主题名称。要在多个主题下记录同一事件,请传递主题名称列表。

msg

要记录的事件消息。注意,这必须是 msgpack 可序列化的。

要添加的 SchedulerPlugin 实例

Client.log_event
async retry_busy_worker_later(worker: str) distributed.worker_state_machine.StateMachineEvent[source]

等待一段时间,然后将一个对等 worker 从忙碌状态中取出。实现 BaseWorker 抽象方法。

async start_unsafe()[source]

尝试启动服务器。这不是幂等的,也不受并发启动尝试的保护。

此方法旨在由子类覆盖或调用。为了安全启动,请使用 Server.start

如果配置了 death_timeout,我们将要求此协程在此超时到达之前完成。如果达到超时,我们将关闭实例并引发 asyncio.TimeoutError

transfer_outgoing_bytes: int

当前向其他 worker 进行的开放数据传输的总大小

transfer_outgoing_bytes_total: int

向其他 worker 进行的数据传输总大小(包括进行中和失败的传输)

transfer_outgoing_count: int

当前向其他 worker 进行的开放数据传输数量

transfer_outgoing_count_total: int

从工作节点启动以来向其他工作节点传输数据的总次数

trigger_profile() None[source]

从所有活跃的计算线程获取一个帧

将这些帧合并到现有的性能分析计数中

property worker_address

用于与 Nanny 的 API 兼容性

Nanny

class distributed.Nanny(scheduler_ip=None, scheduler_port=None, scheduler_file=None, worker_port: int | str | collections.abc.Collection[int] | None = 0, nthreads=None, loop=None, local_directory=None, services=None, name=None, memory_limit='auto', reconnect=True, validate=False, quiet=False, resources=None, silence_logs=None, death_timeout=None, preload=None, preload_argv=None, preload_nanny=None, preload_nanny_argv=None, security=None, contact_address=None, listen_address=None, worker_class=None, env=None, interface=None, host=None, port: int | str | collections.abc.Collection[int] | None = None, protocol=None, config=None, **worker_kwargs)[source]

用于管理 worker 进程的进程

Nanny 启动 Worker 进程,监视它们,并在必要时杀死或重启它们。如果你想使用 Client.restart 方法,或者在工作节点达到其内存限制的终止阈值时自动重启工作节点,则 Nanny 是必要的。

Nanny 的参数大部分与 Worker 的参数相同,但有一些例外如下所列。

向调度器添加外部插件。
env: dict, optional

在 Nanny 初始化时设置的环境变量也将确保在 Worker 进程中设置。此参数允许覆盖或以其他方式为 Worker 设置环境变量。也可以使用选项 distributed.nanny.environ 设置环境变量。优先顺序如下:

  1. Nanny 参数

  2. 现有环境变量

  3. Dask 配置

注意

某些环境变量,例如 OMP_NUM_THREADS,必须在导入 numpy 之前设置才能生效。其他变量,例如 MALLOC_TRIM_THRESHOLD_(参阅 内存未释放回操作系统),必须在启动 Linux 进程之前设置。如果在 Nanny 参数或 distributed.nanny.environ 中设置此类变量将无效;它们必须在 distributed.nanny.pre-spawn-environ 中设置,以便在派生子进程之前设置它们,即使这意味着“污染”运行 Nanny 的进程。

出于同样的原因,请注意将 distributed.worker.multiprocessing-methodspawn 更改为 forkforkserver 可能会抑制某些环境变量;如果这样做,您应该在启动 dask-worker 之前在 shell 中自行设置这些变量。

要添加的 SchedulerPlugin 实例

Worker
async close(timeout: float = 5, reason: str = 'nanny-close') Literal['OK'][source]

关闭 worker 进程,停止所有通信。

close_gracefully(reason: str = 'nanny-close-gracefully') None[source]

一个信号,表示如果 worker 消失,我们不应尝试重启它们。

这用于集群关闭过程的一部分。

async instantiate() distributed.core.Status[source]

启动本地 worker 进程

阻塞直到进程启动并且调度器被正确通知。

async kill(timeout: float = 5, reason: str = 'nanny-kill') None[source]

杀死本地 worker 进程

阻塞直到进程关闭并且调度器被正确通知。

log_event(topic, msg)[source]

在给定主题下记录事件

向调度器添加外部插件。
topicstr, list[str]

记录事件的主题名称。要在多个主题下记录同一事件,请传递主题名称列表。

msg

要记录的事件消息。注意,这必须是 msgpack 可序列化的。

要添加的 SchedulerPlugin 实例

Client.log_event
async start_unsafe()[source]

启动 nanny,启动本地进程,开始监视