部署 Dask 集群
目录
部署 Dask 集群¶
使用 Dask 分布式进行集群管理的概述。¶
本地机器¶
无需任何设置即可运行 Dask。默认情况下,Dask 会使用本地机器上的线程。
import dask.dataframe as dd
df = dd.read_csv(...)
df.x.sum().compute() # This uses threads on your local machine
或者,您可以在本地机器上设置一个功能齐全的多进程 Dask 集群。这使您可以访问多进程计算和诊断仪表盘。
from dask.distributed import LocalCluster
cluster = LocalCluster() # Fully-featured local Dask cluster
client = cluster.get_client()
# Dask works as normal and leverages the infrastructure defined above
df.x.sum().compute()
上面定义的 LocalCluster
集群管理器易于使用,在单机上运行良好。它与其他所有 Dask 集群管理器遵循相同的接口,因此在您准备好扩展时很容易替换。
# You can swap out LocalCluster for other cluster types
from dask.distributed import LocalCluster
from dask_kubernetes import KubeCluster
# cluster = LocalCluster()
cluster = KubeCluster() # example, you can swap out for Kubernetes
client = cluster.get_client()
以下资源解释了如何在各种本地和分布式硬件上设置 Dask。
云¶
在 AWS、GCP 或 Azure 等商业云上部署很方便,因为您可以快速扩展到许多机器并仅使用几分钟,但同时也具有挑战性,因为您需要处理繁琐的云 API、使用 Docker 管理远程软件环境、发送数据访问凭据、确保清理昂贵的资源等。以下解决方案有助于简化此过程。
Coiled (推荐): 这是一款商业 SaaS 产品,解决了 Dask 用户遇到的多数部署难题,易于使用且非常稳定。其免费层级对于大多数个人用户来说足够大,即使对于不希望与商业公司打交道的用户也是如此。其 API 如下所示。
import coiled cluster = coiled.Cluster( n_workers=100, region="us-east-2", worker_memory="16 GiB", spot_policy="spot_with_fallback", ) client = cluster.get_client()
Dask Cloud Provider: 一种纯粹简单的开源解决方案,可在云 VM 上设置 Dask worker,支持 AWS、GCP、Azure 以及 Hetzner、Digital Ocean 和 Nebius 等其他商业云。
Dask-Yarn: 将 Dask 部署在传统 YARN 集群上,例如可通过 AWS EMR 或 Google Cloud Dataproc 设置的集群。
有关更多详细信息,请参阅云。
高性能计算¶
Dask 运行在传统的 HPC 系统上,这些系统使用 SLURM、PBS、SGE、LSF 或类似资源管理器以及网络文件系统。这是一种将大规模硬件用于分析用例的简单方式。Dask 可以直接通过资源管理器部署,也可以通过 mpirun
/mpiexec
部署,并且倾向于使用 NFS 来分发数据和软件。
Dask-Jobqueue (推荐): 直接与资源管理器(SLURM、PBS、SGE、LSF 等)交互,将许多 Dask worker 作为批处理作业启动。它生成批处理作业脚本并自动提交到用户队列。此方法完全使用用户权限(无需 IT 支持)操作,并在大型 HPC 系统上支持交互式和自适应使用。它看起来有点像这样
from dask_jobqueue import PBSCluster cluster = PBSCluster( cores=24, memory="100GB", queue="regular", account="my-account", ) cluster.scale(jobs=100) client = cluster.get_client()
Dask-MPI: 使用
mpirun
将 Dask 部署在任何支持 MPI 的系统之上。这对于您希望确保worker数量固定且稳定的批处理作业很有帮助。Dask Gateway for Jobqueue: 多租户、安全集群。配置完成后,用户可以在无需直接访问底层 HPC 后端的情况下启动集群。
有关更多详细信息,请参阅高性能计算。
Kubernetes¶
Dask 原生运行在 Kubernetes 集群上。当公司已设置专用 Kubernetes 基础设施以运行其他服务时,这是一个方便的选择。在 Kubernetes 上运行 Dask 时,用户还应计划如何分发软件环境(可能使用 Docker)、用户凭据、配额管理等。在拥有成熟 Kubernetes 部署的大型组织中,这些通常由其他 Kubernetes 服务处理。
Dask Kubernetes Operator (推荐): Dask Kubernetes Operator 对于快速变化或临时性部署最有用。它是最 Kubernetes 原生的解决方案,对于 K8s 爱好者来说应该会很顺手。它看起来有点像这样
from dask_kubernetes.operator import KubeCluster cluster = KubeCluster( name="my-dask-cluster", image="ghcr.io/dask/dask:latest", resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}}, ) cluster.scale(10) client = cluster.get_client()
Dask Gateway for Kubernetes: 多租户、安全集群。配置完成后,用户可以在无需直接访问底层 Kubernetes 后端的情况下启动集群。
Single Cluster Helm Chart: 使用 Helm 部署的单个 Dask 集群和(可选)Jupyter。
有关更多详细信息,请参阅Kubernetes。
手动部署 (不推荐)¶
您可以手动或使用 SSH 等工具设置 Dask 集群。
手动设置: 用于设置
dask-scheduler
和dask-worker
进程的命令行界面。SSH: 使用 SSH 在非托管集群上设置 Dask。
Python API (高级): 从 Python 创建
Scheduler
和Worker
对象,作为分布式 Tornado TCP 应用程序的一部分。
但是,我们不推荐此路径。相反,我们建议您使用一些常见的资源管理器来帮助您管理机器,然后在该系统上部署 Dask。上述已描述这些选项。