重要提示
您正在查看 NeMo 2.0 文档。此版本引入了 API 的重大更改和一个新的库 NeMo Run。我们目前正在将 NeMo 1.0 的所有功能移植到 2.0。有关先前版本或 2.0 中尚不可用的功能的文档,请参阅 NeMo 24.07 文档。
执行#
执行器#
- class nemo_run.core.execution.base.Executor(
- *,
- packager: ~nemo_run.core.packaging.base.Packager = <factory>,
- launcher: ~nemo_run.core.execution.base.Launcher | str | None = None,
- env_vars: dict[str,
- str] = <factory>,
- retries: int = 0,
- experiment_id: str | None = None,
- job_dir: str = '',
基类:
ConfigurableMixin
执行器配置的基础数据类。这不能独立使用,但您可以将其用作注册执行器工厂的基本类型。
有关示例,请参阅
LocalExecutor
和SlurmExecutor
。- assign(
- exp_id: str,
- exp_dir: str,
- task_id: str,
- task_dir: str,
此函数将由 run.Experiment 调用,以分配特定实验的执行器。
- cleanup(handle: str)#
- clone() Self #
- create_job_dir()#
- env_vars: dict[str, str]#
- experiment_dir: str = ''#
- experiment_id: str | None = None#
由 run.Experiment 设置
- get_launcher() Launcher #
- get_launcher_prefix() list[str] | None #
- info() str #
- job_dir: str = ''#
将存储您的运行元数据的目录。如果使用 run.Experiment,则会自动设置此项
- launcher: Launcher | str | None = None#
- macro_values() ExecutorMacros | None #
获取特定于执行器的宏值。这允许将常用宏替换为执行器特定的变量,例如节点 IP 等。
- nnodes() int #
torchrun 组件调用的辅助函数,用于确定 –nnodes。
- nproc_per_node() int #
torchrun 组件调用的辅助函数,用于确定 –nproc-per-node。
- package_configs(*cfgs: tuple[str, str]) list[str] #
- retries: int = 0#
- class nemo_run.core.execution.local.LocalExecutor(
- *,
- packager: ~nemo_run.core.packaging.base.Packager = <factory>,
- launcher: ~nemo_run.core.execution.base.Launcher | str | None = None,
- env_vars: dict[str,
- str] = <factory>,
- retries: int = 0,
- experiment_id: str | None = None,
- job_dir: str = '',
- ntasks_per_node: int = 1,
基类:
Executor
用于配置本地执行器的数据类。
示例
run.LocalExecutor()
- assign(
- exp_id: str,
- exp_dir: str,
- task_id: str,
- task_dir: str,
此函数将由 run.Experiment 调用,以分配特定实验的执行器。
- nnodes() int #
torchrun 组件调用的辅助函数,用于确定 –nnodes。
- nproc_per_node() int #
torchrun 组件调用的辅助函数,用于确定 –nproc-per-node。
- ntasks_per_node: int = 1#
组件(如 torchrun)使用此项来推断要启动的任务数。
- class nemo_run.core.execution.skypilot.SkypilotExecutor(
- *,
- packager: ~nemo_run.core.packaging.base.Packager = <factory>,
- launcher: ~nemo_run.core.execution.base.Launcher | str | None = None,
- env_vars: dict[str,
- str] = <factory>,
- retries: int = 0,
- experiment_id: str | None = None,
- job_dir: str = '',
- container_image: str | None = None,
- cloud: str | list[str] | None = None,
- region: str | list[str] | None = None,
- zone: str | list[str] | None = None,
- gpus: str | list[str] | None = None,
- gpus_per_node: int | None = None,
- cpus: int | float | list[int | float] | None = None,
- memory: int | float | list[int | float] | None = None,
- instance_type: str | list[str] | None = None,
- num_nodes: int = 1,
- use_spot: bool | list[bool] | None = None,
- disk_size: int | list[int] | None = None,
- disk_tier: str | list[str] | None = None,
- ports: tuple[str] | None = None,
- file_mounts: dict[str,
- str] | None = None,
- cluster_name: str | None = None,
- setup: str | None = None,
- autodown: bool = False,
- idle_minutes_to_autostop: int | None = None,
- torchrun_nproc_per_node: int | None = None,
基类:
Executor
用于配置 Skypilot 执行器的数据类。
需要熟悉 Skypilot。为了使用此执行器,您需要安装 NeMo Run 以及 skypilot(仅适用于 Kubernetes)或 skypilot-all(适用于所有云)可选功能。
示例
skypilot = SkypilotExecutor( gpus="A10G", gpus_per_node=devices, container_image="nvcr.io/nvidia/nemo:dev", cloud="kubernetes", cluster_name="nemo_tester", file_mounts={ "nemo_run.whl": "nemo_run.whl" }, setup=""" conda deactivate nvidia-smi ls -al ./ pip install nemo_run.whl cd /opt/NeMo && git pull origin main && pip install . """, )
- HEAD_NODE_IP_VAR = 'head_node_ip'#
- HET_GROUP_HOST_VAR = 'het_group_host'#
- NODE_RANK_VAR = 'SKYPILOT_NODE_RANK'#
- NPROC_PER_NODE_VAR = 'SKYPILOT_NUM_GPUS_PER_NODE'#
- NUM_NODES_VAR = 'num_nodes'#
- assign(
- exp_id: str,
- exp_dir: str,
- task_id: str,
- task_dir: str,
此函数将由 run.Experiment 调用,以分配特定实验的执行器。
- autodown: bool = False#
- classmethod cancel(app_id: str)#
- cleanup(handle: str)#
- cloud: str | list[str] | None = None#
- cluster_name: str | None = None#
- container_image: str | None = None#
- cpus: int | float | list[int | float] | None = None#
- disk_size: int | list[int] | None = None#
- disk_tier: str | list[str] | None = None#
- file_mounts: dict[str, str] | None = None#
- gpus: str | list[str] | None = None#
- gpus_per_node: int | None = None#
- idle_minutes_to_autostop: int | None = None#
- instance_type: str | list[str] | None = None#
- launch(
- task: skyt.Task,
- cluster_name: str | None = None,
- num_nodes: int | None = None,
- detach_run: bool = True,
- dryrun: bool = False,
- classmethod logs(app_id: str, fallback_path: str | None)#
- macro_values() ExecutorMacros | None #
获取特定于执行器的宏值。这允许将常用宏替换为执行器特定的变量,例如节点 IP 等。
- memory: int | float | list[int | float] | None = None#
- nnodes() int #
torchrun 组件调用的辅助函数,用于确定 –nnodes。
- nproc_per_node() int #
torchrun 组件调用的辅助函数,用于确定 –nproc-per-node。
- num_nodes: int = 1#
- package_configs(*cfgs: tuple[str, str]) list[str] #
- classmethod parse_app(app_id: str) tuple[str, str, int] #
- ports: tuple[str] | None = None#
- region: str | list[str] | None = None#
- setup: str | None = None#
- classmethod status(
- app_id: str,
- to_resources() set[sky.Resources] #
- to_task(
- name: str,
- cmd: list[str] | None = None,
- env_vars: dict[str, str] | None = None,
- torchrun_nproc_per_node: int | None = None#
- use_spot: bool | list[bool] | None = None#
- property workdir: str#
- zone: str | list[str] | None = None#
- class nemo_run.core.execution.slurm.SlurmExecutor(
- *,
- packager: ~nemo_run.core.packaging.base.Packager = <factory>,
- launcher: ~nemo_run.core.execution.base.Launcher | str | None = None,
- env_vars: dict[str,
- str] = <factory>,
- retries: int = 0,
- experiment_id: str | None = None,
- job_dir: str = '',
- account: str,
- partition: str | None = None,
- job_name_prefix: str | None = None,
- time: str = '00:10:00',
- nodes: int = 1,
- ntasks_per_node: int = 1,
- cpus_per_task: int | None = None,
- cpus_per_gpu: int | None = None,
- gpus_per_node: int | None = None,
- gpus_per_task: int | None = None,
- qos: str | None = None,
- mem: str | None = None,
- mem_per_gpu: str | None = None,
- mem_per_cpu: str | None = None,
- comment: str | None = None,
- constraint: str | None = None,
- exclude: str | None = None,
- gres: str | None = None,
- signal: str | None = None,
- exclusive: str | bool | None = None,
- array: str | None = None,
- open_mode: str = 'append',
- container_image: str | None = None,
- container_mounts: list[str] = <factory>,
- additional_parameters: dict[str,
- ~typing.Any] | None = None,
- srun_args: list[str] | None = None,
- heterogeneous: bool = False,
- memory_measure: bool = False,
- job_details: ~nemo_run.core.execution.slurm.SlurmJobDetails = <factory>,
- tunnel: ~nemo_run.core.tunnel.client.SSHTunnel | ~nemo_run.core.tunnel.client.LocalTunnel = <factory>,
- dependencies: list[str] = <factory>,
- dependency_type: str = 'afterok',
- torchrun_nproc_per_node: int | None = None,
- wait_time_for_group_job: int = 30,
- monitor_group_job: bool = True,
- monitor_group_job_wait_time: int = 60,
- setup_lines: str | None = None,
基类:
Executor
用于配置 Slurm 集群的数据类。在执行期间,sbatch 相关参数将自动解析为其对应的 sbatch 标志。
注意
我们假设底层的 Slurm 集群已启用 Pyxis。如果 Slurm 集群不支持 pyxis,slurm 执行器将失败。
示例
def your_slurm_executor() -> run.SlurmExecutor: ssh_tunnel = SSHTunnel( host=os.environ["SLURM_HOST"], user=os.environ["SLURM_USER"], job_dir=os.environ["SLURM_JOBDIR"], ) packager = GitArchivePackager() launcher = "torchrun" executor = SlurmExecutor( account=os.environ["SLURM_ACCT"], partition=os.environ["SLURM_PARTITION"], nodes=1, ntasks_per_node=1, tunnel=ssh_tunnel, container_image=os.environ["BASE_IMAGE"], time="00:30:00", packager=packager, launcher=launcher, ) return executor ... your_executor = your_slurm_executor()
- ALLOC_ARGS = ['account', 'partition', 'job-name', 'time', 'nodes', 'ntasks-per-node', 'qos', 'mem', 'mem-per-gpu', 'mem-per-cpu']#
- HEAD_NODE_IP_VAR = 'head_node_ip'#
- HET_GROUP_HOST_VAR = 'het_group_host'#
- NODE_RANK_VAR = 'SLURM_NODEID'#
- NPROC_PER_NODE_VAR = 'SLURM_NTASKS_PER_NODE'#
- NUM_NODES_VAR = 'SLURM_NNODES'#
- class ResourceRequest(
- *,
- packager: nemo_run.core.packaging.base.Packager,
- nodes: int,
- ntasks_per_node: int,
- container_image: Optional[str] = None,
- gpus_per_node: Optional[int] = None,
- gpus_per_task: Optional[int] = None,
- container_mounts: list[str] = <factory>,
- env_vars: dict[str,
- str] = <factory>,
- srun_args: Optional[list[str]] = None,
- job_details: nemo_run.core.execution.slurm.SlurmJobDetails = <factory>,
基类:
object
- container_image: str | None = None#
- container_mounts: list[str]#
- env_vars: dict[str, str]#
- gpus_per_node: int | None = None#
- gpus_per_task: int | None = None#
- job_details: SlurmJobDetails#
- nodes: int#
- ntasks_per_node: int#
- srun_args: list[str] | None = None#
- SBATCH_FLAGS = ['account', 'acctg_freq', 'array', 'batch', 'clusters', 'constraint', 'container', 'container_id', 'core_spec', 'cpus_per_gpu', 'cpus_per_task', 'comment', 'debug', 'delay_boot', 'dependency', 'distribution', 'error', 'exclude', 'exclusive', 'export', 'get_user_env', 'gid', 'gpu_bind', 'gpu_freq', 'gpus', 'gpus_per_node', 'gpus_per_socket', 'gpus_per_task', 'gres', 'gres_flags', 'help', 'hold', 'ignore_pbs', 'input', 'job_name', 'kill_on_invalid_dep', 'licenses', 'mail_type', 'mail_user', 'mcs_label', 'mem', 'mem_bind', 'mem_per_cpu', 'mem_per_gpu', 'mincpus', 'network', 'nice', 'no_kill', 'no_requeue', 'nodefile', 'nodelist', 'nodes', 'ntasks', 'ntasks_per_core', 'ntasks_per_gpu', 'ntasks_per_node', 'ntasks_per_socket', 'open_mode', 'output', 'overcommit', 'oversubscribe', 'parsable', 'partition', 'power', 'prefer', 'priority', 'profile', 'propagate', 'qos', 'quiet', 'reboot', 'requeue', 'reservation', 'signal', 'sockets_per_node', 'spread_job', 'switches', 'test_only', 'thread_spec', 'threads_per_core', 'time', 'time_min', 'tmp', 'tres_bind', 'tres_per_task', 'uid', 'usage', 'verbose', 'version', 'wait', 'wait_all_nodes', 'wckey', 'wrap']#
sbatch 标志(snake case 形式)列表
- SRUN_ARGS = ['account', 'partition', 'job-name', 'time', 'nodes', 'ntasks', 'ntasks-per-node', 'cpus-per-task', 'gpus-per-node', 'gpus-per-task', 'qos', 'mem', 'mem-per-gpu', 'mem-per-cpu', 'comment', 'constraint', 'exclude', 'gres', 'exclusive', 'array', 'additional-parameters', 'container-image', 'container-mounts', 'container-workdir']#
- account: str#
- additional_parameters: dict[str, Any] | None = None#
- alloc(job_name='interactive')#
- array: str | None = None#
- assign(
- exp_id: str,
- exp_dir: str,
- task_id: str,
- task_dir: str,
此函数将由 run.Experiment 调用,以分配特定实验的执行器。
- bash(job_name='interactive')#
- comment: str | None = None#
- connect_devspace(space, tunnel_dir=None)#
- constraint: str | None = None#
- container_image: str | None = None#
- container_mounts: list[str]#
- cpus_per_gpu: int | None = None#
- cpus_per_task: int | None = None#
- dependencies: list[str]#
TorchX 应用句柄列表,这些句柄将被解析并传递给 sbatch 中的 –dependency 标志。
- dependency_type: str = 'afterok'#
- exclude: str | None = None#
- exclusive: str | bool | None = None#
- get_launcher_prefix() list[str] | None #
- gpus_per_node: int | None = None#
- gpus_per_task: int | None = None#
- gres: str | None = None#
- heterogeneous: bool = False#
- info() str #
- job_details: SlurmJobDetails#
- job_name: str = 'nemo-job'#
由执行器设置;无法初始化
- job_name_prefix: str | None = None#
- launch_devspace(
- space: DevSpace,
- job_name='interactive',
- env_vars: Dict[str, str] | None = None,
- add_workspace_to_pythonpath: bool = True,
- property local: LocalTunnel#
- property local_is_slurm: bool#
- macro_values() ExecutorMacros | None #
获取特定于执行器的宏值。这允许将常用宏替换为执行器特定的变量,例如节点 IP 等。
- mem: str | None = None#
- mem_per_cpu: str | None = None#
- mem_per_gpu: str | None = None#
- memory_measure: bool = False#
- classmethod merge(
- executors: list[SlurmExecutor],
- num_tasks: int,
- monitor_group_job: bool = True#
- monitor_group_job_wait_time: int = 60#
- nnodes() int #
torchrun 组件调用的辅助函数,用于确定 –nnodes。
- nodes: int = 1#
- nproc_per_node() int #
torchrun 组件调用的辅助函数,用于确定 –nproc-per-node。
- ntasks_per_node: int = 1#
- open_mode: str = 'append'#
- package_configs(*cfgs: tuple[str, str]) list[str] #
- parse_deps() list[str] #
帮助函数,用于解析 TorchX 应用句柄列表,并返回要用作依赖项的 Slurm 作业 ID 列表。
- partition: str | None = None#
- qos: str | None = None#
- resource_group: list[ResourceRequest]#
- run_as_group: bool = False#
- setup_lines: str | None = None#
- signal: str | None = None#
- property slurm: Tunnel#
- srun(
- cmd: str,
- job_name='interactive',
- flags=None,
- env_vars: Dict[str, str] | None = None,
- arg_dict=None,
- **kwargs,
- srun_args: list[str] | None = None#
- stderr_to_stdout: bool = True#
- time: str = '00:10:00'#
- torchrun_nproc_per_node: int | None = None#
可选参数,用于为类似 torchrun 的组件显式指定 nproc_per_node,如果 slurm 集群不支持细粒度的资源分配。
- tunnel: SSHTunnel | LocalTunnel#
- wait_time_for_group_job: int = 30#
隧道#
- class nemo_run.core.tunnel.client.LocalTunnel(
- *,
- job_dir: str,
- packaging_jobs: dict[str,
- ~nemo_run.core.tunnel.client.PackagingJob] = <factory>,
基类:
Tunnel
受支持执行器的本地隧道。在本地执行所有命令。目前仅支持 SlurmExecutor。如果您从集群内的登录/其他节点启动,请使用此隧道。
- cleanup()#
- connect()#
- get(remote_path: str, local_path: str) None #
- host: str = 'localhost'#
- put(local_path: str, remote_path: str) None #
- run(
- command: str,
- hide: bool = True,
- warn: bool = False,
- **kwargs,
- user: str = ''#
- class nemo_run.core.tunnel.client.SSHTunnel(
- *,
- job_dir: str,
- host: str,
- user: str,
- packaging_jobs: dict[str,
- ~nemo_run.core.tunnel.client.PackagingJob] = <factory>,
- identity: str | None = None,
- shell: str | None = None,
- pre_command: str | None = None,
基类:
Tunnel
受支持执行器的 SSH 隧道。目前仅支持 SlurmExecutor。
如果提供 identity,则使用基于密钥的身份验证,否则使用密码身份验证。
示例
ssh_tunnel = SSHTunnel( host=os.environ["SSH_HOST"], user=os.environ["SSH_USER"], job_dir=os.environ["REMOTE_JOBDIR"], ) another_ssh_tunnel = SSHTunnel( host=os.environ["ANOTHER_SSH_HOST"], user=os.environ["ANOTHER_SSH_USER"], job_dir=os.environ["ANOTHER_REMOTE_JOBDIR"], identity="path_to_private_key" )
- cleanup()#
- connect()#
- get(remote_path: str, local_path: str) None #
- host: str#
- identity: str | None = None#
- pre_command: str | None = None#
- put(local_path: str, remote_path: str) None #
- run(
- command: str,
- hide: bool = True,
- warn: bool = False,
- **kwargs,
- setup()#
如果作业目录不存在,则创建它
- shell: str | None = None#
- user: str#
启动器#
- class nemo_run.core.execution.base.FaultTolerance(
- *,
- nsys_profile: bool = False,
- nsys_folder: str = 'nsys_profile',
- nsys_trace: list[str] = <factory>,
- cfg_path: str = '',
- finished_flag_file: str = '',
- job_results_file: str = '',
- rdzv_backend: str = 'c10d',
- rdzv_port: int = 29500,
- workload_check_interval: Optional[float] = None,
- initial_rank_heartbeat_timeout: Optional[float] = None,
- rank_heartbeat_timeout: Optional[float] = None,
- rank_termination_signal: Optional[str] = None,
- log_level: Optional[str] = None,
- max_restarts: Optional[int] = None,
基类:
Launcher
- cfg_path: str = ''#
- finished_flag_file: str = ''#
- initial_rank_heartbeat_timeout: float | None = None#
- job_results_file: str = ''#
- log_level: str | None = None#
- max_restarts: int | None = None#
- rank_heartbeat_timeout: float | None = None#
- rank_termination_signal: str | None = None#
- rdzv_backend: str = 'c10d'#
- rdzv_port: int = 29500#
- workload_check_interval: float | None = None#
打包器#
- class nemo_run.core.packaging.base.Packager(
- *,
- debug: bool = False,
- symlink_from_remote_dir: str | None = None,
基类:
ConfigurableMixin
用于打包代码的基础类。
打包器通常用作执行器的一部分,并为执行器提供关于如何打包代码的信息。
它还可以包含关于如何运行代码的信息。例如,打包器可以确定是使用 torchrun 还是使用 debug 标志。
注意
此类也可以独立用作直通打包器。这在您不需要打包代码的情况下非常有用。例如,使用您当前工作目录的本地执行器,或者使用包含所有代码的 docker 镜像的执行器。
- debug: bool = False#
如果设置为 True,则使用组件或执行器特定的 debug 标志。
- package(path: Path, job_dir: str, name: str) str #
- setup()#
这在执行器启动您的任务之前运行。
- symlink_from_remote_dir: str | None = None#
从提供的远程目录符号链接包。目前仅在使用 SlurmExecutor 时适用。
- class nemo_run.core.packaging.git.GitArchivePackager(
- *,
- debug: bool = False,
- symlink_from_remote_dir: str | None = None,
- basepath: str = '',
- subpath: str = '',
- ref: str = 'HEAD',
- include_submodules: bool = True,
- include_pattern: str | list[str] = '',
- include_pattern_relative_path: str | list[str] = '',
- check_uncommitted_changes: bool = False,
- check_untracked_files: bool = False,
基类:
Packager
使用 git archive 打包您的代码。
从高层次来看,它的工作方式如下
base_path =
git rev-parse --show-toplevel
。可以选择通过设置
subpath
属性将子路径定义为base_path/self.subpath
。cd base_path
&&git archive --format=tar.gz --output={output_file} {self.ref}:{subpath}
此解压的 tar 文件将成为您任务的工作目录。
注意
git archive 将仅打包在指定 ref 中提交的代码。任何未提交的代码都不会被打包。我们正在努力添加打包未提交代码的选项,但尚未准备就绪。
- basepath: str = ''#
- check_uncommitted_changes: bool = False#
- check_untracked_files: bool = False#
- include_pattern: str | list[str] = ''#
在与 include_pattern 匹配的归档文件中包含额外的文件。此字符串将包含在命令中,如下所示:find {include_pattern} -type f 以获取要包含在归档文件中的额外文件列表
- include_pattern_relative_path: str | list[str] = ''#
用作 tar -C 选项的相对路径 - 需要与 include_pattern 一致。如果未提供,将使用 git 基础路径。
- include_submodules: bool = True#
在归档文件中包含子模块。
- package(
- path: Path,
- job_dir: str,
- name: str,
- ref: str = 'HEAD'#
用于归档代码的 Git 引用。可以是分支名称或类似 HEAD 的提交引用。
- subpath: str = ''#
在您的仓库中打包代码的相对子路径。例如,如果您的仓库有三个文件夹 a、b 和 c,并且您将 a 指定为子路径,则只会打包 a 内的文件。在您的任务中,根工作目录将是 a/。