Python API(高级)

在一些罕见的情况下,专家可能希望在 Python 中显式创建 SchedulerWorkerNanny 对象。在创建用于在自定义设置中自动部署 Dask 的工具时,这通常是必需的。

更常见的方法是在单机上使用 Client() 创建本地集群 或使用 命令行界面 (CLI)。建议新读者从这些地方开始学习。

如果您确实想自己启动 Scheduler 和 Worker 对象,您应该对 async/await 风格的 Python 语法有所了解。这些对象是可等待的,并且通常在 async with 上下文管理器中使用。这里有一些示例,展示了几种启动和结束的方法。

完整示例

Scheduler([loop, services, service_kwargs, ...])

动态分布式任务调度器

Worker(scheduler_ip, scheduler_port, *, ...)

Dask 分布式集群中的工作节点

Client([address, loop, timeout, ...])

连接到 Dask 集群并提交计算

我们首先从一个全面的示例开始,在同一个事件循环中设置一个 Scheduler、两个 Worker 和一个 Client,运行一个简单的计算,然后清理所有内容。

import asyncio
from dask.distributed import Scheduler, Worker, Client

async def f():
    async with Scheduler() as s:
        async with Worker(s.address) as w1, Worker(s.address) as w2:
            async with Client(s.address, asynchronous=True) as client:
                future = client.submit(lambda x: x + 1, 10)
                result = await future
                print(result)

asyncio.get_event_loop().run_until_complete(f())

现在我们来看一些更简单的示例,它们是构建前面示例的基础。

调度器

Scheduler([loop, services, service_kwargs, ...])

动态分布式任务调度器

我们通过创建一个 Scheduler() 对象来创建调度器,然后 await 该对象以等待其启动。然后我们可以等待 .finished 方法直到它关闭。在此期间,调度器将一直活动并管理集群。

import asyncio
from dask.distributed import Scheduler, Worker

async def f():
    s = Scheduler()        # scheduler created, but not yet running
    s = await s            # the scheduler is running
    await s.finished()     # wait until the scheduler closes

asyncio.get_event_loop().run_until_complete(f())

该程序将永远运行,或直到某个外部进程连接到调度器并指示其停止。如果您想自己关闭,可以通过 await .close 方法来关闭任何 SchedulerWorkerNannyClient 类。

await s.close()

工作节点

Worker(scheduler_ip, scheduler_port, *, ...)

Dask 分布式集群中的工作节点

工作节点遵循相同的 API。唯一的区别是工作节点需要知道调度器的地址。

import asyncio
from dask.distributed import Scheduler, Worker

async def f(scheduler_address):
    w = await Worker(scheduler_address)
    await w.finished()

asyncio.get_event_loop().run_until_complete(f("tcp://127.0.0.1:8786"))

在同一个事件循环中启动多个

Scheduler([loop, services, service_kwargs, ...])

动态分布式任务调度器

Worker(scheduler_ip, scheduler_port, *, ...)

Dask 分布式集群中的工作节点

我们可以在同一个事件循环中运行任意数量的这些对象。

import asyncio
from dask.distributed import Scheduler, Worker

async def f():
    s = await Scheduler()
    w = await Worker(s.address)
    await w.finished()
    await s.finished()

asyncio.get_event_loop().run_until_complete(f())

使用上下文管理器

我们还可以使用 async with 上下文管理器来确保我们正确地进行清理。下面是与上面相同的示例

import asyncio
from dask.distributed import Scheduler, Worker

async def f():
    async with Scheduler() as s:
        async with Worker(s.address) as w:
            await w.finished()
            await s.finished()

asyncio.get_event_loop().run_until_complete(f())

或者,在下面的示例中,我们还包含一个 Client,运行一个小型计算,然后允许在计算完成后进行清理。

import asyncio
from dask.distributed import Scheduler, Worker, Client

async def f():
    async with Scheduler() as s:
        async with Worker(s.address) as w1, Worker(s.address) as w2:
            async with Client(s.address, asynchronous=True) as client:
                future = client.submit(lambda x: x + 1, 10)
                result = await future
                print(result)

asyncio.get_event_loop().run_until_complete(f())

这相当于创建并 await 每个服务器,然后在离开上下文时调用每个服务器的 .close 方法。在这个示例中,我们没有等待 s.finished(),因此它会相对快速地终止。但如果您希望它永远运行,您可以调用 await s.finished()

Nanny

Nanny([scheduler_ip, scheduler_port, ...])

一个管理工作进程的进程

或者,如果我们希望工作节点在单独的进程中进行管理,我们可以用 Nanny 替换 WorkerNanny 构造函数遵循相同的 API。这允许工作节点在失败时自行重启。此外,它提供了一些额外的监控,并且在协调多个应该存在于不同进程中的工作节点以避免 GIL 时很有用。

# w = await Worker(s.address)
w = await Nanny(s.address)

API

这些类有各种关键字参数,您可以使用它们来控制其行为。有关更多信息,请参阅下面的 API 文档。

调度器

