部署 Dask 集群

Dask 在不同规模下都能很好地工作,从单机到多台机器组成的集群。本页描述了部署和运行 Dask 的多种方式,包括以下内容

_images/dask-cluster-manager.svg

使用 Dask 分布式进行集群管理的概述。

本地机器

无需任何设置即可运行 Dask。默认情况下,Dask 会使用本地机器上的线程。

import dask.dataframe as dd
df = dd.read_csv(...)
df.x.sum().compute()  # This uses threads on your local machine

或者,您可以在本地机器上设置一个功能齐全的多进程 Dask 集群。这使您可以访问多进程计算和诊断仪表盘。

from dask.distributed import LocalCluster
cluster = LocalCluster()          # Fully-featured local Dask cluster
client = cluster.get_client()

# Dask works as normal and leverages the infrastructure defined above
df.x.sum().compute()

上面定义的 LocalCluster 集群管理器易于使用,在单机上运行良好。它与其他所有 Dask 集群管理器遵循相同的接口,因此在您准备好扩展时很容易替换。

# You can swap out LocalCluster for other cluster types

from dask.distributed import LocalCluster
from dask_kubernetes import KubeCluster

# cluster = LocalCluster()
cluster = KubeCluster()  # example, you can swap out for Kubernetes

client = cluster.get_client()

以下资源解释了如何在各种本地和分布式硬件上设置 Dask。

在 AWS、GCP 或 Azure 等商业云上部署很方便,因为您可以快速扩展到许多机器并仅使用几分钟,但同时也具有挑战性,因为您需要处理繁琐的云 API、使用 Docker 管理远程软件环境、发送数据访问凭据、确保清理昂贵的资源等。以下解决方案有助于简化此过程。

  • Coiled (推荐): 这是一款商业 SaaS 产品,解决了 Dask 用户遇到的多数部署难题,易于使用且非常稳定。其免费层级对于大多数个人用户来说足够大,即使对于不希望与商业公司打交道的用户也是如此。其 API 如下所示。

    import coiled
    cluster = coiled.Cluster(
        n_workers=100,
        region="us-east-2",
        worker_memory="16 GiB",
        spot_policy="spot_with_fallback",
    )
    client = cluster.get_client()
    
  • Dask Cloud Provider: 一种纯粹简单的开源解决方案,可在云 VM 上设置 Dask worker,支持 AWS、GCP、Azure 以及 Hetzner、Digital Ocean 和 Nebius 等其他商业云。

  • Dask-Yarn: 将 Dask 部署在传统 YARN 集群上,例如可通过 AWS EMR 或 Google Cloud Dataproc 设置的集群。

有关更多详细信息,请参阅

高性能计算

Dask 运行在传统的 HPC 系统上,这些系统使用 SLURM、PBS、SGE、LSF 或类似资源管理器以及网络文件系统。这是一种将大规模硬件用于分析用例的简单方式。Dask 可以直接通过资源管理器部署,也可以通过 mpirun/mpiexec 部署,并且倾向于使用 NFS 来分发数据和软件。

  • Dask-Jobqueue (推荐): 直接与资源管理器(SLURM、PBS、SGE、LSF 等)交互,将许多 Dask worker 作为批处理作业启动。它生成批处理作业脚本并自动提交到用户队列。此方法完全使用用户权限(无需 IT 支持)操作,并在大型 HPC 系统上支持交互式和自适应使用。它看起来有点像这样

    from dask_jobqueue import PBSCluster
    cluster = PBSCluster(
        cores=24,
        memory="100GB",
        queue="regular",
        account="my-account",
    )
    cluster.scale(jobs=100)
    client = cluster.get_client()
    
  • Dask-MPI: 使用 mpirun 将 Dask 部署在任何支持 MPI 的系统之上。这对于您希望确保worker数量固定且稳定的批处理作业很有帮助。

  • Dask Gateway for Jobqueue: 多租户、安全集群。配置完成后,用户可以在无需直接访问底层 HPC 后端的情况下启动集群。

有关更多详细信息,请参阅高性能计算

Kubernetes

Dask 原生运行在 Kubernetes 集群上。当公司已设置专用 Kubernetes 基础设施以运行其他服务时,这是一个方便的选择。在 Kubernetes 上运行 Dask 时,用户还应计划如何分发软件环境(可能使用 Docker)、用户凭据、配额管理等。在拥有成熟 Kubernetes 部署的大型组织中,这些通常由其他 Kubernetes 服务处理。

  • Dask Kubernetes Operator (推荐): Dask Kubernetes Operator 对于快速变化或临时性部署最有用。它是最 Kubernetes 原生的解决方案,对于 K8s 爱好者来说应该会很顺手。它看起来有点像这样

    from dask_kubernetes.operator import KubeCluster
    cluster = KubeCluster(
        name="my-dask-cluster",
        image="ghcr.io/dask/dask:latest",
        resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}},
    )
    cluster.scale(10)
    client = cluster.get_client()
    
  • Dask Gateway for Kubernetes: 多租户、安全集群。配置完成后,用户可以在无需直接访问底层 Kubernetes 后端的情况下启动集群。

  • Single Cluster Helm Chart: 使用 Helm 部署的单个 Dask 集群和(可选)Jupyter。

有关更多详细信息,请参阅Kubernetes

高级理解

如果您想改进部署,还有一些额外的概念需要理解。本指南涵盖了运行 Dask 外需要考虑的主要主题。