部署 Dask 集群

Dask 在各种规模下都能很好地工作,从单台机器到由多台机器组成的集群。本页介绍了部署和运行 Dask 的多种方法,包括以下内容:

_images/dask-cluster-manager.svg

Dask 分布式集群管理概述。

本地机器

你无需任何设置即可运行 Dask。Dask 默认会在你的本地机器上使用线程。

import dask.dataframe as dd
df = dd.read_csv(...)
df.x.sum().compute()  # This uses threads on your local machine

另外,你也可以在本地机器上设置一个功能齐全的多进程 Dask 集群。这使你可以进行多进程计算并访问诊断控制面板。

from dask.distributed import LocalCluster
cluster = LocalCluster()          # Fully-featured local Dask cluster
client = cluster.get_client()

# Dask works as normal and leverages the infrastructure defined above
df.x.sum().compute()

上面定义的 LocalCluster 集群管理器易于使用,并且在单台机器上运行良好。它与其他所有 Dask 集群管理器遵循相同的接口,因此当你准备好进行扩展时,可以轻松替换它。

# You can swap out LocalCluster for other cluster types

from dask.distributed import LocalCluster
from dask_kubernetes import KubeCluster

# cluster = LocalCluster()
cluster = KubeCluster()  # example, you can swap out for Kubernetes

client = cluster.get_client()

以下资源解释了如何在各种本地和分布式硬件上设置 Dask。

在商业云(如 AWS、GCP 或 Azure)上部署很方便,因为你可以在短短几分钟内快速扩展到多台机器,但也很有挑战性,因为你需要处理繁琐的云 API、使用 Docker 管理远程软件环境、发送数据访问凭据、确保清理掉昂贵的资源等等。以下解决方案有助于简化此过程。

  • Coiled (推荐):这款商业 SaaS 产品解决了 Dask 用户遇到的大多数部署难题,易于使用且非常稳健。其免费层对于大多数个人用户来说已经足够大,即使是不想与商业公司打交道的人也是如此。其 API 示例如下所示:

    import coiled
    cluster = coiled.Cluster(
        n_workers=100,
        region="us-east-2",
        worker_memory="16 GiB",
        spot_policy="spot_with_fallback",
    )
    client = cluster.get_client()
    
  • Dask Cloud Provider:一个纯粹且简单的开源解决方案,可在云虚拟机上设置 Dask 工作节点,支持 AWS、GCP、Azure 以及 Hetzner、Digital Ocean 和 Nebius 等其他商业云。

  • Dask-Yarn:在旧版 YARN 集群上部署 Dask,例如可以使用 AWS EMR 或 Google Cloud Dataproc 设置的集群。

更多详细信息请参阅

高性能计算

Dask 可在传统的 HPC 系统上运行,这些系统使用 SLURM、PBS、SGE、LSF 等资源管理器或类似系统以及网络文件系统。这是将大规模硬件用于分析用例的一种简单方法。Dask 可以直接通过资源管理器部署,也可以通过 mpirun/mpiexec 部署,并且通常使用 NFS 分发数据和软件。

  • Dask-Jobqueue (推荐):直接与资源管理器 (SLURM, PBS, SGE, LSF 等) 交互,以批处理作业形式启动许多 Dask 工作节点。它生成批处理作业脚本并自动提交到用户的队列。这种方法完全使用用户权限操作(无需 IT 支持),并支持在大型 HPC 系统上进行交互式和自适应使用。其示例如下所示:

    from dask_jobqueue import PBSCluster
    cluster = PBSCluster(
        cores=24,
        memory="100GB",
        queue="regular",
        account="my-account",
    )
    cluster.scale(jobs=100)
    client = cluster.get_client()
    
  • Dask-MPI:在任何支持 MPI 的系统上使用 mpirun 部署 Dask。这对于希望确保固定且稳定数量工作节点的批处理作业很有帮助。

  • Dask Gateway for Jobqueue:多租户安全集群。配置完成后,用户无需直接访问底层 HPC 后端即可启动集群。

更多详细信息请参阅高性能计算

Kubernetes

Dask 原生运行在 Kubernetes 集群上。当公司已经为运行其他服务搭建了专用的 Kubernetes 基础设施时,这是一个方便的选择。在 Kubernetes 上运行 Dask 时,用户还应该制定计划来分发软件环境(可能使用 Docker)、用户凭据、配额管理等。在拥有成熟 Kubernetes 部署的大型组织中,这些工作通常由其他 Kubernetes 服务处理。

  • Dask Kubernetes Operator (推荐):Dask Kubernetes Operator 对于快速变化或临时部署最适用。它是最贴合 Kubernetes 原生的解决方案,对于 Kubernetes 爱好者来说应该会很适应。它看起来有点像这样:

    from dask_kubernetes.operator import KubeCluster
    cluster = KubeCluster(
        name="my-dask-cluster",
        image="ghcr.io/dask/dask:latest",
        resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}},
    )
    cluster.scale(10)
    client = cluster.get_client()
    
  • Dask Gateway for Kubernetes:多租户安全集群。配置完成后,用户无需直接访问底层 Kubernetes 后端即可启动集群。

  • Single Cluster Helm Chart:使用 Helm 部署的单个 Dask 集群和(可选)Jupyter。

更多详细信息请参阅Kubernetes

高级理解

如果你想改进部署,还有一些额外的概念需要理解。本指南涵盖了除了运行 Dask 之外需要考虑的主要议题。