class distributed.Scheduler(loop: tornado.ioloop.IOLoop | None = None, services: dict | None = None, service_kwargs: dict | None = None, allowed_failures: int | None = None, extensions: dict | None = None, validate: bool | None = None, scheduler_file: str | None = None, security: dict | distributed.security.Security | None = None, worker_ttl: float | None = None, idle_timeout: float | None = None, interface: str | None = None, host: str | None = None, port: int = 0, protocol: str | None = None, dashboard_address: str | None = None, dashboard: bool | None = None, http_prefix: str | None = '/', preload: str | collections.abc.Sequence[str] | None = None, preload_argv: str | collections.abc.Sequence[str] | collections.abc.Sequence[collections.abc.Sequence[str]] = (), plugins: collections.abc.Sequence[distributed.diagnostics.plugin.SchedulerPlugin] = (), contact_address: str | None = None, transition_counter_max: bool | int = False, jupyter: bool = False, **kwargs: Any)[source]

动态分布式任务调度器

调度器跟踪工作节点、数据和计算的当前状态。调度器监听事件并通过适当控制工作节点来响应。它不断尝试使用工作节点来执行不断增长的 Dask 图。

所有事件都被快速处理,处理时间与其输入(通常是常量大小)呈线性关系,并且通常在毫秒内完成。为了实现这一点,调度器跟踪了大量状态。每个操作都维护着此状态的一致性。

调度器通过 Comm 对象与外部世界通信。即使同时监听多个客户端,它也能维护一个一致且有效的全局视图。

通常通过 dask scheduler 可执行文件来启动调度器

$ dask scheduler
Scheduler started at 127.0.0.1:8786

或者在 LocalCluster 中,Client 在没有连接信息的情况下启动

>>> c = Client()  
>>> c.cluster.scheduler  
Scheduler(...)

用户通常不直接与调度器交互,而是与客户端对象 Client 交互。

contact_address 参数允许向工作节点发布一个特定地址用于与调度器通信,此地址与调度器绑定的地址不同。当调度器监听私有地址,而该地址不能被工作节点用来联系它时,这很有用。

状态

调度器包含以下状态变量。每个变量及其存储内容和简要描述如下。

  • 任务 (tasks): {task key: TaskState}

    调度器当前知道的任务

  • 不可运行任务 (unrunnable): {TaskState}

    处于“无工作节点”状态的任务

  • 工作节点 (workers): {worker key: WorkerState}

    当前连接到调度器的工作节点

  • 空闲工作节点 (idle): {WorkerState}

    未完全利用的工作节点集合

  • 饱和工作节点 (saturated): {WorkerState}

    未超负荷的工作节点集合

  • 主机信息 (host_info): {hostname: dict}

    每个工作节点主机的信息

  • 客户端 (clients): {client key: ClientState}

    当前连接到调度器的客户端

  • 服务 (services): {str: port}

    在此调度器上运行的其他服务,例如 Bokeh

  • 事件循环 (loop): IOLoop

    正在运行的 Tornado IOLoop

  • 客户端通信 (client_comms): {client key: Comm}

    对于每个客户端,一个用于接收任务请求和报告任务状态更新的 Comm 对象。

  • 流通信 (stream_comms): {worker key: Comm}

    对于每个工作节点,一个用于接受刺激和报告结果的 Comm 对象

  • 任务持续时间 (task_duration): {key-prefix: time}

    我们预期某些函数将花费的时间,例如 {'sum': 0.25}

adaptive_target(target_duration: float | None = None) int[source]

基于当前工作负载所需的工人数量

这会查看当前正在运行的任务和内存使用情况,并返回所需的工人数量。自适应调度通常会使用此信息。

参数
目标持续时间 (target_duration)str

计算所需的期望持续时间。这会影响调度器请求伸缩的速度。

async add_client(comm: distributed.comm.core.Comm, client: str, versions: dict[str, Any]) None[source]

将客户端添加到网络

我们监听来自此 Comm 的所有未来消息。

add_keys(worker: str, keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]] = (), stimulus_id: str | None = None) Literal['OK', 'not found'][source]

得知某个工作节点拥有某些键

这在实践中不应使用,主要是出于遗留原因而存在。但它会不时地由工作节点发送。

add_plugin(plugin: distributed.diagnostics.plugin.SchedulerPlugin, *, idempotent: bool = False, name: str | None = None, **kwargs: Any) None[source]

将外部插件添加到调度器。

请参阅 https://distributed.dask.org.cn/en/latest/plugins.html

参数
插件 (plugin)SchedulerPlugin

要添加的 SchedulerPlugin 实例

幂等 (idempotent)bool

如果为 true,则假定插件已存在,不执行任何操作。

名称 (name)str

插件的名称,如果为 None,则检查 Plugin 实例上的 name 属性,如果未发现则生成。

async add_worker(comm: distributed.comm.core.Comm, *, address: str, status: str, server_id: str, nthreads: int, name: str, resolve_address: bool = True, now: float, resources: dict[str, float], host_info: None = None, memory_limit: int | None, metrics: dict[str, Any], pid: int = 0, services: dict[str, int], local_directory: str, versions: dict[str, Any], nanny: str, extra: dict, stimulus_id: str) None[source]

向集群添加新的工作节点

async benchmark_hardware() dict[str, dict[str, float]][source]

在工作节点上运行内存、磁盘和网络带宽基准测试

返回
结果 (result): dict

一个字典,将名称“disk”、“memory”和“network”映射到将大小映射到带宽的字典。这些带宽是在跨集群运行计算的许多工作节点上平均得出的。

async broadcast(*, msg: dict, workers: collections.abc.Collection[str] | None = None, hosts: collections.abc.Collection[str] | None = None, nanny: bool = False, serializers: Any = None, on_error: Literal['raise', 'return', 'return_pickle','ignore'] = 'raise') dict[str, Any][source]

向工作节点广播消息,返回所有结果

