高性能计算机

相关机器

本页面包含在高性能超级计算机上部署 Dask 的说明和指南,这些超级计算机常见于科研和工业研究实验室。这些系统通常具有以下属性

  1. 某种启动 MPI 应用程序或使用诸如 SLURM、SGE、TORQUE、LSF、DRMAA、PBS 或其他作业调度程序的机制

  2. 集群中所有机器可见的共享网络文件系统

  3. 高性能网络互连,例如 Infiniband

  4. 节点本地存储很少或没有

从何开始

本页大部分内容记录了在 HPC 集群上使用 Dask 的各种方式和最佳实践。这些内容具有技术性,面向有一定 Dask 部署经验的用户以及系统管理员。

对于新的、有经验的用户或管理员而言,目前在 HPC 系统上运行 Dask 首选且最简单的方式是使用 dask-jobqueue

然而,dask-jobqueue 稍微偏向于交互式分析用途,对于一些常规的批量生产工作负载,使用 dask-mpi 等工具可能会更好。

Dask-jobqueue 和 Dask-drmaa

dask-jobqueue 为 PBS、SLURM、LSF、SGE 和其他资源管理器提供了集群管理器。您可以在这些系统上像这样启动 Dask 集群。

from dask_jobqueue import PBSCluster

cluster = PBSCluster(cores=36,
                     memory="100GB",
                     project='P48500028',
                     queue='premium',
                     interface='ib0',
                     walltime='02:00:00')

cluster.scale(100)  # Start 100 workers in 100 jobs that match the description above

from dask.distributed import Client
client = Client(cluster)    # Connect to that cluster

Dask-jobqueue 提供了许多可能性,例如自适应的动态 worker 扩展,我们建议您首先阅读 dask-jobqueue 文档,以使基本系统运行起来,然后在需要时返回本文档进行微调。

使用 MPI

您可以使用 mpirunmpiexec 以及 dask-mpi 命令行工具启动 Dask 集群。

mpirun --np 4 dask-mpi --scheduler-file /home/$USER/scheduler.json
from dask.distributed import Client
client = Client(scheduler_file='/path/to/scheduler.json')

这依赖于 mpi4py 库。它仅使用 MPI 来启动 Dask 集群,而不用于节点间通信。MPI 实现有所不同:使用 mpirun --np 4 特定于通过 conda 安装并链接到 mpi4py 的 mpichopen-mpi MPI 实现。

conda install mpi4py

并非必须使用完全相同的实现,但您可能需要验证您的 mpi4py Python 库是否链接到了正确的 mpirun/mpiexec 可执行文件,并且所使用的标志(如 --np 4)对于您的系统是正确的。您的集群系统管理员应该非常熟悉这些问题,并能够提供帮助。

在某些设置中,MPI 进程不允许 fork 其他进程。在这种情况下,我们建议使用 --no-nanny 选项,以防止 dask 使用额外的 nanny 进程来管理 workers。

运行 dask-mpi --help 查看 dask-mpi 命令的更多选项。

使用共享网络文件系统和作业调度程序

注意

如果您使用 dask-jobqueue 等工具,则无需阅读本节。

某些集群得益于共享文件系统(NFS、GPFS、Lustre 或类似系统),并可以利用它向 workers 通信调度程序的位置

dask-scheduler --scheduler-file /path/to/scheduler.json  # writes address to file

dask-worker --scheduler-file /path/to/scheduler.json  # reads file for address
dask-worker --scheduler-file /path/to/scheduler.json  # reads file for address
>>> client = Client(scheduler_file='/path/to/scheduler.json')

在使用 SGE/SLURM/Torque/等作业调度程序部署 dask-schedulerdask-worker 进程时,这特别有用。这是一个使用 SGE 的 qsub 命令的示例

# Start a dask-scheduler somewhere and write the connection information to a file
qsub -b y /path/to/dask-scheduler --scheduler-file /home/$USER/scheduler.json

# Start 100 dask-worker processes in an array job pointing to the same file
qsub -b y -t 1-100 /path/to/dask-worker --scheduler-file /home/$USER/scheduler.json

请注意,--scheduler-file 选项在您的调度程序和 workers 共享网络文件系统时才有用。

高性能网络

