高性能计算机

相关机器

本页包含在科学和工业研究实验室常见的高性能超级计算机上部署 Dask 的说明和指南。这些系统通常具有以下属性:

  1. 启动 MPI 应用程序或使用作业调度器(如 SLURM、SGE、TORQUE、LSF、DRMAA、PBS 或其他)的某种机制

  2. 集群中所有机器都可见的共享网络文件系统

  3. 高性能网络互连,如 Infiniband

  4. 很少或没有节点本地存储

从何开始

本页的大部分内容记录了在 HPC 集群上使用 Dask 的各种方法和最佳实践。这是技术性内容,旨在帮助具有 Dask 部署经验的用户和系统管理员。

对于新的、有经验的用户或管理员来说,目前在 HPC 系统上运行 Dask 的首选和最简单的方法是使用 dask-jobqueue

然而,dask-jobqueue 稍微偏向于交互式分析使用,在某些常规批处理生产工作负载中,使用 dask-mpi 等工具可能更好。

Dask-jobqueue 和 Dask-drmaa

dask-jobqueue 为 PBS、SLURM、LSF、SGE 和其他资源管理器提供集群管理器。你可以在这些系统上启动一个 Dask 集群,如下所示。

from dask_jobqueue import PBSCluster

cluster = PBSCluster(cores=36,
                     memory="100GB",
                     project='P48500028',
                     queue='premium',
                     interface='ib0',
                     walltime='02:00:00')

cluster.scale(100)  # Start 100 workers in 100 jobs that match the description above

from dask.distributed import Client
client = Client(cluster)    # Connect to that cluster

Dask-jobqueue 提供了许多可能性,例如 worker 的自适应动态伸缩,我们建议首先阅读 dask-jobqueue 文档以建立一个基本系统,然后根据需要返回此文档进行微调。

使用 MPI

你可以使用 mpirunmpiexec 以及 dask-mpi 命令行工具启动一个 Dask 集群。

mpirun --np 4 dask-mpi --scheduler-file /home/$USER/scheduler.json
from dask.distributed import Client
client = Client(scheduler_file='/path/to/scheduler.json')

这依赖于 mpi4py 库。它仅使用 MPI 启动 Dask 集群,不用于节点间通信。MPI 实现各不相同:使用 mpirun --np 4 是特定于通过 conda 安装并链接到 mpi4py 的 mpichopen-mpi MPI 实现。

conda install mpi4py

不需要完全使用此实现,但你可能需要验证你的 mpi4py Python 库是否链接到正确的 mpirun/mpiexec 可执行文件,以及使用的标志(如 --np 4)对于你的系统是否正确。你的集群系统管理员应该非常熟悉这些问题并能提供帮助。

在某些设置中,MPI 进程不允许派生其他进程。在这种情况下,我们建议使用 --no-nanny 选项,以防止 dask 使用额外的 nanny 进程来管理 worker。

运行 dask-mpi --help 查看 dask-mpi 命令的更多选项。

使用共享网络文件系统和作业调度器

注意

如果你使用 dask-jobqueue 等工具,则不需要此部分。

某些集群受益于共享文件系统(NFS, GPFS, Lustre 或类似系统),可以使用它来将 scheduler 的位置告知 worker。

dask-scheduler --scheduler-file /path/to/scheduler.json  # writes address to file

dask-worker --scheduler-file /path/to/scheduler.json  # reads file for address
dask-worker --scheduler-file /path/to/scheduler.json  # reads file for address
>>> client = Client(scheduler_file='/path/to/scheduler.json')

当使用 SGE/SLURM/Torque 等作业调度器部署 dask-schedulerdask-worker 进程时,这会特别有用。以下是使用 SGE 的 qsub 命令的示例。

# Start a dask-scheduler somewhere and write the connection information to a file
qsub -b y /path/to/dask-scheduler --scheduler-file /home/$USER/scheduler.json

# Start 100 dask-worker processes in an array job pointing to the same file
qsub -b y -t 1-100 /path/to/dask-worker --scheduler-file /home/$USER/scheduler.json

请注意,--scheduler-file 选项仅当你的 scheduler 和 worker 共享一个网络文件系统时才有价值。

高性能网络

