重要提示

您正在查看 NeMo 2.0 文档。此版本引入了 API 的重大更改和一个新的库 NeMo Run。我们目前正在将 NeMo 1.0 的所有功能移植到 2.0。有关先前版本或 2.0 中尚不可用的功能的文档,请参阅 NeMo 24.07 文档

执行#

执行器#

class nemo_run.core.execution.base.Executor(
*,
packager: ~nemo_run.core.packaging.base.Packager = <factory>,
launcher: ~nemo_run.core.execution.base.Launcher | str | None = None,
env_vars: dict[str,
str] = <factory>,
retries: int = 0,
experiment_id: str | None = None,
job_dir: str = '',
)#

基类: ConfigurableMixin

执行器配置的基础数据类。这不能独立使用,但您可以将其用作注册执行器工厂的基本类型。

有关示例,请参阅 LocalExecutorSlurmExecutor

assign(
exp_id: str,
exp_dir: str,
task_id: str,
task_dir: str,
) None#

此函数将由 run.Experiment 调用,以分配特定实验的执行器。

cleanup(handle: str)#
clone() Self#
create_job_dir()#
env_vars: dict[str, str]#
experiment_dir: str = ''#
experiment_id: str | None = None#

由 run.Experiment 设置

get_launcher() Launcher#
get_launcher_prefix() list[str] | None#
info() str#
job_dir: str = ''#

将存储您的运行元数据的目录。如果使用 run.Experiment,则会自动设置此项

launcher: Launcher | str | None = None#
macro_values() ExecutorMacros | None#

获取特定于执行器的宏值。这允许将常用宏替换为执行器特定的变量,例如节点 IP 等。

nnodes() int#

torchrun 组件调用的辅助函数,用于确定 –nnodes。

nproc_per_node() int#

torchrun 组件调用的辅助函数,用于确定 –nproc-per-node。

package_configs(*cfgs: tuple[str, str]) list[str]#
packager: Packager#
retries: int = 0#
class nemo_run.core.execution.local.LocalExecutor(
*,
packager: ~nemo_run.core.packaging.base.Packager = <factory>,
launcher: ~nemo_run.core.execution.base.Launcher | str | None = None,
env_vars: dict[str,
str] = <factory>,
retries: int = 0,
experiment_id: str | None = None,
job_dir: str = '',
ntasks_per_node: int = 1,
)#

基类: Executor

用于配置本地执行器的数据类。

示例

run.LocalExecutor()
assign(
exp_id: str,
exp_dir: str,
task_id: str,
task_dir: str,
)#

此函数将由 run.Experiment 调用,以分配特定实验的执行器。

nnodes() int#

torchrun 组件调用的辅助函数,用于确定 –nnodes。

nproc_per_node() int#

torchrun 组件调用的辅助函数,用于确定 –nproc-per-node。

ntasks_per_node: int = 1#

组件(如 torchrun)使用此项来推断要启动的任务数。

class nemo_run.core.execution.skypilot.SkypilotExecutor(
*,
packager: ~nemo_run.core.packaging.base.Packager = <factory>,
launcher: ~nemo_run.core.execution.base.Launcher | str | None = None,
env_vars: dict[str,
str] = <factory>,
retries: int = 0,
experiment_id: str | None = None,
job_dir: str = '',
container_image: str | None = None,
cloud: str | list[str] | None = None,
region: str | list[str] | None = None,
zone: str | list[str] | None = None,
gpus: str | list[str] | None = None,
gpus_per_node: int | None = None,
cpus: int | float | list[int | float] | None = None,
memory: int | float | list[int | float] | None = None,
instance_type: str | list[str] | None = None,
num_nodes: int = 1,
use_spot: bool | list[bool] | None = None,
disk_size: int | list[int] | None = None,
disk_tier: str | list[str] | None = None,
ports: tuple[str] | None = None,
file_mounts: dict[str,
str] | None = None,
cluster_name: str | None = None,
setup: str | None = None,
autodown: bool = False,
idle_minutes_to_autostop: int | None = None,
torchrun_nproc_per_node: int | None = None,
)#