许多 HPC 系统既有标准以太网,也有能够提高带宽的高性能网络。您可以通过在 dask-workerdask-schedulerdask-mpi 命令中使用 --interface 关键字,或者在 dask-jobqueue Cluster 对象中使用 interface= 关键字来指示 Dask 使用高性能网络接口

mpirun --np 4 dask-mpi --scheduler-file /home/$USER/scheduler.json --interface ib0

在上面的代码示例中,我们假设您的集群有一个名为 ib0 的 Infiniband 网络接口。您可以通过咨询系统管理员或检查 ifconfig 的输出来确认。

$ ifconfig
lo          Link encap:Local Loopback                       # Localhost
                        inet addr:127.0.0.1  Mask:255.0.0.0
                        inet6 addr: ::1/128 Scope:Host
eth0        Link encap:Ethernet  HWaddr XX:XX:XX:XX:XX:XX   # Ethernet
                        inet addr:192.168.0.101
                        ...
ib0         Link encap:Infiniband                           # Fast InfiniBand
                        inet addr:172.42.0.101

https://stackoverflow.com/questions/43881157/how-do-i-use-an-infiniband-network-with-dask

本地存储

用户经常会超出特定 Dask 部署可用的内存限制。在正常操作中,Dask 会将超出限制的数据溢写到磁盘,通常是默认的临时目录。

然而,在 HPC 系统中,这个默认的临时目录可能指向网络文件系统(NFS)挂载点,这可能会导致问题,因为 Dask 尝试读写许多小文件。请注意,从许多分布式进程读写大量小文件是使国家超级计算机瘫痪的一个好方法

如果可用,最好将 Dask workers 指向本地存储,即物理位于每个节点上的硬盘驱动器。您的 IT 管理员能够告诉您这些位置。您可以在 dask-worker 命令中使用 --local-directorylocal_directory= 关键字来执行此操作

dask-mpi ... --local-directory /path/to/local/storage

或任何其他 Dask 设置工具,或者通过指定以下 配置值

temporary-directory: /path/to/local/storage

然而,并非所有 HPC 系统都具有本地存储。如果出现这种情况,您可能需要完全关闭 Dask 溢写到磁盘的功能。有关 Dask 内存策略的更多信息,请参阅本页。考虑更改 ~/.config/dask/distributed.yaml 文件中的以下值来禁用数据溢写到磁盘

distributed:
  worker:
    memory:
      target: false  # don't spill to disk
      spill: false  # don't spill to disk
      pause: 0.80  # pause execution at 80% memory use
      terminate: 0.95  # restart the worker at 95% use

这可以阻止 Dask workers 将数据溢写到磁盘,而是完全依赖于在达到内存限制时停止处理的机制。

提醒一下,您可以使用 --memory-limit 关键字设置 worker 的内存限制

dask-mpi ... --memory-limit 10GB

启动许多小型作业

注意

如果您使用 dask-jobqueue 等工具,则无需阅读本节。

HPC 作业调度程序针对需要同时以组形式运行的、包含许多节点的庞大单体作业进行了优化。Dask 作业则更加灵活:worker 可以随时加入或离开,而不会对作业产生太大影响。如果我们将作业拆分成许多较小的作业,通常可以比典型作业更快地通过作业调度队列。当我们想立即开始并与 Jupyter notebook 会话进行交互,而不是等待数小时等待合适的分配块可用时,这一点特别有价值。

因此,为了快速获得大型集群,我们建议在一个节点上分配一个具有适度墙钟时间(您会话的预期时间)的 dask-scheduler 进程,然后分配许多墙钟时间较短(可能 30 分钟)的单节点 dask-worker 小型作业,这些小型作业可以轻松挤进作业调度程序中的额外空间。随着您需要更多计算资源,您可以增加更多的这类单节点作业或让它们过期。

使用 Dask 协同启动 Jupyter 服务器

Dask 可以通过与自身一起启动其他服务来帮助您。例如,您可以在运行 dask-scheduler 进程的机器上运行 Jupyter notebook 服务器,使用以下命令

from dask.distributed import Client
client = Client(scheduler_file='scheduler.json')

import socket
host = client.run_on_scheduler(socket.gethostname)

def start_jlab(dask_scheduler):
    import subprocess
    proc = subprocess.Popen(['/path/to/jupyter', 'lab', '--ip', host, '--no-browser'])
    dask_scheduler.jlab_proc = proc

client.run_on_scheduler(start_jlab)