client_heartbeat(client: str) None[source]

处理来自客户端的心跳

client_releases_keys(keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], client: str, stimulus_id: str | None = None) None[source]

从客户端期望列表中移除键

client_send(client: str, msg: dict) None[source]

向客户端发送消息

async close(timeout: float | None = None, reason: str = 'unknown') None[source]

向所有协程发送清理信号,然后等待完成

另请参阅

Scheduler.cleanup
close_worker(worker: str) None[source]

要求工作节点自行关闭。不等待其生效。请注意,无法保证工作节点一定会接受该命令。

请注意,如果 close=True,则 remove_worker() 在内部发送相同的命令。

coerce_address(addr: str | tuple, resolve: bool = True) str[source]

将可能的输入地址强制转换为规范形式。resolve 可以禁用以使用虚假主机名进行测试。

处理字符串、元组或别名。

async delete_worker_data(worker_address: str, keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], stimulus_id: str) None[source]

从工作节点删除数据并更新相应的工作节点/任务状态

参数
工作节点地址 (worker_address): str

要从中删除键的工作节点地址

键 (keys): list[Key]

要在指定工作节点上删除的键列表

async dump_cluster_state_to_url(url: str, exclude: collections.abc.Collection[str], format: Literal['msgpack', 'yaml'], **storage_options: dict[str, Any]) None[source]

将集群状态转储写入 fsspec 兼容的 URL。

async feed(comm: distributed.comm.core.Comm, function: bytes | None = None, setup: bytes | None = None, teardown: bytes | None = None, interval: str | float = '1s', **kwargs: Any) None[source]

向外部请求者提供一个数据 Comm

注意:这会在调度器上运行任意 Python 代码。此功能最终应被淘汰。它主要用于诊断目的。

async gather(keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], serializers: list[str] | None = None) dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], object][source]

从工作节点收集数据到调度器

async gather_on_worker(worker_address: str, who_has: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]]) set[source]

将键从多个工作节点点对点复制到单个工作节点

参数
工作节点地址 (worker_address): str

接收键的工作节点地址。

who_has: dict[键, list[str]]

{键: [发送者地址, 发送者地址, …], 键: …}

返回
返回

未能复制的键的集合。

async get_cluster_state(exclude: collections.abc.Collection[str]) dict[source]

生成用于集群状态转储的状态字典。

async get_story(keys_or_stimuli: collections.abc.Iterable[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]]) list[distributed.scheduler.Transition][source]

SchedulerState.story() 的 RPC 钩子。

注意,msgpack 的序列化/反序列化往返过程会将 Transition 命名元组转换为普通元组。

get_worker_service_addr(worker: str, service_name: str, protocol: bool = False) tuple[str, int] | str | None[source]

获取工作节点上指定服务的 (主机, 端口) 地址。如果服务不存在,则返回 None。

参数
worker地址
service_namestr

常见的服务包括 'bokeh' 和 'nanny'。

protocol布尔值

是否包含带有协议的完整地址 (True) 或仅包含 (主机, 端口) 对。

handle_long_running(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], worker: str, run_id: int, compute_duration: float | None, stimulus_id: str) None[source]

任务已从线程池中分离。

我们阻止任务将来被窃取,并更改任务持续时间统计,就像任务已停止一样。

handle_request_refresh_who_has(keys: collections.abc.Iterable[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], worker: str, stimulus_id: str) None[source]

从工作节点发出的请求,用于刷新某些键的 who_has 信息。不要与 scheduler.who_has 混淆,后者是客户端发出的专用通信 RPC 请求。

async handle_worker(comm: distributed.comm.core.Comm, worker: str) None[source]

监听单个工作节点的响应。

这是调度器与工作节点交互的主循环。

另请参阅

Scheduler.handle_client

客户端的等效协程。

identity(n_workers: int = - 1) dict[str, Any][source]

关于自身和集群的基本信息。

log_event(topic: str | collections.abc.Collection[str], msg: Any) None[source]

在给定主题下记录一个事件。

参数
topicstr, list[str]

记录事件所在主题的名称。要在多个主题下记录同一事件,请传入主题名称列表。

msg

要记录的事件消息。注意,这必须是 msgpack 可序列化的。

另请参阅

Client.log_event
async proxy(msg: dict, worker: str, serializers: Any = None) Any[source]

通过调度器将通信代理到其他工作节点。

async rebalance(keys: collections.abc.Iterable[Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]] | None = None, workers: collections.abc.Iterable[str] | None = None, stimulus_id: str | None = None) dict

重新平衡键,以便每个工作节点最终拥有大致相同的进程内存(托管内存 + 非托管内存)。

警告

此操作通常未针对调度器的正常操作进行充分测试。不建议在等待计算时使用此操作。