基类: Executor

用于配置 Skypilot 执行器的数据类。

需要熟悉 Skypilot。为了使用此执行器,您需要安装 NeMo Run 以及 skypilot(仅适用于 Kubernetes)或 skypilot-all(适用于所有云)可选功能。

示例

skypilot = SkypilotExecutor(
    gpus="A10G",
    gpus_per_node=devices,
    container_image="nvcr.io/nvidia/nemo:dev",
    cloud="kubernetes",
    cluster_name="nemo_tester",
    file_mounts={
        "nemo_run.whl": "nemo_run.whl"
    },
    setup="""
conda deactivate
nvidia-smi
ls -al ./
pip install nemo_run.whl
cd /opt/NeMo && git pull origin main && pip install .
    """,
)
HEAD_NODE_IP_VAR = 'head_node_ip'#
HET_GROUP_HOST_VAR = 'het_group_host'#
NODE_RANK_VAR = 'SKYPILOT_NODE_RANK'#
NPROC_PER_NODE_VAR = 'SKYPILOT_NUM_GPUS_PER_NODE'#
NUM_NODES_VAR = 'num_nodes'#
assign(
exp_id: str,
exp_dir: str,
task_id: str,
task_dir: str,
)#

此函数将由 run.Experiment 调用,以分配特定实验的执行器。

autodown: bool = False#
classmethod cancel(app_id: str)#
cleanup(handle: str)#
cloud: str | list[str] | None = None#
cluster_name: str | None = None#
container_image: str | None = None#
cpus: int | float | list[int | float] | None = None#
disk_size: int | list[int] | None = None#
disk_tier: str | list[str] | None = None#
file_mounts: dict[str, str] | None = None#
gpus: str | list[str] | None = None#
gpus_per_node: int | None = None#
idle_minutes_to_autostop: int | None = None#
instance_type: str | list[str] | None = None#
launch(
task: skyt.Task,
cluster_name: str | None = None,
num_nodes: int | None = None,
detach_run: bool = True,
dryrun: bool = False,
) tuple[int | None, backends.ResourceHandle | None]#
classmethod logs(app_id: str, fallback_path: str | None)#
macro_values() ExecutorMacros | None#

获取特定于执行器的宏值。这允许将常用宏替换为执行器特定的变量,例如节点 IP 等。

memory: int | float | list[int | float] | None = None#
nnodes() int#

torchrun 组件调用的辅助函数,用于确定 –nnodes。

nproc_per_node() int#

torchrun 组件调用的辅助函数,用于确定 –nproc-per-node。

