dask_jobqueue.LSFCluster

dask_jobqueue.LSFCluster

class dask_jobqueue.LSFCluster(n_workers=0, job_cls: typing.Optional[dask_jobqueue.core.Job] = None, loop=None, security=None, shared_temp_directory=None, silence_logs='error', name=None, asynchronous=False, dashboard_address=None, host=None, scheduler_options=None, scheduler_cls=<class 'distributed.scheduler.Scheduler'>, interface=None, protocol=None, config_name=None, **job_kwargs)

在 LSF 集群上启动 Dask

参数
queuestr

每个 worker 作业的目标队列。传递给 #BSUB -q 选项。

projectstr

与每个 worker 作业关联的项目。传递给 #BSUB -P 选项。

coresint

作业内所有 worker 线程将运行的总 CPU 核心数。每个 worker 进程的线程数使用公式 cores / processes 确定。作业排队系统默认使用此值作为每个作业的 CPU 数量。

memory: str

作业内所有 worker 将使用的总内存量。作业排队系统默认使用此值作为每个作业的内存数量。

processesint

将作业分割成指定数量的进程。适用于 GIL 工作负载或具有许多核心的节点。默认情况下,process ~= sqrt(cores),以便进程数和每个进程的线程数大致相同。

interfacestr

网络接口,如“eth0”或“ib0”。这将同时用于 Dask 调度器和 Dask worker 的接口。如果 Dask 调度器需要不同的接口,可以通过 scheduler_options 参数传递:interface=your_worker_interface, scheduler_options={'interface': your_scheduler_interface}

nannybool

是否启动 nanny 进程

local_directorystr

用于文件溢出的 Dask worker 本地目录。

death_timeoutfloat

在关闭 worker 之前等待调度器的秒数

extralist

已弃用:请改用 worker_extra_args。此参数将在未来版本中移除。

worker_commandlist

启动 worker 时要运行的命令。默认为“distributed.cli.dask_worker”。

worker_extra_argslist

传递给 dask-worker 的附加参数

env_extralist

已弃用:请改用 job_script_prologue。此参数将在未来版本中移除。

job_script_prologuelist

在启动 worker 之前添加到脚本的其他命令。

job_script_epiloguelist

worker 命令退出后将运行的命令添加到脚本。

header_skiplist

已弃用:请改用 job_directives_skip。此参数将在未来版本中移除。

job_directives_skiplist

在生成的作业脚本头部中要跳过的指令。包含指定字符串的指令行将被移除。由 job_extra_directives 添加的指令不受影响。

log_directorystr

用于作业调度器日志的目录。

shebangstr

批处理提交脚本所需解释器的路径。

pythonstr

用于启动 Dask worker 的 Python 可执行文件。默认为提交这些作业的 Python。

config_namestr

jobqueue.yaml 配置文件中要使用的部分。

namestr

Dask worker 的名称。通常由 Cluster 设置。

ncpusint

CPU 数量。传递给 #BSUB -n 选项。

memint

请求的内存(字节)。传递给 #BSUB -M 选项。

walltimestr

每个 worker 作业的墙上时间,格式为 HH:MM。传递给 #BSUB -W 选项。

n_workersint

默认启动的 worker 数量。默认为 0。请参阅 scale 方法。

silence_logsstr

如果在本地启动调度器,此处发出的日志级别,例如“debug”、“info”或“error”。

asynchronousbool

是否使用 async/await 语法运行此集群对象

securitySecurity or Bool

如果您使用 TLS/SSL,则为 dask.distributed 安全对象。如果为 True,将自动创建临时的自签名凭据。

scheduler_optionsdict

用于向 Dask 调度器传递附加参数。例如,使用 scheduler_options={'dashboard_address': ':12435'} 指定 Web 面板应使用的端口,或使用 scheduler_options={'host': 'your-host'} 指定 Dask 调度器应运行的主机。有关更多详细信息,请参阅 distributed.Scheduler

scheduler_clstype

更改使用的 Dask 调度器的类。默认为 Dask 的 distributed.Scheduler

shared_temp_directorystr

调度器和 worker 之间的共享目录(例如用于临时安全证书),如果未设置,则默认为当前工作目录。

job_extralist

已弃用:请改用 job_extra_directives。此参数将在未来版本中移除。

job_extra_directiveslist

其他 LSF 选项列表,例如 -u。每个选项将以 #LSF 前缀开头。

lsf_unitsstr

集群 lsf.conf 文件中 LSF_UNIT_FOR_LIMITS 设置的资源使用大单位的单位系统。

use_stdinbool

LSF 的 bsub 命令允许我们通过将其作为参数传递 (bsub /tmp/jobscript.sh`) 或通过 stdin 输入 (bsub < /tmp/jobscript.sh`) 来启动作业。根据集群的配置和/或共享文件系统设置,其中一种方法可能无效,迫使您使用另一种方法。此选项控制 dask-jobqueue 将使用哪种方法通过 bsub 提交作业。

特别是,如果您的集群启动失败且 LSF 日志包含类似于以下的错误消息

/home/someuser/.lsbatch/1571869562.66512066: line 8: /tmp/tmpva_yau8m.sh: No such file or directory

…则尝试在此处传递 use_stdin=True 或在 jobqueue.lsf 配置部分中设置 use-stdin: true

示例

>>> from dask_jobqueue import LSFCluster
>>> cluster = LSFCluster(queue='general', project='DaskonLSF',
...                      cores=15, memory='25GB', use_stdin=True)
>>> cluster.scale(jobs=10)  # ask for 10 jobs
>>> from dask.distributed import Client
>>> client = Client(cluster)

这同样适用于自适应集群。它根据负载自动启动和终止 worker。

>>> cluster.adapt(maximum_jobs=20)
__init__(n_workers=0, job_cls: typing.Optional[dask_jobqueue.core.Job] = None, loop=None, security=None, shared_temp_directory=None, silence_logs='error', name=None, asynchronous=False, dashboard_address=None, host=None, scheduler_options=None, scheduler_cls=<class 'distributed.scheduler.Scheduler'>, interface=None, protocol=None, config_name=None, **job_kwargs)

方法

__init__([n_workers, job_cls, loop, ...])

adapt(*args[, minimum_jobs, maximum_jobs])

根据调度器活动自动扩展 Dask 集群。

close([timeout])

from_name(name)

根据名称创建此类实例以表示现有集群。

get_client()

返回集群的客户端

get_logs([cluster, scheduler, workers])

返回集群、调度器和 worker 的日志

job_script()

logs(*args, **kwargs)

new_worker_spec()

返回下一个 worker 的名称和规格

scale([n, jobs, memory, cores])

将集群扩展到指定的配置。

scale_down(workers)

scale_up([n, memory, cores])

将集群扩展到 n 个 worker

sync(func, *args[, asynchronous, ...])

根据调用上下文同步或异步调用 func 并传递 args

wait_for_workers(n_workers[, timeout])

阻塞调用,等待 n 个 worker 后继续

属性

asynchronous

是否在事件循环中运行?

called_from_running_loop

dashboard_link

job_header

job_name

loop

name

observed

plan

requested

scheduler_address

由 Dask-jobqueue 开发团队
© Copyright 2018, Anaconda, Inc. 和贡献者。