算法

  1. 计算集群的平均占用率,定义为 dask 管理的数据 + 存在至少 30 秒的非托管进程内存 (distributed.worker.memory.recent-to-old-time)。这使我们可以忽略由任务堆使用引起的临时峰值。

    另外,您可以通过 distributed.worker.memory.rebalance.measure 更改单个工作节点和计算平均值时的内存测量方式。具体来说,这对于忽略不准确的操作系统内存测量值很有用。

  2. 丢弃其占用率在集群平均占用率的 5% 以内的工作节点 (distributed.worker.memory.rebalance.sender-recipient-gap / 2)。这有助于避免数据在集群中反复弹跳。

  3. 高于平均值的工作节点是发送者;低于平均值的是接收者。

  4. 丢弃绝对占用率低于 30% 的发送者 (distributed.worker.memory.rebalance.sender-min)。换句话说,只要所有工作节点的占用率都低于 30%,无论是否失衡,都不会移动数据。

  5. 丢弃绝对占用率高于 60% 的接收者 (distributed.worker.memory.rebalance.recipient-max)。请注意,此阈值默认与 distributed.worker.memory.target 相同,以防止工作节点接收数据后立即将其溢出到磁盘。

  6. 迭代地选择离平均值最远的发送者和接收者,并在两者之间移动最近最少插入的键,直到所有发送者或所有接收者的占用率都在平均值的 5% 以内。

    如果接收者已经拥有数据的副本,则会跳过该接收者。换句话说,此方法不会降低复制级别。如果没有可用的接收者具有足够的内存来接受该键并且尚未持有副本,则会跳过该键。

最近最少插入 (LRI) 策略是一种贪婪选择,其优点是时间复杂度为 O(1),易于实现(它依赖于 Python 字典的插入排序),并且在大多数情况下可能足够好。被放弃的替代策略有:

  • 最大优先。除了非平凡的额外数据结构外,其时间复杂度为 O(n*log(n)),并且存在导致最大的数据块像弹珠一样在集群中反复移动的风险。

  • 最近最少使用 (LRU)。此信息目前仅在工作节点上可用,在调度器上复制并不容易;通过网络传输此信息将非常昂贵。另外请注意,dask 会竭尽全力最大限度地减少中间键在内存中占用的时间,因此在这种情况下,LRI 是 LRU 的一个良好近似。

参数
keys: 可选

允许移动的 dask 键列表。所有其他键将被忽略。请注意,这不保证键实际会被移动(例如,因为不需要移动,或者没有可行的接收工作节点)。

workers: 可选

允许作为发送者或接收者考虑的工作节点地址列表。所有其他工作节点将被忽略。集群平均占用率仅使用允许的工作节点计算。

async register_nanny_plugin(comm: None, plugin: bytes, name: str, idempotent: bool | None = None) dict[str, distributed.core.OKMessage][source]

在所有运行中和将来的 nanny 上注册一个 nanny 插件。

async register_scheduler_plugin(plugin: bytes | distributed.diagnostics.plugin.SchedulerPlugin, name: str | None = None, idempotent: bool | None = None) None

在调度器上注册一个插件。

async register_worker_plugin(comm: None, plugin: bytes, name: str, idempotent: bool | None = None) dict[str, distributed.core.OKMessage]

在所有运行中和将来的工作节点上注册一个工作节点插件。

remove_client(client: str, stimulus_id: str | None = None) None

从网络中移除客户端。

remove_plugin(name: str | None = None) None

从调度器中移除外部插件。

参数
名称 (name)str

要移除的插件的名称。

remove_worker(address: str, *, stimulus_id: str, expected: bool = False, close: bool = True) Literal['OK', 'already-removed']

从集群中移除工作节点。

当工作节点报告计划离开或似乎无响应时,我们会执行此操作。这可能会将其任务送回释放状态。

async replicate(keys: list[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], n: int | None = None, workers: collections.abc.Iterable | None = None, branching_factor: int = 2, delete: bool = True, stimulus_id: str | None = None) dict | None

在整个集群中复制数据。

这会在网络中对每块数据单独执行树形复制。

参数
keys: Iterable

要复制的键列表。

n: int

我们期望在集群中看到的复制份数。

branching_factor: int, 可选

每代可以复制数据的工作节点数量。分支因子越大,单步复制的数据越多,但给定工作节点被数据请求淹没的风险也越高。

另请参阅

Scheduler.rebalance
report(msg: dict, ts: distributed.scheduler.TaskState | None = None, client: str | None = None) None

将更新发布到所有监听中的队列和通信通道。

如果消息包含一个键,则我们只将消息发送给关心该键的通信通道。

request_acquire_replicas(addr: str, keys: collections.abc.Iterable[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], *, stimulus_id: str) None

异步请求工作节点从其他工作节点获取列出的键的副本。这是一项“即发即忘”的操作,不提供成功或失败的反馈,旨在用于内部维护,而非计算。

request_remove_replicas(addr: str, keys: list[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], *, stimulus_id: str) None

异步请求工作节点丢弃列出的键的副本。这绝不能用于销毁键的最后一个副本。这是一项“即发即忘”的操作,旨在用于内部维护,而非计算。

该副本会立即从调度器端的 TaskState.who_has 中消失;如果工作节点拒绝删除(例如,因为该任务是其上运行的另一个任务的依赖项),它将(同样异步地)通知调度器将其重新添加到 who_has 中。如果工作节点同意丢弃任务,则没有反馈。

async restart(*, client: str | None = None, timeout: float = 30, wait_for_workers: bool = True, stimulus_id: str) None

忘记所有任务并在所有工作节点上调用 restart_workers。

参数
timeout

参见 restart_workers

wait_for_workers

参见 restart_workers

async restart_workers(workers: list[str] | None = None, *, client: str | None = None, timeout: float = 30, wait_for_workers: bool = True, on_error: Literal['raise', 'return'] = 'raise', stimulus_id: str) dict[str, Literal['OK', 'removed','timed out']]

重启选定的工作节点。可选择等待工作节点返回。

