高级技巧与窍门

HPC 集群的环境极其多样化,拥有不同的作业调度器、不同的配置以及由每个 HPC 集群做出的不同决策(安全、使用等)。不幸的是,Dask-Jobqueue 不可能涵盖某些 HPC 集群所有微小的边缘情况。

本页尝试记录可能在某些集群(理想情况下严格多于一个,尽管很难确定……)上有用的技巧与窍门。

使用 job_directives_skip 跳过提交脚本中未识别的行

注意:参数 job_directives_skip 在 0.8.0 版本之前命名为 header_skip header_skip 仍然可以使用,但已被视为弃用,并将在未来版本中移除。

在某些集群上,由 Dask-Jobqueue 生成的提交脚本(您可以使用 print(cluster.job_script()) 查看)可能由于该 HPC 集群的某些配置怪癖而无法工作。当然,这种配置怪癖背后可能有一些原因。

当您调用 cluster.scale 时(即您实际提交作业的地方),您会收到一个错误,该错误会告诉您作业调度器对您的作业提交脚本不满意(参见下面的示例)。您可以用于解决此问题的主要参数是 job_directives_skip

# this will remove any line containing either '--mem' or
# 'another-string' from the job submission script
cluster = YourCluster(
    job_directives_skip=['--mem', 'another-string'],
    **other_options_go_here)

Matthew Rocklin 的这篇博客文章详细描述了此问题的一个示例。在他的情况下,错误是

Command:
bsub /tmp/tmp4874eufw.sh
stdout:

Typical usage:
     bsub [LSF arguments] jobscript
     bsub [LSF arguments] -Is $SHELL
     bsub -h[elp] [options]
     bsub -V

NOTES:
 * All jobs must specify a walltime (-W) and project id (-P)
 * Standard jobs must specify a node count (-nnodes) or -ln_slots. These jobs cannot specify a resource string (-R).
 * Expert mode jobs (-csm y) must specify a resource string and cannot specify -nnodes or -ln_slots.

stderr:
ERROR: Resource strings (-R) are not supported in easy mode. Please resubmit without a resource string.
ERROR: -n is no longer supported. Please request nodes with -nnodes.
ERROR: No nodes requested. Please request nodes with -nnodes.

此问题的另一个示例是此 github 问题,其中 --mem 在某些 SLURM 集群上不是一个被接受的选项。错误大致如下

$sbatch submit_slurm.sh
sbatch: error: Memory specification can not be satisfied
sbatch: error: Batch job submission failed: Requested node configuration is not available

使用 job_script_prologue 在启动工作进程前运行 setup 命令

注意:参数 job_script_prologue 在 0.7.4 版本之前命名为 env_extra env_extra 仍然可以使用,但已被视为弃用,并将在未来版本中移除。

有时您需要在实际启动工作进程之前运行一些 setup 命令。这包括设置环境变量、加载环境模块、激活虚拟环境,或激活 conda/mamba 环境。

可以使用 job_script_prologue 参数实现此目的。设置虚拟环境的示例

from dask_jobqueue.htcondor import HTCondorCluster
job_script_prologue = ['cd /some/path', 'source venv/bin/activate']
cluster = HTCondorCluster(cores=1, memory="2GB", disk="4GB", log_directory = 'logs', python='python3',
                          job_script_prologue=job_script_prologue)
print(cluster.job_script())

对于 HTCondorCluster,这些命令将被添加到提交描述文件中的 Arguments 参数的实际 python 调用之前。相关的行将如下所示

...
Arguments = "-c 'cd /some/path; source venv/bin/activate; python3 -m distributed.cli.dask_worker tcp://<IP>:<PORT> --nthreads 1 --memory-limit 2.00GB --name dummy-name --nanny --death-timeout 60'"
Executable = /bin/sh
...

对于其他批处理系统(*Cluster 类),附加命令将作为单独的行插入到提交脚本中。

类似地,如果您需要在工作进程退出后运行一些命令,则使用 job_script_epilogue 参数。

如何处理作业排队系统因墙钟时间杀死工作进程

在 dask-jobqueue 中,每个工作进程都在一个作业中运行,并且所有作业在作业排队系统中都有时间限制。达到墙钟时间在几种情况下可能会很麻烦

  • 当您在 HPC 平台上的可用空间不多,一次只能运行少量工作进程时(少于您在使用 scale 或 adapt 时期望的数量)。这些工作进程将在您的工作负载结束之前被杀死(并启动其他工作进程)。

  • 当您真的不知道工作负载需要多久时:所有工作进程都可能在到达结束之前被杀死。在这种情况下,您会希望使用自适应集群,以便 Dask 确保始终有一些工作进程处于运行状态。

如果您没有设置适当的参数,在这两种情况下都会遇到 KilledWorker 异常。

解决此问题的方法是提前告知 Dask 工作进程的生命周期是有限的

  • 使用 –lifetime 工作进程选项。这将启用使用自适应的无限工作负载。工作进程将在调度系统杀死它们之前正确关闭,并且所有状态都将被转移。

  • 在处理大量工作进程(例如 > 20)时使用 –lifetime-stagger:这将防止工作进程同时终止,从而缓解任务重新平衡和调度负担。

以下是如何使用这些参数的示例

cluster = Cluster(
    walltime="01:00:00",
    cores=4,
    memory="16gb",
    worker_extra_args=["--lifetime", "55m", "--lifetime-stagger", "4m"],
)
cluster.adapt(minimum=0, maximum=200)

注意:参数 worker_extra_args 在 0.7.4 版本之前命名为 extra extra 仍然可以使用,但已被视为弃用,并将在未来版本中移除。

以下是一个利用此优势的工作流程示例,如果您想尝试或根据您的用例进行调整

import time
import numpy as np
from dask_jobqueue import PBSCluster as Cluster
from dask import delayed
from dask.distributed import Client, as_completed

# config in $HOME/.config/dask/jobqueue.yaml
cluster = Cluster(walltime='00:01:00', cores=1, memory='4gb')
cluster.adapt(minimum=0, maximum=4)

client = Client(cluster)

# each job takes 1s, and we have 4 cpus * 1 min * 60s/min = 240 cpu.s, let's ask for a little more tasks.
filenames = [f'img{num}.jpg' for num in range(480)]

def features(num_fn):
    num, image_fn = num_fn
    time.sleep(1)  # takes about 1s to compute features on an image
    features = np.random.random(246)
    return num, features

num_files = len(filenames)
num_features = len(features((0, filenames[0]))[1]) # FIX

X = np.zeros((num_files, num_features), dtype=np.float32)

for future in as_completed(client.map(features, list(enumerate(filenames)))): # FIX
    i, v = future.result()
    X[i, :] = v