num_nodes: int = 1#
package(
packager: Packager,
job_name: str,
)#
package_configs(*cfgs: tuple[str, str]) list[str]#
packager: Packager#
classmethod parse_app(app_id: str) tuple[str, str, int]#
ports: tuple[str] | None = None#
region: str | list[str] | None = None#
setup: str | None = None#
classmethod status(
app_id: str,
) tuple[status_lib.ClusterStatus | None, dict | None]#
to_resources() set[sky.Resources]#
to_task(
name: str,
cmd: list[str] | None = None,
env_vars: dict[str, str] | None = None,
) skyt.Task#
torchrun_nproc_per_node: int | None = None#
use_spot: bool | list[bool] | None = None#
property workdir: str#
zone: str | list[str] | None = None#
class nemo_run.core.execution.slurm.SlurmExecutor(
*,
packager: ~nemo_run.core.packaging.base.Packager = <factory>,
launcher: ~nemo_run.core.execution.base.Launcher | str | None = None,
env_vars: dict[str,
str] = <factory>,
retries: int = 0,
experiment_id: str | None = None,
job_dir: str = '',
account: str,
partition: str | None = None,
job_name_prefix: str | None = None,
time: str = '00:10:00',
nodes: int = 1,
ntasks_per_node: int = 1,
cpus_per_task: int | None = None,
cpus_per_gpu: int | None = None,
gpus_per_node: int | None = None,
gpus_per_task: int | None = None,
qos: str | None = None,
mem: str | None = None,
mem_per_gpu: str | None = None,
mem_per_cpu: str | None = None,
comment: str | None = None,
constraint: str | None = None,
exclude: str | None = None,
gres: str | None = None,
signal: str | None = None,
exclusive: str | bool | None = None,
array: str | None = None,
open_mode: str = 'append',
container_image: str | None = None,
container_mounts: list[str] = <factory>,
additional_parameters: dict[str,
~typing.Any] | None = None,
srun_args: list[str] | None = None,
heterogeneous: bool = False,
memory_measure: bool = False,
job_details: ~nemo_run.core.execution.slurm.SlurmJobDetails = <factory>,
tunnel: ~nemo_run.core.tunnel.client.SSHTunnel | ~nemo_run.core.tunnel.client.LocalTunnel = <factory>,
dependencies: list[str] = <factory>,
dependency_type: str = 'afterok',
torchrun_nproc_per_node: int | None = None,
wait_time_for_group_job: int = 30,
monitor_group_job: bool = True,
monitor_group_job_wait_time: int = 60,
setup_lines: str | None = None,
)#

基类: Executor

用于配置 Slurm 集群的数据类。在执行期间,sbatch 相关参数将自动解析为其对应的 sbatch 标志。

注意

我们假设底层的 Slurm 集群已启用 Pyxis。如果 Slurm 集群不支持 pyxis,slurm 执行器将失败。

示例

def your_slurm_executor() -> run.SlurmExecutor:
    ssh_tunnel = SSHTunnel(
        host=os.environ["SLURM_HOST"],
        user=os.environ["SLURM_USER"],
        job_dir=os.environ["SLURM_JOBDIR"],
    )
    packager = GitArchivePackager()
    launcher = "torchrun"
    executor = SlurmExecutor(
        account=os.environ["SLURM_ACCT"],
        partition=os.environ["SLURM_PARTITION"],
        nodes=1,
        ntasks_per_node=1,
        tunnel=ssh_tunnel,
        container_image=os.environ["BASE_IMAGE"],
        time="00:30:00",
        packager=packager,
        launcher=launcher,
    )
    return executor

...

your_executor = your_slurm_executor()
ALLOC_ARGS = ['account', 'partition', 'job-name', 'time', 'nodes', 'ntasks-per-node', 'qos', 'mem', 'mem-per-gpu', 'mem-per-cpu']#
HEAD_NODE_IP_VAR = 'head_node_ip'#
HET_GROUP_HOST_VAR = 'het_group_host'#
NODE_RANK_VAR = 'SLURM_NODEID'#
NPROC_PER_NODE_VAR = 'SLURM_NTASKS_PER_NODE'#
NUM_NODES_VAR = 'SLURM_NNODES'#
class ResourceRequest(
*,
packager: nemo_run.core.packaging.base.Packager,
nodes: int,
ntasks_per_node: int,
container_image: Optional[str] = None,
gpus_per_node: Optional[int] = None,
gpus_per_task: Optional[int] = None,
container_mounts: list[str] = <factory>,
env_vars: dict[str,
str] = <factory>,
srun_args: Optional[list[str]] = None,
job_details: nemo_run.core.execution.slurm.SlurmJobDetails = <factory>,
)#

基类: object