没有 nanny 的工作节点将被关闭,希望外部部署系统会重启它们。因此,如果不使用 nanny 且您的部署系统不自动重启工作节点,restart 将只会关闭所有工作节点,然后超时!

restart 后,所有连接的工作节点都是新的,无论是否抛出了 TimeoutError。未能及时关闭的任何工作节点都将被移除,并且将来可能会或可能不会自行关闭。

参数
workers

要重启的工作节点地址列表。如果省略,则重启所有工作节点。

timeout

如果 wait_for_workers 为 True,则表示等待工作节点关闭并重新上线的时间,否则仅表示等待工作节点关闭的时间。如果超过此时间,将抛出 asyncio.TimeoutError

wait_for_workers

是否等待所有工作节点重新连接,或者仅等待它们关闭(默认为 True)。结合 restart(wait_for_workers=False)Client.wait_for_workers() 使用,可对要等待的工作节点数量进行精细控制。

on_error

如果为 'raise'(默认),则在重启工作节点时任何 nanny 超时都会抛出异常。如果为 'return',则返回错误消息。

返回
{工作节点地址:“正常”,“无 nanny”,或“超时”,或错误消息}
async retire_workers(workers: list[str], *, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None') dict[str, Any]
async retire_workers(*, names: list, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None') dict[str, Any]
async retire_workers(*, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None', memory_ratio: int | float | None = 'None', n: int | None = 'None', key: Callable[[WorkerState], Hashable] | bytes | None = 'None', minimum: int | None = 'None', target: int | None = 'None', attribute: str = "'address'") dict[str, Any]

优雅地将工作节点从集群中退役。任何仅在退役工作节点内存中的键都会被复制到其他地方。

参数
workers: list[str] (可选)

要退役的工作节点地址列表。

names: list (可选)

要退役的工作节点名称列表。与 workers 互斥。如果既未提供 workers 也未提供 names,我们将调用 workers_to_close 来查找一组合适的工作节点。

close_workers: bool (默认为 False)

是否在此处明确关闭工作节点。否则,我们期望外部作业调度器来结束工作节点。

remove: bool (默认为 True)

是立即移除工作节点元数据,还是等待工作节点联系我们。

如果 close_workers=False 且 remove=False,此方法仅将内存中的任务从工作节点刷新出来然后返回。如果 close_workers=True 且 remove=False,此方法将在工作节点仍在集群中时返回,尽管它们不再接受新任务。如果 close_workers=False 或由于任何原因工作节点未接受关闭命令,它将永久无法接受新任务,并期望以其他方式关闭。

**kwargs: dict

传递给 workers_to_close 的额外选项,用于确定应丢弃哪些工作节点。仅在省略 workersnames 时接受。

返回
字典,将工作节点 ID/地址映射到包含有关
该工作节点的信息的字典,对于每个退役的工作节点。
如果存在仅在正在退役的工作节点内存中的键,并且
无法将它们复制到其他地方(例如,因为没有
其他运行中的工作节点),则持有这些键的工作节点不会被退役,并且
不会出现在返回的字典中。
run_function(comm: distributed.comm.core.Comm, function: collections.abc.Callable, args: tuple = (), kwargs: dict | None = None, wait: bool = True) Any

在此进程内运行一个函数。

async scatter(data: dict, workers: collections.abc.Iterable | None, client: str, broadcast: bool = False, timeout: float = 2) list[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]]

将数据发送到工作节点。

另请参阅

Scheduler.broadcast
send_all(client_msgs: dict[str, list[dict[str, Any]]], worker_msgs: dict[str, list[dict[str, Any]]]) None[source]

向客户端和工作节点发送消息

send_task_to_worker(worker: str, ts: distributed.scheduler.TaskState) None[source]

向工作节点发送单个计算任务

async start_unsafe() Self[source]

清除旧状态并重启所有正在运行的协程

stimulus_cancel(keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], client: str, force: bool, reason: str, msg: str) None[source]

停止执行列表中的键对应的任务

stimulus_queue_slots_maybe_opened(*, stimulus_id: str) None[source]

响应可能已在工作节点线程池上打开空位的事件

根据工作节点上可用的任务槽总数(可能为 0),从队列头部选择适当数量的任务,并将它们转换到 processing 状态。

注意

与此刺激相关的其他转换应在此之前完全处理完毕,以便任何变为可运行的任务都已处于 processing 状态。否则,如果排队任务在下游任务之前被调度,可能会发生过度生产。

必须在 check_idle_saturated 之后调用;即 idle_task_count 必须是最新的。

stimulus_task_erred(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], worker: str, exception: Any, stimulus_id: str, traceback: Any, run_id: str, **kwargs: Any) tuple[dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], dict[str, list[dict[str, typing.Any]]], dict[str, list[dict[str, typing.Any]]]][source]

标记特定工作节点上的任务出错

stimulus_task_finished(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], worker: str, stimulus_id: str, run_id: int, **kwargs: Any) tuple[dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], dict[str, list[dict[str, typing.Any]]], dict[str, list[dict[str, typing.Any]]]][source]

标记特定工作节点上的任务已完成执行

transition(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], finish: Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten'], stimulus_id: str, **kwargs: Any) dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']][source]

将键从当前状态转换为完成状态

返回
后续转换建议字典

另请参阅

Scheduler.transitions

此函数的传递版本

示例

>>> self.transition('x', 'waiting')
{'x': 'processing'}
transitions(recommendations: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], stimulus_id: str) None[source]

处理转换直到没有剩余

这包括来自先前转换的反馈,并持续进行直到达到稳定状态

