编写你自己的 runners
目录
编写你自己的 runners¶
本文档描述了 runners 类的设计以及如何实现你自己的 Dask runners。
runner 模型设计的核心假设是同一个脚本会被作业调度器多次执行。
(mpirun|srun|qsub|etc) -n4 myscript.py
├── [0] myscript.py
├── [1] myscript.py
├── [2] myscript.py
└── [3] myscript.py
在脚本中,runner 类在执行早期创建。
from dask_jobqueue import SomeRunner
from dask.distributed import Client
with SomeRunner(**kwargs) as runner:
with Client(runner) as client:
client.wait_for_workers(2)
# Do some Dask work
这将导致在 HPC 上运行多个进程,所有这些进程都实例化 runner 类。
这些进程需要协调决定哪个进程应运行 Dask Scheduler,哪个应成为 Dask Workers,以及哪个应继续运行脚本中的其余客户端代码。这种协调发生在 runner 类的 __init__()
期间。
Scheduler 和 Worker 进程在完成后退出,以避免多次运行客户端代码。这意味着只有一个进程会继续执行 runner 类的 __init__()
之后的代码,其余进程在工作完成后将在该点退出。
基类¶
在 dask_jobqueue.runners
中有一个 BaseRunner
类,可用于实现其他 runners。
实现新 runner 所需的最低要求是以下方法。
from dask_jobqueue.runner import BaseRunner
class MyRunner(BaseRunner):
async def get_role(self) -> str:
"""Figure out whether I am a scheduler, worker or client.
A common way to do this is by using a process ID. Many job queues give each process
a monotonic index that starts from zero. So we can assume proc 0 is the scheduler, proc 1
is the client and any other procs are workers.
"""
...
async def get_scheduler_address(self) -> str:
"""If I am not the scheduler discover the scheduler address.
A common way to do this is to read a scheduler file from a shared filesystem.
Alternatively if the scheduler process can broadcast it's address via something like MPI
we can define ``BaseRunner.set_scheduler_address()`` which will be called on the scheduler
and then recieve the broadcast in this method.
"""
...
BaseRunner
类在实现了这些方法后,负责启动 Dask。它还提供了许多存根(stubbed out)钩子,允许你在每个组件创建之前/之后编写代码。例如 BaseRunner.before_scheduler_start()
、BaseRunner.before_worker_start()
和 BaseRunner.before_client_start()
。
runner 必须知道调度器的地址,以便在代码执行结束时(通过 __exit__()
或 finalizer)协调所有进程的干净关闭。这种通信独立于可能创建的任何客户端进行。
Slurm 实现示例¶
作为一个具体的例子,你可以查看 Slurm 的实现。
在 get_role()
方法中,我们使用 SLURM_PROCID
环境变量来推断角色。
我们还添加了一个默认的调度器选项来设置 scheduler_file="scheduler-{job_id}.json"
,并且我从 SLURM_JOB_ID
环境变量查找作业 ID 以确保唯一性。这有效地允许我们通过共享文件系统广播调度器地址。
然后,在 get_scheduler_address()
方法中,我们等待调度器文件存在,然后以与 dask.distributed.Client
相同的方式打开并从调度器文件中读取地址。
这是一个精简示例,用于演示目的。
from dask_jobqueue.runner import BaseRunner
class SLURMRunner(BaseRunner):
def __init__(self, *args, scheduler_file="scheduler.json", **kwargs):
# Get the current process ID from the environment
self.proc_id = int(os.environ["SLURM_PROCID"])
# Tell the scheduler and workers to use a scheduler file on the shared filesystem
self.scheduler_file = scheduler_file
options = {"scheduler_file": self.scheduler_file}
super().__init__(*args, worker_options=options, scheduler_options=options)
async def get_role(self) -> str:
# Choose the role for this process based on the process ID
if self.proc_id == 0 and self.scheduler:
return Role.scheduler
elif self.proc_id == 1 and self.client:
return Role.client
else:
return Role.worker
async def get_scheduler_address(self) -> str:
# Wait for the scheduler file to be created and read the address from it
while not self.scheduler_file or not self.scheduler_file.exists():
await asyncio.sleep(0.2)
cfg = json.loads(self.scheduler_file.read_text())
return cfg["address"]