Python API (高级)
目录
Python API (高级)¶
在某些罕见情况下,专家可能希望在 Python 中显式创建 Scheduler
、Worker
和 Nanny
对象。这通常在构建工具以在自定义设置中自动部署 Dask 时是必需的。
更常见的方法是使用 在一台机器上使用 Client() 创建本地集群 或使用 命令行界面 (CLI)。建议新读者从那里开始。
如果您确实想自己启动 Scheduler 和 Worker 对象,您应该对 async
/await
风格的 Python 语法有所了解。这些对象是可等待的 (awaitable),并且通常在 async with
上下文管理器中使用。这里有一些示例,展示了几种启动和完成它们的方式。
完整示例¶
|
动态分布式任务调度器 |
|
Dask 分布式集群中的 Worker 节点 |
|
连接到 Dask 集群并提交计算 |
我们首先从一个综合示例开始,演示如何在同一个事件循环中设置一个 Scheduler、两个 Worker 和一个 Client,运行一个简单的计算,然后清理所有内容。
import asyncio
from dask.distributed import Scheduler, Worker, Client
async def f():
async with Scheduler() as s:
async with Worker(s.address) as w1, Worker(s.address) as w2:
async with Client(s.address, asynchronous=True) as client:
future = client.submit(lambda x: x + 1, 10)
result = await future
print(result)
asyncio.get_event_loop().run_until_complete(f())
现在我们来看一些更简单的示例,逐步构建到上面那个案例。
Scheduler¶
|
动态分布式任务调度器 |
我们通过创建一个 Scheduler()
对象来创建调度器,然后 await
该对象以等待其启动。之后,我们可以等待 .finished
方法,直到它关闭。在此期间,调度器将处于活动状态并管理集群。
import asyncio
from dask.distributed import Scheduler, Worker
async def f():
s = Scheduler() # scheduler created, but not yet running
s = await s # the scheduler is running
await s.finished() # wait until the scheduler closes
asyncio.get_event_loop().run_until_complete(f())
这个程序将永远运行,除非有外部进程连接到调度器并告诉它停止。如果您想自行关闭,可以通过等待 .close
方法来关闭任何 Scheduler
、Worker
、Nanny
或 Client
类。
await s.close()
Worker¶
|
Dask 分布式集群中的 Worker 节点 |
Worker 遵循相同的 API。唯一的区别是 worker 需要知道调度器的地址。
import asyncio
from dask.distributed import Scheduler, Worker
async def f(scheduler_address):
w = await Worker(scheduler_address)
await w.finished()
asyncio.get_event_loop().run_until_complete(f("tcp://127.0.0.1:8786"))
在同一个事件循环中启动多个¶
|
动态分布式任务调度器 |
|
Dask 分布式集群中的 Worker 节点 |
我们可以在同一个事件循环中运行任意数量的这些对象。
import asyncio
from dask.distributed import Scheduler, Worker
async def f():
s = await Scheduler()
w = await Worker(s.address)
await w.finished()
await s.finished()
asyncio.get_event_loop().run_until_complete(f())
使用上下文管理器¶
我们也可以使用 async with
上下文管理器来确保正确清理。下面是与上面相同的示例:
import asyncio
from dask.distributed import Scheduler, Worker
async def f():
async with Scheduler() as s:
async with Worker(s.address) as w:
await w.finished()
await s.finished()
asyncio.get_event_loop().run_until_complete(f())
或者,在下面的示例中,我们还包含一个 Client
,运行一个小型计算,然后在计算完成后允许进行清理。
import asyncio
from dask.distributed import Scheduler, Worker, Client
async def f():
async with Scheduler() as s:
async with Worker(s.address) as w1, Worker(s.address) as w2:
async with Client(s.address, asynchronous=True) as client:
future = client.submit(lambda x: x + 1, 10)
result = await future
print(result)
asyncio.get_event_loop().run_until_complete(f())
这相当于创建并等待每个服务器,然后在退出上下文时对每个服务器调用 .close
。在这个示例中,我们不等待 s.finished()
,因此它会相对快速地终止。如果您希望它永远运行,则可以调用 await s.finished()
。
Nanny¶
|
用于管理 worker 进程的进程 |
或者,如果我们希望 worker 在单独的进程中进行管理,则可以将 Worker
替换为 Nanny
。Nanny
构造函数遵循相同的 API。这允许 worker 在失败时自行重启。此外,它还提供了一些附加监控,并且在协调许多应该位于不同进程中以避免 GIL 的 worker 时非常有用。
# w = await Worker(s.address)
w = await Nanny(s.address)
API¶
这些类具有多种关键字参数,可用于控制其行为。有关更多信息,请参阅下面的 API 文档。
Scheduler¶
- class distributed.Scheduler(loop: tornado.ioloop.IOLoop | None = None, services: dict | None = None, service_kwargs: dict | None = None, allowed_failures: int | None = None, extensions: dict | None = None, validate: bool | None = None, scheduler_file: str | None = None, security: dict | distributed.security.Security | None = None, worker_ttl: float | None = None, idle_timeout: float | None = None, interface: str | None = None, host: str | None = None, port: int = 0, protocol: str | None = None, dashboard_address: str | None = None, dashboard: bool | None = None, http_prefix: str | None = '/', preload: str | collections.abc.Sequence[str] | None = None, preload_argv: str | collections.abc.Sequence[str] | collections.abc.Sequence[collections.abc.Sequence[str]] = (), plugins: collections.abc.Sequence[distributed.diagnostics.plugin.SchedulerPlugin] = (), contact_address: str | None = None, transition_counter_max: bool | int = False, jupyter: bool = False, **kwargs: Any)[source]¶
动态分布式任务调度器
调度器跟踪 worker、数据和计算的当前状态。调度器监听事件并适当控制 worker 进行响应。它不断尝试使用 worker 来执行不断增长的 dask 图。
所有事件都得到快速处理,处理时间与其输入(通常是常量大小)呈线性关系,通常在毫秒内完成。为了实现这一点,调度器跟踪了大量状态。每个操作都维护了此状态的一致性。
调度器通过 Comm 对象与外部世界通信。即使同时监听多个客户端,它也能维护一致且有效的数据视图。
Scheduler 通常通过
dask scheduler
可执行文件启动$ dask scheduler Scheduler started at 127.0.0.1:8786
或者在 LocalCluster 中,Client 无需连接信息即可启动
>>> c = Client() >>> c.cluster.scheduler Scheduler(...)
用户通常不直接与调度器交互,而是与客户端对象
Client
交互。参数
contact_address
允许为 worker 通信向调度器发布一个特定的地址,该地址与调度器绑定的地址不同。这在调度器监听一个私有地址(worker 因此无法联系到它)时非常有用。状态
调度器包含以下状态变量。每个变量与其存储的内容和简要描述一起列出。
- tasks:
{task key: TaskState}
调度器当前已知的任务
- tasks:
- unrunnable:
{TaskState}
处于“无 worker”状态的任务
- unrunnable:
- workers:
{worker key: WorkerState}
当前连接到调度器的 Worker
- workers:
- idle:
{WorkerState}
未充分利用的 worker 集合
- idle:
- saturated:
{WorkerState}
未过载的 worker 集合
- saturated:
- host_info:
{hostname: dict}
关于每个 worker 主机的信息
- host_info:
- clients:
{client key: ClientState}
根据当前工作负载所需的 worker 数量
- clients:
- 这查看当前正在运行的任务和内存使用情况,并返回所需的 worker 数量。这通常由自适应调度使用。
参数
- target_durationfloat | None
计算所需的目标持续时间。这会影响调度器请求扩缩的速度。
- 得知某个 worker 持有特定的键
这在实践中不应使用,主要是出于历史原因。然而,worker 会不时发送它。
add_plugin(plugin: distributed.diagnostics.plugin.SchedulerPlugin, *, idempotent: bool = False, name: str | None = None, **kwargs: Any) None [source]¶
- 向调度器添加外部插件。
- 参见 https://distributed.dask.org.cn/en/latest/plugins.html
pluginSchedulerPlugin
要添加的 SchedulerPlugin 实例
- 如果为 True,则假定插件已存在,不执行任何操作。
namestr
插件的名称,如果为 None,则检查 Plugin 实例上的 name 属性,如果未找到则生成一个。
- async add_worker(comm: distributed.comm.core.Comm, *, address: str, status: str, server_id: str, nthreads: int, name: str, resolve_address: bool = True, now: float, resources: dict[str, float], host_info: None = None, memory_limit: int | None, metrics: dict[str, Any], pid: int = 0, services: dict[str, int], local_directory: str, versions: dict[str, Any], nanny: str, extra: dict, stimulus_id: str) None [source]¶
了解工作节点拥有某些键
这在实践中不应使用,主要出于历史原因保留。然而,工作节点有时会发送此消息。
- add_plugin(plugin: distributed.diagnostics.plugin.SchedulerPlugin, *, idempotent: bool = False, name: str | None = None, **kwargs: Any) None [source]¶
向调度器添加外部插件。
请参阅 https://distributed.dask.org.cn/en/latest/plugins.html
- 向调度器添加外部插件。
- pluginSchedulerPlugin
要添加的 SchedulerPlugin 实例
- idempotentbool
如果为 True,则假定插件已存在,不执行任何操作。
- namestr
插件的名称;如果为 None,则检查 Plugin 实例上的 name 属性,如果未发现则生成一个名称。
- async add_worker(comm: distributed.comm.core.Comm, *, address: str, status: str, server_id: str, nthreads: int, name: str, resolve_address: bool = True, now: float, resources: dict[str, float], host_info: None = None, memory_limit: int | None, metrics: dict[str, Any], pid: int = 0, services: dict[str, int], local_directory: str, versions: dict[str, Any], nanny: str, extra: dict, stimulus_id: str) None [source]¶
向集群添加一个新的 worker
- async benchmark_hardware() dict[str, dict[str, float]] [source]¶
在 worker 上运行基准测试,获取内存、磁盘和网络带宽
- 返回
- result: dict
一个字典,将名称“disk”、“memory”和“network”映射到将大小映射到带宽的字典。这些带宽是在许多 worker 在集群中运行计算时平均得出的。
- async broadcast(*, msg: dict, workers: collections.abc.Collection[str] | None = None, hosts: collections.abc.Collection[str] | None = None, nanny: bool = False, serializers: Any = None, on_error: Literal['raise', 'return','return_pickle', 'ignore'] = 'raise') dict[str, Any] [source]¶
向 worker 广播消息,返回所有结果
- client_releases_keys(keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], client: str, stimulus_id: str | None = None) None [source]¶
从 client 的期望列表中移除键
- async close(timeout: float | None = None, reason: str = 'unknown') None [source]¶
向所有协程发送清理信号,然后等待直到完成
要添加的 SchedulerPlugin 实例
Scheduler.cleanup
- close_worker(worker: str) None [source]¶
要求 worker 自行关闭。不等待其生效。请注意,不能保证 worker 实际上会接受该命令。
请注意,
remove_worker()
在 close=True 时会在内部发送相同的命令。要添加的 SchedulerPlugin 实例
- coerce_address(addr: str | tuple, resolve: bool = True) str [source]¶
强制将可能的输入地址转换为标准格式。*resolve* 可以禁用,用于使用虚假主机名进行测试。
处理字符串、元组或别名。
- async delete_worker_data(worker_address: str, keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], stimulus_id: str) None [source]¶
从 worker 中删除数据并更新相应的 worker/task 状态
- 向调度器添加外部插件。
- worker_address: str
要从中删除键的 Worker 地址
- keys: list[Key]
要在指定 worker 上删除的键列表
- async dump_cluster_state_to_url(url: str, exclude: collections.abc.Collection[str], format: Literal['msgpack', 'yaml'], **storage_options: dict[str, Any]) None [source]¶
将集群状态转储写入 fsspec 兼容的 URL。
- async feed(comm: distributed.comm.core.Comm, function: bytes | None = None, setup: bytes | None = None, teardown: bytes | None = None, interval: str | float = '1s', **kwargs: Any) None [source]¶
向外部请求者提供数据 Comm
注意:这将在调度器上运行任意 Python 代码。最终应逐步淘汰此功能。它主要用于诊断。
- async gather(keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], serializers: list[str] | None = None) dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], object] [source]¶
从 worker 向调度器收集数据
- async gather_on_worker(worker_address: str, who_has: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]]) set [source]¶
将键从多个 worker 对等复制到单个 worker
- 向调度器添加外部插件。
- worker_address: str
接收 worker 地址,将键复制到此处
- who_has: dict[Key, list[str]]
{key: [发送方地址, 发送方地址, …], key: …}
- 返回
- 返回
未能复制的键集合
- async get_cluster_state(exclude: collections.abc.Collection[str]) dict [source]¶
生成集群状态转储中使用的状态字典
- async get_story(keys_or_stimuli: collections.abc.Iterable[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]]) list[distributed.scheduler.Transition] [source]¶
用于
SchedulerState.story()
的 RPC Hook。请注意,msgpack 序列化/反序列化往返过程会将
Transition
命名元组转换为普通元组。
- get_worker_service_addr(worker: str, service_name: str, protocol: bool = False) tuple[str, int] | str | None [source]¶
获取 worker 上指定服务的 (host, port) 地址。如果服务不存在则返回 None。
- 向调度器添加外部插件。
- worker地址
- service_namestr
常见服务包括 ‘bokeh’ 和 ‘nanny’
- protocol布尔值
是否包含带协议的完整地址 (True) 或仅包含 (host, port) 对
- handle_long_running(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], worker: str, run_id: int, compute_duration: float | None = None, stimulus_id: str) None [source]¶
一个任务已从线程池中分离
我们阻止该任务将来被盗取,并更改任务持续时间计算方式,就好像任务已停止一样。
- handle_request_refresh_who_has(keys: collections.abc.Iterable[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], worker: str, stimulus_id: str) None [source]¶
来自工作节点的请求,要求刷新某些键的 who_has 信息。不要与 scheduler.who_has 混淆,后者是来自客户端的专用通信 RPC 请求。
- async handle_worker(comm: distributed.comm.core.Comm, worker: str) None [source]¶
监听来自单个工作节点的响应
这是调度器与工作节点交互的主循环
要添加的 SchedulerPlugin 实例
Scheduler.handle_client
客户端的等效协程
- log_event(topic: str | collections.abc.Collection[str], msg: Any) None [source]¶
在给定主题下记录事件
- 向调度器添加外部插件。
- topicstr, list[str]
记录事件的主题名称。要在多个主题下记录同一事件,请传递主题名称列表。
- msg
要记录的事件消息。注意,这必须是 msgpack 可序列化的。
要添加的 SchedulerPlugin 实例
- async rebalance(keys: collections.abc.Iterable[Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]] | None = None, workers: collections.abc.Iterable[str] | None = None, stimulus_id: str | None = None) dict [source]¶
重新平衡键,使每个工作节点的进程内存(托管+非托管)大致相同。
警告
此操作通常未针对调度器的正常运行进行充分测试。不建议在等待计算时使用此操作。
算法
找到集群的平均占用率,定义为 dask 托管的数据 + 在工作节点上存在至少 30 秒(
distributed.worker.memory.recent-to-old-time
)的非托管进程内存。这使我们可以忽略由任务堆使用引起的临时峰值。另外,您可以通过
distributed.worker.memory.rebalance.measure
更改测量单个工作节点内存和计算平均值的方式。特别是,这对于忽略不准确的操作系统内存测量可能很有用。丢弃占用率在集群平均占用率的 5% 以内(
distributed.worker.memory.rebalance.sender-recipient-gap
/ 2)的工作节点。这有助于避免数据在集群中重复跳动。高于平均值的工作节点是发送者;低于平均值的是接收者。
丢弃绝对占用率低于 30%(
distributed.worker.memory.rebalance.sender-min
)的发送者。换句话说,只要所有工作节点的占用率都低于 30%,无论是否存在不平衡,都不会移动数据。丢弃绝对占用率高于 60%(
distributed.worker.memory.rebalance.recipient-max
)的接收者。请注意,此阈值默认与distributed.worker.memory.target
相同,以防止工作节点接受数据后立即溢出到磁盘。迭代地选择离平均值最远的发送者和接收者,并在两者之间移动最近插入时间最靠前的键,直到所有发送者或所有接收者都落入平均值的 5% 以内。
如果接收者已经拥有数据的副本,则会跳过该接收者。换句话说,此方法不会降低复制等级。如果没有足够的内存接受该键且尚未持有该键副本的接收者,则会跳过该键。
最近插入时间最靠前 (LRI) 策略是一种贪婪选择,优点是时间复杂度为 O(1),实现简单(它依赖于 python 字典的插入排序),并且在大多数情况下表现良好。被放弃的替代策略包括
最大优先。时间复杂度为 O(n*log(n)),需要额外的复杂数据结构,并且存在导致最大的数据块像弹珠一样在集群中反复移动的风险。
最近最少使用 (LRU)。此信息目前仅在工作节点上可用,并且在调度器上复制并不简单;通过网络传输它会非常昂贵。此外,请注意 dask 会尽力最大限度地减少中间键在内存中保留的时间,因此在这种情况下,LRI 是 LRU 的近似值。
- 向调度器添加外部插件。
- keys: 可选
允许移动的 dask 键列表。所有其他键将被忽略。请注意,这不能保证键实际会被移动(例如,因为没有必要,或者没有可接受该键的接收工作节点)。
- workers: 可选
允许被视为发送者或接收者工作节点的地址列表。所有其他工作节点将被忽略。集群的平均占用率将仅使用允许的工作节点进行计算。
- async register_nanny_plugin(comm: None, plugin: bytes, name: str, idempotent: bool | None = None) dict[str, distributed.core.OKMessage] [source]¶
在所有当前运行和未来的 nanny 上注册 nanny 插件
- async register_scheduler_plugin(plugin: bytes | distributed.diagnostics.plugin.SchedulerPlugin, name: str | None = None, idempotent: bool | None = None) None [source]¶
在调度器上注册插件。
- async register_worker_plugin(comm: None, plugin: bytes, name: str, idempotent: bool | None = None) dict[str, distributed.core.OKMessage] [source]¶
在所有当前运行和未来的工作节点上注册工作节点插件
- remove_worker(address: str, *, stimulus_id: str, expected: bool = False, close: bool = True) Literal['OK', 'already-removed'] [source]¶
从集群中移除工作节点。
当工作节点报告计划离开或似乎无响应时,我们会执行此操作。这可能会将其任务发送回释放状态。
要添加的 SchedulerPlugin 实例
- async replicate(keys: list[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], n: int | None = None, workers: collections.abc.Iterable | None = None, branching_factor: int = 2, delete: bool = True, stimulus_id: str | None = None) dict | None [source]¶
在整个集群中复制数据
这将对每份数据单独执行树状复制,遍布整个网络。
- 向调度器添加外部插件。
- keys: Iterable
要复制的键列表
- n: int
我们期望在集群中看到的复制次数
- branching_factor: int, optional
每一代可以复制数据的工作节点数量。分支因子越大,单步复制的数据越多,但给定工作节点被数据请求淹没的风险也越大。
要添加的 SchedulerPlugin 实例
- report(msg: dict, ts: distributed.scheduler.TaskState | None = None, client: str | None = None) None [source]¶
向所有监听的队列 (Queues) 和通信 (Comms) 发布更新
如果消息包含键,则我们只将消息发送给那些关心该键的通信通道。
- request_acquire_replicas(addr: str, keys: collections.abc.Iterable[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], *, stimulus_id: str) None [source]¶
异步请求工作节点从其他工作节点获取列出的键的副本。这是一个即发即弃操作,不提供成功或失败的反馈,主要用于维护而非计算。
- request_remove_replicas(addr: str, keys: list[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], *, stimulus_id: str) None [source]¶
异步请求工作节点丢弃列出的键的副本。绝不能用于销毁某个键的最后一个副本。这是一个即发即弃操作,主要用于维护而非计算。
副本会立即从调度器端的 TaskState.who_has 中消失;如果工作节点拒绝删除,例如因为该任务是其上运行的另一个任务的依赖项,它将(同样异步地)通知调度器将其重新添加到 who_has 中。如果工作节点同意丢弃任务,则没有反馈。
- async restart(*, client: str | None = None, timeout: float = 30, wait_for_workers: bool = True, stimulus_id: str) None [source]¶
忘记所有任务,并在所有工作节点上调用 restart_workers。
- 向调度器添加外部插件。
- timeout
请参阅 restart_workers
- wait_for_workers
请参阅 restart_workers
要添加的 SchedulerPlugin 实例
- async restart_workers(workers: list[str] | None = None, *, client: str | None = None, timeout: float = 30, wait_for_workers: bool = True, on_error: Literal['raise', 'return'] = 'raise', stimulus_id: str) dict[str, Literal['OK', 'removed', 'timed out']] [source]¶
重启选定的工作节点。可以选择等待工作节点返回。
没有 nanny 的工作节点会被关闭,期望外部部署系统能重启它们。因此,如果不使用 nanny 且您的部署系统不自动重启工作节点,则
restart
只会关闭所有工作节点,然后超时!在
restart
之后,所有连接的工作节点都是新的,无论是否引发TimeoutError
。未能及时关闭的任何工作节点将被移除,并且将来可能或可能不会自行关闭。- 向调度器添加外部插件。
- workers
要重启的工作节点地址列表。如果省略,则重启所有工作节点。
- timeout
如果
wait_for_workers
为 True,则等待工作节点关闭并返回的时长,否则仅等待工作节点关闭的时长。如果超出此时间,则引发asyncio.TimeoutError
。- wait_for_workers
是否等待所有工作节点重新连接,或者只等待它们关闭(默认为 True)。结合使用
restart(wait_for_workers=False)
和Client.wait_for_workers()
可以对等待多少工作节点进行细粒度控制。- on_error
如果为 ‘raise’(默认),则在重启工作节点时任何 nanny 超时则引发异常。如果为 ‘return’,则返回错误消息。
- 返回
- {工作节点地址: “OK”, “no nanny”, “timed out” 或错误消息}
要添加的 SchedulerPlugin 实例
- async retire_workers(workers: list[str], *, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None') dict[str, Any]
- async retire_workers(*, names: list, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None') dict[str, Any]
- async retire_workers(*, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None', memory_ratio: int | float | None = 'None', n: int | None = 'None', key: Callable[[WorkerState], Hashable] | bytes | None = 'None', minimum: int | None = 'None', target: int | None = 'None', attribute: str = "'address'") dict[str, Any]
从集群中优雅地退役工作节点。只在退役工作节点内存中的任何键都会被复制到别处。
- 向调度器添加外部插件。
- workers: list[str] (可选)
要退役的工作节点地址列表。
- names: list (可选)
要退役的工作节点名称列表。与
workers
互斥。如果既未提供workers
也未提供names
,我们将调用workers_to_close
来找到一个合适的工作节点集。- close_workers: bool (默认为 False)
是否实际从此处显式关闭工作节点。否则,我们期望外部作业调度器来终止工作节点。
- remove: bool (默认为 True)
是否立即移除 worker 元数据,还是等待 worker 联系我们后再移除。
如果 close_workers=False 且 remove=False,此方法仅将 worker 内存中的任务刷新出去,然后返回。如果 close_workers=True 且 remove=False,此方法将在 worker 仍留在集群中时返回,尽管它们不再接受新任务。如果 close_workers=False 或由于任何原因 worker 未接受关闭命令,它将永久无法接受新任务,并且预期会通过其他方式关闭。
- **kwargs: dict
传递给 workers_to_close 的额外选项,用于确定应删除哪些 worker。仅在省略
workers
和names
时接受。
- 返回
- 字典,将 worker ID/地址映射到有关该 worker 的信息字典,针对每个已退役 worker。
- 该 worker 的信息。
- 如果存在仅在正在退役的 worker 内存中存在的键,并且
- 无法将它们复制到其他地方(例如,因为没有
- 其他正在运行的 worker),则持有这些键的 worker 将不会被退役,并且
- 不会出现在返回的字典中。
要添加的 SchedulerPlugin 实例
- run_function(comm: distributed.comm.core.Comm, function: collections.abc.Callable, args: tuple = (), kwargs: dict | None = None, wait: bool = True) Any [source]¶
在此进程中运行一个函数
要添加的 SchedulerPlugin 实例
- async scatter(data: dict, workers: collections.abc.Iterable | None, client: str, broadcast: bool = False, timeout: float = 2) list[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]] [source]¶
将数据发送到 worker
要添加的 SchedulerPlugin 实例
- send_all(client_msgs: dict[str, list[dict[str, Any]]], worker_msgs: dict[str, list[dict[str, Any]]]) None [source]¶
向客户端和 worker 发送消息
- send_task_to_worker(worker: str, ts: distributed.scheduler.TaskState) None [source]¶
向 worker 发送单个计算任务
- stimulus_cancel(keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], client: str, force: bool, reason: str, msg: str) None [source]¶
停止执行键列表上的任务
- stimulus_queue_slots_maybe_opened(*, stimulus_id: str) None [source]¶
响应可能在 worker 线程池中打开了槽位的事件
根据 worker 上可用的任务槽总数(可能为 0),从队列前面选择适当数量的任务,并将其转换为
processing
状态。注意
与此事件相关的其他转换应事先完全处理,以便任何变得可运行的任务都已处于
processing
状态。否则,如果队列中的任务在下游任务之前被调度,可能会发生过度生产。必须在 check_idle_saturated 之后调用;即 idle_task_count 必须是最新的。
- stimulus_task_erred(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], worker: str, exception: Any, stimulus_id: str, traceback: Any, run_id: str, **kwargs: Any) tuple[dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], dict[str, list[dict[str, typing.Any]]], dict[str, list[dict[str, typing.Any]]]] [source]¶
标记某个任务在特定 worker 上出错
- stimulus_task_finished(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], worker: str, stimulus_id: str, run_id: int, **kwargs: Any) tuple[dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'processing', 'memory', 'erred', 'forgotten']], dict[str, list[dict[str, typing.Any]]], dict[str, list[dict[str, typing.Any]]]] [source]¶
标记某个任务在特定 worker 上执行完成
- transition(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], finish: Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred','forgotten'], stimulus_id: str, **kwargs: Any) dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']] [source]¶
将键从当前状态转换为完成状态
- 返回
- 关于未来转换的建议字典
要添加的 SchedulerPlugin 实例
Scheduler.transitions
此函数的传递版本
示例
>>> self.transition('x', 'waiting') {'x': 'processing'}
- transitions(recommendations: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], stimulus_id: str) None [source]¶
处理转换直到没有剩余
这包括来自先前转换的反馈,并持续到达到稳定状态
- async unregister_nanny_plugin(comm: None, name: str) dict[str, distributed.core.ErrorMessage | distributed.core.OKMessage] [source]¶
取消注册 worker 插件
- async unregister_worker_plugin(comm: None, name: str) dict[str, distributed.core.ErrorMessage | distributed.core.OKMessage] [source]¶
取消注册 worker 插件
- update_data(*, who_has: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]], nbytes: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], int], client: str | None = None) None [source]¶
得知新数据已从外部源进入网络
- worker_send(worker: str, msg: dict[str, Any]) None [source]¶
向 worker 发送消息
这也通过添加回调在下一个周期移除 worker 来处理连接失败。
- workers_list(workers: collections.abc.Iterable[str] | None) list[str] [source]¶
符合条件的 worker 列表
接受 worker 地址或主机名列表。返回所有匹配的 worker 地址列表。
- workers_to_close(memory_ratio: int | float | None = None, n: int | None = None, key: collections.abc.Callable[[distributed.scheduler.WorkerState], collections.abc.Hashable] | bytes | None = None, minimum: int | None = None, target: int | None = None, attribute: str = 'address') list[str] [source]¶
查找可以低成本关闭的 worker
这返回一个适合退役的 worker 列表。这些 worker 没有运行任何任务,并且相对于其对等节点存储的数据相对较少。如果所有 worker 都处于空闲状态,我们仍会保留足够的 worker 以拥有足够的内存来存储我们的数据,并留有充足的缓冲空间。
这适用于诸如
distributed.deploy.adaptive
之类的系统。- 向调度器添加外部插件。
- memory_ratio数字
希望为存储数据保留的额外空间量。默认为 2,即我们希望拥有当前数据量两倍的内存。
- n整数
要关闭的 worker 数量
- minimum整数
至少保留的 worker 数量
- key可调用对象(WorkerState)
一个可选的可调用对象,用于将 WorkerState 对象映射到组关联。组将一起关闭。这在必须集体关闭 worker 时很有用,例如按主机名关闭。
- target整数
关闭后 worker 的目标数量
- attribute字符串
要返回的 WorkerState 对象的属性,例如“address”或“name”。默认为“address”。
- 返回
- to_close:可以关闭的 worker 地址列表
要添加的 SchedulerPlugin 实例
示例
>>> scheduler.workers_to_close() ['tcp://192.168.0.1:1234', 'tcp://192.168.0.2:1234']
在关闭之前按主机名对 worker 进行分组
>>> scheduler.workers_to_close(key=lambda ws: ws.host) ['tcp://192.168.0.1:1234', 'tcp://192.168.0.1:4567']
移除两个 worker
>>> scheduler.workers_to_close(n=2)
保留足够的 worker,使其内存是我们所需内存的两倍。
>>> scheduler.workers_to_close(memory_ratio=2)
Worker¶
- class distributed.Worker(scheduler_ip: str | None = None, scheduler_port: int | None = None, *, scheduler_file: str | None = None, nthreads: int | None = None, loop: IOLoop | None = None, local_directory: str | None = None, services: dict | None = None, name: Any | None = None, reconnect: bool | None = None, executor: Executor | dict[str, Executor] | Literal['offload'] | None = None, resources: dict[str, float] | None = None, silence_logs: int | None = None, death_timeout: Any | None = None, preload: list[str] | None = None, preload_argv: list[str] | list[list[str]] | None = None, security: Security | dict[str, Any] | None = None, contact_address: str | None = None, heartbeat_interval: Any = '1s', extensions: dict[str, type] | None = None, metrics: Mapping[str, Callable[[Worker], Any]] = {}, startup_information: Mapping[str, Callable[[Worker], Any]] = {}, interface: str | None = None, host: str | None = None, port: int | str | Collection[int] | None = None, protocol: str | None = None, dashboard_address: str | None = None, dashboard: bool = False, http_prefix: str = '/', nanny: Nanny | None = None, plugins: tuple[WorkerPlugin, ...] = (), low_level_profiler: bool | None = None, validate: bool | None = None, profile_cycle_interval=None, lifetime: Any | None = None, lifetime_stagger: Any | None = None, lifetime_restart: bool | None = None, transition_counter_max: int | Literal[False] = False, memory_limit: str | float = 'auto', data: WorkerDataParameter = None, memory_target_fraction: float | Literal[False] | None = None, memory_spill_fraction: float | Literal[False] | None = None, memory_pause_fraction: float | Literal[False] | None = None, scheduler_sni: str | None = None, WorkerStateClass: type = <class 'distributed.worker_state_machine.WorkerState'>, **kwargs)[source]¶
Dask 分布式集群中的 Worker 节点
Worker 执行两项功能
服务数据,来自本地字典
执行计算,在这些数据以及来自对等节点的数据上执行
Worker 会将它们的数据告知调度器,并在需要执行计算时使用该调度器从其他 worker 收集数据。
您可以使用
dask worker
命令行应用程序启动 worker$ dask worker scheduler-ip:port
使用
--help
标志查看更多选项$ dask worker --help
此文档字符串的其余部分是关于 worker 用于管理和跟踪内部计算的内部状态。
状态
信息状态
这些属性在执行过程中不会发生显著变化。
- nthreads:
int
此 worker 进程使用的 nthreads 数量
- nthreads:
- executors:
dict[str, concurrent.futures.Executor]
用于执行计算的执行器。始终包含默认执行器。
- executors:
- local_directory:
path
本地机器上存储临时文件的路径
- local_directory:
- scheduler:
PooledRPCCall
调度器的位置。参见
.ip/.port
属性。
- scheduler:
- name:
string
别名
- name:
- services:
{str: Server}
在此 worker 上运行的辅助 Web 服务器
- services:
service_ports:
{str: port}
- transfer_outgoing_count_limit:
int
并发传出数据传输的最大数量。另请参见
distributed.worker_state_machine.WorkerState.transfer_incoming_count_limit
。
- transfer_outgoing_count_limit:
- batched_stream:
BatchedSend
用于与调度器通信的批处理流
- batched_stream:
- log:
[(message)]
结构化且可查询的日志。参见
Worker.story
- log:
易变状态
这些属性跟踪此 worker 正在尝试完成的任务的进度。在下面的描述中,
key
是我们想要计算的任务名称,而dep
是我们想要从其他 worker 收集的依赖数据块的名称。- threads:
{key: int}
任务运行所在的线程 ID
- threads:
- active_threads:
{int: key}
当前在活动线程上运行的键
- active_threads:
- state:
WorkerState
封装的状态机。参见
BaseWorker
和WorkerState
- state:
- 向调度器添加外部插件。
- scheduler_ip: str, 可选
- scheduler_port: int, 可选
- scheduler_file: str, 可选
- host: str, 可选
- data: MutableMapping, type, None
用于存储的对象,默认为构建一个磁盘支持的 LRU 字典。
如果提供了用于构建存储对象的可调用对象,并且调用签名中包含名为
worker_local_directory
的参数,则该对象将接收 worker 的属性:local_directory
作为参数。- nthreads: int, 可选
- local_directory: str, 可选
放置本地资源的目录
- name: str, 可选
- memory_limit: int, float, string
此 worker 应使用的内存字节数。设置为零表示无限制。设置为“auto”则计算为 system.MEMORY_LIMIT * min(1, nthreads / total_cores)。使用字符串或数字,例如 5GB 或 5e9。
- memory_target_fraction: float 或 False
尝试保持低于此内存比例(默认:从配置键 distributed.worker.memory.target 读取)
- memory_spill_fraction: float 或 False
开始溢出到磁盘时的内存比例(默认:从配置键 distributed.worker.memory.spill 读取)
- memory_pause_fraction: float 或 False
停止运行新任务时的内存比例(默认:从配置键 distributed.worker.memory.pause 读取)
- max_spill: int, string 或 False
溢出到磁盘的字节数限制。(默认:从配置键 distributed.worker.memory.max-spill 读取)
- executor: concurrent.futures.Executor, dict[str, concurrent.futures.Executor], “offload”
- 要使用的执行器。根据类型不同,其含义如下:
Executor 实例:默认执行器。
Dict[str, Executor]:将名称映射到 Executor 实例的字典。如果字典中没有“default”键,则将使用
ThreadPoolExecutor(nthreads)
创建一个“default”执行器。字符串:字符串“offload”,指用于卸载通信的同一线程池。这导致同一线程用于反序列化和计算。
- resources: dict
此 worker 拥有的资源,例如
{'GPU': 2}
- nanny: str
如果存在,则联系 nanny 的地址
- lifetime: str
优雅关闭 worker 的时间量,例如“1 hour”。默认为 None,表示没有明确的关闭时间。
- lifetime_stagger: str
用于错开生命周期值的时间量,例如“5 minutes”。实际生命周期将在 lifetime +/- lifetime_stagger 之间均匀随机选择。
- lifetime_restart: bool
worker 达到其生命周期后是否重启。默认为 False。
- kwargs: 可选
ServerNode 构造函数的附加参数
要添加的 SchedulerPlugin 实例
示例
使用命令行启动 worker
$ dask scheduler Start scheduler at 127.0.0.1:8786 $ dask worker 127.0.0.1:8786 Start worker at: 127.0.0.1:1234 Registered with scheduler at: 127.0.0.1:8786
- batched_send(msg: dict[str, Any]) None [source]¶
实现 BaseWorker 抽象方法。
通过批量通信向调度器发送即发即弃的消息。
如果当前未连接到调度器,消息将被静默丢弃!
要添加的 SchedulerPlugin 实例
- async close(timeout: float = 30, executor_wait: bool = True, nanny: bool = True, reason: str = 'worker-close') str | None [source]¶
关闭 worker
关闭 worker 上运行的异步操作,停止所有执行器和通信。如果请求,这也关闭 nanny。
- 向调度器添加外部插件。
- timeout
关闭单个指令的超时时间(秒)
- executor_wait
如果为 True,则同步关闭执行器,否则异步关闭
- nanny
如果为 True,则关闭 nanny
- reason
关闭 worker 的原因
- 返回
- str | None
如果 worker 已处于关闭状态或失败,则为 None,否则为“OK”
- async close_gracefully(restart=None, reason: str = 'worker-close-gracefully')[source]¶
优雅地关闭 worker
这首先告知调度器我们正在关闭,并请求其将我们的数据移动到其他地方。之后,我们照常关闭。
- property data: collections.abc.MutableMapping[Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], object]¶
所有已完成任务的 {任务键: 任务载荷},无论它们是在此 Worker 上计算的,还是在其他地方计算然后通过网络传输到这里的。
使用默认配置时,这是一个 zict 缓冲区,当超出目标阈值时会自动溢出到磁盘。如果禁用溢出,它就是一个普通的 dict。它也可以是初始化 Worker 或 Nanny 时传递的用户定义的任意类字典对象。Worker 逻辑应将其视为不透明对象,并遵循 MutableMapping API。
注意
这个集合也可以通过
self.state.data
和self.memory_manager.data
访问。
- digest_metric(name: collections.abc.Hashable, value: float) None [source]¶
通过调用 Server.digest_metric 实现 BaseWorker.digest_metric
- async execute(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent [source]¶
执行一个任务。实现 BaseWorker 抽象方法。
要添加的 SchedulerPlugin 实例
- async gather(who_has: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]]) dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], object] [source]¶
Scheduler.rebalance() 和 Scheduler.replicate() 使用的端点
- async gather_dep(worker: str, to_gather: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], total_nbytes: int, *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent [source]¶
实现 BaseWorker 抽象方法
要添加的 SchedulerPlugin 实例
- get_current_task() Union[str, int, float, tuple[ForwardRef('Key'), ...]] [source]¶
获取当前正在运行的任务的键
这仅在任务内部运行时有意义
要添加的 SchedulerPlugin 实例
get_worker
示例
>>> from dask.distributed import get_worker >>> def f(): ... return get_worker().get_current_task()
>>> future = client.submit(f) >>> future.result() 'f-1234'
- handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) None [source]¶
覆盖 BaseWorker 方法以进行额外验证
- log_event(topic: str | collections.abc.Collection[str], msg: Any) None [source]¶
在给定主题下记录事件
- 向调度器添加外部插件。
- topicstr, list[str]
记录事件的主题名称。要在多个主题下记录同一事件,请传递主题名称列表。
- msg
要记录的事件消息。注意,这必须是 msgpack 可序列化的。
要添加的 SchedulerPlugin 实例
- async retry_busy_worker_later(worker: str) distributed.worker_state_machine.StateMachineEvent [source]¶
等待一段时间,然后将一个对等 worker 从忙碌状态中取出。实现 BaseWorker 抽象方法。
要添加的 SchedulerPlugin 实例
- async start_unsafe()[source]¶
尝试启动服务器。这不是幂等的,也不受并发启动尝试的保护。
此方法旨在由子类覆盖或调用。为了安全启动,请使用
Server.start
。如果配置了
death_timeout
,我们将要求此协程在此超时到达之前完成。如果达到超时,我们将关闭实例并引发asyncio.TimeoutError
。
- property worker_address¶
用于与 Nanny 的 API 兼容性
Nanny¶
- class distributed.Nanny(scheduler_ip=None, scheduler_port=None, scheduler_file=None, worker_port: int | str | collections.abc.Collection[int] | None = 0, nthreads=None, loop=None, local_directory=None, services=None, name=None, memory_limit='auto', reconnect=True, validate=False, quiet=False, resources=None, silence_logs=None, death_timeout=None, preload=None, preload_argv=None, preload_nanny=None, preload_nanny_argv=None, security=None, contact_address=None, listen_address=None, worker_class=None, env=None, interface=None, host=None, port: int | str | collections.abc.Collection[int] | None = None, protocol=None, config=None, **worker_kwargs)[source]¶
用于管理 worker 进程的进程
Nanny 启动 Worker 进程,监视它们,并在必要时杀死或重启它们。如果你想使用
Client.restart
方法,或者在工作节点达到其内存限制的终止阈值时自动重启工作节点,则 Nanny 是必要的。Nanny 的参数大部分与 Worker 的参数相同,但有一些例外如下所列。
- 向调度器添加外部插件。
- env: dict, optional
在 Nanny 初始化时设置的环境变量也将确保在 Worker 进程中设置。此参数允许覆盖或以其他方式为 Worker 设置环境变量。也可以使用选项
distributed.nanny.environ
设置环境变量。优先顺序如下:Nanny 参数
现有环境变量
Dask 配置
注意
某些环境变量,例如
OMP_NUM_THREADS
,必须在导入 numpy 之前设置才能生效。其他变量,例如MALLOC_TRIM_THRESHOLD_
(参阅 内存未释放回操作系统),必须在启动 Linux 进程之前设置。如果在 Nanny 参数或distributed.nanny.environ
中设置此类变量将无效;它们必须在distributed.nanny.pre-spawn-environ
中设置,以便在派生子进程之前设置它们,即使这意味着“污染”运行 Nanny 的进程。出于同样的原因,请注意将
distributed.worker.multiprocessing-method
从spawn
更改为fork
或forkserver
可能会抑制某些环境变量;如果这样做,您应该在启动dask-worker
之前在 shell 中自行设置这些变量。
要添加的 SchedulerPlugin 实例
- async close(timeout: float = 5, reason: str = 'nanny-close') Literal['OK'] [source]¶
关闭 worker 进程,停止所有通信。
- close_gracefully(reason: str = 'nanny-close-gracefully') None [source]¶
一个信号,表示如果 worker 消失,我们不应尝试重启它们。
这用于集群关闭过程的一部分。
- async kill(timeout: float = 5, reason: str = 'nanny-kill') None [source]¶
杀死本地 worker 进程
阻塞直到进程关闭并且调度器被正确通知。