async unregister_nanny_plugin(comm: None, name: str) dict[str, distributed.core.ErrorMessage | distributed.core.OKMessage][source]

注销工作节点插件

async unregister_scheduler_plugin(name: str) None[source]

在调度器上注销插件。

async unregister_worker_plugin(comm: None, name: str) dict[str, distributed.core.ErrorMessage | distributed.core.OKMessage][source]

注销工作节点插件

update_data(*, who_has: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]], nbytes: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], int], client: str | None = None) None[source]

得知新数据已从外部源进入网络

worker_send(worker: str, msg: dict[str, Any]) None[source]

向工作节点发送消息

这也通过添加一个回调函数来处理连接失败,以便在下一个周期移除工作节点。

workers_list(workers: collections.abc.Iterable[str] | None) list[str][source]

符合条件的工作节点列表

接受工作节点地址或主机名列表。返回所有匹配的工作节点地址列表

workers_to_close(memory_ratio: int | float | None = None, n: int | None = None, key: collections.abc.Callable[[distributed.scheduler.WorkerState], collections.abc.Hashable] | bytes | None = None, minimum: int | None = None, target: int | None = None, attribute: str = 'address') list[str][source]

查找可以低成本关闭的工作节点

这会返回一组适合退役的工作节点列表。这些工作节点没有运行任何任务,并且相对于其对等节点存储的数据相对较少。如果所有工作节点都空闲,我们仍会保留足够的工作节点,以拥有足够的内存来存储我们的数据,并留有舒适的缓冲区。

这适用于 distributed.deploy.adaptive 等系统。

参数
memory_ratio数字

我们希望为存储的数据预留的额外空间量。默认为 2,即我们希望拥有的内存是当前数据量的两倍。

nint

要关闭的工作节点数量

minimumint

要保留的最小工作节点数量

keyCallable(WorkerState)

一个可选的可调用对象,用于将 WorkerState 对象映射到组关联。同组的工作节点将一起关闭。当必须集体关闭工作节点时,这很有用,例如按主机名关闭。

targetint

关闭后目标保留的工作节点数量

attributestr

要返回的 WorkerState 对象的属性,例如“address”或“name”。默认为“address”。

返回
to_close: 可以安全关闭的工作节点地址列表

示例

>>> scheduler.workers_to_close()
['tcp://192.168.0.1:1234', 'tcp://192.168.0.2:1234']

关闭前按主机名分组工作节点

>>> scheduler.workers_to_close(key=lambda ws: ws.host)
['tcp://192.168.0.1:1234', 'tcp://192.168.0.1:4567']

移除两个工作节点

>>> scheduler.workers_to_close(n=2)

保留足够的工作节点,使其拥有两倍于所需内存的容量。

>>> scheduler.workers_to_close(memory_ratio=2)

Worker

class distributed.Worker(scheduler_ip: str | None = None, scheduler_port: int | None = None, *, scheduler_file: str | None = None, nthreads: int | None = None, loop: IOLoop | None = None, local_directory: str | None = None, services: dict | None = None, name: Any | None = None, reconnect: bool | None = None, executor: Executor | dict[str, Executor] | Literal['offload'] | None = None, resources: dict[str, float] | None = None, silence_logs: int | None = None, death_timeout: Any | None = None, preload: list[str] | None = None, preload_argv: list[str] | list[list[str]] | None = None, security: Security | dict[str, Any] | None = None, contact_address: str | None = None, heartbeat_interval: Any = '1s', extensions: dict[str, type] | None = None, metrics: Mapping[str, Callable[[Worker], Any]] = {}, startup_information: Mapping[str, Callable[[Worker], Any]] = {}, interface: str | None = None, host: str | None = None, port: int | str | Collection[int] | None = None, protocol: str | None = None, dashboard_address: str | None = None, dashboard: bool = False, http_prefix: str = '/', nanny: Nanny | None = None, plugins: tuple[WorkerPlugin, ...] = (), low_level_profiler: bool | None = None, validate: bool | None = None, profile_cycle_interval=None, lifetime: Any | None = None, lifetime_stagger: Any | None = None, lifetime_restart: bool | None = None, transition_counter_max: int | Literal[False] = False, memory_limit: str | float = 'auto', data: WorkerDataParameter = None, memory_target_fraction: float | Literal[False] | None = None, memory_spill_fraction: float | Literal[False] | None = None, memory_pause_fraction: float | Literal[False] | None = None, scheduler_sni: str | None = None, WorkerStateClass: type = <class 'distributed.worker_state_machine.WorkerState'>, **kwargs)[source]

Dask 分布式集群中的工作节点

工作节点执行两项功能

  1. 从本地字典提供数据

  2. 对这些数据和对等节点的数据执行计算

工作节点会向调度器报告其数据,并在需要执行计算时,利用该调度器从其他工作节点收集数据。

您可以使用 dask worker 命令行应用程序启动工作节点

$ dask worker scheduler-ip:port

使用 --help 标志查看更多选项

$ dask worker --help

此文档字符串的其余部分描述了工作节点用于管理和跟踪内部计算的内部状态。

状态

信息状态

