高性能计算机
目录
高性能计算机¶
相关机器¶
本页包含在科学和工业研究实验室常见的高性能超级计算机上部署 Dask 的说明和指南。这些系统通常具有以下属性:
启动 MPI 应用程序或使用作业调度器(如 SLURM、SGE、TORQUE、LSF、DRMAA、PBS 或其他)的某种机制
集群中所有机器都可见的共享网络文件系统
高性能网络互连,如 Infiniband
很少或没有节点本地存储
从何开始¶
本页的大部分内容记录了在 HPC 集群上使用 Dask 的各种方法和最佳实践。这是技术性内容,旨在帮助具有 Dask 部署经验的用户和系统管理员。
对于新的、有经验的用户或管理员来说,目前在 HPC 系统上运行 Dask 的首选和最简单的方法是使用 dask-jobqueue。
然而,dask-jobqueue 稍微偏向于交互式分析使用,在某些常规批处理生产工作负载中,使用 dask-mpi 等工具可能更好。
Dask-jobqueue 和 Dask-drmaa¶
dask-jobqueue 为 PBS、SLURM、LSF、SGE 和其他资源管理器提供集群管理器。你可以在这些系统上启动一个 Dask 集群,如下所示。
from dask_jobqueue import PBSCluster
cluster = PBSCluster(cores=36,
memory="100GB",
project='P48500028',
queue='premium',
interface='ib0',
walltime='02:00:00')
cluster.scale(100) # Start 100 workers in 100 jobs that match the description above
from dask.distributed import Client
client = Client(cluster) # Connect to that cluster
Dask-jobqueue 提供了许多可能性,例如 worker 的自适应动态伸缩,我们建议首先阅读 dask-jobqueue 文档以建立一个基本系统,然后根据需要返回此文档进行微调。
使用 MPI¶
你可以使用 mpirun
或 mpiexec
以及 dask-mpi 命令行工具启动一个 Dask 集群。
mpirun --np 4 dask-mpi --scheduler-file /home/$USER/scheduler.json
from dask.distributed import Client
client = Client(scheduler_file='/path/to/scheduler.json')
这依赖于 mpi4py 库。它仅使用 MPI 启动 Dask 集群,不用于节点间通信。MPI 实现各不相同:使用 mpirun --np 4
是特定于通过 conda 安装并链接到 mpi4py 的 mpich
或 open-mpi
MPI 实现。
conda install mpi4py
不需要完全使用此实现,但你可能需要验证你的 mpi4py
Python 库是否链接到正确的 mpirun/mpiexec
可执行文件,以及使用的标志(如 --np 4
)对于你的系统是否正确。你的集群系统管理员应该非常熟悉这些问题并能提供帮助。
在某些设置中,MPI 进程不允许派生其他进程。在这种情况下,我们建议使用 --no-nanny
选项,以防止 dask 使用额外的 nanny 进程来管理 worker。
运行 dask-mpi --help
查看 dask-mpi
命令的更多选项。
高性能网络¶
许多 HPC 系统既有标准以太网,也有能够提供更高带宽的高性能网络。你可以通过在 dask-worker
、dask-scheduler
或 dask-mpi
命令中使用 --interface
关键字,或在 dask-jobqueue 的 Cluster
对象中使用 interface=
关键字来指示 Dask 使用高性能网络接口。
mpirun --np 4 dask-mpi --scheduler-file /home/$USER/scheduler.json --interface ib0
在上面的代码示例中,我们假设你的集群有一个名为 ib0
的 Infiniband 网络接口。你可以咨询系统管理员或检查 ifconfig
的输出来确认。
$ ifconfig
lo Link encap:Local Loopback # Localhost
inet addr:127.0.0.1 Mask:255.0.0.0
inet6 addr: ::1/128 Scope:Host
eth0 Link encap:Ethernet HWaddr XX:XX:XX:XX:XX:XX # Ethernet
inet addr:192.168.0.101
...
ib0 Link encap:Infiniband # Fast InfiniBand
inet addr:172.42.0.101
https://stackoverflow.com/questions/43881157/how-do-i-use-an-infiniband-network-with-dask
本地存储¶
用户经常超出特定 Dask 部署可用的内存限制。正常情况下,Dask 会将超出内存的数据溢出到磁盘,通常是到默认的临时目录。
然而,在 HPC 系统中,这个默认临时目录可能指向网络文件系统 (NFS) 挂载点,当 Dask 尝试读写许多小文件时,这可能会导致问题。请注意,从许多分布式进程读写大量小文件是可能导致国家超级计算机崩溃的方式。
如果可用,最好将 Dask worker 指向本地存储,即物理上位于每个节点上的硬盘驱动器。你的 IT 管理员将能指引你到这些位置。你可以通过在 dask-worker
命令中使用 --local-directory
或 local_directory=
关键字来实现。
dask-mpi ... --local-directory /path/to/local/storage
或任何其他 Dask 设置工具,或者通过指定以下 配置值 来实现。
temporary-directory: /path/to/local/storage
然而,并非所有 HPC 系统都具有本地存储。如果情况如此,你可能希望完全关闭 Dask 溢出到磁盘的功能。有关 Dask 内存策略的更多信息,请参阅 此页。考虑修改你的 ~/.config/dask/distributed.yaml
文件中的以下值以禁用数据溢出到磁盘:
distributed:
worker:
memory:
target: false # don't spill to disk
spill: false # don't spill to disk
pause: 0.80 # pause execution at 80% memory use
terminate: 0.95 # restart the worker at 95% use
这会阻止 Dask worker 将数据溢出到磁盘,而是完全依赖于当它们达到内存限制时阻止其处理的机制。
提醒一下,你可以使用 --memory-limit
关键字设置 worker 的内存限制。
dask-mpi ... --memory-limit 10GB
启动许多小型作业¶
注意
如果你使用 dask-jobqueue 等工具,则不需要此部分。
HPC 作业调度器针对需要同时作为一个组运行的、具有许多节点的大型单一作业进行了优化。Dask 作业则灵活得多:worker 可以随时加入或离开,而不会严重影响作业。如果我们将作业拆分成许多小型作业,通常可以比典型作业更快地通过作业调度队列。当我们希望立即开始并与 Jupyter notebook 会话交互,而不是等待数小时直到合适的分配块变得空闲时,这尤其有价值。
因此,为了快速获得一个大型集群,我们建议在一个节点上分配一个 dask-scheduler 进程,并设置一个适度的墙钟时间(你的会话预期时间),然后分配许多具有较短墙钟时间(可能 30 分钟)的小型单节点 dask-worker 作业,这些作业可以很容易地挤进作业调度器的空闲空间。当你需要更多计算时,可以添加更多此类单节点作业或让它们到期。
使用 Dask 协同启动 Jupyter 服务器¶
Dask 可以帮助你同时启动其他服务。例如,你可以在运行 dask-scheduler
进程的机器上运行 Jupyter notebook 服务器,使用以下命令:
from dask.distributed import Client
client = Client(scheduler_file='scheduler.json')
import socket
host = client.run_on_scheduler(socket.gethostname)
def start_jlab(dask_scheduler):
import subprocess
proc = subprocess.Popen(['/path/to/jupyter', 'lab', '--ip', host, '--no-browser'])
dask_scheduler.jlab_proc = proc
client.run_on_scheduler(start_jlab)