高性能计算机
目录
高性能计算机¶
相关机器¶
本页面包含在高性能超级计算机上部署 Dask 的说明和指南,这些超级计算机常见于科研和工业研究实验室。这些系统通常具有以下属性
某种启动 MPI 应用程序或使用诸如 SLURM、SGE、TORQUE、LSF、DRMAA、PBS 或其他作业调度程序的机制
集群中所有机器可见的共享网络文件系统
高性能网络互连,例如 Infiniband
节点本地存储很少或没有
从何开始¶
本页大部分内容记录了在 HPC 集群上使用 Dask 的各种方式和最佳实践。这些内容具有技术性,面向有一定 Dask 部署经验的用户以及系统管理员。
对于新的、有经验的用户或管理员而言,目前在 HPC 系统上运行 Dask 首选且最简单的方式是使用 dask-jobqueue。
然而,dask-jobqueue 稍微偏向于交互式分析用途,对于一些常规的批量生产工作负载,使用 dask-mpi 等工具可能会更好。
Dask-jobqueue 和 Dask-drmaa¶
dask-jobqueue 为 PBS、SLURM、LSF、SGE 和其他资源管理器提供了集群管理器。您可以在这些系统上像这样启动 Dask 集群。
from dask_jobqueue import PBSCluster
cluster = PBSCluster(cores=36,
memory="100GB",
project='P48500028',
queue='premium',
interface='ib0',
walltime='02:00:00')
cluster.scale(100) # Start 100 workers in 100 jobs that match the description above
from dask.distributed import Client
client = Client(cluster) # Connect to that cluster
Dask-jobqueue 提供了许多可能性,例如自适应的动态 worker 扩展,我们建议您首先阅读 dask-jobqueue 文档,以使基本系统运行起来,然后在需要时返回本文档进行微调。
使用 MPI¶
您可以使用 mpirun
或 mpiexec
以及 dask-mpi 命令行工具启动 Dask 集群。
mpirun --np 4 dask-mpi --scheduler-file /home/$USER/scheduler.json
from dask.distributed import Client
client = Client(scheduler_file='/path/to/scheduler.json')
这依赖于 mpi4py 库。它仅使用 MPI 来启动 Dask 集群,而不用于节点间通信。MPI 实现有所不同:使用 mpirun --np 4
特定于通过 conda 安装并链接到 mpi4py 的 mpich
或 open-mpi
MPI 实现。
conda install mpi4py
并非必须使用完全相同的实现,但您可能需要验证您的 mpi4py
Python 库是否链接到了正确的 mpirun/mpiexec
可执行文件,并且所使用的标志(如 --np 4
)对于您的系统是正确的。您的集群系统管理员应该非常熟悉这些问题,并能够提供帮助。
在某些设置中,MPI 进程不允许 fork 其他进程。在这种情况下,我们建议使用 --no-nanny
选项,以防止 dask 使用额外的 nanny 进程来管理 workers。
运行 dask-mpi --help
查看 dask-mpi
命令的更多选项。
高性能网络¶
许多 HPC 系统既有标准以太网,也有能够提高带宽的高性能网络。您可以通过在 dask-worker
、dask-scheduler
或 dask-mpi
命令中使用 --interface
关键字,或者在 dask-jobqueue Cluster
对象中使用 interface=
关键字来指示 Dask 使用高性能网络接口
mpirun --np 4 dask-mpi --scheduler-file /home/$USER/scheduler.json --interface ib0
在上面的代码示例中,我们假设您的集群有一个名为 ib0
的 Infiniband 网络接口。您可以通过咨询系统管理员或检查 ifconfig
的输出来确认。
$ ifconfig
lo Link encap:Local Loopback # Localhost
inet addr:127.0.0.1 Mask:255.0.0.0
inet6 addr: ::1/128 Scope:Host
eth0 Link encap:Ethernet HWaddr XX:XX:XX:XX:XX:XX # Ethernet
inet addr:192.168.0.101
...
ib0 Link encap:Infiniband # Fast InfiniBand
inet addr:172.42.0.101
https://stackoverflow.com/questions/43881157/how-do-i-use-an-infiniband-network-with-dask
本地存储¶
用户经常会超出特定 Dask 部署可用的内存限制。在正常操作中,Dask 会将超出限制的数据溢写到磁盘,通常是默认的临时目录。
然而,在 HPC 系统中,这个默认的临时目录可能指向网络文件系统(NFS)挂载点,这可能会导致问题,因为 Dask 尝试读写许多小文件。请注意,从许多分布式进程读写大量小文件是使国家超级计算机瘫痪的一个好方法。
如果可用,最好将 Dask workers 指向本地存储,即物理位于每个节点上的硬盘驱动器。您的 IT 管理员能够告诉您这些位置。您可以在 dask-worker
命令中使用 --local-directory
或 local_directory=
关键字来执行此操作
dask-mpi ... --local-directory /path/to/local/storage
或任何其他 Dask 设置工具,或者通过指定以下 配置值
temporary-directory: /path/to/local/storage
然而,并非所有 HPC 系统都具有本地存储。如果出现这种情况,您可能需要完全关闭 Dask 溢写到磁盘的功能。有关 Dask 内存策略的更多信息,请参阅本页。考虑更改 ~/.config/dask/distributed.yaml
文件中的以下值来禁用数据溢写到磁盘
distributed:
worker:
memory:
target: false # don't spill to disk
spill: false # don't spill to disk
pause: 0.80 # pause execution at 80% memory use
terminate: 0.95 # restart the worker at 95% use
这可以阻止 Dask workers 将数据溢写到磁盘,而是完全依赖于在达到内存限制时停止处理的机制。
提醒一下,您可以使用 --memory-limit
关键字设置 worker 的内存限制
dask-mpi ... --memory-limit 10GB
启动许多小型作业¶
注意
如果您使用 dask-jobqueue 等工具,则无需阅读本节。
HPC 作业调度程序针对需要同时以组形式运行的、包含许多节点的庞大单体作业进行了优化。Dask 作业则更加灵活:worker 可以随时加入或离开,而不会对作业产生太大影响。如果我们将作业拆分成许多较小的作业,通常可以比典型作业更快地通过作业调度队列。当我们想立即开始并与 Jupyter notebook 会话进行交互,而不是等待数小时等待合适的分配块可用时,这一点特别有价值。
因此,为了快速获得大型集群,我们建议在一个节点上分配一个具有适度墙钟时间(您会话的预期时间)的 dask-scheduler 进程,然后分配许多墙钟时间较短(可能 30 分钟)的单节点 dask-worker 小型作业,这些小型作业可以轻松挤进作业调度程序中的额外空间。随着您需要更多计算资源,您可以增加更多的这类单节点作业或让它们过期。
使用 Dask 协同启动 Jupyter 服务器¶
Dask 可以通过与自身一起启动其他服务来帮助您。例如,您可以在运行 dask-scheduler
进程的机器上运行 Jupyter notebook 服务器,使用以下命令
from dask.distributed import Client
client = Client(scheduler_file='scheduler.json')
import socket
host = client.run_on_scheduler(socket.gethostname)
def start_jlab(dask_scheduler):
import subprocess
proc = subprocess.Popen(['/path/to/jupyter', 'lab', '--ip', host, '--no-browser'])
dask_scheduler.jlab_proc = proc
client.run_on_scheduler(start_jlab)