编写你自己的 runners

编写你自己的 runners

本文档描述了 runners 类的设计以及如何实现你自己的 Dask runners。

runner 模型设计的核心假设是同一个脚本会被作业调度器多次执行。

(mpirun|srun|qsub|etc) -n4 myscript.py
├── [0] myscript.py
├── [1] myscript.py
├── [2] myscript.py
└── [3] myscript.py

在脚本中,runner 类在执行早期创建。

from dask_jobqueue import SomeRunner
from dask.distributed import Client

with SomeRunner(**kwargs) as runner:
    with Client(runner) as client:
        client.wait_for_workers(2)
        # Do some Dask work

这将导致在 HPC 上运行多个进程,所有这些进程都实例化 runner 类。

这些进程需要协调决定哪个进程应运行 Dask Scheduler,哪个应成为 Dask Workers,以及哪个应继续运行脚本中的其余客户端代码。这种协调发生在 runner 类的 __init__() 期间。

Scheduler 和 Worker 进程在完成后退出,以避免多次运行客户端代码。这意味着只有一个进程会继续执行 runner 类的 __init__() 之后的代码,其余进程在工作完成后将在该点退出。

基类

dask_jobqueue.runners 中有一个 BaseRunner 类,可用于实现其他 runners。

实现新 runner 所需的最低要求是以下方法。

from dask_jobqueue.runner import BaseRunner

class MyRunner(BaseRunner):

    async def get_role(self) -> str:
        """Figure out whether I am a scheduler, worker or client.

        A common way to do this is by using a process ID. Many job queues give each process
        a monotonic index that starts from zero. So we can assume proc 0 is the scheduler, proc 1
        is the client and any other procs are workers.
        """
        ...

    async def get_scheduler_address(self) -> str:
        """If I am not the scheduler discover the scheduler address.

        A common way to do this is to read a scheduler file from a shared filesystem.

        Alternatively if the scheduler process can broadcast it's address via something like MPI
        we can define ``BaseRunner.set_scheduler_address()`` which will be called on the scheduler
        and then recieve the broadcast in this method.
        """
        ...

BaseRunner 类在实现了这些方法后,负责启动 Dask。它还提供了许多存根(stubbed out)钩子,允许你在每个组件创建之前/之后编写代码。例如 BaseRunner.before_scheduler_start()BaseRunner.before_worker_start()BaseRunner.before_client_start()

runner 必须知道调度器的地址,以便在代码执行结束时(通过 __exit__() 或 finalizer)协调所有进程的干净关闭。这种通信独立于可能创建的任何客户端进行。

Slurm 实现示例

作为一个具体的例子,你可以查看 Slurm 的实现。

get_role() 方法中,我们使用 SLURM_PROCID 环境变量来推断角色。

我们还添加了一个默认的调度器选项来设置 scheduler_file="scheduler-{job_id}.json",并且我从 SLURM_JOB_ID 环境变量查找作业 ID 以确保唯一性。这有效地允许我们通过共享文件系统广播调度器地址。

然后,在 get_scheduler_address() 方法中,我们等待调度器文件存在,然后以与 dask.distributed.Client 相同的方式打开并从调度器文件中读取地址。

这是一个精简示例,用于演示目的。

from dask_jobqueue.runner import BaseRunner

class SLURMRunner(BaseRunner):
    def __init__(self, *args, scheduler_file="scheduler.json", **kwargs):
        # Get the current process ID from the environment
        self.proc_id = int(os.environ["SLURM_PROCID"])

        # Tell the scheduler and workers to use a scheduler file on the shared filesystem
        self.scheduler_file = scheduler_file
        options = {"scheduler_file": self.scheduler_file}
        super().__init__(*args, worker_options=options, scheduler_options=options)

    async def get_role(self) -> str:
        # Choose the role for this process based on the process ID
        if self.proc_id == 0 and self.scheduler:
            return Role.scheduler
        elif self.proc_id == 1 and self.client:
            return Role.client
        else:
            return Role.worker

    async def get_scheduler_address(self) -> str:
        # Wait for the scheduler file to be created and read the address from it
        while not self.scheduler_file or not self.scheduler_file.exists():
            await asyncio.sleep(0.2)
        cfg = json.loads(self.scheduler_file.read_text())
        return cfg["address"]