这些属性在执行过程中不会发生显著变化。

  • nthreads: int

    此工作节点进程使用的线程数 (nthreads)

  • executors: dict[str, concurrent.futures.Executor]

    用于执行计算的执行器。始终包含默认执行器。

  • local_directory: path

    本地机器上用于存储临时文件的路径

  • scheduler: PooledRPCCall

    调度器的位置。请参阅 .ip/.port 属性。

  • name: string

    别名

  • services: {str: Server}

    在此工作节点上运行的辅助 Web 服务器

  • service_ports: {str: port}

  • transfer_outgoing_count_limit: int

    最大并发传出数据传输数量。另请参阅 distributed.worker_state_machine.WorkerState.transfer_incoming_count_limit

  • batched_stream: BatchedSend

    一个批量流,我们通过它与调度器通信

  • log: [(message)]

    一个结构化且可查询的日志。请参阅 Worker.story

易变状态

这些属性跟踪此工作节点正在尝试完成的任务的进度。在下面的描述中,key 是我们要计算的任务名称,dep 是我们要从其他节点收集的依赖数据名称。

  • threads: {key: int}

    任务运行所在的线程 ID

  • active_threads: {int: key}

    当前在活动线程上运行的键

  • state: WorkerState

    封装的状态机。请参阅 BaseWorkerWorkerState

参数
scheduler_ip: str, 可选
scheduler_port: int, 可选
scheduler_file: str, 可选
host: str, 可选
data: MutableMapping, type, None

用于存储的对象,默认为构建一个基于磁盘的 LRU 字典。

如果提供一个可调用对象来构建存储对象,如果其调用签名包含名为 worker_local_directory 的参数,则它将接收工作节点的属性 local_directory 作为参数。

nthreads: int, 可选
local_directory: str, 可选

用于存放本地资源的目录

name: str, 可选
memory_limit: int, float, string

此工作节点应使用的内存字节数。设置为零表示无限制。设置为 'auto' 则计算为 system.MEMORY_LIMIT * min(1, nthreads / total_cores)。使用字符串或数字,如 5GB 或 5e9

memory_target_fraction: float 或 False

尝试保持在此分数以下的内存比例(默认:从配置键 distributed.worker.memory.target 读取)

memory_spill_fraction: float 或 False

开始溢出到磁盘的内存比例(默认:从配置键 distributed.worker.memory.spill 读取)

memory_pause_fraction: float 或 False

停止运行新任务的内存比例(默认:从配置键 distributed.worker.memory.pause 读取)

max_spill: int, string 或 False

溢出到磁盘的字节数限制(默认:从配置键 distributed.worker.memory.max-spill 读取)

executor: concurrent.futures.Executor, dict[str, concurrent.futures.Executor], “offload”
要使用的执行器。根据类型,它具有以下含义
  • Executor instance: 默认执行器。

  • Dict[str, Executor]:将名称映射到 Executor 实例的字典。如果字典中没有“default”键,将使用 ThreadPoolExecutor(nthreads) 创建一个“default”执行器。

  • Str:字符串“offload”,指用于卸载通信的同一线程池。这导致用于反序列化和计算的是同一个线程。

resources: dict

此工作节点拥有的资源,例如 {'GPU': 2}

nanny: str

用于联系 nanny 的地址,如果它存在

lifetime: str

经过一段时间(如“1 hour”)后优雅地关闭工作节点。默认为 None,表示没有明确的关闭时间。

lifetime_stagger: str

用于错开生命周期值的时间(如“5 minutes”)。实际生命周期将在生命周期 +/- lifetime_stagger 之间均匀随机选择。

lifetime_restart: bool

工作节点达到其生命周期后是否重启。默认为 False

kwargs: 可选

ServerNode 构造函数的额外参数

示例

使用命令行启动工作节点

$ dask scheduler
Start scheduler at 127.0.0.1:8786

$ dask worker 127.0.0.1:8786
Start worker at:               127.0.0.1:1234
Registered with scheduler at:  127.0.0.1:8786
batched_send(msg: dict[str, Any]) None[source]

实现了 BaseWorker 抽象方法。

通过批量通信向调度器发送一个即发即弃的消息。

如果我们当前未连接到调度器,消息将被悄无声息地丢弃!

async close(timeout: float = 30, executor_wait: bool = True, nanny: bool = True, reason: str = 'worker-close') str | None[source]

关闭工作节点

关闭工作节点上运行的异步操作,停止所有执行器和通信。如果请求,这也会关闭 nanny。

参数
timeout

关闭单个指令的超时时间(秒)

executor_wait

如果为 True,则同步关闭执行器,否则异步关闭

nanny

如果为 True,则关闭 nanny

reason

关闭工作节点的原因

返回
str | None

如果工作节点已处于关闭状态或失败,则为 None;否则为“OK”

async close_gracefully(restart=None, reason: str = 'worker-close-gracefully')[source]

优雅地关闭工作节点

这首先会通知调度器我们要关闭,并要求它将我们的数据移动到其他地方。之后,我们像往常一样关闭。

property data: collections.abc.MutableMapping[Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], object]

所有已完成任务的 {任务键: 任务负载},无论它们是在此工作节点上计算的,还是在其他地方计算然后通过网络传输到此处的。

使用默认配置时,这是一个 zict 缓冲区,当超过目标阈值时会自动溢出到磁盘。如果禁用溢出,则是一个普通字典。它也可以是在初始化 Worker 或 Nanny 时传递的用户定义的任意类似字典的对象。工作节点逻辑应将其视为不透明,并遵循 MutableMapping API。

注意

此相同的集合也可通过 self.state.dataself.memory_manager.data 访问。

digest_metric(name: collections.abc.Hashable, value: float) None[source]