container_image: str | None = None#
container_mounts: list[str]#
env_vars: dict[str, str]#
gpus_per_node: int | None = None#
gpus_per_task: int | None = None#
job_details: SlurmJobDetails#
nodes: int#
ntasks_per_node: int#
packager: Packager#
srun_args: list[str] | None = None#
SBATCH_FLAGS = ['account', 'acctg_freq', 'array', 'batch', 'clusters', 'constraint', 'container', 'container_id', 'core_spec', 'cpus_per_gpu', 'cpus_per_task', 'comment', 'debug', 'delay_boot', 'dependency', 'distribution', 'error', 'exclude', 'exclusive', 'export', 'get_user_env', 'gid', 'gpu_bind', 'gpu_freq', 'gpus', 'gpus_per_node', 'gpus_per_socket', 'gpus_per_task', 'gres', 'gres_flags', 'help', 'hold', 'ignore_pbs', 'input', 'job_name', 'kill_on_invalid_dep', 'licenses', 'mail_type', 'mail_user', 'mcs_label', 'mem', 'mem_bind', 'mem_per_cpu', 'mem_per_gpu', 'mincpus', 'network', 'nice', 'no_kill', 'no_requeue', 'nodefile', 'nodelist', 'nodes', 'ntasks', 'ntasks_per_core', 'ntasks_per_gpu', 'ntasks_per_node', 'ntasks_per_socket', 'open_mode', 'output', 'overcommit', 'oversubscribe', 'parsable', 'partition', 'power', 'prefer', 'priority', 'profile', 'propagate', 'qos', 'quiet', 'reboot', 'requeue', 'reservation', 'signal', 'sockets_per_node', 'spread_job', 'switches', 'test_only', 'thread_spec', 'threads_per_core', 'time', 'time_min', 'tmp', 'tres_bind', 'tres_per_task', 'uid', 'usage', 'verbose', 'version', 'wait', 'wait_all_nodes', 'wckey', 'wrap']#

sbatch 标志(snake case 形式)列表

SRUN_ARGS = ['account', 'partition', 'job-name', 'time', 'nodes', 'ntasks', 'ntasks-per-node', 'cpus-per-task', 'gpus-per-node', 'gpus-per-task', 'qos', 'mem', 'mem-per-gpu', 'mem-per-cpu', 'comment', 'constraint', 'exclude', 'gres', 'exclusive', 'array', 'additional-parameters', 'container-image', 'container-mounts', 'container-workdir']#
account: str#
additional_parameters: dict[str, Any] | None = None#
alloc(job_name='interactive')#
array: str | None = None#
assign(
exp_id: str,
exp_dir: str,
task_id: str,
task_dir: str,
)#

此函数将由 run.Experiment 调用,以分配特定实验的执行器。

bash(job_name='interactive')#
comment: str | None = None#
connect_devspace(space, tunnel_dir=None)#
constraint: str | None = None#
container_image: str | None = None#
container_mounts: list[str]#
cpus_per_gpu: int | None = None#
cpus_per_task: int | None = None#
dependencies: list[str]#

TorchX 应用句柄列表,这些句柄将被解析并传递给 sbatch 中的 –dependency 标志。

dependency_type: str = 'afterok'#
exclude: str | None = None#
exclusive: str | bool | None = None#
get_launcher_prefix() list[str] | None#
gpus_per_node: int | None = None#
gpus_per_task: int | None = None#
gres: str | None = None#
heterogeneous: bool = False#
info() str#
job_details: SlurmJobDetails#
job_name: str = 'nemo-job'#

由执行器设置;无法初始化

job_name_prefix: str | None = None#
launch_devspace(
space: DevSpace,
job_name='interactive',
env_vars: Dict[str, str] | None = None,
add_workspace_to_pythonpath: bool = True,
)#
property local: LocalTunnel#
property local_is_slurm: bool#
macro_values() ExecutorMacros | None#

获取特定于执行器的宏值。这允许将常用宏替换为执行器特定的变量,例如节点 IP 等。

mem: str | None = None#
mem_per_cpu: str | None = None#
mem_per_gpu: str | None = None#
memory_measure: bool = False#
classmethod merge(
executors: list[SlurmExecutor],
num_tasks: int,
) SlurmExecutor#
monitor_group_job: bool = True#
monitor_group_job_wait_time: int = 60#
nnodes() int#

