diff --git a/CHANGELOG.md b/CHANGELOG.md index 70c52e38..d85bb250 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,50 @@ All notable changes to madengine will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +## [2.1.0] - 2026-05-28 + +### Added + +- **`slurm_multi` SLURM escape-hatch launcher**: New self-managed multi-node launcher for workloads that orchestrate their own per-node Docker containers via `srun` (e.g. SGLang Disaggregated proxy + prefill + decode topologies). Selected via `distributed.launcher: "slurm_multi"` (or `"slurm-multi"` alias). Generates a wrapper SBATCH script that runs the model's `.slurm` script directly on baremetal so `srun`/`scontrol` work inside it; performs parallel `srun docker pull` of the registry image on all allocated nodes when the model card sets `env_vars.DOCKER_IMAGE_NAME`. Honors model-card and `--additional-context` `slurm` fields (`partition`, `nodes`, `gpus_per_node`, `time`, `exclusive`, `reservation`, `nodelist`). This launcher coexists with the standard templated launchers (torchrun, vllm, sglang, deepspeed, megatron, torchtitan, primus) — those continue to flow through the standard sbatch template unchanged; only `slurm_multi`/`slurm-multi` takes the self-managed bypass path. + +- **`madengine build --use-image [IMAGE | auto]`**: Skip the local Docker build and use a pre-built image instead. With no value, resolves to the model card's `env_vars.DOCKER_IMAGE_NAME` automatically. Mutually exclusive with `--registry` and `--build-on-compute`. Manifest entries are keyed by model name with `local_image: True` so `ContainerRunner.run_models_from_manifest()` resolves `run_image` correctly and pulls on demand. + +- **`madengine build --build-on-compute`**: Build Docker images on a SLURM compute node and push to a registry, then have `madengine run` pull the image in parallel on all allocated nodes. Requires `--registry`. The resulting manifest carries `built_on_compute: true`. + +- **slurm_multi build registry gate**: When `madengine build` discovers a `slurm_multi` model and no `--registry`/`--use-image`/`--build-on-compute` is given, the orchestrator either auto-uses `env_vars.DOCKER_IMAGE_NAME` from the model card (implicit `--use-image` fallback) or raises a structured `ConfigurationError` with the four supported options listed. + +- **bash-in-salloc execution path** for slurm_multi: when `madengine run` detects `SLURM_JOB_ID` (i.e. running inside an existing `salloc`), the slurm_multi launcher runs the generated wrapper synchronously with `bash` instead of nesting another `sbatch` job. Other launchers continue to use `sbatch` even inside `salloc` (no behavior change for non-slurm_multi). + +- **Local self-managed launcher execution** (`container_runner.py`): `ContainerRunner._run_self_managed()` runs the model script directly on the host for self-managed launchers, bypassing madengine's Docker wrapper. Used when `madengine run` detects a `slurm_multi` launcher in local/non-SLURM contexts. Environment variables from the model card and `--additional-context` are injected; keys are logged without values to avoid leaking credentials. + +- **Model card config merge into manifest `deployment_config`**: `_execute_with_prebuilt_image` now merges the model card's `distributed` and `slurm` sections into the manifest's `deployment_config`, so the run phase auto-detects SLURM deployment and launcher settings without requiring `--additional-context`. User-supplied CLI values take precedence over model card defaults. + +- **`DockerBuilder` registry image injection for parallel pull**: After a successful registry push, `DockerBuilder.generate_manifest()` now sets `DOCKER_IMAGE_NAME` in each `built_models` entry's `env_vars` to the registry image, enabling slurm_multi parallel `srun docker pull` on all nodes without requiring manual image specification. + +- **`DeploymentResult.skip_monitoring`** (`deployment/base.py`): new dataclass field so synchronous deploy paths (e.g. slurm_multi's bash-in-salloc) can skip the monitor poll. + +- **`SlurmNodeSelector` `reservation` parameter**: optional reservation name forwarded to srun health/cleanup commands so node-prep srun calls run inside the reservation. + +- **`tests/unit/test_slurm_multi.py`**: contract tests for `slurm_multi` registry membership, hyphen alias normalization, end-to-end env_vars-export contract against MAD-private PR #186's `pyt_sglang_disagg_qwen3-32b_short` model card, and `_execute_with_prebuilt_image` manifest key-set contract (`built_images.keys() == built_models.keys()`). + +- **`examples/slurm-configs/minimal/slurm-multi-minimal.json`**: minimal reference config for the new launcher. + +### Changed + +- **Early model discovery reuse in `BuildOrchestrator`**: The `DiscoverModels` result from the slurm_multi registry-gate check is now cached and reused for the actual build step, avoiding duplicate `get_models_json.py` execution and duplicate console output. + +- **E2E test cleanup defaults expanded**: `DEFAULT_CLEAN_FILES` in `tests/fixtures/utils.py` now includes `build_manifest.json` and related perf artefacts (`perf_super.json`, `perf_entry.csv`, etc.) so stale manifests from prior e2e tests cannot silently cause the wrong image to be executed. + +### Fixed + +- **slurm_multi: cwd `perf.csv` aggregation**: After a successful slurm_multi run, `madengine run` previously printed a cosmetic `Performance CSV not found: perf.csv` warning even though `_collect_slurm_multi_results` had ingested the per-job CSV from `/shared_inference/$USER/$JOBID/perf.csv`. The reporter (`display_performance_table`) reads cwd `perf.csv` by default. Now `_collect_slurm_multi_results` also writes the per-job rows into cwd `perf.csv` (copy if absent, append-data-rows if present) so reporting and HTML generation work without extra args. Local + classic-SLURM flows are unchanged. + +### Security + +- **Shell injection hardening in slurm_multi wrapper scripts**: `shlex.quote()` is applied to env_var values, the model script name, and model args in the generated SBATCH wrapper script (`slurm.py::_prepare_slurm_multi_script`) and the local self-managed runner (`container_runner.py::_run_self_managed`), preventing shell metacharacters (`$()`, backticks, `;`, `"`, etc.) in user-supplied inputs from triggering host-shell expansion. + ## [2.0.3] - 2026-05-26 ### Added diff --git a/docs/cli-reference.md b/docs/cli-reference.md index 9340ae1c..2c827793 100644 --- a/docs/cli-reference.md +++ b/docs/cli-reference.md @@ -97,6 +97,8 @@ madengine build [OPTIONS] | `--tags` | `-t` | TEXT | `[]` | Model tags to build (can specify multiple) | | `--target-archs` | `-a` | TEXT | `[]` | Target GPU architectures (e.g., gfx908,gfx90a,gfx942) | | `--registry` | `-r` | TEXT | `None` | Docker registry to push images to | +| `--use-image` | | TEXT | `None` | Skip Docker build and use a pre-built image. Omit value or pass `auto` to resolve from model card's `DOCKER_IMAGE_NAME`. Mutually exclusive with `--registry` and `--build-on-compute` | +| `--build-on-compute` | | FLAG | `False` | Build Docker images on a SLURM compute node and push to registry. Requires `--registry` | | `--batch-manifest` | | TEXT | `None` | Input batch.json file for batch build mode | | `--additional-context` | `-c` | TEXT | `"{}"` | Additional context as JSON string | | `--additional-context-file` | `-f` | TEXT | `None` | File containing additional context JSON | @@ -142,6 +144,15 @@ madengine build --tags model \ # Real-time output with verbose logging madengine build --tags model --live-output --verbose + +# Use a pre-built image (skip Docker build) +madengine build --tags model --use-image lmsysorg/sglang:v0.5.2rc1-rocm700-mi30x + +# Auto-detect image from model card's DOCKER_IMAGE_NAME +madengine build --tags model --use-image + +# Build on SLURM compute node and push to registry +madengine build --tags model --build-on-compute --registry docker.io/myorg ``` **Default Values:** @@ -658,6 +669,6 @@ madengine recognizes these environment variables: --- -**Version:** 2.0.0 -**Last Updated:** December 2025 +**Version:** 2.1.0 +**Last Updated:** May 2026 diff --git a/docs/configuration.md b/docs/configuration.md index c0f0d807..be1de3b9 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -472,6 +472,8 @@ Automatically applies (see presets under `src/madengine/deployment/presets/k8s/` - `gpus_per_node` - GPUs per node (default: 1) - `nodes` - Number of nodes (default: 1) - `nodelist` - Comma-separated node names to run on (e.g. `"node01,node02"`); when set, job is restricted to these nodes and automatic node health preflight is skipped +- `reservation` - SLURM reservation name; forwarded to srun health/cleanup commands and SBATCH directives +- `exclusive` - Exclusive node access (default: `true`) - `time` - Wall time limit HH:MM:SS (required) - `mem` - Memory per node (e.g., "64G") - `mail_user` - Email for notifications @@ -521,8 +523,11 @@ Automatically applies (see presets under `src/madengine/deployment/presets/k8s/` - `deepspeed` - ZeRO optimization - `megatron` - Large transformers (K8s + SLURM) - `torchtitan` - LLM pre-training +- `primus` - Primus unified pretrain - `vllm` - LLM inference - `sglang` - Structured generation +- `sglang-disagg` - Disaggregated SGLang +- `slurm_multi` / `slurm-multi` - Self-managed multi-container topologies (SLURM only) See [Launchers Guide](launchers.md) for details. diff --git a/docs/deployment.md b/docs/deployment.md index a1d25307..7c982fce 100644 --- a/docs/deployment.md +++ b/docs/deployment.md @@ -144,6 +144,7 @@ This creates: - `vllm` - LLM inference - `sglang` - Structured generation - `sglang-disagg` - Disaggregated SGLang (multi-node) +- `slurm_multi` / `slurm-multi` - Self-managed multi-container topologies (SLURM only, escape hatch) See [Launchers Guide](launchers.md) for details. @@ -242,8 +243,10 @@ The deployment target is automatically detected from the `slurm` key in the conf - `gpus_per_node`: Number of GPUs per node - `nodes`: Number of nodes (for multi-node) - `nodelist`: Comma-separated node names to run on (e.g. `"node01,node02"`); when set, job runs only on these nodes and node health preflight is skipped +- `reservation`: SLURM reservation name; forwarded to srun health/cleanup commands - `time`: Wall time limit (HH:MM:SS) - `mem`: Memory per node (e.g., "64G") +- `exclusive`: Exclusive node access (default: `true`) - `mail_user`: Email for job notifications - `mail_type`: Notification types (BEGIN, END, FAIL, ALL) @@ -291,6 +294,53 @@ scontrol show job tail -f slurm-.out ``` +### Pre-Built Images and Build-on-Compute + +For workloads that use externally maintained Docker images (e.g. SGLang, vLLM releases): + +```bash +# Skip Docker build, use a pre-built image +madengine build --tags model --use-image lmsysorg/sglang:latest + +# Auto-detect image from model card's DOCKER_IMAGE_NAME +madengine build --tags model --use-image + +# Build on a SLURM compute node and push to registry +madengine build --tags model --build-on-compute --registry docker.io/myorg +``` + +The manifest generated by `--use-image` merges the model card's `distributed` and `slurm` config into `deployment_config`, so the run phase auto-detects SLURM deployment without additional `--additional-context`. + +### slurm_multi Launcher (Self-Managed) + +For workloads that orchestrate their own per-node Docker containers (e.g. SGLang Disaggregated proxy + prefill + decode topologies), use the `slurm_multi` launcher: + +```json +{ + "distributed": { + "launcher": "slurm_multi" + }, + "slurm": { + "partition": "gpu", + "nodes": 3, + "gpus_per_node": 8, + "reservation": "my-reservation" + } +} +``` + +Unlike templated launchers, slurm_multi runs the model's `.slurm` script directly on baremetal. The script manages its own Docker containers via `srun` internally. See [Launchers Guide — slurm_multi](launchers.md#9-slurm_multi-self-managed-escape-hatch) for details. + +### Running Inside salloc + +When `madengine run` detects an existing SLURM allocation (`SLURM_JOB_ID` is set, e.g. inside `salloc`), the slurm_multi launcher runs the generated wrapper script synchronously with `bash` instead of nesting another `sbatch`. Other launchers continue to use `sbatch` even inside `salloc`. + +```bash +salloc --nodes=3 --gpus-per-node=8 --partition=gpu +madengine run --manifest-file build_manifest.json +# → Detects salloc, runs synchronously +``` + ### Cancellation ```bash diff --git a/docs/launchers.md b/docs/launchers.md index 37d5f9cb..a4bcb672 100644 --- a/docs/launchers.md +++ b/docs/launchers.md @@ -20,6 +20,7 @@ madengine provides unified support for multiple distributed frameworks, enabling | **vLLM** | Inference | High-throughput LLM serving | ✅ | ✅ | ✅ | | **SGLang** | Inference | Fast LLM inference | ✅ | ✅ | ✅ | | **SGLang Disaggregated** | Inference | Large-scale disaggregated inference | ✅ | ✅ | ✅ (min 3) | +| **slurm_multi** | Escape hatch | Self-managed multi-container topologies | ❌ | ✅ | ✅ | --- @@ -557,6 +558,108 @@ madengine run --tags model --config custom-split-config.json --- +### 9. slurm_multi (Self-Managed Escape Hatch) + +**Purpose**: Run workloads that manage their own per-node Docker containers via `srun` — an escape hatch for topologies that don't fit the standard templated launchers. + +**When to Use**: +- ✅ Multi-container SLURM topologies (e.g. SGLang Disaggregated proxy + prefill + decode) +- ✅ Workloads whose `.slurm` script orchestrates Docker containers via `srun` internally +- ✅ Scenarios requiring baremetal `srun`/`scontrol` access from the model script +- ❌ NOT a peer of templated launchers — use torchrun, vllm, sglang, etc. for standard workloads + +**Configuration**: +```json +{ + "distributed": { + "launcher": "slurm_multi", + "nnodes": 3, + "nproc_per_node": 8 + }, + "slurm": { + "partition": "gpu", + "nodes": 3, + "gpus_per_node": 8, + "time": "04:00:00", + "exclusive": true, + "reservation": "my-reservation" + } +} +``` + +**How It Works**: + +Unlike templated launchers that inject `MAD_MULTI_NODE_RUNNER` and wrap the model script inside a Docker container, slurm_multi: + +1. Generates a wrapper SBATCH script that exports `env_vars` from the model card +2. Runs the model's own `.slurm` script directly on baremetal (head node) +3. The model script orchestrates per-node Docker containers via `srun` internally +4. Performs parallel `srun docker pull` on all allocated nodes when using registry images +5. Writes a completion marker file for robust job completion detection + +``` +┌─────────────────────────────────────────────────┐ +│ madengine build --use-image │ +│ → Generates manifest with pre-built image │ +│ → Merges model card slurm/distributed config │ +└───────────────────┬─────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────┐ +│ madengine run --manifest-file manifest.json │ +│ → Detects slurm_multi launcher │ +│ → Generates wrapper SBATCH script │ +│ → Parallel docker pull on all nodes (if needed) │ +│ → Submits sbatch (or runs bash if inside salloc)│ +└───────────────────┬─────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────┐ +│ Model's .slurm script runs on head node │ +│ → Orchestrates Docker containers via srun │ +│ → Manages its own topology (proxy/prefill/...) │ +│ → Writes perf.csv (collected by madengine) │ +└─────────────────────────────────────────────────┘ +``` + +**Build Phase**: + +slurm_multi models typically use pre-built images. The build phase has a **registry gate**: if no `--registry`, `--use-image`, or `--build-on-compute` is given, the orchestrator either auto-detects `DOCKER_IMAGE_NAME` from the model card (implicit `--use-image`) or raises a `ConfigurationError` with supported options. + +```bash +# Use a pre-built image (recommended for slurm_multi) +madengine build --tags my_model --use-image lmsysorg/sglang:latest + +# Auto-detect image from model card's DOCKER_IMAGE_NAME +madengine build --tags my_model --use-image + +# Build on compute node and push to registry +madengine build --tags my_model --build-on-compute --registry docker.io/myorg +``` + +**Run Phase — salloc support**: + +When `madengine run` detects `SLURM_JOB_ID` (running inside an existing `salloc` allocation), the slurm_multi launcher runs the wrapper script synchronously with `bash` instead of nesting another `sbatch`. Other launchers continue to use `sbatch` inside `salloc` (no behavior change). + +```bash +# Inside salloc: runs synchronously with bash +salloc --nodes=3 --gpus-per-node=8 --partition=gpu +madengine run --manifest-file build_manifest.json +``` + +**Alias**: `"slurm-multi"` (hyphen) is normalized to `"slurm_multi"` (underscore). + +**Features**: +- Wrapper SBATCH script with shell-quoted env_vars (injection-safe) +- Parallel `srun docker pull` on all nodes for registry images +- Completion marker for robust job status detection +- bash-in-salloc synchronous execution path +- `DeploymentResult.skip_monitoring` for synchronous runs +- Model card slurm/distributed config auto-merged into manifest + +**Examples**: +- SLURM: `examples/slurm-configs/minimal/slurm-multi-minimal.json` + +--- + ## Comparison Matrix ### Training Launchers @@ -732,7 +835,7 @@ SGLANG_NODE_RANK=${SLURM_PROCID} ```bash Error: Unknown launcher type 'xyz' ``` -Solution: Use one of: `torchrun`, `deepspeed`, `megatron`, `torchtitan`, `primus`, `vllm`, `sglang`, `sglang-disagg` +Solution: Use one of: `torchrun`, `deepspeed`, `megatron`, `torchtitan`, `primus`, `vllm`, `sglang`, `sglang-disagg`, `slurm_multi` (or `slurm-multi`) **2. Multi-Node Communication Fails** ```bash @@ -782,6 +885,9 @@ $MAD_MULTI_NODE_RUNNER your_training_script.py --args # For vLLM/sglang (no MAD_MULTI_NODE_RUNNER) python your_inference_script.py --args + +# For slurm_multi (no MAD_MULTI_NODE_RUNNER; script runs on baremetal and manages Docker via srun) +# The model's .slurm script is executed directly — it handles srun, docker run, etc. internally ``` ### Launcher Detection diff --git a/examples/slurm-configs/README.md b/examples/slurm-configs/README.md index 7d714a08..e89f016f 100644 --- a/examples/slurm-configs/README.md +++ b/examples/slurm-configs/README.md @@ -64,8 +64,10 @@ Stripped-down configurations showing only essential fields: - `primus-minimal.json` - Minimal Primus pretrain (`distributed.launcher: "primus"`; edit `primus.config_path`) - `vllm-single-node-minimal.json` - Minimal vLLM single-node - `vllm-multi-node-minimal.json` - Minimal vLLM multi-node +- `slurm-multi-minimal.json` - Minimal slurm_multi self-managed launcher (3 nodes) For Primus options and environment variables, see [Launchers Guide](../../docs/launchers.md#5-primus). +For slurm_multi escape-hatch launcher, see [Launchers Guide](../../docs/launchers.md#9-slurm_multi-self-managed-escape-hatch). ## 🔄 Configuration Workflow @@ -386,6 +388,7 @@ madengine uses intelligent multi-layer configuration merging: "results_dir": "/shared/results", // Shared results collection "shared_workspace": "/shared/workspace", // Shared workspace (NFS/Lustre) "exclusive": true, // Exclusive node access + "reservation": "my-reservation", // Optional SLURM reservation name "qos": "high", // Quality of Service "account": "project-name", // SLURM account "network_interface": "ib0", // Network interface (ib0/eth0) @@ -401,7 +404,7 @@ madengine uses intelligent multi-layer configuration merging: ```json { "distributed": { - "launcher": "torchrun", // Launcher type: torchrun, vllm, sglang, deepspeed, megatron + "launcher": "torchrun", // Launcher type: torchrun, vllm, sglang, deepspeed, megatron, slurm_multi "backend": "nccl", // Communication backend (nccl/gloo) "port": 29500, // Master node port "nnodes": 2, // Number of nodes (overrides slurm.nodes if set) @@ -416,10 +419,12 @@ madengine uses intelligent multi-layer configuration merging: - `sglang`: SGLang inference engine - `deepspeed`: DeepSpeed training framework - `megatron`: Megatron-LM large model training +- `slurm_multi` / `slurm-multi`: Self-managed multi-container topologies (escape hatch) - Custom: Set environment variables, model script handles launcher **Note**: For vLLM and SGLang, the model script handles process spawning directly. For torchrun/deepspeed/megatron, use `$MAD_MULTI_NODE_RUNNER` in your model script. +For slurm_multi, the model's `.slurm` script runs on baremetal and manages Docker containers via `srun` internally. ### Environment Variables @@ -526,6 +531,53 @@ For custom memory configurations, create a new config file: } ``` +## 🔧 slurm_multi Launcher (Self-Managed) + +The `slurm_multi` launcher is an escape hatch for workloads that orchestrate their own Docker containers via `srun` (e.g. SGLang Disaggregated proxy + prefill + decode topologies). Unlike templated launchers, the model's `.slurm` script runs directly on baremetal. + +### slurm_multi Workflow + +```bash +# 1. Build with pre-built image (slurm_multi typically uses external images) +madengine build --tags sglang_disagg \ + --use-image lmsysorg/sglang:latest + +# 2. Run — launcher auto-detected from model card +madengine run --manifest-file build_manifest.json +``` + +### slurm_multi Inside salloc + +When running inside an existing `salloc` allocation, slurm_multi runs synchronously with `bash` instead of nesting `sbatch`: + +```bash +salloc --nodes=3 --gpus-per-node=8 --partition=gpu +madengine run --manifest-file build_manifest.json +# → Detected existing SLURM allocation: runs with bash +``` + +### slurm_multi Configuration + +```json +{ + "slurm": { + "partition": "gpu", + "nodes": 3, + "gpus_per_node": 8, + "time": "04:00:00", + "exclusive": true, + "reservation": "my-reservation" + }, + "distributed": { + "launcher": "slurm_multi", + "nnodes": 3, + "nproc_per_node": 8 + } +} +``` + +See [Launchers Guide — slurm_multi](../../docs/launchers.md#9-slurm_multi-self-managed-escape-hatch) for full details. + ## 🛠️ Advanced Features ### Custom Environment Modules diff --git a/examples/slurm-configs/minimal/slurm-multi-minimal.json b/examples/slurm-configs/minimal/slurm-multi-minimal.json new file mode 100644 index 00000000..03fb5fb9 --- /dev/null +++ b/examples/slurm-configs/minimal/slurm-multi-minimal.json @@ -0,0 +1,23 @@ +{ + "_comment": "Minimal slurm_multi launcher configuration - 3 nodes minimum", + "_description": "Self-managed multi-node SLURM launcher (script runs on baremetal, manages its own Docker via srun)", + "_architecture": "Wrapper SBATCH exports env_vars and runs the model's .slurm script directly on the head node; the script orchestrates per-node containers via srun", + + "gpu_vendor": "AMD", + "guest_os": "UBUNTU", + "deploy": "slurm", + + "slurm": { + "partition": "gpu", + "nodes": 3, + "gpus_per_node": 8, + "time": "04:00:00", + "exclusive": true + }, + + "distributed": { + "launcher": "slurm_multi", + "nnodes": 3, + "nproc_per_node": 8 + } +} diff --git a/src/madengine/cli/commands/build.py b/src/madengine/cli/commands/build.py index 5b10a65c..8133a75c 100644 --- a/src/madengine/cli/commands/build.py +++ b/src/madengine/cli/commands/build.py @@ -55,6 +55,28 @@ def build( "--batch-manifest", help="Input batch.json file for batch build mode" ), ] = None, + # NOTE: `is_flag=False, flag_value="auto"` lets `--use-image` (no value) + # mean "auto-detect from the model card's DOCKER_IMAGE_NAME", matching + # MAD-private PR #186's documented UX. Typer is deprecating this pattern + # for a future release; when removed, switch to requiring an explicit + # value (e.g. `--use-image auto` as the documented sentinel) and update + # MAD-private's docs in lockstep. + use_image: Annotated[ + Optional[str], + typer.Option( + "--use-image", + is_flag=False, + flag_value="auto", + help="Skip Docker build and use pre-built image. Optionally specify image name, or omit to auto-detect from model card's DOCKER_IMAGE_NAME" + ), + ] = None, + build_on_compute: Annotated[ + bool, + typer.Option( + "--build-on-compute", + help="Build Docker images on SLURM compute node instead of login node" + ), + ] = False, additional_context: Annotated[ str, typer.Option( @@ -116,6 +138,31 @@ def build( ) raise typer.Exit(ExitCode.INVALID_ARGS) + if use_image and registry: + console.print( + "❌ [bold red]Error: Cannot specify both --use-image and --registry options[/bold red]\n" + "[yellow]Use --use-image for pre-built external images.[/yellow]\n" + "[yellow]Use --registry to push locally built images.[/yellow]" + ) + raise typer.Exit(ExitCode.INVALID_ARGS) + + if use_image and build_on_compute: + console.print( + "❌ [bold red]Error: Cannot specify both --use-image and --build-on-compute options[/bold red]\n" + "[yellow]--use-image skips Docker build entirely.[/yellow]\n" + "[yellow]--build-on-compute builds on SLURM compute nodes.[/yellow]" + ) + raise typer.Exit(ExitCode.INVALID_ARGS) + + if build_on_compute and not registry: + console.print( + "❌ [bold red]Error: --build-on-compute requires --registry option[/bold red]\n" + "[yellow]Build on compute node pushes image to registry.[/yellow]\n" + "[yellow]Run phase will pull image in parallel on all nodes.[/yellow]\n" + "[dim]Example: --build-on-compute --registry docker.io/myorg[/dim]" + ) + raise typer.Exit(ExitCode.INVALID_ARGS) + # Process batch manifest if provided batch_data = None effective_tags = processed_tags @@ -175,7 +222,7 @@ def build( try: # Validate additional context and merge file + CLI; defaults wired into orchestrator validated_context = validate_additional_context( - additional_context, additional_context_file + additional_context, additional_context_file, use_image ) # Create arguments object @@ -191,6 +238,8 @@ def build( verbose=verbose, _separate_phases=True, batch_build_metadata=batch_build_metadata if batch_build_metadata else None, + use_image=use_image, + build_on_compute=build_on_compute, ) # Initialize orchestrator in build-only mode @@ -211,6 +260,8 @@ def build( clean_cache=clean_docker_cache, manifest_output=manifest_output, batch_build_metadata=batch_build_metadata, + use_image=use_image, + build_on_compute=build_on_compute, ) # Load build summary for display diff --git a/src/madengine/cli/validators.py b/src/madengine/cli/validators.py index 1f7ee001..68f45856 100644 --- a/src/madengine/cli/validators.py +++ b/src/madengine/cli/validators.py @@ -298,6 +298,7 @@ def additional_context_needs_cli_validation( def validate_additional_context( additional_context: str, additional_context_file: Optional[str] = None, + use_image: Optional[str] = None, ) -> Dict[str, Any]: """ Validate and parse additional context. @@ -305,6 +306,10 @@ def validate_additional_context( Args: additional_context: JSON string containing additional context additional_context_file: Optional file containing additional context + use_image: Pre-built image override forwarded by build.py for CLI signature + compatibility. Currently informational only -- validation behavior is + unchanged when this is set; callers wanting to skip required-field + checks should adjust ``finalize_additional_context_dict`` directly. Returns: Dict containing parsed additional context diff --git a/src/madengine/deployment/base.py b/src/madengine/deployment/base.py index 981931f0..69e1367e 100644 --- a/src/madengine/deployment/base.py +++ b/src/madengine/deployment/base.py @@ -82,6 +82,7 @@ class DeploymentResult: metrics: Optional[Dict[str, Any]] = None logs_path: Optional[str] = None artifacts: Optional[List[str]] = None + skip_monitoring: bool = False # Set True for synchronous runs (e.g., inside salloc) @property def is_success(self) -> bool: @@ -196,7 +197,8 @@ def execute(self) -> DeploymentResult: return result # Step 4: Monitor (optional) - if self.config.monitor: + # Skip monitoring if deploy() already ran synchronously (e.g., inside salloc) + if self.config.monitor and not result.skip_monitoring: result = self._monitor_until_complete(result.deployment_id) # Step 5: Collect Results (always collect, even on failure to record failed runs) diff --git a/src/madengine/deployment/common.py b/src/madengine/deployment/common.py index 5b898960..4231854f 100644 --- a/src/madengine/deployment/common.py +++ b/src/madengine/deployment/common.py @@ -21,7 +21,8 @@ "primus", "vllm", "sglang", - "sglang-disagg" + "sglang-disagg", + "slurm_multi", ] # Tool names that use rocprof / rocprofv3 wrapping and need MPI-aware rocprofv3 on multi-node. @@ -62,6 +63,8 @@ def normalize_launcher(launcher_type: Optional[str], deployment_type: str) -> st Logic: - If launcher is in VALID_LAUNCHERS: keep as-is + - If launcher's hyphen/underscore variant is in VALID_LAUNCHERS: normalize + (e.g. "slurm-multi" -> "slurm_multi") - If launcher is None/empty/invalid: * local → "docker" (runs in Docker container) * slurm → "docker" (typically uses containers on compute nodes) @@ -76,6 +79,9 @@ def normalize_launcher(launcher_type: Optional[str], deployment_type: str) -> st """ if launcher_type and launcher_type in VALID_LAUNCHERS: return launcher_type + # Normalize hyphen variant: slurm-multi -> slurm_multi + if launcher_type and launcher_type.replace("-", "_") in VALID_LAUNCHERS: + return launcher_type.replace("-", "_") if deployment_type == "local": return "docker" if deployment_type == "slurm": @@ -85,6 +91,22 @@ def normalize_launcher(launcher_type: Optional[str], deployment_type: str) -> st return "docker" +_SELF_MANAGED_LAUNCHERS: frozenset = frozenset({"slurm_multi"}) + + +def is_self_managed_launcher(launcher_type: Optional[str]) -> bool: + """Return True if the launcher manages its own per-node containers. + + Self-managed launchers (e.g. slurm_multi) run the model's own .slurm script + directly on the head node and orchestrate Docker containers via srun internally. + They bypass the standard sbatch template entirely and are an escape hatch — not + peers of the templated launchers (torchrun, vllm, sglang, etc.). + """ + if not launcher_type: + return False + return normalize_launcher(launcher_type, "slurm") in _SELF_MANAGED_LAUNCHERS + + @functools.lru_cache(maxsize=None) def is_rocprofv3_available() -> bool: """ diff --git a/src/madengine/deployment/slurm.py b/src/madengine/deployment/slurm.py index b5eefbd4..64b3cdcf 100644 --- a/src/madengine/deployment/slurm.py +++ b/src/madengine/deployment/slurm.py @@ -12,6 +12,7 @@ """ import os +import shlex import subprocess import time from pathlib import Path @@ -19,7 +20,7 @@ from .base import BaseDeployment, DeploymentConfig, DeploymentResult, DeploymentStatus, create_jinja_env from .primus_backend import infer_primus_backend_from_model_name, merged_primus_config -from .common import configure_multi_node_profiling, normalize_launcher +from .common import configure_multi_node_profiling, is_self_managed_launcher, normalize_launcher from .config_loader import ConfigLoader, apply_deployment_config from .slurm_node_selector import SlurmNodeSelector from madengine.utils.gpu_config import resolve_runtime_gpus @@ -70,6 +71,7 @@ def __init__(self, config: DeploymentConfig): self.gpus_per_node = self.slurm_config.get("gpus_per_node", 8) self.time_limit = self.slurm_config.get("time", "24:00:00") self.output_dir = Path(self.slurm_config.get("output_dir", "./slurm_results")) + self.reservation = self.slurm_config.get("reservation", None) # Setup Jinja2 template engine template_dir = Path(__file__).parent / "templates" / "slurm" @@ -78,6 +80,115 @@ def __init__(self, config: DeploymentConfig): # Generated script path self.script_path = None + # ========== OPTION 2: Detect existing SLURM allocation ========== + # If SLURM_JOB_ID exists, we're inside an salloc allocation + self.inside_allocation = os.environ.get("SLURM_JOB_ID") is not None + self.existing_job_id = os.environ.get("SLURM_JOB_ID", "") + self.allocation_nodes = self._get_allocation_node_count() + + if self.inside_allocation: + self.console.print( + f"[cyan]✓ Detected existing SLURM allocation: Job {self.existing_job_id}[/cyan]" + ) + self.console.print( + f" Allocation has {self.allocation_nodes} nodes available" + ) + + def _get_allocation_node_count(self) -> int: + """ + Get number of nodes in current SLURM allocation. + + Note: SLURM_NNODES reflects the current job step, not the full allocation. + We query the job directly using scontrol to get the actual node count. + """ + if not self.inside_allocation: + return 0 + + job_id = self.existing_job_id + + # Query the actual job's node count using scontrol (most accurate) + try: + result = subprocess.run( + ["scontrol", "show", "job", job_id], + capture_output=True, + text=True, + timeout=10, + ) + if result.returncode == 0: + # Parse NumNodes=X from output + for line in result.stdout.split("\n"): + if "NumNodes=" in line: + # Format: "NumNodes=3 NumCPUs=..." + for part in line.split(): + if part.startswith("NumNodes="): + try: + return int(part.split("=")[1]) + except (ValueError, IndexError): + pass + except Exception: + pass + + # Fallback: Try SLURM_JOB_NUM_NODES (full job node count, if set) + job_num_nodes = os.environ.get("SLURM_JOB_NUM_NODES") + if job_num_nodes: + try: + return int(job_num_nodes) + except ValueError: + pass + + # Fallback: SLURM_NNODES (may be step-specific, not full allocation) + nnodes = os.environ.get("SLURM_NNODES") + if nnodes: + try: + return int(nnodes) + except ValueError: + pass + + # Last resort: count nodes in SLURM_NODELIST + nodelist = os.environ.get("SLURM_NODELIST") + if nodelist: + try: + result = subprocess.run( + ["scontrol", "show", "hostname", nodelist], + capture_output=True, + text=True, + timeout=10, + ) + if result.returncode == 0: + return len(result.stdout.strip().split("\n")) + except Exception: + pass + + return 0 + + def _validate_allocation_nodes(self) -> tuple: + """ + Validate that existing allocation has enough nodes for the job. + + Returns: + Tuple of (is_valid, error_message) + """ + if not self.inside_allocation: + return True, "" + + requested_nodes = self.nodes + available_nodes = self.allocation_nodes + + if available_nodes < requested_nodes: + return False, ( + f"Insufficient nodes in current allocation. " + f"Requested: {requested_nodes}, Available: {available_nodes}. " + f"Either reduce nodes in config or use a larger allocation." + ) + + if available_nodes > requested_nodes: + self.console.print( + f"[yellow]⚠ Note: Using {requested_nodes} of {available_nodes} " + f"available nodes in allocation[/yellow]" + ) + + return True, "" + def validate(self) -> bool: """Validate SLURM commands are available locally.""" # Check required SLURM CLI tools @@ -177,6 +288,32 @@ def _validate_cli_availability(self) -> bool: def prepare(self) -> bool: """Generate sbatch script from template.""" + # Escape-hatch early dispatch: slurm_multi is a self-managed launcher that runs the + # model's own .slurm script on the head node (no madengine wrapper on compute nodes). + # Peek at the launcher type; if it's a self-managed launcher, take the bypass path. + # All other launchers (torchrun, vllm, sglang, deepspeed, megatron, torchtitan, + # primus) fall through to the standard Jinja2 template flow unchanged. + try: + model_keys_peek = list((self.manifest or {}).get("built_models", {}).keys()) + if model_keys_peek: + model_info_peek = self.manifest["built_models"][model_keys_peek[0]] + model_distributed_peek = model_info_peek.get("distributed", {}) + launcher_type_peek = ( + model_distributed_peek.get("launcher") + or self.distributed_config.get("launcher", "torchrun") + ) + if is_self_managed_launcher(launcher_type_peek): + self.output_dir.mkdir(parents=True, exist_ok=True) + self.console.print( + f"[cyan]Detected slurm_multi launcher: {launcher_type_peek}[/cyan]" + ) + return self._prepare_slurm_multi_script( + model_info_peek, docker_image_name=model_keys_peek[0] + ) + except Exception: + # Fall through to develop's standard flow on any peek error + pass + # Validate environment BEFORE generating job scripts self.console.print("\n[bold]Validating submission environment...[/bold]") if not self._validate_cli_availability(): @@ -224,6 +361,240 @@ def _normalize_nodelist(nodelist: Optional[str]) -> Optional[str]: return None return ",".join(n.strip() for n in nodelist.split(",") if n.strip()) + def _prepare_slurm_multi_script(self, model_info: Dict, docker_image_name: str = None) -> bool: + """ + Escape hatch for self-orchestrating multi-container SLURM topologies. + + slurm_multi bypasses the standard sbatch template entirely: the model's own + .slurm script runs on the head node and manages its per-node Docker containers + via srun internally (e.g. SGLang Disaggregated proxy + prefill + decode). No + madengine wrapper is injected on compute nodes. This is NOT a peer of the + templated launchers (torchrun, vllm, sglang, deepspeed, megatron, torchtitan, + primus) — those continue to flow through the standard template path unchanged. + + Args: + model_info: Model configuration from manifest + docker_image_name: The built Docker image name from manifest key + """ + self._is_slurm_multi = True + # Get the model's script path + model_script = model_info.get("scripts", "") + if not model_script: + self.console.print("[red]✗ No scripts defined in model_info[/red]") + return False + + # Get manifest directory (where the model script is relative to) + manifest_dir = Path(self.config.manifest_file).parent.absolute() + model_script_path = manifest_dir / model_script + + if not model_script_path.exists(): + self.console.print(f"[red]✗ Model script not found: {model_script_path}[/red]") + return False + + # Get environment variables + env_vars = {} + + # From model_info.env_vars + if "env_vars" in model_info: + env_vars.update(model_info["env_vars"]) + + # From additional_context.env_vars + if "env_vars" in self.config.additional_context: + env_vars.update(self.config.additional_context["env_vars"]) + + # From distributed config (model's distributed section) + model_distributed = model_info.get("distributed", {}) + sglang_disagg_config = model_distributed.get("sglang_disagg", {}) or self.distributed_config.get("sglang_disagg", {}) + if sglang_disagg_config: + if "xP" not in env_vars: + env_vars["xP"] = str(sglang_disagg_config.get("prefill_nodes", 1)) + if "yD" not in env_vars: + env_vars["yD"] = str(sglang_disagg_config.get("decode_nodes", 1)) + + # Override DOCKER_IMAGE_NAME with the built image from manifest + # This ensures the run uses the freshly built image, not the base image + # Priority: docker_image_name param > model_info.docker_image > env_vars.DOCKER_IMAGE_NAME + if docker_image_name and docker_image_name.startswith("ci-"): + # The manifest key IS the built image name for madengine-built images + self.console.print(f"[cyan]Using built Docker image: {docker_image_name}[/cyan]") + env_vars["DOCKER_IMAGE_NAME"] = docker_image_name + elif "docker_image" in model_info: + built_image = model_info["docker_image"] + self.console.print(f"[cyan]Using Docker image: {built_image}[/cyan]") + env_vars["DOCKER_IMAGE_NAME"] = built_image + elif "image" in model_info: + # Fallback to 'image' field + built_image = model_info["image"] + self.console.print(f"[cyan]Using Docker image: {built_image}[/cyan]") + env_vars["DOCKER_IMAGE_NAME"] = built_image + + # Get model args. The wrapper script below is executed by bash, so the + # script name and free-form args string must be shell-quoted to prevent + # embedded metacharacters ($(), backticks, ;, etc.) from being evaluated + # by the host shell. Mirrors the hardening in container_runner._run_self_managed. + model_args = model_info.get("args", "") + _script_name_q = shlex.quote(model_script_path.name) + _model_args_q = ( + " ".join(shlex.quote(a) for a in shlex.split(model_args)) + if model_args + else "" + ) + _bash_invocation = f"bash {_script_name_q} {_model_args_q}".rstrip() + + # Generate simple wrapper script + # IMPORTANT: SBATCH directives MUST be at the top, right after #!/bin/bash + script_lines = [ + "#!/bin/bash", + f"#SBATCH --job-name=madengine-{model_info['name']}", + f"#SBATCH --output={self.output_dir}/madengine-{model_info['name']}_%j_%t.out", + f"#SBATCH --error={self.output_dir}/madengine-{model_info['name']}_%j_%t.err", + f"#SBATCH --partition={self.partition}", + f"#SBATCH --nodes={self.nodes}", + f"#SBATCH --ntasks={self.nodes}", + f"#SBATCH --gpus-per-node={self.gpus_per_node}", + f"#SBATCH --time={self.time_limit}", + ] + # Honour user-configured exclusivity (defaults to True to match the standard SLURM template). + if self.slurm_config.get("exclusive", True): + script_lines.append("#SBATCH --exclusive") + + # Add reservation if specified + if self.reservation: + script_lines.append(f"#SBATCH --reservation={self.reservation}") + + # Add nodelist if specified (from model card or --additional-context) + nodelist = self._normalize_nodelist(self.slurm_config.get("nodelist")) + if nodelist: + script_lines.append(f"#SBATCH --nodelist={nodelist}") + + script_lines.extend([ + "", + f"# slurm_multi launcher script for {model_info['name']}", + f"# Generated by madengine for slurm_multi", + "", + "set -e", + "", + "# Environment variables", + ]) + + for key, value in env_vars.items(): + script_lines.append(f"export {key}={shlex.quote(str(value))}") + + script_lines.append("") + script_lines.extend([ + "echo '=========================================='", + "echo 'slurm_multi Launcher'", + "echo '=========================================='", + f"echo 'Model: {model_info['name']}'", + f"echo 'Script: {model_script_path}'", + "echo 'SLURM_JOB_ID:' $SLURM_JOB_ID", + "echo 'SLURM_NNODES:' $SLURM_NNODES", + "echo 'SLURM_NODELIST:' $SLURM_NODELIST", + "echo ''", + ]) + + # Check if image needs parallel pull on all nodes + # Pull if: image is from registry (contains / or .) and not a local ci-* build + docker_image = env_vars.get("DOCKER_IMAGE_NAME", "") + is_registry_image = docker_image and not docker_image.startswith("ci-") and ("/" in docker_image or "." in docker_image) + + if is_registry_image: + # Add parallel docker pull on all nodes + # This ensures all nodes have the image before running + script_lines.extend([ + "", + "# Pull Docker image in parallel on all nodes", + "echo '=========================================='", + "echo 'Pulling Docker image on all nodes in parallel'", + "echo '=========================================='", + f"echo 'Image: {docker_image}'", + "echo ''", + "", + f"srun --nodes=$SLURM_NNODES --ntasks=$SLURM_NNODES bash -c \"", + f" echo \\\"[\\$(hostname)] Pulling {docker_image}...\\\"", + f" docker pull {docker_image}", + " PULL_RC=\\$?", + " if [ \\$PULL_RC -eq 0 ]; then", + " echo \\\"[\\$(hostname)] Pull SUCCESS\\\"", + " else", + " echo \\\"[\\$(hostname)] Pull FAILED with exit code \\$PULL_RC\\\"", + " fi", + " exit \\$PULL_RC", + "\"", + "PULL_EXIT=$?", + "", + "if [ $PULL_EXIT -ne 0 ]; then", + " echo 'Docker pull failed on one or more nodes'", + " exit $PULL_EXIT", + "fi", + "", + "echo ''", + "echo 'Docker image pulled on all nodes'", + "echo ''", + ]) + + # Create completion marker path for robust completion detection. + # Namespace by SLURM_JOB_ID so concurrent / repeat runs of the same model + # tag don't collide on each other's marker files. monitor() reconstructs + # the same path using the deployment_id returned by sbatch. + completion_marker_dir = self.output_dir.resolve() + completion_marker_template = ( + completion_marker_dir + / f"madengine_{model_info['name']}_${{SLURM_JOB_ID:-local}}.complete" + ) + + # Disable `set -e` around the model script bash invocation below so a + # non-zero exit doesn't terminate the wrapper before SCRIPT_EXIT_CODE is + # captured and the completion marker is written. monitor() relies on the + # marker to distinguish 'failed' from 'still running'; without this, + # a failed model run would look like a hang. + script_lines.extend([ + "", + "# Change to script directory", + f"cd {model_script_path.parent}", + "", + "# Run the model script directly on the host (with -e disabled so we", + "# can capture the exit code and write the completion marker even on failure).", + f"echo 'Executing: {_bash_invocation}'", + "set +e", + _bash_invocation, + "SCRIPT_EXIT_CODE=$?", + "set -e", + "", + "echo ''", + "echo 'Script completed.'", + "", + "# Write completion marker for madengine to detect (job-id namespaced)", + f"echo \"exit_code=$SCRIPT_EXIT_CODE\" > {completion_marker_template}", + f"echo \"timestamp=$(date -Iseconds)\" >> {completion_marker_template}", + f"echo \"Completion marker written: {completion_marker_template}\"", + "", + "exit $SCRIPT_EXIT_CODE", + ]) + + # Store marker info for monitor() to reconstruct the path with deployment_id. + self._completion_marker_dir = completion_marker_dir + self._completion_marker_basename_template = ( + f"madengine_{model_info['name']}_{{job_id}}.complete" + ) + # Backward-compat: monitor() falls back to this single path when the + # job-id-aware path is unavailable (e.g. inside_allocation flow). + self._completion_marker = ( + completion_marker_dir / f"madengine_{model_info['name']}_local.complete" + ) + + script_content = "\n".join(script_lines) + + # Save script + self.script_path = self.output_dir / f"madengine_{model_info['name']}.sh" + self.script_path.write_text(script_content) + self.script_path.chmod(0o755) + + self.console.print(f"[green]✓ Generated slurm_multi script: {self.script_path}[/green]") + self.console.print(f" Model script: {model_script_path}") + self.console.print(f" Environment: {len(env_vars)} variables") + + return True def _prepare_template_context(self, model_info: Dict) -> Dict[str, Any]: """Prepare context for Jinja2 template rendering.""" # Use hierarchical GPU resolution: runtime > deployment > model > default @@ -717,6 +1088,12 @@ def deploy(self) -> DeploymentResult: message="Script not generated. Run prepare() first.", ) + # slurm_multi inside an existing salloc allocation: run the generated script + # directly with bash instead of nesting another sbatch. Non-slurm_multi launchers + # always fall through to the standard sbatch flow (preserves develop behavior). + if self.inside_allocation and getattr(self, "_is_slurm_multi", False): + return self._run_inside_existing_allocation() + # ==================== PREFLIGHT NODE SELECTION ==================== # For single- and multi-node jobs, check for clean nodes and exclude bad ones. # Single-node: we still run the check so bad nodes (e.g. Docker broken) get excluded; @@ -734,6 +1111,7 @@ def deploy(self) -> DeploymentResult: console=self.console, auto_cleanup=auto_cleanup, verbose=self.slurm_config.get("verbose_node_check", False), + reservation=self.reservation, ) clean_nodes, updated_exclude = selector.select_nodes( partition=self.partition, @@ -826,6 +1204,79 @@ def deploy(self) -> DeploymentResult: message=f"Deployment error: {str(e)}", ) + def _run_inside_existing_allocation(self) -> DeploymentResult: + """ + Run script directly inside existing salloc allocation using bash. + + The script will use the nodes already allocated to the current job. + SLURM environment variables (SLURM_NODELIST, etc.) are inherited. + """ + # Validate node count before running + is_valid, error_msg = self._validate_allocation_nodes() + if not is_valid: + return DeploymentResult( + status=DeploymentStatus.FAILED, + deployment_id=self.existing_job_id, + message=error_msg, + ) + + self.console.print( + f"\n[bold cyan]Running inside existing SLURM allocation[/bold cyan]" + ) + self.console.print(f" Job ID: {self.existing_job_id}") + self.console.print(f" Using {self.nodes} of {self.allocation_nodes} allocated nodes") + self.console.print(f" GPUs per node: {self.gpus_per_node}") + self.console.print(f" Script: {self.script_path}") + self.console.print(f"\n[dim]Executing: bash {self.script_path}[/dim]\n") + + try: + # Run script directly with bash (synchronous, blocks until done) + # Don't capture output - let it stream directly to console + result = subprocess.run( + ["bash", str(self.script_path)], + timeout=self.config.timeout if self.config.timeout > 0 else None, + ) + + if result.returncode == 0: + self.console.print( + f"\n[green]✓ Script completed successfully in allocation {self.existing_job_id}[/green]" + ) + return DeploymentResult( + status=DeploymentStatus.SUCCESS, + deployment_id=self.existing_job_id, + message=f"Completed inside existing allocation {self.existing_job_id}", + logs_path=str(self.output_dir), + skip_monitoring=True, # Already ran synchronously, no need to poll + ) + else: + self.console.print( + f"\n[red]✗ Script failed with exit code {result.returncode}[/red]" + ) + return DeploymentResult( + status=DeploymentStatus.FAILED, + deployment_id=self.existing_job_id, + message=f"Script failed with exit code {result.returncode}", + logs_path=str(self.output_dir), + skip_monitoring=True, # Already ran synchronously + ) + + except subprocess.TimeoutExpired: + self.console.print( + f"\n[red]✗ Script timed out after {self.config.timeout}s[/red]" + ) + return DeploymentResult( + status=DeploymentStatus.FAILED, + deployment_id=self.existing_job_id, + message=f"Script timed out after {self.config.timeout}s", + ) + except Exception as e: + self.console.print(f"\n[red]✗ Execution error: {e}[/red]") + return DeploymentResult( + status=DeploymentStatus.FAILED, + deployment_id=self.existing_job_id, + message=f"Execution error: {str(e)}", + ) + def monitor(self, deployment_id: str) -> DeploymentResult: """Check SLURM job status (locally).""" try: @@ -1204,6 +1655,17 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]: model_name_for_path = model_info_for_path.get("name", model_key or "unknown") model_name = model_key or "unknown" # image key for build_info / model_info_for_entry lookups + # slurm_multi early dispatch: model script emits its own perf.csv directly, + # so collect via _collect_slurm_multi_results instead of the template-based path. + if model_key: + _mi = built_models_dict.get(model_key, {}) or {} + _launcher_type = (_mi.get("distributed") or {}).get("launcher", "") + if is_self_managed_launcher(_launcher_type): + return self._collect_slurm_multi_results( + deployment_id, results, session_start_row + ) + + build_info = {} built_images = self.manifest.get("built_images") or {} if built_images: @@ -1519,6 +1981,102 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]: ) return results + def _collect_slurm_multi_results( + self, deployment_id: str, results: Dict[str, Any], session_start_row: Optional[int] + ) -> Dict[str, Any]: + """ + Collect results for slurm_multi launchers. + + slurm_multi model scripts generate their own perf.csv via their + benchmark scripts (e.g. generate_perf_csv.py). We collect SLURM + logs for diagnostics and read the model-generated perf.csv for metrics. + """ + # Collect SLURM output logs for diagnostics + flat_out_files = sorted(self.output_dir.glob(f"madengine-*_{deployment_id}_*.out")) + results["logs"] = [str(f) for f in flat_out_files] + + # Look for model-generated perf.csv. Inner scripts in MAD-private write + # to one of these locations depending on the workload: + # * SGLang / vLLM disagg: /shared_inference///perf.csv + # * Large EP / KV cache: /slurm_output/perf_csv/**.csv + # Plus the legacy /perf.csv path some flows still use. + # Priority: results_dir config > shared_inference NFS > slurm_output/perf_csv + # > /perf.csv (with NFS-propagation retry). + perf_csv_path = None + if self.slurm_config.get("results_dir"): + results_dir = Path(self.slurm_config["results_dir"]) + candidates = list(results_dir.glob("perf*.csv")) + if candidates: + perf_csv_path = candidates[0] + + if not perf_csv_path: + user = os.environ.get("USER", "") + shared_candidates = [] + if user: + shared_candidates.extend([ + Path(f"/shared_inference/{user}/{deployment_id}/perf.csv"), + Path(f"/shared_inference/{user}/model_blog_logs/{deployment_id}/perf.csv"), + ]) + workspace_perf_dir = Path("slurm_output/perf_csv") + workspace_candidates = list(workspace_perf_dir.glob(f"*{deployment_id}*.csv")) + workspace_perf = Path("perf.csv") + + # Retry briefly for NFS propagation after SLURM job completion + import time + for _attempt in range(6): + for cand in shared_candidates: + if cand.exists() and cand.stat().st_size > 0: + perf_csv_path = cand + break + if perf_csv_path: + break + if workspace_candidates: + perf_csv_path = workspace_candidates[0] + break + if workspace_perf.exists() and workspace_perf.stat().st_size > 0: + perf_csv_path = workspace_perf + break + time.sleep(5) + # Re-glob in case the file appeared during the wait. + workspace_candidates = list(workspace_perf_dir.glob(f"*{deployment_id}*.csv")) + + if perf_csv_path: + results["perf_files"] = [str(perf_csv_path)] + self._collect_results_parse_perf_csv(results, session_start_row) + # Aggregate per-job perf rows into cwd perf.csv so the dashboard + # reporter (display_performance_table, report to-html, etc.) + # finds them under the conventional path. Local + classic-SLURM + # flows already leave a cumulative perf.csv in cwd via + # update_perf_csv(); slurm_multi flows did not, so this mirrors + # that convention without modifying the original per-job file. + import shutil + cwd_perf = Path("perf.csv") + try: + if cwd_perf.exists(): + with open(perf_csv_path, "r") as src, open(cwd_perf, "a") as dst: + next(src, None) # skip per-job header so cwd CSV stays single-headed + for line in src: + dst.write(line) + else: + shutil.copy(str(perf_csv_path), str(cwd_perf)) + self.console.print( + f"[green]✓ Aggregated per-job perf into {cwd_perf}[/green]" + ) + except Exception as e: + self.console.print( + f"[yellow]⚠ Could not aggregate per-job perf into cwd perf.csv: {e}[/yellow]" + ) + else: + self.console.print("[yellow]No perf.csv found from slurm_multi model script[/yellow]") + + self.console.print( + f"[green]Collected slurm_multi results: {len(results['perf_files'])} perf files, " + f"{len(results['logs'])} log files, " + f"{len(results['successful_runs'])} successful, " + f"{len(results['failed_runs'])} failed[/green]" + ) + return results + def _collect_results_parse_perf_csv( self, results: Dict[str, Any], session_start_row: Optional[int] ) -> None: diff --git a/src/madengine/deployment/slurm_node_selector.py b/src/madengine/deployment/slurm_node_selector.py index 408e8d3c..f63593ac 100644 --- a/src/madengine/deployment/slurm_node_selector.py +++ b/src/madengine/deployment/slurm_node_selector.py @@ -72,6 +72,7 @@ def __init__( auto_cleanup: bool = False, verbose: bool = False, timeout: int = 30, + reservation: Optional[str] = None, ): """ Initialize node selector. @@ -81,11 +82,13 @@ def __init__( auto_cleanup: Automatically clean dirty nodes verbose: Enable verbose logging timeout: Timeout for srun commands (seconds) + reservation: SLURM reservation name (passed through to srun health/cleanup) """ self.console = console or Console() self.auto_cleanup = auto_cleanup self.verbose = verbose self.timeout = timeout + self.reservation = reservation # Max candidates to check (avoids excessive checks on large clusters) MAX_CANDIDATES_CAP = 100 @@ -209,6 +212,8 @@ def check_node_health(self, node: str, job_name: Optional[str] = None) -> NodeSt ] if job_name: srun_cmd.append(f"--job-name={job_name}") + if self.reservation: + srun_cmd.append(f"--reservation={self.reservation}") srun_cmd.extend(["bash", "-c", check_script]) try: @@ -323,6 +328,8 @@ def cleanup_node(self, node: str, job_name: Optional[str] = None) -> bool: ] if job_name: srun_cmd.append(f"--job-name={job_name}") + if self.reservation: + srun_cmd.append(f"--reservation={self.reservation}") srun_cmd.extend(["bash", "-c", cleanup_script]) try: diff --git a/src/madengine/execution/container_runner.py b/src/madengine/execution/container_runner.py index e49d9647..26110c41 100644 --- a/src/madengine/execution/container_runner.py +++ b/src/madengine/execution/container_runner.py @@ -37,6 +37,7 @@ from madengine.core.additional_context_defaults import DEFAULT_GUEST_OS from madengine.utils.therock_markers import is_therock_tree from madengine.deployment.base import PERFORMANCE_LOG_PATTERN +from madengine.deployment.common import is_self_managed_launcher from madengine.execution.container_runner_helpers import ( log_text_has_error_pattern, make_run_log_file_path, @@ -797,6 +798,165 @@ def apply_tools( else: print(f" Note: Command '{cmd}' already added by another tool, skipping duplicate.") + def _run_self_managed( + self, + model_info: typing.Dict, + build_info: typing.Dict, + log_file_path: str, + timeout: int, + run_results: typing.Dict, + pre_encapsulate_post_scripts: typing.Dict, + run_env: typing.Dict, + ) -> typing.Dict: + """ + Run script directly on the host (self-managed launcher, not inside madengine Docker). + + Used for slurm_multi launchers that manage their own Docker containers + via SLURM srun commands. The script is executed directly on the node. + + Args: + model_info: Model configuration from manifest + build_info: Build information from manifest + log_file_path: Path to log file + timeout: Execution timeout in seconds + run_results: Dictionary to store run results + pre_encapsulate_post_scripts: Pre/post script configuration + run_env: Environment variables for the script + + Returns: + Dictionary with run results + """ + self.rich_console.print(f"[dim]{'='*80}[/dim]") + + # Prepare script path + scripts_arg = model_info["scripts"] + + # Get the current working directory (might be temp workspace) + cwd = os.getcwd() + print(f"📂 Current directory: {cwd}") + + if scripts_arg.endswith(".sh") or scripts_arg.endswith(".slurm") or scripts_arg.endswith(".py"): + script_path = scripts_arg + else: + # Directory specified - look for run.sh + script_path = os.path.join(scripts_arg, "run.sh") + + # If script path is relative, make it absolute from cwd + if not os.path.isabs(script_path): + script_path = os.path.join(cwd, script_path) + + # Check script exists + if not os.path.exists(script_path): + print(f"⚠️ Script not found at: {script_path}") + # Try alternative locations + alt_path = os.path.join(cwd, os.path.basename(scripts_arg)) + if os.path.exists(alt_path): + script_path = alt_path + print(f"✓ Found at alternative location: {script_path}") + else: + raise FileNotFoundError(f"Script not found: {script_path}") + + script_dir = os.path.dirname(script_path) or cwd + print(f"📜 Script: {script_path}") + print(f"📁 Working directory: {script_dir}") + + # Prepare model arguments + model_args = self.context.ctx.get("model_args", model_info.get("args", "")) + print(f"📝 Arguments: {model_args}") + + # Build command. The eventual `subprocess.run(..., shell=True)` below + # interprets shell metacharacters in `script_path` and `model_args`, + # so quote each piece explicitly. `model_args` is a CLI/manifest- + # supplied free-form string -- shlex.split + per-arg shlex.quote + # passes literal arguments to the script even when the input contains + # `$()`, backticks, `;`, etc. + _script_q = shlex.quote(script_path) + _args_q = ( + " ".join(shlex.quote(a) for a in shlex.split(model_args)) + if model_args + else "" + ) + if script_path.endswith(".py"): + cmd = f"python3 {_script_q} {_args_q}".rstrip() + else: + cmd = f"bash {_script_q} {_args_q}".rstrip() + + print(f"🔧 Command: {cmd}") + + # Prepare environment + env = os.environ.copy() + env.update(run_env) + + # Add model-specific env vars from model_info. + # Log keys only (not values) so credentials in env_vars (HF_TOKEN, MAD_DOCKERHUB_PASSWORD, + # CONNECT_*_TOKEN, etc.) carried via the model card don't leak into the run log. + if "env_vars" in model_info and model_info["env_vars"]: + for key, value in model_info["env_vars"].items(): + env[key] = str(value) + print(f" ENV: {key}=") + + # Add env vars from additional_context. + # Log keys only (not values) for consistency with model_info env vars and to avoid + # leaking sensitive values while still showing operators what was applied. + if self.additional_context and "env_vars" in self.additional_context: + for key, value in self.additional_context["env_vars"].items(): + env[key] = str(value) + print(f" ENV: {key}=") + + # Run script with logging + test_start_time = time.time() + self.rich_console.print("\n[bold blue]Running script (self-managed launcher)...[/bold blue]") + + try: + with open(log_file_path, mode="w", buffering=1) as outlog: + with redirect_stdout( + PythonicTee(outlog, self.live_output) + ), redirect_stderr(PythonicTee(outlog, self.live_output)): + print(f"⏰ Setting timeout to {timeout} seconds.") + print(f"🚀 Executing: {cmd}") + print(f"📂 Working directory: {script_dir}") + print(f"{'='*80}") + + # NOTE: shell=True is required because cmd embeds shell features + # (pipes, redirects, env-var substitution) constructed earlier in this + # method. cmd is built from validated model card / manifest fields and + # any user-provided model_args are routed through shlex-quoted assembly + # in the caller — do NOT concatenate raw user input directly into cmd. + result = subprocess.run( # noqa: S602 (shell=True intentional, see comment above) + cmd, + shell=True, + cwd=script_dir, + env=env, + timeout=timeout if timeout > 0 else None, + ) + + run_results["test_duration"] = time.time() - test_start_time + print(f"\n{'='*80}") + print(f"⏱️ Test Duration: {run_results['test_duration']:.2f} seconds") + + if result.returncode == 0: + run_results["status"] = "SUCCESS" + self.rich_console.print("[bold green]✓ Script completed successfully[/bold green]") + else: + run_results["status"] = "FAILURE" + run_results["status_detail"] = f"Exit code {result.returncode}" + self.rich_console.print(f"[bold red]✗ Script failed with exit code {result.returncode}[/bold red]") + raise subprocess.CalledProcessError(result.returncode, cmd) + + except subprocess.TimeoutExpired: + run_results["status"] = "FAILURE" + run_results["status_detail"] = f"Timeout after {timeout}s" + run_results["test_duration"] = time.time() - test_start_time + self.rich_console.print(f"[bold red]✗ Script timed out after {timeout}s[/bold red]") + raise + except Exception as e: + run_results["status"] = "FAILURE" + run_results["status_detail"] = str(e) + run_results["test_duration"] = time.time() - test_start_time + raise + + return run_results + def run_pre_post_script( self, model_docker: Docker, model_dir: str, pre_post: typing.List ) -> None: @@ -1173,6 +1333,35 @@ def run_container( print(f"Docker options: {docker_options}") + # ========== CHECK FOR SELF-MANAGED LAUNCHERS ========== + # slurm_multi launchers run scripts directly on the host, + # not inside a madengine-managed Docker. The script manages its own containers via srun. + launcher = "" + if self.additional_context: + distributed_config = self.additional_context.get("distributed", {}) + launcher = distributed_config.get("launcher", "") + if not launcher and model_info.get("distributed"): + launcher = model_info["distributed"].get("launcher", "") + if not launcher: + launcher = os.environ.get("MAD_LAUNCHER_TYPE", "") + if is_self_managed_launcher(launcher): + self.rich_console.print( + f"\n[bold cyan]🖥️ Self-managed launcher (launcher: {launcher})[/bold cyan]" + ) + self.rich_console.print( + "[dim]Script will manage its own Docker containers via SLURM[/dim]" + ) + return self._run_self_managed( + model_info=model_info, + build_info=build_info, + log_file_path=log_file_path, + timeout=timeout, + run_results=run_results, + pre_encapsulate_post_scripts=pre_encapsulate_post_scripts, + run_env=run_env, + ) + # ========== END SELF-MANAGED CHECK ========== + self.rich_console.print(f"\n[bold blue]🏃 Starting Docker container execution...[/bold blue]") print(f"🏷️ Image: {docker_image}") print(f"📦 Container: {container_name}") @@ -1289,8 +1478,8 @@ def run_container( # Prepare script execution scripts_arg = model_info["scripts"] - if scripts_arg.endswith(".sh"): - # Shell script specified directly + if scripts_arg.endswith(".sh") or scripts_arg.endswith(".slurm"): + # Shell script specified directly (.sh or .slurm for SLURM batch scripts) dir_path = os.path.dirname(scripts_arg) script_name = "bash " + os.path.basename(scripts_arg) elif scripts_arg.endswith(".py"): diff --git a/src/madengine/execution/docker_builder.py b/src/madengine/execution/docker_builder.py index 1ee5d692..4f187e13 100644 --- a/src/madengine/execution/docker_builder.py +++ b/src/madengine/execution/docker_builder.py @@ -366,6 +366,31 @@ def export_build_manifest( "registry" ) + # Update built_models with registry image name for parallel pull in slurm_multi + # Map local image to registry image for env_vars + for image_name, build_info in self.built_images.items(): + registry_image = build_info.get("registry_image") + if not registry_image: + continue + if image_name not in self.built_models: + # built_images and built_models are keyed by docker_image in the + # normal build path (see _build_single_model). If a future code + # path keys them differently, this injection would silently no-op + # and slurm_multi parallel pulls would fall back to the local + # image tag. Surface the mismatch so it's caught early. + self.rich_console.print( + "[yellow]Warning:[/yellow] " + f"No built_models entry found for local image key '{image_name}' " + f"while setting DOCKER_IMAGE_NAME='{registry_image}'. " + "built_images and built_models may be keyed differently." + ) + continue + model_data = self.built_models[image_name] + if "env_vars" not in model_data: + model_data["env_vars"] = {} + # Set DOCKER_IMAGE_NAME to registry image for parallel pull + model_data["env_vars"]["DOCKER_IMAGE_NAME"] = registry_image + manifest = { "built_images": self.built_images, "built_models": self.built_models, diff --git a/src/madengine/orchestration/build_orchestrator.py b/src/madengine/orchestration/build_orchestrator.py index d905f3b4..246701cf 100644 --- a/src/madengine/orchestration/build_orchestrator.py +++ b/src/madengine/orchestration/build_orchestrator.py @@ -10,6 +10,7 @@ import json import os +import shlex from pathlib import Path from typing import Dict, List, Optional @@ -26,6 +27,7 @@ DiscoveryError, create_error_context, ) +from madengine.deployment.common import is_self_managed_launcher from madengine.utils.discover_models import DiscoverModels from madengine.execution.docker_builder import DockerBuilder from madengine.execution.dockerfile_utils import ( @@ -95,6 +97,7 @@ def __init__(self, args, additional_context: Optional[Dict] = None, detect_local apply_build_context_defaults(merged_context) self.additional_context = merged_context + self._original_user_slurm_keys = set(merged_context.get("slurm", {}).keys()) # Apply ConfigLoader to infer deploy type, validate, and apply defaults if self.additional_context: @@ -195,6 +198,8 @@ def execute( clean_cache: bool = False, manifest_output: str = "build_manifest.json", batch_build_metadata: Optional[Dict] = None, + use_image: Optional[str] = None, + build_on_compute: bool = False, ) -> str: """ Execute build workflow. @@ -204,6 +209,8 @@ def execute( clean_cache: Whether to use --no-cache for Docker builds manifest_output: Output file for build manifest batch_build_metadata: Optional batch build metadata + use_image: Pre-built Docker image to use (skip Docker build) + build_on_compute: Build on SLURM compute node instead of login node Returns: Path to generated build_manifest.json @@ -212,16 +219,113 @@ def execute( DiscoveryError: If model discovery fails BuildError: If Docker build fails """ + # --use-image and --build-on-compute are mutually exclusive + if use_image and build_on_compute: + raise ConfigurationError( + "--use-image and --build-on-compute cannot be used together", + context=create_error_context( + operation="build", + component="BuildOrchestrator", + ), + suggestions=[ + "Use --use-image to skip build and use an existing image", + "Use --build-on-compute to build on a compute node and push to registry", + ], + ) + + # Handle pre-built image mode + if use_image: + # If use_image is "auto", resolve from model card + if use_image == "auto": + use_image = self._resolve_image_from_model_card() + + return self._execute_with_prebuilt_image( + use_image=use_image, + manifest_output=manifest_output, + ) + + # Handle build-on-compute mode + if build_on_compute: + return self._execute_build_on_compute( + registry=registry, + clean_cache=clean_cache, + manifest_output=manifest_output, + batch_build_metadata=batch_build_metadata, + ) + + # For normal build: check if slurm_multi launcher requires registry. + # Discover models once here and reuse below for the actual build, so we + # don't repeat dynamic get_models_json.py execution, MODEL_DIR copies, + # or duplicate "Selected Models" output. + _discovered_models = None + _discovery_error: Optional[Exception] = None + try: + _disc = DiscoverModels(args=self.args) + _discovered_models = _disc.run() + except Exception as e: + _discovery_error = e + _discovered_models = [] + + if _discovered_models: + # When the discovered model card already declares an image via + # env_vars.DOCKER_IMAGE_NAME, treat it as an implicit --use-image so + # users do not have to repeat the image on the CLI for slurm_multi. + slurm_multi_models = [ + m for m in _discovered_models + if is_self_managed_launcher((m.get("distributed") or {}).get("launcher", "")) + ] + if slurm_multi_models and not registry: + card_images = { + (m.get("env_vars") or {}).get("DOCKER_IMAGE_NAME", "") + for m in slurm_multi_models + if (m.get("env_vars") or {}).get("DOCKER_IMAGE_NAME") + } + if len(card_images) == 1: + implicit_image = next(iter(card_images)) + self.rich_console.print( + f"[dim]slurm_multi: no --registry/--use-image given; " + f"using DOCKER_IMAGE_NAME from model card -> {implicit_image}[/dim]" + ) + return self._execute_with_prebuilt_image( + use_image=implicit_image, + manifest_output=manifest_output, + ) + # No card image (or divergent images across models): fall through to error. + for model in _discovered_models: + launcher = (model.get("distributed") or {}).get("launcher", "") + if is_self_managed_launcher(launcher) and not registry: + model_name = model.get("name", "unknown") + raise ConfigurationError( + "slurm_multi launcher requires --registry or --use-image", + context=create_error_context( + operation="build", + component="BuildOrchestrator", + model_name=model_name, + additional_info={"launcher": launcher}, + ), + suggestions=[ + "Use --registry docker.io/myorg to push image (nodes will pull in parallel)", + "Use --use-image to use a pre-built image from registry", + "Use --build-on-compute --registry to build on compute and push", + "Or set DOCKER_IMAGE_NAME in the model card env_vars (auto-detected for slurm_multi)", + ], + ) + self.rich_console.print(f"\n[dim]{'=' * 60}[/dim]") self.rich_console.print("[bold blue]🔨 BUILD PHASE[/bold blue]") self.rich_console.print("[yellow](Build-only mode - no GPU detection)[/yellow]") self.rich_console.print(f"[dim]{'=' * 60}[/dim]\n") try: - # Step 1: Discover models + # Step 1: Discover models (reuse early discovery; retry only if it + # failed or returned nothing, so a real discovery error still surfaces + # as DiscoveryError rather than a swallowed Exception). self.rich_console.print("[bold cyan]🔍 Discovering models...[/bold cyan]") - discover_models = DiscoverModels(args=self.args) - models = discover_models.run() + if _discovered_models: + models = _discovered_models + else: + discover_models = DiscoverModels(args=self.args) + models = discover_models.run() if not models: raise DiscoveryError( @@ -229,6 +333,11 @@ def execute( context=create_error_context( operation="discover_models", component="BuildOrchestrator", + additional_info=( + {"early_discovery_error": str(_discovery_error)} + if _discovery_error + else None + ), ), suggestions=[ "Check if models.json exists", @@ -356,6 +465,810 @@ def execute( ], ) from e + def _execute_with_prebuilt_image( + self, + use_image: str, + manifest_output: str = "build_manifest.json", + ) -> str: + """ + Generate manifest for a pre-built Docker image (skip Docker build). + + This is useful when using external images like: + - lmsysorg/sglang:v0.5.2rc1-rocm700-mi30x + - nvcr.io/nvidia/pytorch:24.01-py3 + + Args: + use_image: Pre-built Docker image name + manifest_output: Output file for build manifest + + Returns: + Path to generated build_manifest.json + """ + self.rich_console.print(f"\n[dim]{'=' * 60}[/dim]") + self.rich_console.print("[bold blue]🔨 BUILD PHASE (Pre-built Image Mode)[/bold blue]") + self.rich_console.print(f"[cyan]Using pre-built image: {use_image}[/cyan]") + self.rich_console.print(f"[dim]{'=' * 60}[/dim]\n") + + try: + # Step 1: Discover models + self.rich_console.print("[bold cyan]🔍 Discovering models...[/bold cyan]") + discover_models = DiscoverModels(args=self.args) + models = discover_models.run() + + if not models: + raise DiscoveryError( + "No models discovered", + context=create_error_context( + operation="discover_models", + component="BuildOrchestrator", + ), + suggestions=[ + "Check if models.json exists", + "Verify --tags parameter is correct", + ], + ) + + self.rich_console.print(f"[green]✓ Found {len(models)} models[/green]\n") + + # Step 2: Generate manifest with pre-built image + self.rich_console.print("[bold cyan]📄 Generating manifest for pre-built image...[/bold cyan]") + + manifest = { + # built_images and built_models MUST share the same key set so + # ContainerRunner.run_models_from_manifest() can join them via + # `built_models.get(image_name, {})`. Key both by model_name and + # write one built_images entry per model (all pointing at the + # same pre-built use_image) so multi-model --use-image runs work. + "built_images": {}, + "built_models": {}, + "context": self.context.ctx if hasattr(self.context, 'ctx') else {}, + "credentials_required": [], + "summary": { + "successful_builds": [], + "failed_builds": [], + "total_build_time": 0, + "successful_pushes": [], + "failed_pushes": [], + }, + } + + for model in models: + model_name = model.get("name", "unknown") + model_distributed = model.get("distributed", {}) + + # Merge DOCKER_IMAGE_NAME into env_vars for parallel pull in run phase + model_env_vars = model.get("env_vars", {}).copy() + model_env_vars["DOCKER_IMAGE_NAME"] = use_image + + # `local_image: True` routes through the local-image branch in + # ContainerRunner.run_models_from_manifest(), which uses + # build_info["docker_image"] as run_image (and pulls it if not + # present locally). Without this, the else branch would set + # run_image = image_name -- which is now keyed by model_name -- + # and _resolve_docker_image() would fail to find any such image. + manifest["built_images"][model_name] = { + "image_name": use_image, + "docker_image": use_image, + "local_image": True, + "dockerfile": "", + "build_time": 0, + "prebuilt": True, + } + + manifest["built_models"][model_name] = { + "name": model_name, + "image": use_image, + "docker_image": use_image, + "dockerfile": model.get("dockerfile", ""), + "scripts": model.get("scripts", ""), + "data": model.get("data", ""), + "n_gpus": model.get("n_gpus", "8"), + "owner": model.get("owner", ""), + "training_precision": model.get("training_precision", ""), + "multiple_results": model.get("multiple_results", ""), + "tags": model.get("tags", []), + "timeout": model.get("timeout", -1), + "args": model.get("args", ""), + "slurm": model.get("slurm", {}), + "distributed": model_distributed, + "env_vars": model_env_vars, + "prebuilt": True, + } + manifest["summary"]["successful_builds"].append(model_name) + + # Save manifest + with open(manifest_output, "w") as f: + json.dump(manifest, f, indent=2) + + # Save deployment config + self._save_deployment_config(manifest_output) + + # Merge model's distributed and slurm config into deployment_config + # This ensures launcher and slurm settings are in deployment_config even if not in additional-context + if models: + with open(manifest_output, "r") as f: + saved_manifest = json.load(f) + + if "deployment_config" not in saved_manifest: + saved_manifest["deployment_config"] = {} + + # Merge model's distributed config from the first model. + # If multiple models have differing distributed configs, warn — only the first wins here. + # Use json.dumps for the hash key so nested dicts (e.g. sglang_disagg / vllm_disagg) + # don't trigger TypeError: unhashable type: 'dict' from `tuple(sorted(items()))`. + if len(models) > 1: + distinct_distributed = { + json.dumps(m.get("distributed") or {}, sort_keys=True, default=str) + for m in models + } + if len(distinct_distributed) > 1: + self.rich_console.print( + "[yellow]Warning: discovered models have differing distributed configs; " + f"using {models[0].get('name', '')}'s config.[/yellow]" + ) + model_distributed = models[0].get("distributed", {}) + if model_distributed: + if "distributed" not in saved_manifest["deployment_config"]: + saved_manifest["deployment_config"]["distributed"] = {} + + # Copy launcher and other critical fields from model config + for key in ["launcher", "nnodes", "nproc_per_node", "backend", "port", "sglang_disagg", "vllm_disagg"]: + if key in model_distributed and key not in saved_manifest["deployment_config"]["distributed"]: + saved_manifest["deployment_config"]["distributed"][key] = model_distributed[key] + + # Merge model's slurm config into deployment_config.slurm from the first model. + # This enables run phase to auto-detect SLURM deployment without --additional-context. + # Warn when multiple models have differing slurm configs (only the first wins here). + # json.dumps key for the same unhashable-nested-dict reason as above. + if len(models) > 1: + distinct_slurm = { + json.dumps(m.get("slurm") or {}, sort_keys=True, default=str) + for m in models + } + if len(distinct_slurm) > 1: + self.rich_console.print( + "[yellow]Warning: discovered models have differing slurm configs; " + f"using {models[0].get('name', '')}'s config.[/yellow]" + ) + model_slurm = models[0].get("slurm", {}) + if model_slurm: + if "slurm" not in saved_manifest["deployment_config"]: + saved_manifest["deployment_config"]["slurm"] = {} + + # Copy slurm settings from model config (model card fills in + # values not explicitly set by --additional-context). + # Use _original_user_slurm_keys (captured before ConfigLoader + # applies defaults) so model card values override defaults + # but user's explicit CLI values still win. + for key in ["partition", "nodes", "gpus_per_node", "time", "exclusive", "reservation", "output_dir", "nodelist"]: + if key in model_slurm and key not in self._original_user_slurm_keys: + saved_manifest["deployment_config"]["slurm"][key] = model_slurm[key] + + with open(manifest_output, "w") as f: + json.dump(saved_manifest, f, indent=2) + + self.rich_console.print(f"[green]✓ Generated manifest: {manifest_output}[/green]") + self.rich_console.print(f" Pre-built image: {use_image}") + self.rich_console.print(f" Models: {len(models)}") + self.rich_console.print(f"[dim]{'=' * 60}[/dim]\n") + + return manifest_output + + except (DiscoveryError, BuildError): + raise + except Exception as e: + raise BuildError( + f"Failed to generate manifest for pre-built image: {e}", + context=create_error_context( + operation="prebuilt_manifest", + component="BuildOrchestrator", + ), + ) from e + + def _resolve_image_from_model_card(self) -> str: + """ + Resolve Docker image name from model card's DOCKER_IMAGE_NAME env var. + + This method discovers models and extracts the DOCKER_IMAGE_NAME from + env_vars. If multiple models have different images, uses the first + and prints a warning. + + Returns: + Docker image name from model card + + Raises: + ConfigurationError: If no DOCKER_IMAGE_NAME found in any model + """ + self.rich_console.print("[bold cyan]🔍 Auto-detecting image from model card...[/bold cyan]") + + # Discover models to get their env_vars + discover_models = DiscoverModels(args=self.args) + models = discover_models.run() + + if not models: + raise ConfigurationError( + "No models discovered for image auto-detection", + context=create_error_context( + operation="resolve_image", + component="BuildOrchestrator", + ), + suggestions=[ + "Specify image name explicitly with --use-image ", + "Check if models.json exists", + "Verify --tags parameter is correct", + ], + ) + + # Collect DOCKER_IMAGE_NAME from all models + images_found = {} + for model in models: + model_name = model.get("name", "unknown") + env_vars = model.get("env_vars", {}) + docker_image = env_vars.get("DOCKER_IMAGE_NAME") + + if docker_image: + images_found[model_name] = docker_image + + if not images_found: + model_names = [m.get("name", "unknown") for m in models] + raise ConfigurationError( + "No DOCKER_IMAGE_NAME found in model card env_vars", + context=create_error_context( + operation="resolve_image", + component="BuildOrchestrator", + additional_info={"model_names": model_names}, + ), + suggestions=[ + "Add DOCKER_IMAGE_NAME to model's env_vars in models.json", + "Specify image name explicitly with --use-image ", + 'Example: "env_vars": {"DOCKER_IMAGE_NAME": "myimage:tag"}', + ], + ) + + # Use first model's image + first_model = list(images_found.keys())[0] + resolved_image = images_found[first_model] + + # Warn if multiple models have different images + unique_images = set(images_found.values()) + if len(unique_images) > 1: + self.rich_console.print( + f"[yellow]⚠️ Warning: Multiple models have different DOCKER_IMAGE_NAME values:[/yellow]" + ) + for model_name, image in images_found.items(): + self.rich_console.print(f" - {model_name}: {image}") + self.rich_console.print( + f"[yellow] Using image from '{first_model}': {resolved_image}[/yellow]\n" + ) + else: + self.rich_console.print(f"[green]✓ Auto-detected image: {resolved_image}[/green]\n") + + return resolved_image + + def _execute_build_on_compute( + self, + registry: Optional[str] = None, + clean_cache: bool = False, + manifest_output: str = "build_manifest.json", + batch_build_metadata: Optional[Dict] = None, + ) -> str: + """ + Execute Docker build on a SLURM compute node and push to registry. + + Build workflow: + 1. Build on 1 compute node only + 2. Push image to registry + 3. Store registry image name in manifest + 4. Run phase will pull image in parallel on all nodes + + Args: + registry: Registry to push images to (REQUIRED) + clean_cache: Whether to use --no-cache for Docker builds + manifest_output: Output file for build manifest + batch_build_metadata: Optional batch build metadata + + Returns: + Path to generated build_manifest.json + """ + import subprocess + import os + import glob + + self.rich_console.print(f"\n[dim]{'=' * 60}[/dim]") + self.rich_console.print("[bold blue]🔨 BUILD PHASE (Compute Node Mode)[/bold blue]") + self.rich_console.print("[cyan]Building on 1 compute node, pushing to registry...[/cyan]") + self.rich_console.print(f"[dim]{'=' * 60}[/dim]\n") + + # registry is required for the build-on-compute flow (it must be pushed somewhere + # so the run phase on other nodes can pull it). The signature accepts Optional[str] + # to keep the call-site signature flexible, but we must reject None up front. + if not registry: + raise ConfigurationError( + "Registry is required for --build-on-compute (image must be pushed for run-phase pull).", + context=create_error_context( + operation="build_on_compute", + component="BuildOrchestrator", + ), + suggestions=[ + "Pass --registry on the build CLI", + 'Or set "registry" in the model card / additional-context', + ], + ) + + # Normalize and validate --registry input shape. + # + # Downstream code derives the registry HOST from `registry.split("/")[0]` + # (used as the `docker login ` argument). Users routinely pass + # Dockerhub-shorthand like `rocm/pytorch-private` without the + # `docker.io/` prefix, and that would `docker login rocm` -> DNS NXDOMAIN. + # + # Auto-prepend `docker.io/` when the first path segment doesn't look + # like an FQDN/host:port. Then validate the resulting first segment is + # a plausible host (contains '.' or ':' or is 'localhost'), and reject + # otherwise with an actionable error. + _registry_invalid_msg = lambda r, fs: ConfigurationError( # noqa: E731 + f"Invalid --registry value: {r!r}. " + + ( + f"First segment '{fs}' is not a valid registry host." + if fs + else "Registry value is empty after whitespace/trailing-slash trim." + ), + context=create_error_context( + operation="build_on_compute", + component="BuildOrchestrator", + additional_info={"registry": r}, + ), + suggestions=[ + 'Dockerhub: --registry docker.io/(/)', + 'GHCR: --registry ghcr.io/(/)', + 'Quay: --registry quay.io/(/)', + 'NGC: --registry nvcr.io/(/)', + 'Self-hosted: --registry (:)(/)', + 'Local: --registry localhost:5000(/)', + ], + ) + normalized = registry.strip().rstrip("/") + if not normalized: + raise _registry_invalid_msg(registry, "") + first_seg = normalized.split("/", 1)[0] + # Reject blatantly invalid characters BEFORE the auto-prepend step, + # because auto-prepend would overwrite first_seg to "docker.io" and + # mask path-side garbage. `@` and whitespace are never valid in a + # Dockerhub-style namespace/repo; subtler cases (uppercase letters, + # other illegal chars deeper in the path) get caught later by + # `docker push` itself. + if " " in normalized or "@" in normalized: + raise _registry_invalid_msg(registry, first_seg) + looks_like_host = ( + "." in first_seg or ":" in first_seg or first_seg == "localhost" + ) + if not looks_like_host: + # Treat as Dockerhub shorthand: prepend docker.io/. + self.rich_console.print( + f" [dim]Registry: {registry!r} has no host segment; " + f"auto-prefixing 'docker.io/' (treat as Dockerhub).[/dim]" + ) + normalized = f"docker.io/{normalized}" + first_seg = "docker.io" + if not first_seg: + raise _registry_invalid_msg(registry, first_seg) + if normalized != registry: + registry = normalized + + # Discover models first to get SLURM config from model card + self.rich_console.print("[bold cyan]🔍 Discovering models...[/bold cyan]") + discover_models = DiscoverModels(args=self.args) + models = discover_models.run() + + if not models: + raise DiscoveryError( + "No models discovered for build-on-compute", + context=create_error_context( + operation="build_on_compute", + component="BuildOrchestrator", + ), + suggestions=[ + "Check if models.json exists", + "Verify --tags parameter is correct", + ], + ) + + # SLURM config is derived from the first model card + --additional-context overrides. + # All models are built in a single sbatch job, so one SLURM config applies to all. + first_model = models[0] + model_slurm_config = first_model.get("slurm", {}) + context_slurm_config = self.additional_context.get("slurm", {}) + slurm_config = {**model_slurm_config, **context_slurm_config} + + self.rich_console.print(f"[green]✓ Found {len(models)} model(s)[/green]\n") + self.rich_console.print("[bold cyan]📋 SLURM Configuration (merged):[/bold cyan]") + if model_slurm_config: + self.rich_console.print(f" [dim]From model card:[/dim] {list(model_slurm_config.keys())}") + if context_slurm_config: + self.rich_console.print(f" [dim]From --additional-context (overrides):[/dim] {list(context_slurm_config.keys())}") + + # Validate required fields + partition = slurm_config.get("partition") + if not partition: + raise ConfigurationError( + "Missing required SLURM field: partition", + context=create_error_context( + operation="build_on_compute", + component="BuildOrchestrator", + ), + suggestions=[ + 'Add "partition" to model card\'s slurm section', + 'Or specify via --additional-context \'{"slurm": {"partition": "gpu"}}\'', + ], + ) + + reservation = slurm_config.get("reservation", "") + time_limit = slurm_config.get("time", "02:00:00") + + self.rich_console.print(f" Partition: {partition}") + self.rich_console.print(f" Time limit: {time_limit}") + if reservation: + self.rich_console.print(f" Reservation: {reservation}") + self.rich_console.print("") + + # Validate registry credentials + self.rich_console.print("[bold cyan]🔐 Registry Configuration:[/bold cyan]") + self.rich_console.print(f" Registry: {registry}") + + # Check for credentials - either from environment or credential.json + dockerhub_user = os.environ.get("MAD_DOCKERHUB_USER", "") + dockerhub_password = os.environ.get("MAD_DOCKERHUB_PASSWORD", "") + + # Try to load from credential.json if env vars not set + credential_file = Path("credential.json") + if not dockerhub_user and credential_file.exists(): + try: + with open(credential_file) as f: + creds = json.load(f) + dockerhub_creds = creds.get("dockerhub", {}) + dockerhub_user = dockerhub_creds.get("username", "") + dockerhub_password = dockerhub_creds.get("password", "") + if dockerhub_user: + self.rich_console.print(f" Credentials: Found in credential.json") + except (json.JSONDecodeError, IOError) as e: + self.rich_console.print(f" [yellow]Warning: Could not read credential.json: {e}[/yellow]") + elif dockerhub_user: + self.rich_console.print(f" Credentials: Found in environment (MAD_DOCKERHUB_USER)") + + # Determine if registry requires authentication + public_registries = ["docker.io", "ghcr.io", "gcr.io", "quay.io", "nvcr.io"] + registry_lower = registry.lower() if registry else "" + + # For docker.io pushes, authentication is always required + # Per-registry guidance for the missing-credentials error message. + # Today only Docker Hub credentials (MAD_DOCKERHUB_USER/PASSWORD or credential.json) + # are wired into this code path, but the error suggestion should at least name the + # right token type for ghcr.io / quay.io / nvcr.io / gcr.io users. + _registry_hints = { + "docker.io": [ + "Set environment variables: MAD_DOCKERHUB_USER and MAD_DOCKERHUB_PASSWORD", + 'Or create credential.json: {"dockerhub": {"username": "...", "password": "..."}}', + "For Docker Hub, use a Personal Access Token (PAT) as password", + "Example: export MAD_DOCKERHUB_USER=myuser", + "Example: export MAD_DOCKERHUB_PASSWORD=dckr_pat_xxxxx", + ], + "ghcr.io": [ + "GitHub Container Registry: use a GitHub PAT with read:packages (and write:packages to push)", + "Set MAD_DOCKERHUB_USER=, MAD_DOCKERHUB_PASSWORD=", + ], + "gcr.io": [ + "Google Container Registry: use a service-account JSON key as password", + "Set MAD_DOCKERHUB_USER=_json_key, MAD_DOCKERHUB_PASSWORD=\"$(cat key.json)\"", + ], + "quay.io": [ + "Quay.io: use a robot account or encrypted password", + "Set MAD_DOCKERHUB_USER=, MAD_DOCKERHUB_PASSWORD=", + ], + "nvcr.io": [ + "NVIDIA NGC: use $oauthtoken as username and an NGC API key as password", + "Set MAD_DOCKERHUB_USER=\\$oauthtoken, MAD_DOCKERHUB_PASSWORD=", + ], + } + _matched_hints = next( + (hints for reg_key, hints in _registry_hints.items() if reg_key in registry_lower), + _registry_hints["docker.io"], + ) + if any(pub_reg in registry_lower for pub_reg in public_registries): + if not dockerhub_user or not dockerhub_password: + raise ConfigurationError( + f"Registry credentials required for pushing to {registry}", + context=create_error_context( + operation="build_on_compute", + component="BuildOrchestrator", + additional_info={"registry": registry}, + ), + suggestions=_matched_hints, + ) + self.rich_console.print(f" Auth: Will login to registry before push") + else: + # Private/internal registry - may not need auth + self.rich_console.print(f" Auth: Private registry (auth may not be required)") + + self.rich_console.print("") + + # Check if we're inside an existing allocation + inside_allocation = os.environ.get("SLURM_JOB_ID") is not None + existing_job_id = os.environ.get("SLURM_JOB_ID", "") + + # Determine registry host for docker login (shared across all models) + registry_host = registry.split("/")[0] if "/" in registry else registry + registry_parts = registry.replace("docker.io/", "").split("/") + + # Collect per-model build data (Dockerfile, image names) for all discovered models + per_model_data: List[Dict] = [] + self.rich_console.print("[bold cyan]🐳 Docker Configuration:[/bold cyan]") + for model in models: + model_name = model.get("name", "unknown") + dockerfile = model.get("dockerfile", "") + dockerfile_path = "" + for pattern in [ + f"{dockerfile}.ubuntu.amd.Dockerfile", + f"{dockerfile}.Dockerfile", + f"{dockerfile}", + ]: + matches = glob.glob(pattern) + if matches: + dockerfile_path = matches[0] + break + if not dockerfile_path: + raise ConfigurationError( + f"Dockerfile not found for model {model_name}", + context=create_error_context( + operation="build_on_compute", + component="BuildOrchestrator", + additional_info={"dockerfile": dockerfile}, + ), + suggestions=[ + f"Check if {dockerfile}.ubuntu.amd.Dockerfile exists", + "Verify the dockerfile path in models.json", + ], + ) + # Docker repository names must be lowercase; cover the edge case where the + # Dockerfile filename is just `Dockerfile` (no suffix to strip). + dockerfile_basename = ( + Path(dockerfile_path).name.replace(".Dockerfile", "").replace(".ubuntu.amd", "") + ) + local_image_name = f"ci-{model_name}_{dockerfile_basename}".lower() + if len(registry_parts) >= 2: + registry_image_name = f"{registry}:{model_name}" + else: + registry_image_name = f"{registry}/{model_name}:latest" + self.rich_console.print(f" {model_name}: {dockerfile_path} -> {registry_image_name}") + per_model_data.append({ + "model": model, + "model_name": model_name, + "dockerfile_path": dockerfile_path, + "local_image_name": local_image_name, + "registry_image_name": registry_image_name, + }) + self.rich_console.print("") + + # Shell-quote values that end up inside bash commands (defense-in-depth, + # consistent with _run_self_managed and the v2.0.3 shlex hardening pass). + registry_host_q = shlex.quote(str(registry_host)) + cwd_q = shlex.quote(str(Path.cwd().absolute())) + no_cache_flag = "--no-cache" if clean_cache else "" + + # Build one build+tag+push block per model, all in a single sbatch job. + build_steps = "" + for idx, pmd in enumerate(per_model_data, start=1): + # All values destined for the bash script are shell-quoted, including + # those embedded only in echo lines: a `"` or `$` in model_name or + # dockerfile_path would otherwise break the surrounding bash string + # or trigger expansion. The leading comment line is the only raw + # interpolation; `#` starts a shell comment so its contents are + # ignored by bash regardless of metacharacters. + local_q = shlex.quote(str(pmd["local_image_name"])) + reg_img_q = shlex.quote(str(pmd["registry_image_name"])) + df_q = shlex.quote(str(pmd["dockerfile_path"])) + model_q = shlex.quote(str(pmd["model_name"])) + build_steps += f""" +# --- Model {idx}/{len(per_model_data)} --- +echo "=== Building" {model_q} "===" +echo "Dockerfile:" {df_q} +echo "Local image:" {local_q} +docker build --network=host -t {local_q} {no_cache_flag} --pull -f {df_q} ./docker +BUILD_RC=$? +if [ $BUILD_RC -ne 0 ]; then + echo "❌ Docker build FAILED for" {model_q} "(exit $BUILD_RC)" + exit $BUILD_RC +fi +echo "✅ Build OK:" {model_q} +echo "Tagging:" {local_q} "->" {reg_img_q} +docker tag {local_q} {reg_img_q} +echo "Pushing:" {reg_img_q} +docker push {reg_img_q} +PUSH_RC=$? +if [ $PUSH_RC -ne 0 ]; then + echo "❌ Docker push FAILED for" {model_q} "(exit $PUSH_RC)" + exit $PUSH_RC +fi +echo "✅ Push OK:" {reg_img_q} +echo "" +""" + + # Build script content — runs on 1 compute node, builds+pushes all models + build_script_content = f"""#!/bin/bash +#SBATCH --job-name=madengine-build +#SBATCH --partition={partition} +#SBATCH --nodes=1 +#SBATCH --ntasks=1 +#SBATCH --time={time_limit} +{f'#SBATCH --reservation={reservation}' if reservation else ''} +#SBATCH --output=madengine_build_%j.out +#SBATCH --error=madengine_build_%j.err + +echo "============================================================" +echo "=== MADENGINE BUILD ON COMPUTE NODE ===" +echo "============================================================" +echo "Job ID: $SLURM_JOB_ID" +echo "Build Node: $(hostname)" +echo "Registry: {registry}" +echo "" + +# Change to submission directory +cd {cwd_q} + +# Step 0: Docker login for registry push +echo "=== Step 0: Docker Registry Authentication ===" +DOCKER_USER="${{MAD_DOCKERHUB_USER:-}}" +DOCKER_PASS="${{MAD_DOCKERHUB_PASSWORD:-}}" + +if [ -z "$DOCKER_USER" ] && [ -f "credential.json" ]; then + echo "Reading credentials from credential.json..." + DOCKER_USER=$(python3 -c "import json; print(json.load(open('credential.json')).get('dockerhub', {{}}).get('username', ''))" 2>/dev/null || echo "") + DOCKER_PASS=$(python3 -c "import json; print(json.load(open('credential.json')).get('dockerhub', {{}}).get('password', ''))" 2>/dev/null || echo "") +fi + +if [ -n "$DOCKER_USER" ] && [ -n "$DOCKER_PASS" ]; then + echo "Logging in as $DOCKER_USER..." + echo "$DOCKER_PASS" | docker login {registry_host_q} -u "$DOCKER_USER" --password-stdin + LOGIN_RC=$? + if [ $LOGIN_RC -ne 0 ]; then + echo "❌ Docker login FAILED (exit $LOGIN_RC)" + exit $LOGIN_RC + fi + echo "✅ Docker login SUCCESS" +else + echo "No credentials found - assuming public registry or pre-authenticated" +fi +echo "" + +# Steps 1-N: Build, tag, and push each model image +{build_steps} +echo "============================================================" +echo "✅ ALL BUILDS COMPLETE" +echo "============================================================" +echo "Build Node: $(hostname)" +echo "Run phase will pull images in parallel on all nodes." +echo "============================================================" + +exit 0 +""" + + build_script_path = Path("madengine_build_job.sh") + build_script_path.write_text(build_script_content) + build_script_path.chmod(0o755) + + if inside_allocation: + self.rich_console.print(f"[cyan]Running build via srun (inside allocation {existing_job_id})...[/cyan]") + cmd = ["srun", "-N1", "--ntasks=1", "bash", str(build_script_path)] + else: + self.rich_console.print("[cyan]Submitting build job via sbatch...[/cyan]") + cmd = ["sbatch", "--wait", str(build_script_path)] + + self.rich_console.print(f" Build script: {build_script_path}") + self.rich_console.print(f" Command: {' '.join(cmd)}") + self.rich_console.print("") + + try: + result = subprocess.run( + cmd, + capture_output=False, + text=True, + ) + + if result.returncode != 0: + raise BuildError( + f"Build on compute node failed with exit code {result.returncode}", + context=create_error_context( + operation="build_on_compute", + component="BuildOrchestrator", + ), + suggestions=[ + "Check the build log files (madengine_build_*.out/err)", + "Verify SLURM partition and reservation settings", + "Ensure Docker is available on compute nodes", + "Verify registry credentials are configured", + ], + ) + + # Generate manifest keyed by model_name — same shape as _execute_with_prebuilt_image + # so ContainerRunner.run_models_from_manifest() can join via built_models.get(key). + self.rich_console.print(f"\n[bold cyan]📄 Generating manifest...[/bold cyan]") + + built_images: Dict = {} + built_models: Dict = {} + for pmd in per_model_data: + mn = pmd["model_name"] + rim = pmd["registry_image_name"] + m = pmd["model"] + built_images[mn] = { + "image_name": rim, + "docker_image": rim, + "local_image": pmd["local_image_name"], + "dockerfile": pmd["dockerfile_path"], + "build_time": 0, + "built_on_compute": True, + "registry": registry, + } + built_models[mn] = { + "name": mn, + "image": rim, + "docker_image": rim, + "dockerfile": pmd["dockerfile_path"], + "scripts": m.get("scripts", ""), + "data": m.get("data", ""), + "n_gpus": m.get("n_gpus", "8"), + "tags": m.get("tags", []), + "slurm": slurm_config, + "distributed": m.get("distributed", {}), + "env_vars": {**m.get("env_vars", {}), "DOCKER_IMAGE_NAME": rim}, + "built_on_compute": True, + } + + manifest = { + "built_images": built_images, + "built_models": built_models, + "context": self.context.ctx if hasattr(self.context, "ctx") else {}, + "deployment_config": { + "slurm": slurm_config, + "distributed": first_model.get("distributed", {}), + }, + "credentials_required": [], + "summary": { + "successful_builds": list(built_models.keys()), + "failed_builds": [], + "total_build_time": 0, + "successful_pushes": [pmd["registry_image_name"] for pmd in per_model_data], + "failed_pushes": [], + }, + } + + with open(manifest_output, "w") as f: + json.dump(manifest, f, indent=2) + + self.rich_console.print(f"[green]✓ Build completed on compute node[/green]") + for pmd in per_model_data: + self.rich_console.print(f"[green]✓ Image pushed: {pmd['registry_image_name']}[/green]") + self.rich_console.print(f"[green]✓ Manifest: {manifest_output}[/green]") + self.rich_console.print(f"[dim]{'=' * 60}[/dim]\n") + + return manifest_output + + except subprocess.TimeoutExpired: + raise BuildError( + "Build on compute node timed out", + context=create_error_context( + operation="build_on_compute", + component="BuildOrchestrator", + ), + ) + except (DiscoveryError, ConfigurationError, BuildError): + raise + except Exception as e: + raise BuildError( + f"Failed to build on compute node: {e}", + context=create_error_context( + operation="build_on_compute", + component="BuildOrchestrator", + ), + ) from e def _save_build_summary(self, manifest_file: str, build_summary: Dict): """Save build summary to manifest for display purposes.""" try: diff --git a/tests/e2e/test_build_workflows.py b/tests/e2e/test_build_workflows.py index 9cc74438..79422558 100644 --- a/tests/e2e/test_build_workflows.py +++ b/tests/e2e/test_build_workflows.py @@ -67,7 +67,9 @@ class TestCLIFeatures: """Test various CLI features and command-line argument behaviors.""" @pytest.mark.parametrize( - "clean_test_temp_files", [["perf_test.csv", "perf_test.html"]], indirect=True + "clean_test_temp_files", + [DEFAULT_CLEAN_FILES + ["perf_test.csv", "perf_test.html"]], + indirect=True, ) def test_output_commandline_argument_writes_csv_correctly( self, global_data, clean_test_temp_files @@ -101,7 +103,9 @@ def test_output_commandline_argument_writes_csv_correctly( @requires_gpu("skip_gpu_arch filtering requires GPU hardware to detect current architecture") @pytest.mark.parametrize( - "clean_test_temp_files", [["perf_test.csv", "perf_test.html"]], indirect=True + "clean_test_temp_files", + [DEFAULT_CLEAN_FILES + ["perf_test.csv", "perf_test.html"]], + indirect=True, ) def test_commandline_argument_skip_gpu_arch( self, global_data, clean_test_temp_files, dynamic_skip_gpu_arch_model_dir @@ -126,7 +130,9 @@ def test_commandline_argument_skip_gpu_arch( @requires_gpu("skip_gpu_arch filtering requires GPU hardware to detect current architecture") @pytest.mark.parametrize( - "clean_test_temp_files", [["perf_test.csv", "perf_test.html"]], indirect=True + "clean_test_temp_files", + [DEFAULT_CLEAN_FILES + ["perf_test.csv", "perf_test.html"]], + indirect=True, ) def test_commandline_argument_disable_skip_gpu_arch_fail( self, global_data, clean_test_temp_files, dynamic_skip_gpu_arch_model_dir @@ -150,7 +156,9 @@ def test_commandline_argument_disable_skip_gpu_arch_fail( pytest.fail("Disable skipping gpu arch for running model is failed.") @pytest.mark.parametrize( - "clean_test_temp_files", [["perf_test.csv", "perf_test.html"]], indirect=True + "clean_test_temp_files", + [DEFAULT_CLEAN_FILES + ["perf_test.csv", "perf_test.html"]], + indirect=True, ) def test_output_multi_results(self, global_data, clean_test_temp_files): """ diff --git a/tests/fixtures/utils.py b/tests/fixtures/utils.py index 53cd6938..fae8d901 100644 --- a/tests/fixtures/utils.py +++ b/tests/fixtures/utils.py @@ -364,8 +364,23 @@ def get_num_cpus() -> int: # E2E test helpers (run command, perf CSV, log path, timeout from log) # ============================================================================= -# Default list of perf output files to clean before/after e2e tests. -DEFAULT_CLEAN_FILES = ["perf.csv", "perf.html"] +# Default list of files to clean before/after e2e tests. +# Includes perf outputs AND build_manifest.json + perf_super/entry siblings, +# since `madengine run` will silently reuse a stale build_manifest.json from a +# previous test if it is left in cwd, causing later tests to execute the wrong +# image. +DEFAULT_CLEAN_FILES = [ + "perf.csv", + "perf.html", + "build_manifest.json", + "perf_super.json", + "perf_super.csv", + "perf_entry.csv", + "perf_entry.json", + "perf_entry_super.csv", + "perf_entry_super.json", + "perf_dummy.csv", +] def build_run_command(tags, extra_args="", output_file=None, additional_context=None): diff --git a/tests/unit/test_slurm_multi.py b/tests/unit/test_slurm_multi.py new file mode 100644 index 00000000..872450c3 --- /dev/null +++ b/tests/unit/test_slurm_multi.py @@ -0,0 +1,526 @@ +#!/usr/bin/env python3 +""" +Unit tests for the slurm_multi launcher. + +Locks in contract points so future refactors fail loudly if they regress: +1. `slurm_multi` is registered in `VALID_LAUNCHERS`. +2. `slurm-multi` (hyphen alias) normalizes to `slurm_multi`. +3. The wrapper SBATCH script generated by `_prepare_slurm_multi_script` + exports every `model_info.env_vars` declared by the model card + (mirrors the `pyt_sglang_disagg_qwen3-32b_short` entry from MAD-private PR #186). +4. The wrapper bash invocation is wrapped in `set +e` / `set -e` so a non-zero + exit from the model script does not skip the completion-marker write. +5. `_execute_with_prebuilt_image` produces a manifest where `built_images.keys() + == built_models.keys()` so `ContainerRunner.run_models_from_manifest()` joins + them correctly (also exercises multi-model + nested-dict distributed configs + to guard against the `tuple(sorted(items()))` unhashable regression). + +Copyright (c) Advanced Micro Devices, Inc. All rights reserved. +""" + +import json +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from madengine.deployment.common import VALID_LAUNCHERS, is_self_managed_launcher, normalize_launcher +from madengine.deployment.base import DeploymentConfig +from madengine.deployment.slurm import SlurmDeployment + + +# --------------------------------------------------------------------------- +# 1. Registry membership + +class TestSlurmMultiRegistration: + """slurm_multi is registered in the launcher allowlist.""" + + def test_slurm_multi_in_valid_launchers(self): + assert "slurm_multi" in VALID_LAUNCHERS + + @pytest.mark.parametrize("launcher,expected", [ + ("slurm_multi", True), + ("slurm-multi", True), # hyphen alias normalized via normalize_launcher + ("torchrun", False), + ("vllm", False), + ("sglang-disagg", False), + ("", False), + (None, False), + ]) + def test_is_self_managed_launcher(self, launcher, expected): + """is_self_managed_launcher must accept both aliases and reject all other launchers.""" + assert is_self_managed_launcher(launcher) is expected + + +# --------------------------------------------------------------------------- +# 2. Hyphen alias normalization + +class TestNormalizeSlurmMultiAliases: + """slurm-multi (hyphen) normalizes to slurm_multi.""" + + def test_canonical(self): + assert normalize_launcher("slurm_multi", "slurm") == "slurm_multi" + + def test_hyphen_alias(self): + assert normalize_launcher("slurm-multi", "slurm") == "slurm_multi" + + def test_unknown_falls_through_to_default(self): + # Sanity: unrelated value still returns docker for slurm + assert normalize_launcher("totally-bogus", "slurm") == "docker" + + +# --------------------------------------------------------------------------- +# 3. MAD-private #186 env_vars contract + +# Verbatim from MAD-private PR #186, model name pyt_sglang_disagg_qwen3-32b_short. +PR186_MODEL_ENTRY = { + "name": "pyt_sglang_disagg_qwen3-32b_short", + "url": "", + "dockerfile": "docker/sglang_disagg_inference", + "scripts": "scripts/sglang_disagg/run_xPyD_models.slurm", + "data": "huggingface", + "n_gpus": "8", + "owner": "mad.support@amd.com", + "training_precision": "", + "multiple_results": "perf_Qwen3-32B.csv", + "tags": [ + "pyt", "sglang", "sglang_disagg", "inference", + "qwen3", "disaggregated", "short", + ], + "timeout": -1, + "args": "--model_name Qwen3-32B --model_path /shared_inference/models_blog/Qwen3-32B --tp_size 8", + "slurm": { + "partition": "amd-rccl", + "nodes": 3, + "gpus_per_node": 8, + "time": "01:00:00", + "output_dir": "./slurm_output", + "exclusive": True, + "reservation": "", + }, + "distributed": { + "launcher": "slurm_multi", + "nnodes": 3, + "nproc_per_node": 8, + "backend": "nccl", + "port": 29500, + "sglang_disagg": { + "prefill_nodes": 1, + "decode_nodes": 1, + }, + }, + "env_vars": { + "DOCKER_IMAGE_NAME": "rocm/pytorch-private:sglang_disagg_mori_20260502", + "MODEL_NAME": "Qwen3-32B", + "MODEL_PATH": "/shared_inference/models_blog/Qwen3-32B", + "SGLANG_ENABLE_FLASHINFER": "1", + "SGLANG_ENABLE_RADIX_CACHE": "1", + "SGLANG_RADIX_CACHE_SIZE": "0.9", + "xP": "1", + "yD": "1", + "BENCHMARK_CONCURRENCY": "8", + "BENCHMARK_COMBINATIONS": "32/32", + }, +} + + +class TestSlurmMultiPrepareScript: + """`_prepare_slurm_multi_script` emits every model_info.env_vars as `export`.""" + + @pytest.fixture + def slurm_deployment(self, tmp_path: Path) -> SlurmDeployment: + """Build a SlurmDeployment whose manifest contains the MAD-private #186 entry.""" + # _prepare_slurm_multi_script resolves model_info["scripts"] against the + # manifest's directory and bails (returns False) if the script doesn't exist. + # Create a placeholder so the existence check passes; contents are irrelevant + # because prepare() never executes it. + script_rel = PR186_MODEL_ENTRY["scripts"] + script_abs = tmp_path / script_rel + script_abs.parent.mkdir(parents=True, exist_ok=True) + script_abs.write_text("#!/bin/bash\n# placeholder slurm_multi model script for unit test\n") + + # Use the model image-name as the manifest key (matches manifest convention). + image_key = "rocm/pytorch-private:sglang_disagg_mori_20260502" + + manifest = { + "built_images": { + image_key: { + "image_name": image_key, + "docker_image": image_key, + "registry_image": image_key, + }, + }, + "built_models": { + image_key: PR186_MODEL_ENTRY, + }, + "context": { + "docker_env_vars": {}, + "docker_mounts": {}, + "docker_build_arg": {}, + "gpu_vendor": "AMD", + "guest_os": "UBUNTU", + "docker_gpus": "all", + }, + } + manifest_path = tmp_path / "build_manifest.json" + manifest_path.write_text(json.dumps(manifest)) + + # Additional context drives the slurm config: point output_dir at tmp_path + # so we can read the generated script back. + additional_context = { + "deploy": "slurm", + "gpu_vendor": "AMD", + "guest_os": "UBUNTU", + "slurm": dict(PR186_MODEL_ENTRY["slurm"], output_dir=str(tmp_path / "slurm_results")), + "distributed": PR186_MODEL_ENTRY["distributed"], + } + + cfg = DeploymentConfig( + target="slurm", + manifest_file=str(manifest_path), + additional_context=additional_context, + ) + return SlurmDeployment(cfg) + + def test_prepare_dispatches_to_slurm_multi(self, slurm_deployment, tmp_path): + """prepare() must take the slurm_multi early-dispatch path and return True.""" + ok = slurm_deployment.prepare() + assert ok is True + # Wrapper script should now exist + assert slurm_deployment.script_path is not None + assert Path(slurm_deployment.script_path).exists() + + def test_wrapper_exports_all_model_env_vars(self, slurm_deployment): + """Every key in model_info.env_vars must appear as `export KEY="value"` in the wrapper.""" + slurm_deployment.prepare() + script_text = Path(slurm_deployment.script_path).read_text() + + for key, value in PR186_MODEL_ENTRY["env_vars"].items(): + # xP/yD are overridden by distributed.sglang_disagg.{prefill,decode}_nodes + # in _prepare_slurm_multi_script; values match in this fixture so the + # assertion still holds, but the source of truth is the model card. + expected = f'export {key}="{value}"' + assert expected in script_text, f"missing export for {key!r}: expected {expected!r}" + + def test_wrapper_is_slurm_multi_flag_set(self, slurm_deployment): + """`_is_slurm_multi` flag must be set so deploy() can choose the bash branch.""" + slurm_deployment.prepare() + assert getattr(slurm_deployment, "_is_slurm_multi", False) is True + + def test_wrapper_sbatch_header_uses_slurm_config(self, slurm_deployment): + """SBATCH header reflects the model card's slurm block (partition, nodes, gpus, time, exclusive).""" + slurm_deployment.prepare() + script_text = Path(slurm_deployment.script_path).read_text() + assert "#SBATCH --partition=amd-rccl" in script_text + assert "#SBATCH --nodes=3" in script_text + assert "#SBATCH --gpus-per-node=8" in script_text + assert "#SBATCH --time=01:00:00" in script_text + assert "#SBATCH --exclusive" in script_text + # reservation is "" in the fixture, so no --reservation line + assert "#SBATCH --reservation=" not in script_text + + def test_wrapper_disables_set_e_around_model_script(self, slurm_deployment): + """Regression for Copilot C6: `set -e` at the top of the wrapper would + terminate the script before SCRIPT_EXIT_CODE is captured and the + completion marker is written if the model script exits non-zero. + The wrapper must `set +e` immediately before the bash invocation, + capture $?, then re-enable `set -e` before continuing.""" + slurm_deployment.prepare() + script_text = Path(slurm_deployment.script_path).read_text() + + # Find the index of the bash line and assert the + # surrounding set +e / SCRIPT_EXIT_CODE / set -e ordering. + lines = script_text.splitlines() + bash_line_idx = next( + (i for i, line in enumerate(lines) if line.strip().startswith("bash run_xPyD_models.slurm")), + None, + ) + assert bash_line_idx is not None, "wrapper must invoke the model script via bash" + + # The two preceding non-empty/non-comment lines must include `set +e` + prelude = [l.strip() for l in lines[max(0, bash_line_idx - 5):bash_line_idx] if l.strip() and not l.strip().startswith("#")] + assert "set +e" in prelude, ( + f"`set +e` must appear immediately before bash invocation, got prelude {prelude!r}" + ) + + # SCRIPT_EXIT_CODE must be captured immediately after, and -e re-enabled before exit + suffix = [l.strip() for l in lines[bash_line_idx + 1:bash_line_idx + 5] if l.strip() and not l.strip().startswith("#")] + assert suffix[0] == "SCRIPT_EXIT_CODE=$?", f"first non-comment line after bash must capture $?, got {suffix!r}" + assert "set -e" in suffix, f"`set -e` must be re-enabled after capture, got {suffix!r}" + + # And the final exit must use the captured exit code + assert "exit $SCRIPT_EXIT_CODE" in script_text + + +# --------------------------------------------------------------------------- +# 4. _execute_with_prebuilt_image manifest-shape contract (Copilot C2 + C3) + +class TestPrebuiltImageManifestShape: + """Regression tests for the manifest produced by `_execute_with_prebuilt_image`. + + Two invariants the rest of the codebase relies on: + - C2: `built_images.keys() == built_models.keys()` so + `ContainerRunner.run_models_from_manifest()` can join them via + `built_models.get(image_name, {})`. + - C3: discovering >1 model whose `distributed` block contains nested dicts + (e.g. `sglang_disagg`, `vllm_disagg`) must not raise + `TypeError: unhashable type: 'dict'` when comparing distinct configs. + """ + + @pytest.fixture + def fake_models_two_sglang_disagg(self): + """Two sglang_disagg model cards. `distributed` includes a nested dict + (sglang_disagg.{prefill_nodes, decode_nodes}) so `tuple(sorted(items()))` + on this would TypeError -- exactly what C3 protects against.""" + common_distributed = { + "launcher": "slurm_multi", + "nnodes": 3, + "nproc_per_node": 8, + "backend": "nccl", + "port": 29500, + "sglang_disagg": {"prefill_nodes": 1, "decode_nodes": 1}, + } + common_slurm = { + "partition": "amd-rccl", + "nodes": 3, + "gpus_per_node": 8, + "time": "01:00:00", + "exclusive": True, + "reservation": "", + } + return [ + { + "name": "pyt_fake_a", + "dockerfile": "docker/sglang_disagg_inference", + "scripts": "scripts/fake/run.sh", + "n_gpus": "8", + "owner": "test@amd.com", + "tags": ["pyt", "fake", "a"], + "args": "--model_a", + "slurm": common_slurm, + "distributed": common_distributed, + "env_vars": {"MODEL_NAME": "Fake-A"}, + }, + { + "name": "pyt_fake_b", + "dockerfile": "docker/sglang_disagg_inference", + "scripts": "scripts/fake/run.sh", + "n_gpus": "8", + "owner": "test@amd.com", + "tags": ["pyt", "fake", "b"], + "args": "--model_b", + "slurm": common_slurm, + # Differing nested-dict to trigger the warning + dedup path + "distributed": {**common_distributed, "sglang_disagg": {"prefill_nodes": 2, "decode_nodes": 1}}, + "env_vars": {"MODEL_NAME": "Fake-B"}, + }, + ] + + def _build_orchestrator_stub(self, tmp_path: Path): + """Construct a BuildOrchestrator instance with `__new__` so we skip the + real constructor (which initializes Context, ConfigLoader, credentials, + etc.). We populate only the attributes `_execute_with_prebuilt_image` + and `_save_deployment_config` actually read.""" + from madengine.orchestration.build_orchestrator import BuildOrchestrator + orch = BuildOrchestrator.__new__(BuildOrchestrator) + # Required attributes + orch.args = MagicMock() + orch.console = MagicMock() + orch.rich_console = MagicMock() + orch.context = MagicMock() + orch.context.ctx = {"docker_env_vars": {}, "docker_mounts": {}} + orch.additional_context = {} + orch._original_user_slurm_keys = set() + orch.credentials = {} + return orch + + def test_built_images_and_models_share_keys(self, tmp_path, fake_models_two_sglang_disagg): + """Copilot C2: built_images and built_models must use the same keys + (model_name) so ContainerRunner.run_models_from_manifest() finds + each model_info via `built_models.get(image_name, {})`.""" + orch = self._build_orchestrator_stub(tmp_path) + manifest_path = tmp_path / "build_manifest.json" + use_image = "rocm/pytorch-private:fake-tag" + + with patch("madengine.orchestration.build_orchestrator.DiscoverModels") as mock_dm: + mock_dm.return_value.run.return_value = fake_models_two_sglang_disagg + # _save_deployment_config writes a sidecar file we don't care about; + # patch it to a no-op so we don't depend on its filesystem behavior. + with patch.object(orch, "_save_deployment_config", return_value=None): + orch._execute_with_prebuilt_image(use_image=use_image, manifest_output=str(manifest_path)) + + manifest = json.loads(manifest_path.read_text()) + + assert set(manifest["built_images"].keys()) == set(manifest["built_models"].keys()), ( + "built_images and built_models must share the same key set" + ) + assert set(manifest["built_models"].keys()) == {"pyt_fake_a", "pyt_fake_b"} + + for model_name in ("pyt_fake_a", "pyt_fake_b"): + assert manifest["built_images"][model_name]["docker_image"] == use_image + assert manifest["built_images"][model_name]["prebuilt"] is True + assert manifest["built_models"][model_name]["env_vars"]["DOCKER_IMAGE_NAME"] == use_image + + def test_multi_model_nested_dict_distributed_does_not_raise( + self, tmp_path, fake_models_two_sglang_disagg + ): + """Copilot C3: `distinct_distributed` / `distinct_slurm` dedupe must not + TypeError when distributed/slurm blocks contain nested dicts. Before + the fix, `tuple(sorted((m.get('distributed') or {}).items()))` raised + `TypeError: unhashable type: 'dict'` because the dict values aren't + hashable when stuffed into a set.""" + orch = self._build_orchestrator_stub(tmp_path) + manifest_path = tmp_path / "build_manifest.json" + + with patch("madengine.orchestration.build_orchestrator.DiscoverModels") as mock_dm: + mock_dm.return_value.run.return_value = fake_models_two_sglang_disagg + with patch.object(orch, "_save_deployment_config", return_value=None): + # Must not raise TypeError + orch._execute_with_prebuilt_image( + use_image="rocm/pytorch-private:fake-tag", + manifest_output=str(manifest_path), + ) + + # Manifest should still be valid JSON with both models present + manifest = json.loads(manifest_path.read_text()) + assert len(manifest["built_models"]) == 2 + + +# --------------------------------------------------------------------------- +# 5. xP/yD skip-if-set: model card wins over distributed.sglang_disagg + +class TestXpYdSkipIfSet: + """xP/yD in model card env_vars must not be overwritten by distributed.sglang_disagg.""" + + @pytest.fixture + def slurm_deployment(self, tmp_path: Path) -> SlurmDeployment: + """SlurmDeployment with model card that has xP=2/yD=3 while sglang_disagg says 1/1.""" + model_info = { + **PR186_MODEL_ENTRY, + "env_vars": {**PR186_MODEL_ENTRY["env_vars"], "xP": "2", "yD": "3"}, + "distributed": { + **PR186_MODEL_ENTRY["distributed"], + "sglang_disagg": {"prefill_nodes": 1, "decode_nodes": 1}, + }, + } + script_abs = tmp_path / PR186_MODEL_ENTRY["scripts"] + script_abs.parent.mkdir(parents=True, exist_ok=True) + script_abs.write_text("#!/bin/bash\n") + + image_key = "rocm/pytorch-private:sglang_disagg_mori_20260502" + manifest = { + "built_images": {image_key: {"image_name": image_key, "docker_image": image_key}}, + "built_models": {image_key: model_info}, + "context": { + "docker_env_vars": {}, "docker_mounts": {}, "docker_build_arg": {}, + "gpu_vendor": "AMD", "guest_os": "UBUNTU", "docker_gpus": "all", + }, + } + manifest_path = tmp_path / "build_manifest.json" + manifest_path.write_text(json.dumps(manifest)) + + additional_context = { + "deploy": "slurm", + "gpu_vendor": "AMD", + "guest_os": "UBUNTU", + "slurm": dict(PR186_MODEL_ENTRY["slurm"], output_dir=str(tmp_path / "slurm_results")), + "distributed": model_info["distributed"], + } + cfg = DeploymentConfig(target="slurm", manifest_file=str(manifest_path), + additional_context=additional_context) + return SlurmDeployment(cfg) + + def test_model_card_xp_yd_wins(self, slurm_deployment): + """When env_vars already contains xP/yD, distributed.sglang_disagg must not override them.""" + slurm_deployment.prepare() + script_text = Path(slurm_deployment.script_path).read_text() + + assert 'export xP="2"' in script_text, "model card xP=2 must not be overwritten" + assert 'export yD="3"' in script_text, "model card yD=3 must not be overwritten" + assert 'export xP="1"' not in script_text, "distributed.sglang_disagg xP=1 must not win" + assert 'export yD="1"' not in script_text, "distributed.sglang_disagg yD=1 must not win" + + +# --------------------------------------------------------------------------- +# 6. _execute_build_on_compute manifest-shape contract (multi-model parity) + +class TestBuildOnComputeManifestShape: + """_execute_build_on_compute must produce the same key contract as _execute_with_prebuilt_image. + + built_images.keys() == built_models.keys() == set of model names, so + ContainerRunner.run_models_from_manifest() can join them. + """ + + def _build_orchestrator_stub(self, tmp_path: Path): + from madengine.orchestration.build_orchestrator import BuildOrchestrator + orch = BuildOrchestrator.__new__(BuildOrchestrator) + orch.args = MagicMock() + orch.console = MagicMock() + orch.rich_console = MagicMock() + orch.context = MagicMock() + orch.context.ctx = {} + orch.additional_context = {"slurm": {"partition": "gpu", "time": "01:00:00"}} + orch._original_user_slurm_keys = set() + orch.credentials = {} + return orch + + def test_two_model_manifest_keys_match(self, tmp_path): + """built_images and built_models must share the same key set (model names).""" + fake_models = [ + { + "name": "model_a", + "dockerfile": "docker/model_a", + "scripts": "scripts/model_a/run.sh", + "n_gpus": "8", + "tags": ["model_a"], + "slurm": {"partition": "gpu", "time": "01:00:00"}, + "distributed": {"launcher": "slurm_multi"}, + "env_vars": {}, + }, + { + "name": "model_b", + "dockerfile": "docker/model_b", + "scripts": "scripts/model_b/run.sh", + "n_gpus": "8", + "tags": ["model_b"], + "slurm": {"partition": "gpu", "time": "01:00:00"}, + "distributed": {"launcher": "slurm_multi"}, + "env_vars": {}, + }, + ] + # Write dummy Dockerfiles so glob finds them + for m in fake_models: + df = tmp_path / f"{m['dockerfile']}.ubuntu.amd.Dockerfile" + df.parent.mkdir(parents=True, exist_ok=True) + df.write_text("FROM scratch\n") + + manifest_path = tmp_path / "build_manifest.json" + orch = self._build_orchestrator_stub(tmp_path) + + import os + orig_cwd = os.getcwd() + try: + os.chdir(tmp_path) + with patch("madengine.orchestration.build_orchestrator.DiscoverModels") as mock_dm: + mock_dm.return_value.run.return_value = fake_models + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + # Use localhost:5000 (private-registry path) so the test + # doesn't hit the public-registry credential gate at + # build_orchestrator.py:954. The manifest-shape contract + # this test locks in is registry-agnostic. + orch._execute_build_on_compute( + registry="localhost:5000/myrepo", + manifest_output=str(manifest_path), + ) + finally: + os.chdir(orig_cwd) + + manifest = json.loads(manifest_path.read_text()) + assert set(manifest["built_images"].keys()) == set(manifest["built_models"].keys()), ( + "built_images and built_models must share the same key set" + ) + assert set(manifest["built_models"].keys()) == {"model_a", "model_b"} + for mn in ("model_a", "model_b"): + assert manifest["built_models"][mn]["built_on_compute"] is True + assert "DOCKER_IMAGE_NAME" in manifest["built_models"][mn]["env_vars"]