部署 Dask 集群
目录
部署 Dask 集群¶
Dask 分布式集群管理概述。¶
本地机器¶
你无需任何设置即可运行 Dask。Dask 默认会在你的本地机器上使用线程。
import dask.dataframe as dd
df = dd.read_csv(...)
df.x.sum().compute() # This uses threads on your local machine
另外,你也可以在本地机器上设置一个功能齐全的多进程 Dask 集群。这使你可以进行多进程计算并访问诊断控制面板。
from dask.distributed import LocalCluster
cluster = LocalCluster() # Fully-featured local Dask cluster
client = cluster.get_client()
# Dask works as normal and leverages the infrastructure defined above
df.x.sum().compute()
上面定义的 LocalCluster
集群管理器易于使用,并且在单台机器上运行良好。它与其他所有 Dask 集群管理器遵循相同的接口,因此当你准备好进行扩展时,可以轻松替换它。
# You can swap out LocalCluster for other cluster types
from dask.distributed import LocalCluster
from dask_kubernetes import KubeCluster
# cluster = LocalCluster()
cluster = KubeCluster() # example, you can swap out for Kubernetes
client = cluster.get_client()
以下资源解释了如何在各种本地和分布式硬件上设置 Dask。
云¶
在商业云(如 AWS、GCP 或 Azure)上部署很方便,因为你可以在短短几分钟内快速扩展到多台机器,但也很有挑战性,因为你需要处理繁琐的云 API、使用 Docker 管理远程软件环境、发送数据访问凭据、确保清理掉昂贵的资源等等。以下解决方案有助于简化此过程。
Coiled (推荐):这款商业 SaaS 产品解决了 Dask 用户遇到的大多数部署难题,易于使用且非常稳健。其免费层对于大多数个人用户来说已经足够大,即使是不想与商业公司打交道的人也是如此。其 API 示例如下所示:
import coiled cluster = coiled.Cluster( n_workers=100, region="us-east-2", worker_memory="16 GiB", spot_policy="spot_with_fallback", ) client = cluster.get_client()
Dask Cloud Provider:一个纯粹且简单的开源解决方案,可在云虚拟机上设置 Dask 工作节点,支持 AWS、GCP、Azure 以及 Hetzner、Digital Ocean 和 Nebius 等其他商业云。
Dask-Yarn:在旧版 YARN 集群上部署 Dask,例如可以使用 AWS EMR 或 Google Cloud Dataproc 设置的集群。
更多详细信息请参阅云。
高性能计算¶
Dask 可在传统的 HPC 系统上运行,这些系统使用 SLURM、PBS、SGE、LSF 等资源管理器或类似系统以及网络文件系统。这是将大规模硬件用于分析用例的一种简单方法。Dask 可以直接通过资源管理器部署,也可以通过 mpirun
/mpiexec
部署,并且通常使用 NFS 分发数据和软件。
Dask-Jobqueue (推荐):直接与资源管理器 (SLURM, PBS, SGE, LSF 等) 交互,以批处理作业形式启动许多 Dask 工作节点。它生成批处理作业脚本并自动提交到用户的队列。这种方法完全使用用户权限操作(无需 IT 支持),并支持在大型 HPC 系统上进行交互式和自适应使用。其示例如下所示:
from dask_jobqueue import PBSCluster cluster = PBSCluster( cores=24, memory="100GB", queue="regular", account="my-account", ) cluster.scale(jobs=100) client = cluster.get_client()
Dask-MPI:在任何支持 MPI 的系统上使用
mpirun
部署 Dask。这对于希望确保固定且稳定数量工作节点的批处理作业很有帮助。Dask Gateway for Jobqueue:多租户安全集群。配置完成后,用户无需直接访问底层 HPC 后端即可启动集群。
更多详细信息请参阅高性能计算。
Kubernetes¶
Dask 原生运行在 Kubernetes 集群上。当公司已经为运行其他服务搭建了专用的 Kubernetes 基础设施时,这是一个方便的选择。在 Kubernetes 上运行 Dask 时,用户还应该制定计划来分发软件环境(可能使用 Docker)、用户凭据、配额管理等。在拥有成熟 Kubernetes 部署的大型组织中,这些工作通常由其他 Kubernetes 服务处理。
Dask Kubernetes Operator (推荐):Dask Kubernetes Operator 对于快速变化或临时部署最适用。它是最贴合 Kubernetes 原生的解决方案,对于 Kubernetes 爱好者来说应该会很适应。它看起来有点像这样:
from dask_kubernetes.operator import KubeCluster cluster = KubeCluster( name="my-dask-cluster", image="ghcr.io/dask/dask:latest", resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}}, ) cluster.scale(10) client = cluster.get_client()
Dask Gateway for Kubernetes:多租户安全集群。配置完成后,用户无需直接访问底层 Kubernetes 后端即可启动集群。
Single Cluster Helm Chart:使用 Helm 部署的单个 Dask 集群和(可选)Jupyter。
更多详细信息请参阅Kubernetes。
手动部署 (不推荐)¶
你可以手动设置 Dask 集群,或者使用 SSH 等工具。
手动设置:用于设置
dask-scheduler
和dask-worker
进程的命令行界面。SSH:使用 SSH 在非托管集群上设置 Dask。
Python API (高级):从 Python 创建
Scheduler
和Worker
对象,作为分布式 Tornado TCP 应用程序的一部分。
然而,我们不推荐这种方式。相反,我们建议你使用一些常用的资源管理器来帮助管理你的机器,然后在该系统上部署 Dask。上述已介绍了这些选项。