dask_jobqueue.HTCondorCluster
dask_jobqueue.HTCondorCluster¶
- class dask_jobqueue.HTCondorCluster(n_workers=0, job_cls: typing.Optional[dask_jobqueue.core.Job] = None, loop=None, security=None, shared_temp_directory=None, silence_logs='error', name=None, asynchronous=False, dashboard_address=None, host=None, scheduler_options=None, scheduler_cls=<class 'distributed.scheduler.Scheduler'>, interface=None, protocol=None, config_name=None, **job_kwargs)¶
在 HTCondor 集群上启动 Dask
- 参数
- diskstr
每个作业的总磁盘空间
- job_extradict
已弃用: 请改用
job_extra_directives
。此参数将在未来版本中移除。- job_extra_directivesdict
作为键值对的额外作业提交文件属性。它们将以
key = value
的形式插入。- submit_command_extralist of str
传递给 condor_submit 的额外参数
- cancel_command_extralist of str
传递给 condor_rm 的额外参数
- coresint
作业中所有工作线程将运行的总 CPU 核数。每个工作进程的线程数使用公式
cores / processes
确定。作业排队系统默认将其用作每个作业的 CPU 数量。- memory: str
作业中所有工作器将使用的总内存量。作业排队系统默认将其用作每个作业的内存量。
- processesint
将作业分解成这么多进程。适用于 GIL 工作负载或具有许多核心的节点。默认情况下,
process ~= sqrt(cores)
,以便进程数和每个进程的线程数大致相同。- interfacestr
网络接口,如“eth0”或“ib0”。这将用于 Dask 调度器和 Dask 工作器接口。如果你需要为 Dask 调度器指定不同的接口,可以通过
scheduler_options
参数传递:interface=your_worker_interface, scheduler_options={'interface': your_scheduler_interface}
。- nannybool
是否启动 nanny 进程
- local_directorystr
用于文件溢出的 Dask 工作器本地目录。
- death_timeoutfloat
在关闭工作器之前等待调度器的秒数
- extralist
已弃用: 请改用
worker_extra_args
。此参数将在未来版本中移除。- worker_commandlist
启动工作器时运行的命令。默认为“distributed.cli.dask_worker”。
- worker_extra_argslist
传递给 dask-worker 的额外参数
- env_extralist
已弃用: 请改用
job_script_prologue
。此参数将在未来版本中移除。- job_script_prologuelist
在启动工作器之前添加到脚本的其他命令。
- job_script_epiloguelist
添加到脚本中,在工作器命令退出后运行的命令。
- header_skiplist
已弃用: 请改用
job_directives_skip
。此参数将在未来版本中移除。- job_directives_skiplist
在生成的作业脚本头部中跳过的指令。包含指定字符串的指令行将被移除。通过 job_extra_directives 添加的指令不会受到影响。
- log_directorystr
用于作业调度器日志的目录。
- shebangstr
批量提交脚本所需解释器的路径。
- pythonstr
用于启动 Dask 工作器的 Python 可执行文件。默认为提交这些作业的 Python。
- config_namestr
jobqueue.yaml 配置文件中要使用的部分。
- namestr
Dask 工作器的名称。这通常由 Cluster 设置。
- n_workersint
默认启动的工作器数量。默认为 0。请参阅 scale 方法。
- silence_logsstr
如果在本地启动调度器,则在此处发出的日志级别,如“debug”、“info”或“error”。
- asynchronousbool
是否使用 async/await 语法运行此集群对象
- securitySecurity or Bool
如果您使用 TLS/SSL,则是一个 dask.distributed 安全对象。如果为 True,则会自动创建临时自签名凭据。
- scheduler_optionsdict
用于向 Dask Scheduler 传递额外参数。例如,使用
scheduler_options={'dashboard_address': ':12435'}
指定 Web 控制面板应使用的端口,或使用scheduler_options={'host': 'your-host'}
指定 Dask 调度器应运行的主机。有关更多详细信息,请参阅distributed.Scheduler
。- scheduler_clstype
更改使用的 Dask Scheduler 的类。默认为 Dask 的
distributed.Scheduler
。- shared_temp_directorystr
调度器和工作器之间的共享目录(例如用于临时安全证书)。如果未设置,则默认为当前工作目录。
示例
>>> from dask_jobqueue.htcondor import HTCondorCluster >>> cluster = HTCondorCluster(cores=24, memory="4GB", disk="4GB") >>> cluster.scale(jobs=10) # ask for 10 jobs
>>> from dask.distributed import Client >>> client = Client(cluster)
这也适用于自适应集群。它会根据负载自动启动和终止工作器。
>>> cluster.adapt(maximum_jobs=20)
如果在工作节点上启动工作器之前需要运行设置命令,可以使用
job_script_prologue
,例如激活虚拟环境。>>> from dask_jobqueue.htcondor import HTCondorCluster >>> cluster = HTCondorCluster(cores=1, memory="2GB", disk="4GB", job_script_prologue=['cd /some/path/', 'source venv/bin/activate'])
请注意,环境变量不再通过提交描述文件中的
Environment
参数传递。如果你明确想设置,需要使用job_extra
。- __init__(n_workers=0, job_cls: typing.Optional[dask_jobqueue.core.Job] = None, loop=None, security=None, shared_temp_directory=None, silence_logs='error', name=None, asynchronous=False, dashboard_address=None, host=None, scheduler_options=None, scheduler_cls=<class 'distributed.scheduler.Scheduler'>, interface=None, protocol=None, config_name=None, **job_kwargs)¶
方法
__init__
([n_workers, job_cls, loop, ...])adapt
(*args[, minimum_jobs, maximum_jobs])根据调度器活动自动扩展 Dask 集群。
close
([timeout])from_name
(name)创建此类的一个实例,以按名称表示现有集群。
get_client
()返回集群的客户端
get_logs
([cluster, scheduler, workers])返回集群、调度器和工作器的日志
job_script
()logs
(*args, **kwargs)new_worker_spec
()返回下一个工作器的名称和规范
scale
([n, jobs, memory, cores])将集群扩展到指定的配置。
scale_down
(workers)scale_up
([n, memory, cores])将集群扩展到 n 个工作器
sync
(func, *args[, asynchronous, ...])根据调用上下文,同步或异步地使用 args 调用 func。
wait_for_workers
(n_workers[, timeout])在继续之前等待 n 个工作器的阻塞调用
属性
asynchronous
我们正在事件循环中运行吗?
called_from_running_loop
dashboard_link
job_header
job_name
loop
name
observed
plan
requested
scheduler_address