torchrun 组件调用的辅助函数,用于确定 –nnodes。

nodes: int = 1#
nproc_per_node() int#

torchrun 组件调用的辅助函数,用于确定 –nproc-per-node。

ntasks_per_node: int = 1#
open_mode: str = 'append'#
package(
packager: Packager,
job_name: str,
)#
package_configs(*cfgs: tuple[str, str]) list[str]#
packager: Packager#
parse_deps() list[str]#

帮助函数,用于解析 TorchX 应用句柄列表,并返回要用作依赖项的 Slurm 作业 ID 列表。

partition: str | None = None#
qos: str | None = None#
resource_group: list[ResourceRequest]#
run_as_group: bool = False#
setup_lines: str | None = None#
signal: str | None = None#
property slurm: Tunnel#
srun(
cmd: str,
job_name='interactive',
flags=None,
env_vars: Dict[str, str] | None = None,
arg_dict=None,
**kwargs,
)#
srun_args: list[str] | None = None#
stderr_to_stdout: bool = True#
time: str = '00:10:00'#
torchrun_nproc_per_node: int | None = None#

可选参数,用于为类似 torchrun 的组件显式指定 nproc_per_node,如果 slurm 集群不支持细粒度的资源分配。

tunnel: SSHTunnel | LocalTunnel#
wait_time_for_group_job: int = 30#

隧道#

class nemo_run.core.tunnel.client.LocalTunnel(
*,
job_dir: str,
packaging_jobs: dict[str,
~nemo_run.core.tunnel.client.PackagingJob] = <factory>,
)#

基类: Tunnel

受支持执行器的本地隧道。在本地执行所有命令。目前仅支持 SlurmExecutor。如果您从集群内的登录/其他节点启动,请使用此隧道。

cleanup()#
connect()#
get(remote_path: str, local_path: str) None#
host: str = 'localhost'#
put(local_path: str, remote_path: str) None#
run(
command: str,
hide: bool = True,
warn: bool = False,
**kwargs,
) invoke.runners.Result#
user: str = ''#
class nemo_run.core.tunnel.client.SSHTunnel(
*,
job_dir: str,
host: str,
user: str,
packaging_jobs: dict[str,
~nemo_run.core.tunnel.client.PackagingJob] = <factory>,
identity: str | None = None,
shell: str | None = None,
pre_command: str | None = None,
)#

基类: Tunnel

受支持执行器的 SSH 隧道。目前仅支持 SlurmExecutor。

如果提供 identity,则使用基于密钥的身份验证,否则使用密码身份验证。

示例

ssh_tunnel = SSHTunnel(
    host=os.environ["SSH_HOST"],
    user=os.environ["SSH_USER"],
    job_dir=os.environ["REMOTE_JOBDIR"],
)

another_ssh_tunnel = SSHTunnel(
    host=os.environ["ANOTHER_SSH_HOST"],
    user=os.environ["ANOTHER_SSH_USER"],
    job_dir=os.environ["ANOTHER_REMOTE_JOBDIR"],
    identity="path_to_private_key"
)
cleanup()#
connect()#
get(remote_path: str, local_path: str) None#
host: str#
identity: str | None = None#
pre_command: str | None = None#
put(local_path: str, remote_path: str) None#
run(
command: str,
hide: bool = True,
warn: bool = False,
**kwargs,
) invoke.runners.Result#
setup()#

如果作业目录不存在,则创建它

shell: str | None = None#
user: str#

启动器#

class nemo_run.core.execution.base.FaultTolerance(
*,
nsys_profile: bool = False,
nsys_folder: str = 'nsys_profile',
nsys_trace: list[str] = <factory>,
cfg_path: str = '',
finished_flag_file: str = '',
job_results_file: str = '',
rdzv_backend: str = 'c10d',
rdzv_port: int = 29500,
workload_check_interval: Optional[float] = None,
initial_rank_heartbeat_timeout: Optional[float] = None,
rank_heartbeat_timeout: Optional[float] = None,
rank_termination_signal: Optional[str] = None,
log_level: Optional[str] = None,
max_restarts: Optional[int] = None,
)#

