自适应部署
目录
自适应部署¶
动机¶
大多数 Dask 部署是静态的,只有一个调度器和固定数量的工作进程。这会带来可预测的行为,但在以下两种情况下会浪费资源:
用户可能未使用集群,或者他们可能正忙于解释最近的结果或图表,因此工作进程处于空闲状态,占用了其他潜在用户的宝贵共享资源
用户可能非常活跃,但受限于最初分配的资源。
特别高效的用户可能会学会在会话期间手动添加和移除工作进程,但这很少见。相反,我们希望 Dask 集群的大小能够随时匹配计算需求。这就是本文档讨论的自适应部署的目标。
这对于交互式工作负载特别有帮助,交互式工作负载的特点是长时间不活动,期间穿插着短暂的密集活动。自适应部署可以带来更快的分析速度,赋予用户更大的能力,同时大大减轻计算资源的压力。
自适应¶
为了简化自适应部署的设置,一些 Dask 部署解决方案提供了 .adapt()
方法。以下是使用 dask_kubernetes.KubeCluster 的示例。
from dask_kubernetes import KubeCluster
cluster = KubeCluster()
cluster.adapt(minimum=0, maximum=100) # scale between 0 and 100 workers
更多关键字选项,请参见下方的 Adaptive 类
|
根据调度器负载自适应分配工作进程。 |
对资源管理器的依赖¶
Dask 调度器本身不知道如何启动工作进程。相反,它依赖于外部资源调度器,如上面提到的 Kubernetes,或者 Yarn, SGE, SLURM, Mesos,或一些内部系统(参阅 如何部署 Dask 集群 查看选项)。为了使用自适应部署,您必须提供某种机制让调度器能够启动新的工作进程。通常,这可以通过使用 如何部署 Dask 集群 中列出的解决方案之一,或者通过从 Cluster 超类派生并实现其 API 来完成。
|
集群对象的超类 |
扩缩容启发式算法¶
Dask 调度器跟踪各种信息,这些信息有助于正确分配工作进程数量:
它所见过的每个函数和任务的历史运行时长,以及当前能够为用户运行的所有函数
每个工作进程上已使用和可用内存量
由于各种原因(如存在专用硬件)而空闲或饱和的工作进程
根据这些信息,它可以通过将所有待处理任务的累计预期运行时长除以 target_duration
参数(默认为五秒)来确定目标工作进程数量。这个工作进程数量作为对资源管理器的基线请求。这个数量可能会因多种原因而改变:
如果集群需要更多内存,它将选择目标工作进程数量或当前工作进程数量的两倍(取较大者)
如果目标数量超出最小值和最大值的范围,则将其限制在该范围内
此外,在缩容时,Dask 优先选择空闲且内存中数据最少的工作进程。在退役工作进程之前,它会将数据移动到其他机器。为了避免集群规模的快速波动,我们只在工作进程连续几个周期都被认为适合退役后才将其退役(由 wait_count
和 interval
参数控制)。
API¶
- class distributed.deploy.Adaptive(cluster: Cluster, interval: str | float | timedelta | None = None, minimum: int | None =None, maximum: int | float | None =None, wait_count: int | None =None, target_duration: str | float | timedelta | None =None, worker_key: Callable[[distributed.scheduler.WorkerState], Hashable] | None =None, **kwargs: Any)[source]¶
根据调度器负载自适应分配工作进程。一个超类。
包含根据当前使用情况动态调整 Dask 集群大小的逻辑。此类需要与一个能够使用集群资源管理器创建和销毁 Dask 工作进程的系统配合使用。通常,它被内置在现有的解决方案中,而不是由用户直接使用。它最常通过各种 Dask 集群类的
.adapt(...)
方法使用。- 参数
- cluster: object
必须有 scale 和 scale_down 方法/协程
- intervaltimedelta 或 str,默认为 “1000 ms”
两次检查之间的毫秒数
- wait_count: int,默认为 3
在实际移除工作进程之前,连续多少次建议移除该工作进程。
- target_duration: timedelta 或 str,默认为 “5s”
我们希望计算花费的时间量。这会影响我们向上扩缩容的激进程度。
- worker_key: Callable[WorkerState]
缩容时用于将工作进程分组的函数。更多信息请参阅 Scheduler.workers_to_close
- minimum: int
保持的最小工作进程数
- maximum: int
保持的最大工作进程数
- **kwargs
传递给 Scheduler.workers_to_close 的额外参数
注意
子类可以重写
Adaptive.target()
和Adaptive.workers_to_close()
来控制何时调整集群大小。默认实现检查每个工作进程的任务是否过多或可用内存是否过少(参阅distributed.Scheduler.adaptive_target()
)。interval、min、max、wait_count 和 target_duration 的值可以在 dask 配置中的 distributed.adaptive 键下指定。示例
这通常在现有的 Dask 类中使用,例如 KubeCluster
>>> from dask_kubernetes import KubeCluster >>> cluster = KubeCluster() >>> cluster.adapt(minimum=10, maximum=100)
或者,您也可以通过从 Dask 的 Cluster 超类继承,在您自己的 Cluster 类中使用它
>>> from distributed.deploy import Cluster >>> class MyCluster(Cluster): ... def scale_up(self, n): ... """ Bring worker count up to n """ ... def scale_down(self, workers): ... """ Remove worker addresses from cluster """
>>> cluster = MyCluster() >>> cluster.adapt(minimum=10, maximum=100)
- class distributed.deploy.Cluster(asynchronous=False, loop=None, quiet=False, name=None, scheduler_sync_interval=1)[source]¶
集群对象的超类
此类包含 Dask Cluster 管理器类的通用功能。
要实现此类,您必须提供:
一个
scheduler_comm
属性,它是遵循distributed.core.rpc
API 连接到调度器的连接。实现
scale
方法,该方法接受一个整数并按该数量的工作进程扩展集群,否则将_supports_scaling
设置为 False
完成后,您将获得以下功能:
标准的
__repr__
实时 IPython 小部件
自适应扩缩容
与 dask-labextension 集成
一个
scheduler_info
属性,其中包含Scheduler.identity()
的最新副本,用于实现上述大部分功能收集日志的方法