自适应部署

动机

大多数 Dask 部署是静态的,只有一个调度器和固定数量的工作进程。这导致了可预测的行为,但在以下两种情况下会浪费资源:

  1. 用户可能没有使用集群,或者可能正在忙于解释最近的结果或图表,因此工作进程空闲,占用了其他潜在用户的宝贵共享资源。

  2. 用户可能非常活跃,但受到其原始分配资源的限制。

特别是效率高的用户可能会在会话期间学习手动添加和移除工作进程,但这很少见。相反,我们希望 Dask 集群的大小能够随时匹配计算需求。这就是本文档中讨论的自适应部署的目标。


Dask adaptive scaling

这些对于交互式工作负载特别有用,其特点是长时间不活动后伴随短暂的重度活动爆发。自适应部署可以实现更快的分析,赋予用户更多能力,同时显著减轻对计算资源的压力。

自适应

为了便于设置自适应部署,一些 Dask 部署解决方案提供了 .adapt() 方法。这里是使用 dask_kubernetes.KubeCluster 的一个示例。

from dask_kubernetes import KubeCluster

cluster = KubeCluster()
cluster.adapt(minimum=0, maximum=100)  # scale between 0 and 100 workers

有关更多关键字选项,请参阅下面的 Adaptive 类

Adaptive(cluster[, interval, minimum, ...])

根据调度器负载自适应地分配工作进程。

对资源管理器的依赖

Dask 调度器本身不知道如何启动工作进程。相反,它依赖于外部资源调度器,例如上面提到的 Kubernetes,或者 Yarn、SGE、SLURM、Mesos 或其他内部系统(有关选项,请参阅如何部署 Dask 集群)。为了使用自适应部署,您必须提供一种机制,让调度器能够启动新的工作进程。通常,这是通过使用如何部署 Dask 集群中列出的某个解决方案,或者通过从 Cluster 超类继承并实现其 API 来完成的。

Cluster([asynchronous, loop, quiet, name, ...])

集群对象的超类

扩缩容启发式算法

Dask 调度器会跟踪多种信息,这些信息有助于正确分配工作进程数量:

  1. 它已见过的每个函数和任务的历史运行时,以及当前能够为用户运行的所有函数

  2. 每个工作进程使用的内存量和可用内存量

  3. 由于各种原因(例如存在专用硬件)而空闲或饱和的工作进程

基于这些信息,它能够通过将所有待处理任务的累积预期运行时除以 target_duration 参数(默认为五秒)来确定目标工作进程数量。这个工作进程数量作为对资源管理器的基本请求。这个数量可以因多种原因而改变:

  1. 如果集群需要更多内存,它将选择目标工作进程数量或当前工作进程数量的两倍(以较大者为准)

  2. 如果目标数量超出最小值和最大值的范围,则会截断以适应该范围

此外,在缩减规模时,Dask 优先选择那些空闲且内存中数据最少的工作进程。在退役工作进程之前,它会将数据转移到其他机器。为避免集群规模快速上下波动,我们只有在该工作进程在几个周期内持续被认为是适合退役时,才会将其退役(由 wait_countinterval 参数控制)。

API

class distributed.deploy.Adaptive(cluster: Cluster, interval: str | float | timedelta | None =None, minimum: int | None =None, maximum: int | float | None =None, wait_count: int | None =None, target_duration: str | float | timedelta | None =None, worker_key: Callable[[distributed.scheduler.WorkerState], Hashable] | None =None, **kwargs: Any)[source]

根据调度器负载自适应地分配工作进程。一个超类。

包含根据当前使用情况动态调整 Dask 集群大小的逻辑。此类需要与一个能够使用集群资源管理器创建和销毁 Dask 工作进程的系统配合使用。它通常内置于现有解决方案中,而不是由用户直接使用。它最常从各种 Dask 集群类的 .adapt(...) 方法中使用。

参数
cluster: 对象

必须具有 scale 和 scale_down 方法/协程

intervaltimedelta 或 str,默认为 “1000 ms”

检查间隔(毫秒)

wait_count: int,默认为 3

在我们移除工作进程之前,建议移除该工作进程的连续次数。

target_duration: timedelta 或 str,默认为 “5s”

我们希望计算花费的时间量。这会影响我们扩大规模的积极程度。

worker_key: Callable[WorkerState]

缩减规模时用于将工作进程分组的函数。有关更多信息,请参阅 Scheduler.workers_to_close。

minimum: int

要保留的最小工作进程数量

maximum: int

要保留的最大工作进程数量

**kwargs

传递给 Scheduler.workers_to_close 的额外参数

注意

子类可以重写 Adaptive.target()Adaptive.workers_to_close() 来控制何时调整集群大小。默认实现会检查每个工作进程的任务是否过多或可用内存是否过少(参见 distributed.Scheduler.adaptive_target())。interval、min、max、wait_count 和 target_duration 的值可以在 dask 配置文件的 distributed.adaptive 键下指定。

示例

这通常从现有的 Dask 类中使用,例如 KubeCluster

>>> from dask_kubernetes import KubeCluster
>>> cluster = KubeCluster()
>>> cluster.adapt(minimum=10, maximum=100)

或者,您可以通过继承 Dask 的 Cluster 超类来在您自己的 Cluster 类中使用它

>>> from distributed.deploy import Cluster
>>> class MyCluster(Cluster):
...     def scale_up(self, n):
...         """ Bring worker count up to n """
...     def scale_down(self, workers):
...        """ Remove worker addresses from cluster """
>>> cluster = MyCluster()
>>> cluster.adapt(minimum=10, maximum=100)
class distributed.deploy.Cluster(asynchronous=False, loop=None, quiet=False, name=None, scheduler_sync_interval=1)[source]

集群对象的超类

此类包含 Dask 集群管理器类的通用功能。

要实现此类,您必须提供:

  1. 一个 scheduler_comm 属性,它是遵循 distributed.core.rpc API 的调度器连接。

  2. 实现接受整数并根据该整数扩缩容集群到相应工作进程数量的 scale 方法,否则将 _supports_scaling 设置为 False。

为此,您应该获得以下功能:

  1. 一个标准的 __repr__

  2. 一个实时的 IPython 小部件

  3. 自适应扩缩容

  4. 与 dask-labextension 集成

  5. 一个 scheduler_info 属性,其中包含 Scheduler.identity() 的最新副本,上述许多功能都使用了此属性

  6. 收集日志的方法