许多 HPC 系统既有标准以太网,也有能够提供更高带宽的高性能网络。你可以通过在 dask-workerdask-schedulerdask-mpi 命令中使用 --interface 关键字,或在 dask-jobqueue 的 Cluster 对象中使用 interface= 关键字来指示 Dask 使用高性能网络接口。

mpirun --np 4 dask-mpi --scheduler-file /home/$USER/scheduler.json --interface ib0

在上面的代码示例中,我们假设你的集群有一个名为 ib0 的 Infiniband 网络接口。你可以咨询系统管理员或检查 ifconfig 的输出来确认。

$ ifconfig
lo          Link encap:Local Loopback                       # Localhost
                        inet addr:127.0.0.1  Mask:255.0.0.0
                        inet6 addr: ::1/128 Scope:Host
eth0        Link encap:Ethernet  HWaddr XX:XX:XX:XX:XX:XX   # Ethernet
                        inet addr:192.168.0.101
                        ...
ib0         Link encap:Infiniband                           # Fast InfiniBand
                        inet addr:172.42.0.101

https://stackoverflow.com/questions/43881157/how-do-i-use-an-infiniband-network-with-dask

本地存储

用户经常超出特定 Dask 部署可用的内存限制。正常情况下,Dask 会将超出内存的数据溢出到磁盘,通常是到默认的临时目录。

然而,在 HPC 系统中,这个默认临时目录可能指向网络文件系统 (NFS) 挂载点,当 Dask 尝试读写许多小文件时,这可能会导致问题。请注意,从许多分布式进程读写大量小文件是可能导致国家超级计算机崩溃的方式

如果可用,最好将 Dask worker 指向本地存储,即物理上位于每个节点上的硬盘驱动器。你的 IT 管理员将能指引你到这些位置。你可以通过在 dask-worker 命令中使用 --local-directorylocal_directory= 关键字来实现。

dask-mpi ... --local-directory /path/to/local/storage

或任何其他 Dask 设置工具,或者通过指定以下 配置值 来实现。

temporary-directory: /path/to/local/storage

然而,并非所有 HPC 系统都具有本地存储。如果情况如此,你可能希望完全关闭 Dask 溢出到磁盘的功能。有关 Dask 内存策略的更多信息,请参阅 此页。考虑修改你的 ~/.config/dask/distributed.yaml 文件中的以下值以禁用数据溢出到磁盘:

distributed:
  worker:
    memory:
      target: false  # don't spill to disk
      spill: false  # don't spill to disk
      pause: 0.80  # pause execution at 80% memory use
      terminate: 0.95  # restart the worker at 95% use

这会阻止 Dask worker 将数据溢出到磁盘,而是完全依赖于当它们达到内存限制时阻止其处理的机制。

提醒一下,你可以使用 --memory-limit 关键字设置 worker 的内存限制。

dask-mpi ... --memory-limit 10GB

启动许多小型作业

注意

如果你使用 dask-jobqueue 等工具,则不需要此部分。

HPC 作业调度器针对需要同时作为一个组运行的、具有许多节点的大型单一作业进行了优化。Dask 作业则灵活得多:worker 可以随时加入或离开,而不会严重影响作业。如果我们将作业拆分成许多小型作业,通常可以比典型作业更快地通过作业调度队列。当我们希望立即开始并与 Jupyter notebook 会话交互,而不是等待数小时直到合适的分配块变得空闲时,这尤其有价值。

因此,为了快速获得一个大型集群,我们建议在一个节点上分配一个 dask-scheduler 进程,并设置一个适度的墙钟时间(你的会话预期时间),然后分配许多具有较短墙钟时间(可能 30 分钟)的小型单节点 dask-worker 作业,这些作业可以很容易地挤进作业调度器的空闲空间。当你需要更多计算时,可以添加更多此类单节点作业或让它们到期。

使用 Dask 协同启动 Jupyter 服务器

Dask 可以帮助你同时启动其他服务。例如,你可以在运行 dask-scheduler 进程的机器上运行 Jupyter notebook 服务器,使用以下命令:

from dask.distributed import Client
client = Client(scheduler_file='scheduler.json')

import socket
host = client.run_on_scheduler(socket.gethostname)

def start_jlab(dask_scheduler):
    import subprocess
    proc = subprocess.Popen(['/path/to/jupyter', 'lab', '--ip', host, '--no-browser'])
    dask_scheduler.jlab_proc = proc

client.run_on_scheduler(start_jlab)