自适应部署

动机

大多数 Dask 部署是静态的,只有一个调度器和固定数量的工作进程。这会带来可预测的行为,但在以下两种情况下会浪费资源:

  1. 用户可能未使用集群,或者他们可能正忙于解释最近的结果或图表,因此工作进程处于空闲状态,占用了其他潜在用户的宝贵共享资源

  2. 用户可能非常活跃,但受限于最初分配的资源。

特别高效的用户可能会学会在会话期间手动添加和移除工作进程,但这很少见。相反,我们希望 Dask 集群的大小能够随时匹配计算需求。这就是本文档讨论的自适应部署的目标。


Dask adaptive scaling

这对于交互式工作负载特别有帮助,交互式工作负载的特点是长时间不活动,期间穿插着短暂的密集活动。自适应部署可以带来更快的分析速度,赋予用户更大的能力,同时大大减轻计算资源的压力。

自适应

为了简化自适应部署的设置,一些 Dask 部署解决方案提供了 .adapt() 方法。以下是使用 dask_kubernetes.KubeCluster 的示例。

from dask_kubernetes import KubeCluster

cluster = KubeCluster()
cluster.adapt(minimum=0, maximum=100)  # scale between 0 and 100 workers

更多关键字选项,请参见下方的 Adaptive 类

Adaptive(cluster[, interval, minimum, ...])

根据调度器负载自适应分配工作进程。

对资源管理器的依赖

Dask 调度器本身不知道如何启动工作进程。相反,它依赖于外部资源调度器,如上面提到的 Kubernetes,或者 Yarn, SGE, SLURM, Mesos,或一些内部系统(参阅 如何部署 Dask 集群 查看选项)。为了使用自适应部署,您必须提供某种机制让调度器能够启动新的工作进程。通常,这可以通过使用 如何部署 Dask 集群 中列出的解决方案之一,或者通过从 Cluster 超类派生并实现其 API 来完成。

Cluster([asynchronous, loop, quiet, name, ...])

集群对象的超类

扩缩容启发式算法

Dask 调度器跟踪各种信息,这些信息有助于正确分配工作进程数量:

  1. 它所见过的每个函数和任务的历史运行时长,以及当前能够为用户运行的所有函数

  2. 每个工作进程上已使用和可用内存量

  3. 由于各种原因(如存在专用硬件)而空闲或饱和的工作进程

根据这些信息,它可以通过将所有待处理任务的累计预期运行时长除以 target_duration 参数(默认为五秒)来确定目标工作进程数量。这个工作进程数量作为对资源管理器的基线请求。这个数量可能会因多种原因而改变:

  1. 如果集群需要更多内存,它将选择目标工作进程数量或当前工作进程数量的两倍(取较大者)

  2. 如果目标数量超出最小值和最大值的范围,则将其限制在该范围内

此外,在缩容时,Dask 优先选择空闲且内存中数据最少的工作进程。在退役工作进程之前,它会将数据移动到其他机器。为了避免集群规模的快速波动,我们只在工作进程连续几个周期都被认为适合退役后才将其退役(由 wait_countinterval 参数控制)。

API

class distributed.deploy.Adaptive(cluster: Cluster, interval: str | float | timedelta | None = None, minimum: int | None =None, maximum: int | float | None =None, wait_count: int | None =None, target_duration: str | float | timedelta | None =None, worker_key: Callable[[distributed.scheduler.WorkerState], Hashable] | None =None, **kwargs: Any)[source]

根据调度器负载自适应分配工作进程。一个超类。

包含根据当前使用情况动态调整 Dask 集群大小的逻辑。此类需要与一个能够使用集群资源管理器创建和销毁 Dask 工作进程的系统配合使用。通常,它被内置在现有的解决方案中,而不是由用户直接使用。它最常通过各种 Dask 集群类的 .adapt(...) 方法使用。

参数
cluster: object

必须有 scale 和 scale_down 方法/协程

intervaltimedelta 或 str,默认为 “1000 ms”

两次检查之间的毫秒数

wait_count: int,默认为 3

在实际移除工作进程之前,连续多少次建议移除该工作进程。

target_duration: timedelta 或 str,默认为 “5s”

我们希望计算花费的时间量。这会影响我们向上扩缩容的激进程度。

worker_key: Callable[WorkerState]

缩容时用于将工作进程分组的函数。更多信息请参阅 Scheduler.workers_to_close

minimum: int

保持的最小工作进程数

maximum: int

保持的最大工作进程数

**kwargs

传递给 Scheduler.workers_to_close 的额外参数

注意

子类可以重写 Adaptive.target()Adaptive.workers_to_close() 来控制何时调整集群大小。默认实现检查每个工作进程的任务是否过多或可用内存是否过少(参阅 distributed.Scheduler.adaptive_target())。interval、min、max、wait_count 和 target_duration 的值可以在 dask 配置中的 distributed.adaptive 键下指定。

示例

这通常在现有的 Dask 类中使用,例如 KubeCluster

>>> from dask_kubernetes import KubeCluster
>>> cluster = KubeCluster()
>>> cluster.adapt(minimum=10, maximum=100)

或者,您也可以通过从 Dask 的 Cluster 超类继承,在您自己的 Cluster 类中使用它

>>> from distributed.deploy import Cluster
>>> class MyCluster(Cluster):
...     def scale_up(self, n):
...         """ Bring worker count up to n """
...     def scale_down(self, workers):
...        """ Remove worker addresses from cluster """
>>> cluster = MyCluster()
>>> cluster.adapt(minimum=10, maximum=100)
class distributed.deploy.Cluster(asynchronous=False, loop=None, quiet=False, name=None, scheduler_sync_interval=1)[source]

集群对象的超类

此类包含 Dask Cluster 管理器类的通用功能。

要实现此类,您必须提供:

  1. 一个 scheduler_comm 属性,它是遵循 distributed.core.rpc API 连接到调度器的连接。

  2. 实现 scale 方法,该方法接受一个整数并按该数量的工作进程扩展集群,否则将 _supports_scaling 设置为 False

完成后,您将获得以下功能:

  1. 标准的 __repr__

  2. 实时 IPython 小部件

  3. 自适应扩缩容

  4. 与 dask-labextension 集成

  5. 一个 scheduler_info 属性,其中包含 Scheduler.identity() 的最新副本,用于实现上述大部分功能

  6. 收集日志的方法

© 版权所有 2014-2018, Anaconda, Inc. 及贡献者。