通过调用 Server.digest_metric 实现 BaseWorker.digest_metric

async execute(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent[source]

执行任务。实现了 BaseWorker 抽象方法。

async gather(who_has: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]]) dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], object][source]

Scheduler.rebalance() 和 Scheduler.replicate() 使用的端点

async gather_dep(worker: str, to_gather: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], total_nbytes: int, *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent[source]

实现了 BaseWorker 抽象方法

get_current_task() Union[str, int, float, tuple[ForwardRef('Key'), ...]][source]

获取当前正在运行的任务的键

这只在任务内部运行才有意义

另请参阅

get_worker

示例

>>> from dask.distributed import get_worker
>>> def f():
...     return get_worker().get_current_task()
>>> future = client.submit(f)  
>>> future.result()  
'f-1234'
handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) None[source]

覆盖 BaseWorker 方法以添加验证

log_event(topic: str | collections.abc.Collection[str], msg: Any) None[source]

在给定主题下记录一个事件。

参数
topicstr, list[str]

记录事件所在主题的名称。要在多个主题下记录同一事件,请传入主题名称列表。

msg

要记录的事件消息。注意,这必须是 msgpack 可序列化的。

另请参阅

Client.log_event
async retry_busy_worker_later(worker: str) distributed.worker_state_machine.StateMachineEvent[source]

等待一段时间,然后将对等工作节点从繁忙状态中移除。实现了 BaseWorker 抽象方法。

async start_unsafe()[source]

尝试启动服务器。这不是幂等的,也不受并发启动尝试的保护。

这旨在由子类覆盖或调用。为了安全启动,请使用 Server.start

如果配置了 death_timeout,我们将要求此协程在此超时到达之前完成。如果达到超时,我们将关闭实例并引发 asyncio.TimeoutError

transfer_outgoing_bytes: int

当前开放的向其他工作节点传输数据的总大小

transfer_outgoing_bytes_total: int

向其他工作节点传输数据的总大小(包括进行中和失败的传输)

transfer_outgoing_count: int

当前开放的向其他工作节点传输数据的数量

transfer_outgoing_count_total: int

自工作节点启动以来向其他工作节点传输数据的总次数

trigger_profile() None[source]

从所有正在活动的计算线程获取一个帧

将这些帧合并到现有的性能分析计数中

property worker_address

为了与 Nanny 的 API 兼容

Nanny

class distributed.Nanny(scheduler_ip=None, scheduler_port=None, scheduler_file=None, worker_port: int | str | collections.abc.Collection[int] | None = 0, nthreads=None, loop=None, local_directory=None, services=None, name=None, memory_limit='auto', reconnect=True, validate=False, quiet=False, resources=None, silence_logs=None, death_timeout=None, preload=None, preload_argv=None, preload_nanny=None, preload_nanny_argv=None, security=None, contact_address=None, listen_address=None, worker_class=None, env=None, interface=None, host=None, port: int | str | collections.abc.Collection[int] | None = None, protocol=None, config=None, **worker_kwargs)[source]

一个管理工作进程的进程

Nanny 会启动 Worker 进程,监控它们,并在必要时杀死或重启它们。如果您想使用 Client.restart 方法,或者在 Worker 达到内存限制的终止比例时自动重启 Worker,Nanny 是必需的。

Nanny 的参数大多数与 Worker 的参数相同,但也有一些例外,如下所列。

参数
env: dict,可选

Nanny 初始化时设置的环境变量也将确保在 Worker 进程中设置。此参数允许覆盖或以其他方式为 Worker 设置环境变量。也可以使用选项 distributed.nanny.environ 设置环境变量。优先级如下:

  1. Nanny 参数

  2. 现有环境变量

  3. Dask 配置

注意

一些环境变量,例如 OMP_NUM_THREADS,必须在导入 numpy 之前设置才能生效。其他变量,例如 MALLOC_TRIM_THRESHOLD_(参见未释放回操作系统的内存),必须在启动 Linux 进程之前设置。如果在 Nanny 参数中或 distributed.nanny.environ 中设置这些变量将无效;它们必须在 distributed.nanny.pre-spawn-environ 中设置,以便在生成子进程之前设置它们,即使这意味着会污染运行 Nanny 的进程。

出于同样的原因,请注意将 distributed.worker.multiprocessing-methodspawn 更改为 forkforkserver 可能会阻止某些环境变量生效;如果您这样做,则应在启动 dask-worker 之前在 shell 中自行设置变量。

另请参阅

工作节点
async close(timeout: float = 5, reason: str = 'nanny-close') Literal['OK'][source]

关闭 worker 进程,停止所有通信。

close_gracefully(reason: str = 'nanny-close-gracefully') None[source]

一个信号,表示如果 worker 进程终止,我们不应尝试重启它们

这在集群关闭过程中使用。

async instantiate() distributed.core.Status[source]

启动本地 worker 进程

阻塞直到进程启动且 scheduler 正确获知信息

async kill(timeout: float = 5, reason: str = 'nanny-kill') None[source]

终止本地 worker 进程

阻塞直到进程终止且 scheduler 正确获知信息

log_event(topic, msg)[source]

在给定主题下记录一个事件。

参数
topicstr, list[str]

记录事件所在主题的名称。要在多个主题下记录同一事件,请传入主题名称列表。

msg

要记录的事件消息。注意,这必须是 msgpack 可序列化的。

另请参阅

Client.log_event
async start_unsafe()[source]

启动 nanny,启动本地进程,开始监控