基类: Launcher

cfg_path: str = ''#
finished_flag_file: str = ''#
initial_rank_heartbeat_timeout: float | None = None#
job_results_file: str = ''#
log_level: str | None = None#
max_restarts: int | None = None#
rank_heartbeat_timeout: float | None = None#
rank_termination_signal: str | None = None#
rdzv_backend: str = 'c10d'#
rdzv_port: int = 29500#
workload_check_interval: float | None = None#
class nemo_run.core.execution.base.Torchrun(
*,
nsys_profile: bool = False,
nsys_folder: str = 'nsys_profile',
nsys_trace: list[str] = <factory>,
rdzv_backend: str = 'c10d',
rdzv_port: int = 29500,
)#

基类: Launcher

rdzv_backend: str = 'c10d'#
rdzv_port: int = 29500#

打包器#

class nemo_run.core.packaging.base.Packager(
*,
debug: bool = False,
symlink_from_remote_dir: str | None = None,
)#

基类: ConfigurableMixin

用于打包代码的基础类。

打包器通常用作执行器的一部分,并为执行器提供关于如何打包代码的信息。

它还可以包含关于如何运行代码的信息。例如,打包器可以确定是使用 torchrun 还是使用 debug 标志。

注意

此类也可以独立用作直通打包器。这在您不需要打包代码的情况下非常有用。例如,使用您当前工作目录的本地执行器,或者使用包含所有代码的 docker 镜像的执行器。

debug: bool = False#

如果设置为 True,则使用组件或执行器特定的 debug 标志。

package(path: Path, job_dir: str, name: str) str#
setup()#

这在执行器启动您的任务之前运行。

从提供的远程目录符号链接包。目前仅在使用 SlurmExecutor 时适用。

class nemo_run.core.packaging.git.GitArchivePackager(
*,
debug: bool = False,
symlink_from_remote_dir: str | None = None,
basepath: str = '',
subpath: str = '',
ref: str = 'HEAD',
include_submodules: bool = True,
include_pattern: str | list[str] = '',
include_pattern_relative_path: str | list[str] = '',
check_uncommitted_changes: bool = False,
check_untracked_files: bool = False,
)#

基类: Packager

使用 git archive 打包您的代码。

从高层次来看,它的工作方式如下

  1. base_path = git rev-parse --show-toplevel

  2. 可以选择通过设置 subpath 属性将子路径定义为 base_path/self.subpath

  3. cd base_path && git archive --format=tar.gz --output={output_file} {self.ref}:{subpath}

  4. 此解压的 tar 文件将成为您任务的工作目录。

注意

git archive 将仅打包在指定 ref 中提交的代码。任何未提交的代码都不会被打包。我们正在努力添加打包未提交代码的选项,但尚未准备就绪。

basepath: str = ''#
check_uncommitted_changes: bool = False#
check_untracked_files: bool = False#
include_pattern: str | list[str] = ''#

在与 include_pattern 匹配的归档文件中包含额外的文件。此字符串将包含在命令中,如下所示:find {include_pattern} -type f 以获取要包含在归档文件中的额外文件列表

include_pattern_relative_path: str | list[str] = ''#

用作 tar -C 选项的相对路径 - 需要与 include_pattern 一致。如果未提供,将使用 git 基础路径。

include_submodules: bool = True#

在归档文件中包含子模块。

package(
path: Path,
job_dir: str,
name: str,
) str#
ref: str = 'HEAD'#

用于归档代码的 Git 引用。可以是分支名称或类似 HEAD 的提交引用。

subpath: str = ''#

在您的仓库中打包代码的相对子路径。例如,如果您的仓库有三个文件夹 a、b 和 c,并且您将 a 指定为子路径,则只会打包 a 内的文件。在您的任务中,根工作目录将是 a/。