高级技巧与窍门
目录
高级技巧与窍门¶
HPC 集群的环境极其多样化,拥有不同的作业调度器、不同的配置以及由每个 HPC 集群做出的不同决策(安全、使用等)。不幸的是,Dask-Jobqueue 不可能涵盖某些 HPC 集群所有微小的边缘情况。
本页尝试记录可能在某些集群(理想情况下严格多于一个,尽管很难确定……)上有用的技巧与窍门。
使用 job_directives_skip
跳过提交脚本中未识别的行¶
注意:参数 job_directives_skip
在 0.8.0 版本之前命名为 header_skip
。 header_skip
仍然可以使用,但已被视为弃用,并将在未来版本中移除。
在某些集群上,由 Dask-Jobqueue 生成的提交脚本(您可以使用 print(cluster.job_script())
查看)可能由于该 HPC 集群的某些配置怪癖而无法工作。当然,这种配置怪癖背后可能有一些原因。
当您调用 cluster.scale
时(即您实际提交作业的地方),您会收到一个错误,该错误会告诉您作业调度器对您的作业提交脚本不满意(参见下面的示例)。您可以用于解决此问题的主要参数是 job_directives_skip
# this will remove any line containing either '--mem' or
# 'another-string' from the job submission script
cluster = YourCluster(
job_directives_skip=['--mem', 'another-string'],
**other_options_go_here)
Matthew Rocklin 的这篇博客文章详细描述了此问题的一个示例。在他的情况下,错误是
Command:
bsub /tmp/tmp4874eufw.sh
stdout:
Typical usage:
bsub [LSF arguments] jobscript
bsub [LSF arguments] -Is $SHELL
bsub -h[elp] [options]
bsub -V
NOTES:
* All jobs must specify a walltime (-W) and project id (-P)
* Standard jobs must specify a node count (-nnodes) or -ln_slots. These jobs cannot specify a resource string (-R).
* Expert mode jobs (-csm y) must specify a resource string and cannot specify -nnodes or -ln_slots.
stderr:
ERROR: Resource strings (-R) are not supported in easy mode. Please resubmit without a resource string.
ERROR: -n is no longer supported. Please request nodes with -nnodes.
ERROR: No nodes requested. Please request nodes with -nnodes.
此问题的另一个示例是此 github 问题,其中 --mem
在某些 SLURM 集群上不是一个被接受的选项。错误大致如下
$sbatch submit_slurm.sh
sbatch: error: Memory specification can not be satisfied
sbatch: error: Batch job submission failed: Requested node configuration is not available
使用 job_script_prologue
在启动工作进程前运行 setup 命令¶
注意:参数 job_script_prologue
在 0.7.4 版本之前命名为 env_extra
。 env_extra
仍然可以使用,但已被视为弃用,并将在未来版本中移除。
有时您需要在实际启动工作进程之前运行一些 setup 命令。这包括设置环境变量、加载环境模块、激活虚拟环境,或激活 conda/mamba 环境。
可以使用 job_script_prologue
参数实现此目的。设置虚拟环境的示例
from dask_jobqueue.htcondor import HTCondorCluster
job_script_prologue = ['cd /some/path', 'source venv/bin/activate']
cluster = HTCondorCluster(cores=1, memory="2GB", disk="4GB", log_directory = 'logs', python='python3',
job_script_prologue=job_script_prologue)
print(cluster.job_script())
对于 HTCondorCluster
,这些命令将被添加到提交描述文件中的 Arguments
参数的实际 python 调用之前。相关的行将如下所示
...
Arguments = "-c 'cd /some/path; source venv/bin/activate; python3 -m distributed.cli.dask_worker tcp://<IP>:<PORT> --nthreads 1 --memory-limit 2.00GB --name dummy-name --nanny --death-timeout 60'"
Executable = /bin/sh
...
对于其他批处理系统(*Cluster
类),附加命令将作为单独的行插入到提交脚本中。
类似地,如果您需要在工作进程退出后运行一些命令,则使用 job_script_epilogue
参数。
如何处理作业排队系统因墙钟时间杀死工作进程¶
在 dask-jobqueue 中,每个工作进程都在一个作业中运行,并且所有作业在作业排队系统中都有时间限制。达到墙钟时间在几种情况下可能会很麻烦
当您在 HPC 平台上的可用空间不多,一次只能运行少量工作进程时(少于您在使用 scale 或 adapt 时期望的数量)。这些工作进程将在您的工作负载结束之前被杀死(并启动其他工作进程)。
当您真的不知道工作负载需要多久时:所有工作进程都可能在到达结束之前被杀死。在这种情况下,您会希望使用自适应集群,以便 Dask 确保始终有一些工作进程处于运行状态。
如果您没有设置适当的参数,在这两种情况下都会遇到 KilledWorker 异常。
解决此问题的方法是提前告知 Dask 工作进程的生命周期是有限的
使用 –lifetime 工作进程选项。这将启用使用自适应的无限工作负载。工作进程将在调度系统杀死它们之前正确关闭,并且所有状态都将被转移。
在处理大量工作进程(例如 > 20)时使用 –lifetime-stagger:这将防止工作进程同时终止,从而缓解任务重新平衡和调度负担。
以下是如何使用这些参数的示例
cluster = Cluster(
walltime="01:00:00",
cores=4,
memory="16gb",
worker_extra_args=["--lifetime", "55m", "--lifetime-stagger", "4m"],
)
cluster.adapt(minimum=0, maximum=200)
注意:参数 worker_extra_args
在 0.7.4 版本之前命名为 extra
。 extra
仍然可以使用,但已被视为弃用,并将在未来版本中移除。
以下是一个利用此优势的工作流程示例,如果您想尝试或根据您的用例进行调整
import time
import numpy as np
from dask_jobqueue import PBSCluster as Cluster
from dask import delayed
from dask.distributed import Client, as_completed
# config in $HOME/.config/dask/jobqueue.yaml
cluster = Cluster(walltime='00:01:00', cores=1, memory='4gb')
cluster.adapt(minimum=0, maximum=4)
client = Client(cluster)
# each job takes 1s, and we have 4 cpus * 1 min * 60s/min = 240 cpu.s, let's ask for a little more tasks.
filenames = [f'img{num}.jpg' for num in range(480)]
def features(num_fn):
num, image_fn = num_fn
time.sleep(1) # takes about 1s to compute features on an image
features = np.random.random(246)
return num, features
num_files = len(filenames)
num_features = len(features((0, filenames[0]))[1]) # FIX
X = np.zeros((num_files, num_features), dtype=np.float32)
for future in as_completed(client.map(features, list(enumerate(filenames)))): # FIX
i, v = future.result()
X[i, :] = v