diff --git a/training/deepspeed_finetune_demo/README.md b/training/deepspeed_finetune_demo/README.md new file mode 100644 index 000000000..d39d1bee6 --- /dev/null +++ b/training/deepspeed_finetune_demo/README.md @@ -0,0 +1,207 @@ +# DeepSpeed finetune examples +This finetune example is extracted and modified from [ZenFlow Llama-2 Fine-Tuning Example](https://github.com/deepspeedai/DeepSpeedExamples/tree/master/training/DeepSpeed-ZenFlow/finetuning) in [DeepSpeedExamples](https://github.com/deepspeedai/DeepSpeedExamples). The purpose is to demostrate how to use different DeepSpeed training features and compare their performance in a single place. + +Currently in DeepSpeedExamples, each technology has a dedicated directory to show how to use it. However, DeepSpeed's philosophy is to allow users to use different features with different configuration file with no code change needed. This project put this claim to the test. + +# How to use + +To run the example, simply run: +``` +./finetune.sh +``` + +For example, if we want to run Qwen2.5-3B model with ZeRO offload on 2 GPUs, we can run: +``` +./finetune.sh 2 Qwen2.5-3B configs/zo_config.json +``` + +## Key arguments + +| Argument | Description | Default | +|----------|-------------|---------| +| `--batch_size` | Training batch size per GPU | required | +| `--eval_batch_size` | Eval batch size per rank | 4 | +| `--eval_steps` | Run evaluation every N steps (0 disables) | 0 | +| `--max_steps` | Stop after N steps (-1 = full epoch) | -1 | +| `--checkpoint_steps` | Save a checkpoint every N steps (0 disables); keeps last 2 | 0 | +| `--wandb_name` | Wandb run name (optional) | None | +| `--num_train_epochs` | Number of training epochs | 3 | +| `--weight_decay` | Weight decay | 0.01 | +| `--warmup` | Warmup ratio | 0.01 | + +Note: Learning rate is controlled entirely by the DeepSpeed config JSON, not by command-line arguments. + +## Batch size +In DeepSpeed, batch size is decided by configuration file. However, to avoid modify the config file, this python script takes `--batch_size` parameter and use it to decide train batch size. Keep this in mind if you need to try different batch size. + +## Wandb support +An optional `--wandb_name` can be supplied to finetune_llama.py to generate wandb graph. But you need to modify `finetune.sh` manually to supply this argument. + +## Dataset support + +The training script uses a `DATASET_REGISTRY` to configure datasets. Registered datasets are loaded with proper field mapping and preprocessing automatically. + +| Dataset | Format | Use Case | Notes | +|---------|--------|----------|-------| +| `sahil2801/CodeAlpaca-20k` | Alpaca | Code instruction tuning | | +| `meta-math/MetaMathQA` | Alpaca | Math reasoning | `sample_rate=0.1` (39.5k of 395k) | +| `cais/mmlu` | MMLU MCQ | Knowledge tasks | Uses `auxiliary_train` split (~95k) | +| `tatsu-lab/alpaca` | Alpaca | General instruction tuning | Fallback default | +| `ise-uiuc/Magicoder-OSS-Instruct-75K` | Magicoder | Code instruction tuning | Auto-detected via `problem` column | + +**Registered datasets** are specified by `--dataset_name` directly. **Unregistered datasets** are auto-detected: if the dataset has a `problem` column, Magicoder format is used; otherwise Alpaca format is assumed. + +All formats use instruction-masked loss (only the response part contributes to loss). + +### Adding a new dataset + +Add an entry to `DATASET_REGISTRY` in `finetune_llama.py`: + +```python +"your-dataset/name": { + "split": "train", + "preprocessor": "alpaca", # or a custom preprocessor name + "field_map": { # maps source fields to Alpaca format + "instruction": "source_inst_field", + "input": None, # set to None if not present + "output": "source_output_field", + }, + "sample_rate": 0.1, # optional: downsample large datasets +}, +``` + +# Moonlight-16B-A3B with AutoEP + Muon + +This project supports fine-tuning [Moonlight-16B-A3B](https://huggingface.co/moonshotai/Moonlight-16B-A3B) (a 16B-parameter MoE model with 3B active parameters) using DeepSpeed AutoEP (automatic expert parallelism) and the Muon optimizer. + +## Quick start (8x A100 40GB) + +```bash +# 1. Train +deepspeed --num_gpus=8 finetune_llama.py \ + --model_name moonshotai/Moonlight-16B-A3B \ + --output_dir output_moonlight_muon \ + --batch_size 16 --max_length 512 \ + --deepspeed_config configs/z2_moonlight_autoep_muon.json \ + --dataset_name sahil2801/CodeAlpaca-20k \ + --num_train_epochs 1 + +# 2. Convert DeepSpeed checkpoint to HuggingFace format +python convert_ds_to_hf.py \ + --ds_checkpoint output_moonlight_muon/step_ \ + --original_model moonshotai/Moonlight-16B-A3B \ + --output_dir hf_model_muon \ + --ep_size 8 + +# 3. Generate HumanEval completions +python evaluate/humaneval/gen_humaneval.py \ + --model hf_model_muon \ + --output evalplus_results/muon \ + --instruction + +# 4. Evaluate +python -m evalplus.evaluate \ + --dataset humaneval \ + --samples evalplus_results/muon/samples.jsonl +``` + +## Checkpoint format + +With AutoEP, each rank holds a different expert shard. The training script saves checkpoints to `/step_/`: +- `0/model_weights.pt`: full state dict (non-expert params + local experts for rank 0) +- `1/model_weights.pt` ... `7/model_weights.pt`: expert shard params only + +Use `convert_ds_to_hf.py` to merge all shards back into a standard HuggingFace model. + +## HumanEval results + +| Model | HumanEval (base) | HumanEval+ | +|-------|-----------------|------------| +| Moonlight-16B-A3B (baseline) | 46.3% | 40.2% | +| + Muon fine-tune on CodeAlpaca-20k (1 epoch) | 54.9% | 47.0% | + +## AutoEP config + +AutoEP config goes inside the DeepSpeed JSON under `expert_parallel`: + +```json +{ + "expert_parallel": { + "enabled": true, + "autoep_size": 8, + "expert_w1": "gate_proj", + "expert_w2": "down_proj", + "expert_w3": "up_proj", + "route_scale": 2.446, + "load_balance_coeff": null + } +} +``` + +| Parameter | Description | +|-----------|-------------| +| `autoep_size` | Number of expert-parallel ranks (typically = num_gpus) | +| `expert_w1/w2/w3` | Names of the expert weight projections in the HF model | +| `route_scale` | Router output scaling factor (should match `routed_scaling_factor` in model config) | +| `load_balance_coeff` | Auxiliary load-balancing loss coefficient (`null` to disable) | + +# Benchmarking + +To run benchmark, run: +``` +./benchmark.sh +``` + +# Profiling + +To run profiling, run: +``` +./profile.sh +``` + +# Config files + +For quick start, some config files are added, you may also modify the config to fit your need. + +| Config File | Description | +|-------------|-------------| +| z2_config.json | ZeRO Stage 2 with AdamW | +| z3_config.json | ZeRO Stage 3 with AdamW | +| zo_config.json | ZeRO Offload, stage 2 | +| z3o_config.json | ZeRO Offload, stage 3 | +| zf_config.json | ZeRO Offload with ZenFlow | +| so_config.json | ZeRO Offload with SuperOffload | +| z2_muon.json | ZeRO 2 with Muon optimizer | +| z3_muon.json | ZeRO 3 with Muon optimizer | +| tp_config.json | ZeRO 2 with AutoTP | +| z2_moonlight_autoep_adam.json | Moonlight-16B-A3B with AutoEP + AdamW | +| z2_moonlight_autoep_muon.json | Moonlight-16B-A3B with AutoEP + Muon | + +## Muon optimizer config + +Muon is a hybrid optimizer: it applies Muon updates to 2D hidden weights and Adam to everything else. The config supports separate learning rates: + +```json +{ + "optimizer": { + "type": "Muon", + "params": { + "muon_lr": 1e-3, + "adam_lr": 2e-5, + "momentum": 0.95, + "betas": [0.9, 0.999], + "eps": 1e-8, + "weight_decay": 0.01 + } + } +} +``` + +| Parameter | Description | +|-----------|-------------| +| `muon_lr` | Learning rate for Muon (2D hidden weights) | +| `adam_lr` | Learning rate for Adam (embeddings, layer norms, lm_head, etc.) | +| `momentum` | Muon momentum factor | +| `betas` | Adam betas (for non-Muon parameters) | +| `eps` | Adam epsilon | +| `weight_decay` | Weight decay for both Muon and Adam parameters | diff --git a/training/deepspeed_finetune_demo/benchmark.sh b/training/deepspeed_finetune_demo/benchmark.sh new file mode 100755 index 000000000..6f4cf13a8 --- /dev/null +++ b/training/deepspeed_finetune_demo/benchmark.sh @@ -0,0 +1,5 @@ +NUM="${1:-2}" +MODEL="${2:-Qwen/Qwen2.5-0.5B}" +CONFIG="${3:-configs/z2_config.json}" +deepspeed --num_gpus=$NUM --bind_cores_to_rank finetune_llama.py --model_name $MODEL --output_dir output --batch_size 8 --deepspeed_config $CONFIG --num_train_epochs 1 --bench_start 4 + diff --git a/training/deepspeed_finetune_demo/configs/so_config.json b/training/deepspeed_finetune_demo/configs/so_config.json new file mode 100644 index 000000000..7137b99eb --- /dev/null +++ b/training/deepspeed_finetune_demo/configs/so_config.json @@ -0,0 +1,27 @@ +{ + "train_batch_size": 8, + "bf16": { "enabled": true }, + "zero_optimization": { + "stage": 3, + "sub_group_size": 100000000, + "offload_optimizer": { + "super_offload": true, + "convert_grad_on_cpu": true, + "device": "cpu", + "pin_memory": true + } + }, + "optimizer": { + "type": "AdamW", + "params": { + "lr": 2e-5, + "betas": [0.9, 0.999], + "eps": 1e-8, + "weight_decay": 0.01 + } + }, + "gradient_accumulation_steps": 1, + "gradient_clipping": 1.0, + "zero_allow_untested_optimizer": true +} + diff --git a/training/deepspeed_finetune_demo/configs/tp_config.json b/training/deepspeed_finetune_demo/configs/tp_config.json new file mode 100644 index 000000000..25b44422d --- /dev/null +++ b/training/deepspeed_finetune_demo/configs/tp_config.json @@ -0,0 +1,24 @@ +{ + "train_batch_size": 8, + "bf16": { "enabled": true }, + "zero_optimization": { + "stage": 2 + }, + + "tensor_parallel":{ + "autotp_size": 4 + }, + + "optimizer": { + "type": "AdamW", + "params": { + "lr": 2e-5, + "betas": [0.9, 0.999], + "eps": 1e-8, + "weight_decay": 0.01 + } + }, + "gradient_accumulation_steps": 1, + "gradient_clipping": 1.0, + "zero_allow_untested_optimizer": true +} diff --git a/training/deepspeed_finetune_demo/configs/z2_config.json b/training/deepspeed_finetune_demo/configs/z2_config.json new file mode 100644 index 000000000..7227b4ab4 --- /dev/null +++ b/training/deepspeed_finetune_demo/configs/z2_config.json @@ -0,0 +1,19 @@ +{ + "train_batch_size": 8, + "bf16": { "enabled": true }, + "zero_optimization": { + "stage": 2 + }, + "optimizer": { + "type": "AdamW", + "params": { + "lr": 2e-5, + "betas": [0.9, 0.999], + "eps": 1e-8, + "weight_decay": 0.01 + } + }, + "gradient_accumulation_steps": 1, + "gradient_clipping": 1.0, + "zero_allow_untested_optimizer": true +} diff --git a/training/deepspeed_finetune_demo/configs/z2_moonlight_autoep_adam.json b/training/deepspeed_finetune_demo/configs/z2_moonlight_autoep_adam.json new file mode 100644 index 000000000..14df7e84d --- /dev/null +++ b/training/deepspeed_finetune_demo/configs/z2_moonlight_autoep_adam.json @@ -0,0 +1,30 @@ +{ + "train_batch_size": 16, + "bf16": { + "enabled": true + }, + "zero_optimization": { + "stage": 2 + }, + "optimizer": { + "type": "AdamW", + "params": { + "lr": 2e-06, + "betas": [0.9, 0.999], + "eps": 1e-08, + "weight_decay": 0.01 + } + }, + "expert_parallel": { + "enabled": true, + "autoep_size": 8, + "expert_w1": "gate_proj", + "expert_w2": "down_proj", + "expert_w3": "up_proj", + "route_scale": 2.446, + "load_balance_coeff": null + }, + "gradient_accumulation_steps": 2, + "gradient_clipping": 1.0, + "zero_allow_untested_optimizer": true +} diff --git a/training/deepspeed_finetune_demo/configs/z2_moonlight_autoep_muon.json b/training/deepspeed_finetune_demo/configs/z2_moonlight_autoep_muon.json new file mode 100644 index 000000000..706ce4b2e --- /dev/null +++ b/training/deepspeed_finetune_demo/configs/z2_moonlight_autoep_muon.json @@ -0,0 +1,36 @@ +{ + "train_batch_size": 16, + "bf16": { + "enabled": true + }, + "zero_optimization": { + "stage": 2 + }, + "optimizer": { + "type": "Muon", + "params": { + "muon_lr": 0.002, + "adam_lr": 2e-06, + "momentum": 0.95, + "ns_method": "gram", + "betas": [ + 0.9, + 0.999 + ], + "eps": 1e-08, + "weight_decay": 0.01 + } + }, + "expert_parallel": { + "enabled": true, + "autoep_size": 8, + "expert_w1": "gate_proj", + "expert_w2": "down_proj", + "expert_w3": "up_proj", + "route_scale": 2.446, + "load_balance_coeff": null + }, + "gradient_accumulation_steps": 2, + "gradient_clipping": 1.0, + "zero_allow_untested_optimizer": true +} \ No newline at end of file diff --git a/training/deepspeed_finetune_demo/configs/z2_muon.json b/training/deepspeed_finetune_demo/configs/z2_muon.json new file mode 100644 index 000000000..df01b7042 --- /dev/null +++ b/training/deepspeed_finetune_demo/configs/z2_muon.json @@ -0,0 +1,21 @@ +{ + "train_batch_size": 8, + "bf16": { "enabled": true }, + "zero_optimization": { + "stage": 2 + }, + "optimizer": { + "type": "Muon", + "params": { + "muon_lr": 1e-3, + "adam_lr": 2e-5, + "momentum": 0.95, + "betas": [0.9, 0.999], + "eps": 1e-8, + "weight_decay": 0.01 + } + }, + "gradient_accumulation_steps": 1, + "gradient_clipping": 1.0, + "zero_allow_untested_optimizer": true +} diff --git a/training/deepspeed_finetune_demo/configs/z3_config.json b/training/deepspeed_finetune_demo/configs/z3_config.json new file mode 100644 index 000000000..d1d90c9d1 --- /dev/null +++ b/training/deepspeed_finetune_demo/configs/z3_config.json @@ -0,0 +1,19 @@ +{ + "train_batch_size": 8, + "bf16": { "enabled": true }, + "zero_optimization": { + "stage": 3 + }, + "optimizer": { + "type": "AdamW", + "params": { + "lr": 2e-5, + "betas": [0.9, 0.999], + "eps": 1e-8, + "weight_decay": 0.01 + } + }, + "gradient_accumulation_steps": 1, + "gradient_clipping": 1.0, + "zero_allow_untested_optimizer": true +} diff --git a/training/deepspeed_finetune_demo/configs/z3_muon.json b/training/deepspeed_finetune_demo/configs/z3_muon.json new file mode 100644 index 000000000..06e15263a --- /dev/null +++ b/training/deepspeed_finetune_demo/configs/z3_muon.json @@ -0,0 +1,22 @@ +{ + "train_batch_size": 8, + "bf16": { "enabled": true }, + "zero_optimization": { + "stage": 3, + "reduce_scatter": false + }, + "optimizer": { + "type": "Muon", + "params": { + "muon_lr": 1e-3, + "adam_lr": 2e-5, + "momentum": 0.95, + "betas": [0.9, 0.999], + "eps": 1e-8, + "weight_decay": 0.01 + } + }, + "gradient_accumulation_steps": 1, + "gradient_clipping": 1.0, + "zero_allow_untested_optimizer": true +} diff --git a/training/deepspeed_finetune_demo/configs/z3o_config.json b/training/deepspeed_finetune_demo/configs/z3o_config.json new file mode 100644 index 000000000..3cc14e884 --- /dev/null +++ b/training/deepspeed_finetune_demo/configs/z3o_config.json @@ -0,0 +1,24 @@ +{ + "train_batch_size": 8, + "bf16": { "enabled": true }, + "zero_optimization": { + "stage": 3, + "offload_optimizer": { + "device": "cpu", + "pin_memory": true + } + }, + "optimizer": { + "type": "AdamW", + "params": { + "lr": 2e-5, + "betas": [0.9, 0.999], + "eps": 1e-8, + "weight_decay": 0.01 + } + }, + "gradient_accumulation_steps": 1, + "gradient_clipping": 1.0, + "zero_allow_untested_optimizer": true +} + diff --git a/training/deepspeed_finetune_demo/configs/zf_config.json b/training/deepspeed_finetune_demo/configs/zf_config.json new file mode 100644 index 000000000..bf492923c --- /dev/null +++ b/training/deepspeed_finetune_demo/configs/zf_config.json @@ -0,0 +1,30 @@ +{ + "train_batch_size": 8, + "bf16": { "enabled": true }, + "zero_optimization": { + "stage": 2, + "offload_optimizer": { + "device": "cpu", + "pin_memory": true + }, + "zenflow": { + "topk_ratio": 0.1, + "update_interval": 4, + "full_warm_up_rounds": 0, + "overlap_step": true + } + }, + "optimizer": { + "type": "AdamW", + "params": { + "lr": 2e-5, + "betas": [0.9, 0.999], + "eps": 1e-8, + "weight_decay": 0.01 + } + }, + "gradient_accumulation_steps": 1, + "gradient_clipping": 1.0, + "zero_allow_untested_optimizer": true +} + diff --git a/training/deepspeed_finetune_demo/configs/zo_config.json b/training/deepspeed_finetune_demo/configs/zo_config.json new file mode 100644 index 000000000..91a4228db --- /dev/null +++ b/training/deepspeed_finetune_demo/configs/zo_config.json @@ -0,0 +1,24 @@ +{ + "train_batch_size": 8, + "bf16": { "enabled": true }, + "zero_optimization": { + "stage": 2, + "offload_optimizer": { + "device": "cpu", + "pin_memory": true + } + }, + "optimizer": { + "type": "AdamW", + "params": { + "lr": 2e-5, + "betas": [0.9, 0.999], + "eps": 1e-8, + "weight_decay": 0.01 + } + }, + "gradient_accumulation_steps": 1, + "gradient_clipping": 1.0, + "zero_allow_untested_optimizer": true +} + diff --git a/training/deepspeed_finetune_demo/convert_ds_to_hf.py b/training/deepspeed_finetune_demo/convert_ds_to_hf.py new file mode 100644 index 000000000..72755583e --- /dev/null +++ b/training/deepspeed_finetune_demo/convert_ds_to_hf.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: Apache-2.0 +"""Convert DeepSpeed ZeRO-2 + AutoEP model weights back to HuggingFace format. + +With AutoEP (expert parallelism), each rank holds only its local expert shard. +The training script saves: + - Rank 0: full state dict (non-expert params + local experts 0..E_local-1) + - Rank 1..N-1: only expert shard params (w1, w2, w3 as 3D tensors) + +This script: + 1. Loads model_weights.pt from all EP ranks + 2. Takes non-expert params from rank 0 + 3. Concatenates expert shards (w1, w2, w3) across ranks + 4. Unpacks grouped 3D tensors to per-expert module_list format + 5. Remaps AutoEP router keys back to HF gate keys + 6. Saves as a standard HF model + +Usage: + python convert_ds_to_hf.py \ + --ds_checkpoint output_moonlight_muon \ + --original_model moonshotai/Moonlight-16B-A3B \ + --output_dir hf_model_muon \ + --ep_size 8 +""" + +import argparse +import os +import re + +import torch +from transformers import AutoModelForCausalLM, AutoTokenizer + + +def load_rank_state_dict(rank_dir): + """Load model weights from a single rank's directory.""" + weights_file = os.path.join(rank_dir, "model_weights.pt") + if not os.path.exists(weights_file): + raise FileNotFoundError(f"Model weights not found: {weights_file}") + return torch.load(weights_file, map_location="cpu", weights_only=False) + + +# Regex to detect AutoEP expert grouped tensor keys +# Pattern: model.layers.{L}.mlp.experts.w{1,2,3} +_EXPERT_W_RE = re.compile(r"^(model\.layers\.\d+\.mlp)\.experts\.(w[123])$") + +# Pattern for AutoEP router keys. AutoEP wraps the gate in a router module: +# model.layers.{L}.mlp.router.gate.weight -> model.layers.{L}.mlp.gate.weight +# model.layers.{L}.mlp.router.e_score_correction_bias -> model.layers.{L}.mlp.gate.e_score_correction_bias +_ROUTER_RE = re.compile(r"^(model\.layers\.\d+\.mlp)\.router\.(.+)$") + +# Mapping: AutoEP w names -> HF projection names +_W_TO_PROJ = {"w1": "gate_proj", "w2": "down_proj", "w3": "up_proj"} + + +def merge_and_unpack(rank0_sd, expert_shard_sds, ep_size): + """Merge rank 0's full state dict with expert shards from other ranks. + + Args: + rank0_sd: Full state dict from rank 0 (non-expert + expert shard 0) + expert_shard_sds: List of expert-only state dicts from ranks 1..N-1 + ep_size: Number of EP ranks + """ + hf_state_dict = {} + + for key, value in rank0_sd.items(): + m_expert = _EXPERT_W_RE.match(key) + m_router = _ROUTER_RE.match(key) + + if m_expert: + prefix = m_expert.group(1) # e.g., model.layers.2.mlp + w_name = m_expert.group(2) # e.g., w1 + + # Concatenate: rank 0's shard + shards from ranks 1..N-1 + shards = [value] + for rank_sd in expert_shard_sds: + shards.append(rank_sd[key]) + full_tensor = torch.cat(shards, dim=0) # [E_total, ...] + + # Unpack to per-expert module_list format + proj_name = _W_TO_PROJ[w_name] + num_experts = full_tensor.shape[0] + for e in range(num_experts): + hf_key = f"{prefix}.experts.{e}.{proj_name}.weight" + hf_state_dict[hf_key] = full_tensor[e] + elif m_router: + prefix = m_router.group(1) + rest = m_router.group(2) + # Remap AutoEP router keys back to HF gate module keys. + # AutoEP router has: gate (sub-module) and e_score_correction_bias (direct attr). + # HF model has: gate.weight and gate.e_score_correction_bias (both under gate). + # So: mlp.router.gate.weight -> mlp.gate.weight (strip "router.") + # mlp.router.e_score_correction_bias -> mlp.gate.e_score_correction_bias + if rest.startswith("gate."): + hf_key = f"{prefix}.{rest}" + else: + hf_key = f"{prefix}.gate.{rest}" + hf_state_dict[hf_key] = value + else: + # Non-expert, non-router param: take from rank 0 as-is + hf_state_dict[key] = value + + return hf_state_dict + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--ds_checkpoint", + type=str, + required=True, + help="Path to DeepSpeed checkpoint output directory", + ) + parser.add_argument( + "--original_model", type=str, required=True, help="Original HF model name/path" + ) + parser.add_argument( + "--output_dir", type=str, required=True, help="Output directory for HF model" + ) + parser.add_argument( + "--ep_size", + type=int, + default=8, + help="Expert parallelism size (number of ranks)", + ) + args = parser.parse_args() + + # Load rank 0 (full state dict) + rank0_dir = os.path.join(args.ds_checkpoint, "0") + if not os.path.isdir(rank0_dir): + raise FileNotFoundError(f"Rank 0 directory not found: {rank0_dir}") + print("Loading rank 0 (full state dict)...") + rank0_sd = load_rank_state_dict(rank0_dir) + print(f" Rank 0: {len(rank0_sd)} keys") + + # Load expert shards from ranks 1..N-1 + expert_shard_sds = [] + for rank in range(1, args.ep_size): + rank_dir = os.path.join(args.ds_checkpoint, str(rank)) + if not os.path.isdir(rank_dir): + raise FileNotFoundError(f"Rank {rank} directory not found: {rank_dir}") + print(f"Loading rank {rank} (expert shard only)...") + sd = load_rank_state_dict(rank_dir) + expert_shard_sds.append(sd) + print(f" Rank {rank}: {len(sd)} keys") + + # Merge and unpack + print("Merging expert shards and unpacking to HF format...") + hf_state_dict = merge_and_unpack(rank0_sd, expert_shard_sds, args.ep_size) + print(f"Merged state dict: {len(hf_state_dict)} keys") + + # Free loaded state dicts to save memory + del rank0_sd, expert_shard_sds + + # Load original model structure + print(f"Loading original model: {args.original_model}") + model = AutoModelForCausalLM.from_pretrained( + args.original_model, + torch_dtype=torch.bfloat16, + trust_remote_code=True, + ) + + # Load the merged state dict + missing, unexpected = model.load_state_dict(hf_state_dict, strict=False) + if missing: + print(f"WARNING: {len(missing)} missing keys:") + for k in missing[:20]: + print(f" Missing: {k}") + if len(missing) > 20: + print(f" ... and {len(missing) - 20} more") + if unexpected: + print(f"WARNING: {len(unexpected)} unexpected keys:") + for k in unexpected[:20]: + print(f" Unexpected: {k}") + if len(unexpected) > 20: + print(f" ... and {len(unexpected) - 20} more") + if not missing and not unexpected: + print("All keys matched perfectly!") + + # Save + os.makedirs(args.output_dir, exist_ok=True) + print(f"Saving HF model to: {args.output_dir}") + model.save_pretrained(args.output_dir) + + # Copy tokenizer + tokenizer = AutoTokenizer.from_pretrained( + args.original_model, trust_remote_code=True + ) + tokenizer.save_pretrained(args.output_dir) + + # Copy custom modeling code (save_pretrained only copies config, not modeling code) + import shutil + + code_dir = os.path.dirname(model.config.__class__.__module__.replace(".", "/")) + # Find the actual source directory from the module's file path + import importlib + + config_module = importlib.import_module(model.config.__class__.__module__) + src_dir = os.path.dirname(config_module.__file__) + for fname in os.listdir(src_dir): + if fname.endswith(".py") and not fname.startswith("__"): + src_file = os.path.join(src_dir, fname) + dst_file = os.path.join(args.output_dir, fname) + if not os.path.exists(dst_file): + shutil.copy2(src_file, dst_file) + print(f" Copied custom code: {fname}") + + print("Done!") + + +if __name__ == "__main__": + main() diff --git a/training/deepspeed_finetune_demo/evaluate/gsm8k/eval_gsm8k.py b/training/deepspeed_finetune_demo/evaluate/gsm8k/eval_gsm8k.py new file mode 100644 index 000000000..88b99f50b --- /dev/null +++ b/training/deepspeed_finetune_demo/evaluate/gsm8k/eval_gsm8k.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: Apache-2.0 +"""Evaluate GSM8K accuracy from generated samples.jsonl. + +Usage: + python evaluate/gsm8k/eval_gsm8k.py --samples eval_results/gsm8k_baseline/samples.jsonl +""" + +import argparse +import json + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--samples", type=str, required=True, help="Path to samples.jsonl") + args = parser.parse_args() + + total = 0 + correct = 0 + empty = 0 + + with open(args.samples) as f: + for line in f: + sample = json.loads(line) + total += 1 + predicted = sample.get("predicted_answer", "") + gold = sample["correct_answer"] + if not predicted: + empty += 1 + elif predicted == gold: + correct += 1 + + accuracy = correct / total * 100 if total > 0 else 0 + print(f"GSM8K Results:") + print(f" Total: {total}") + print(f" Correct: {correct}") + print(f" Empty: {empty}") + print(f" Accuracy: {accuracy:.2f}%") + print(f" pass@1: {accuracy:.2f}") + + +if __name__ == "__main__": + main() diff --git a/training/deepspeed_finetune_demo/evaluate/gsm8k/gen_gsm8k.py b/training/deepspeed_finetune_demo/evaluate/gsm8k/gen_gsm8k.py new file mode 100644 index 000000000..b2017a14d --- /dev/null +++ b/training/deepspeed_finetune_demo/evaluate/gsm8k/gen_gsm8k.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: Apache-2.0 +"""Generate GSM8K completions using vLLM (multi-GPU tensor parallel). +Outputs JSONL with task_id, question, correct_answer, predicted_answer, and raw_completion. + +Usage: + python evaluate/gsm8k/gen_gsm8k.py --model moonshotai/Moonlight-16B-A3B --output eval_results/gsm8k_baseline --tp 8 +""" + +import argparse +import json +import os +import re + +os.environ.setdefault("VLLM_DISABLE_CUSTOM_ALL_REDUCE", "1") +os.environ.setdefault("VLLM_WORKER_MULTIPROC_METHOD", "spawn") + +from datasets import load_dataset +from vllm import LLM, SamplingParams + + +def format_gsm8k_prompt(question): + return f"### Instruction:\n{question}\n\n### Response:\n" + + +def extract_answer(text): + match = re.search(r"####\s*(-?[\d,]+(?:\.\d+)?)", text) + if match: + return match.group(1).replace(",", "") + numbers = re.findall(r"-?\d+\.?\d*", text) + if numbers: + return numbers[-1].replace(",", "") + return "" + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--model", type=str, required=True) + parser.add_argument("--output", type=str, required=True) + parser.add_argument("--split", type=str, default="test", help="GSM8K split to evaluate") + parser.add_argument("--tp", type=int, default=8, help="Tensor parallel size") + parser.add_argument("--max_new_tokens", type=int, default=1024) + parser.add_argument("--temperature", type=float, default=0.0) + parser.add_argument("--n_samples", type=int, default=1) + args = parser.parse_args() + + os.makedirs(args.output, exist_ok=True) + out_path = os.path.join(args.output, "samples.jsonl") + + print(f"Loading model: {args.model} (tp={args.tp})") + llm = LLM( + model=args.model, + tensor_parallel_size=args.tp, + dtype="bfloat16", + trust_remote_code=True, + max_model_len=2048, + gpu_memory_utilization=0.90, + enforce_eager=True, + disable_custom_all_reduce=True, + ) + + sampling_params = SamplingParams( + temperature=0, + max_tokens=args.max_new_tokens, + ) + + print(f"Loading GSM8K dataset (split={args.split})") + dataset = load_dataset("openai/gsm8k", "main", split=args.split, trust_remote_code=True) + print(f"Loaded {len(dataset)} examples") + + prompts = [] + meta = [] + for i, example in enumerate(dataset): + question = example["question"] + gold_answer = extract_answer(example["answer"]) + prompt = format_gsm8k_prompt(question) + for _ in range(args.n_samples): + prompts.append(prompt) + meta.append({ + "task_id": i, + "question": question, + "correct_answer": gold_answer, + }) + + print(f"Generating {len(prompts)} completions") + outputs = llm.generate(prompts, sampling_params, use_tqdm=True) + + with open(out_path, "w") as f_out: + for m, output in zip(meta, outputs): + completion = output.outputs[0].text.strip() + predicted = extract_answer(completion) + sample = { + "task_id": m["task_id"], + "question": m["question"], + "correct_answer": m["correct_answer"], + "predicted_answer": predicted, + "raw_completion": completion, + } + f_out.write(json.dumps(sample) + "\n") + + print(f"Done. {len(outputs)} samples in {out_path}") + + +if __name__ == "__main__": + main() diff --git a/training/deepspeed_finetune_demo/evaluate/mmlu/eval_mmlu.py b/training/deepspeed_finetune_demo/evaluate/mmlu/eval_mmlu.py new file mode 100644 index 000000000..1616bfe6f --- /dev/null +++ b/training/deepspeed_finetune_demo/evaluate/mmlu/eval_mmlu.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: Apache-2.0 +"""Evaluate MMLU accuracy from generated samples.jsonl. + +Usage: + python evaluate/mmlu/eval_mmlu.py --samples eval_results/mmlu_baseline/samples.jsonl +""" + +import argparse +import json + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--samples", type=str, required=True, help="Path to samples.jsonl") + args = parser.parse_args() + + total = 0 + correct = 0 + empty = 0 + + with open(args.samples) as f: + for line in f: + sample = json.loads(line) + total += 1 + predicted = sample.get("predicted_answer", "") + gold = sample["correct_answer"] + if not predicted: + empty += 1 + elif predicted == gold: + correct += 1 + + accuracy = correct / total * 100 if total > 0 else 0 + print(f"MMLU Results:") + print(f" Total: {total}") + print(f" Correct: {correct}") + print(f" Empty: {empty}") + print(f" Accuracy: {accuracy:.2f}%") + print(f" pass@1: {accuracy:.2f}") + + +if __name__ == "__main__": + main() diff --git a/training/deepspeed_finetune_demo/evaluate/mmlu/gen_mmlu.py b/training/deepspeed_finetune_demo/evaluate/mmlu/gen_mmlu.py new file mode 100644 index 000000000..92ec783f6 --- /dev/null +++ b/training/deepspeed_finetune_demo/evaluate/mmlu/gen_mmlu.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: Apache-2.0 +"""Generate MMLU completions using vLLM (multi-GPU tensor parallel). +Outputs JSONL with task_id, question, choices, correct_answer, and predicted_answer. + +Usage: + python evaluate/mmlu/gen_mmlu.py --model moonshotai/Moonlight-16B-A3B --output eval_results/mmlu_baseline --tp 8 +""" + +import argparse +import json +import os + +os.environ.setdefault("VLLM_DISABLE_CUSTOM_ALL_REDUCE", "1") +os.environ.setdefault("VLLM_WORKER_MULTIPROC_METHOD", "spawn") + +from datasets import load_dataset +from vllm import LLM, SamplingParams + +LABELS = "ABCDEFGHIJ" + + +def format_mmlu_prompt(question, choices): + prompt = f"### Instruction:\n{question}\n" + for i, choice in enumerate(choices): + prompt += f"{LABELS[i]}. {choice}\n" + prompt += "\nAnswer with the letter of the correct choice.\n\n### Response:\n" + return prompt + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--model", type=str, required=True) + parser.add_argument("--output", type=str, required=True) + parser.add_argument("--subset", type=str, default="all", help="MMLU subset name") + parser.add_argument("--split", type=str, default="test", help="MMLU split to evaluate") + parser.add_argument("--tp", type=int, default=8, help="Tensor parallel size") + parser.add_argument("--max_new_tokens", type=int, default=10) + parser.add_argument("--temperature", type=float, default=0.0) + parser.add_argument("--n_samples", type=int, default=1) + args = parser.parse_args() + + os.makedirs(args.output, exist_ok=True) + out_path = os.path.join(args.output, "samples.jsonl") + + print(f"Loading model: {args.model} (tp={args.tp})") + llm = LLM( + model=args.model, + tensor_parallel_size=args.tp, + dtype="bfloat16", + trust_remote_code=True, + max_model_len=2048, + gpu_memory_utilization=0.90, + enforce_eager=True, + disable_custom_all_reduce=True, + ) + + sampling_params = SamplingParams( + temperature=0, + max_tokens=args.max_new_tokens, + ) + + print(f"Loading MMLU dataset (subset={args.subset}, split={args.split})") + dataset = load_dataset("cais/mmlu", args.subset, split=args.split, trust_remote_code=True) + print(f"Loaded {len(dataset)} examples") + + prompts = [] + meta = [] + for i, example in enumerate(dataset): + question = example["question"] + choices = example["choices"] + answer = example["answer"] + if isinstance(answer, int): + answer = LABELS[answer] + prompt = format_mmlu_prompt(question, choices) + for _ in range(args.n_samples): + prompts.append(prompt) + meta.append({ + "task_id": i, + "question": question, + "choices": choices, + "correct_answer": answer, + }) + + print(f"Generating {len(prompts)} completions") + outputs = llm.generate(prompts, sampling_params, use_tqdm=True) + + with open(out_path, "w") as f_out: + for m, output in zip(meta, outputs): + completion = output.outputs[0].text.strip() + predicted = "" + for ch in completion: + if ch.upper() in LABELS[: len(m["choices"])]: + predicted = ch.upper() + break + sample = { + "task_id": m["task_id"], + "question": m["question"], + "choices": m["choices"], + "correct_answer": m["correct_answer"], + "predicted_answer": predicted, + "raw_completion": completion, + } + f_out.write(json.dumps(sample) + "\n") + + print(f"Done. {len(outputs)} samples in {out_path}") + + +if __name__ == "__main__": + main() diff --git a/training/deepspeed_finetune_demo/finetune.sh b/training/deepspeed_finetune_demo/finetune.sh new file mode 100755 index 000000000..6ac744ae3 --- /dev/null +++ b/training/deepspeed_finetune_demo/finetune.sh @@ -0,0 +1,6 @@ +NUM="${1:-2}" +MODEL="${2:-Qwen/Qwen2.5-0.5B}" +DATASET="${5:-tatsu-lab/alpaca}" +CONFIG="${3:-configs/z2_config.json}" +BATCH="${4:-8}" +deepspeed --num_gpus=$NUM --bind_cores_to_rank finetune_llama.py --model_name $MODEL --output_dir output --batch_size $BATCH --deepspeed_config $CONFIG --num_train_epochs 1 --eval_steps 100 --wandb_name $CONFIG-$BATCH --dataset_name $DATASET diff --git a/training/deepspeed_finetune_demo/finetune_llama.py b/training/deepspeed_finetune_demo/finetune_llama.py new file mode 100644 index 000000000..e3e826320 --- /dev/null +++ b/training/deepspeed_finetune_demo/finetune_llama.py @@ -0,0 +1,529 @@ +import torch +import time +import deepspeed +import argparse +from datasets import load_dataset +from torch.utils.data import DataLoader, DistributedSampler +from transformers import AutoModelForCausalLM, AutoTokenizer, default_data_collator +from transformers.integrations.deepspeed import HfDeepSpeedConfig +import json +import random +import numpy as np +from deepspeed import comm as dist +import logging + +import os + +os.environ["TOKENIZERS_PARALLELISM"] = "false" + +import wandb + + +def set_seed(seed): + random.seed(seed) + np.random.seed(seed) + torch.manual_seed(seed) + if torch.cuda.is_available(): + torch.cuda.manual_seed_all(seed) + + +DATASET_REGISTRY = { + "sahil2801/CodeAlpaca-20k": { + "split": "train", + "preprocessor": "alpaca", + "field_map": { + "instruction": "instruction", + "input": "input", + "output": "output", + }, + }, + "meta-math/MetaMathQA": { + "split": "train", + "preprocessor": "alpaca", + "field_map": { + "instruction": "query", + "input": None, + "output": "response", + }, + "sample_rate": 0.1, + }, + "cais/mmlu": { + "subset": "all", + "split": "auxiliary_train", + "preprocessor": "mmlu", + "field_map": None, + }, +} + + +def load_and_prepare_dataset(dataset_name): + if dataset_name not in DATASET_REGISTRY: + raise ValueError( + f"Dataset '{dataset_name}' not in DATASET_REGISTRY. " + f"Available: {list(DATASET_REGISTRY.keys())}" + ) + config = DATASET_REGISTRY[dataset_name] + + load_kwargs = {"path": dataset_name} + if config.get("subset"): + load_kwargs["name"] = config["subset"] + dataset = load_dataset(**load_kwargs) + raw_dataset = dataset[config["split"]] + + sample_rate = config.get("sample_rate") + if sample_rate is not None and sample_rate < 1.0: + n = len(raw_dataset) + keep = max(1, int(n * sample_rate)) + raw_dataset = raw_dataset.shuffle(seed=42).select(range(keep)) + print(f"Downsampled {dataset_name}: {n} -> {keep} (rate={sample_rate})") + + field_map = config.get("field_map") + preprocessor = config["preprocessor"] + + if preprocessor == "alpaca" and field_map: + rename_map = {} + for target_field, source_field in field_map.items(): + if source_field is not None and source_field != target_field: + rename_map[source_field] = target_field + if rename_map: + raw_dataset = raw_dataset.rename_columns(rename_map) + + if preprocessor == "alpaca" and "input" not in raw_dataset.column_names: + raw_dataset = raw_dataset.add_column("input", [""] * len(raw_dataset)) + + return raw_dataset, preprocessor + + +def preprocess_alpaca(example, tokenizer, max_length=2048): + # Build instruction part (will be masked from loss) + instruction = f"### Instruction:\n{example['instruction']}\n\n" + if example.get("input", ""): + instruction += f"### Input:\n{example['input']}\n\n" + instruction += "### Response:\n" + response = example["output"] + + full_prompt = instruction + response + tokenized = tokenizer( + full_prompt, truncation=True, max_length=max_length, padding="max_length" + ) + + # Find instruction length to mask it from loss + # Use full_prompt tokenization to get accurate instruction boundary after truncation + instruction_ids = tokenizer(instruction, add_special_tokens=False)["input_ids"] + instruction_len = len(instruction_ids) + + # Ensure at least one token is unmasked to avoid NaN loss + # If instruction is longer than max_length, only mask padding tokens + seq_len = sum(1 for t in tokenized["input_ids"] if t != tokenizer.pad_token_id) + if instruction_len >= seq_len: + instruction_len = max(0, seq_len - 1) # Keep at least the last non-pad token + + # Mask instruction and padding tokens in labels (set to -100, ignored by CrossEntropyLoss) + labels = tokenized["input_ids"].copy() + for i in range(len(labels)): + if i < instruction_len or labels[i] == tokenizer.pad_token_id: + labels[i] = -100 + tokenized["labels"] = labels + return tokenized + + +def preprocess_magicoder(example, tokenizer, max_length=2048): + # Magicoder uses 'problem' / 'solution' fields + instruction = f"### Instruction:\n{example['problem']}\n\n### Response:\n" + response = example["solution"] + + full_prompt = instruction + response + tokenized = tokenizer( + full_prompt, truncation=True, max_length=max_length, padding="max_length" + ) + + instruction_ids = tokenizer(instruction, add_special_tokens=False)["input_ids"] + instruction_len = len(instruction_ids) + + seq_len = sum(1 for t in tokenized["input_ids"] if t != tokenizer.pad_token_id) + if instruction_len >= seq_len: + instruction_len = max(0, seq_len - 1) + + labels = tokenized["input_ids"].copy() + for i in range(len(labels)): + if i < instruction_len or labels[i] == tokenizer.pad_token_id: + labels[i] = -100 + tokenized["labels"] = labels + return tokenized + + +def preprocess_mmlu(example, tokenizer, max_length=2048): + choices = example["choices"] + labels = "ABCDEFGHIJ" + instruction = f"### Instruction:\n{example['question']}\n" + for i, choice in enumerate(choices): + instruction += f"{labels[i]}. {choice}\n" + instruction += "\nAnswer with the letter of the correct choice.\n\n### Response:\n" + answer_letter = example["answer"] + if isinstance(answer_letter, int): + answer_letter = labels[answer_letter] + response = answer_letter + + full_prompt = instruction + response + tokenized = tokenizer( + full_prompt, truncation=True, max_length=max_length, padding="max_length" + ) + + instruction_ids = tokenizer(instruction, add_special_tokens=False)["input_ids"] + instruction_len = len(instruction_ids) + + seq_len = sum(1 for t in tokenized["input_ids"] if t != tokenizer.pad_token_id) + if instruction_len >= seq_len: + instruction_len = max(0, seq_len - 1) + + labels_out = tokenized["input_ids"].copy() + for i in range(len(labels_out)): + if i < instruction_len or labels_out[i] == tokenizer.pad_token_id: + labels_out[i] = -100 + tokenized["labels"] = labels_out + return tokenized + + +PREPROCESSORS = { + "alpaca": preprocess_alpaca, + "magicoder": preprocess_magicoder, + "mmlu": preprocess_mmlu, +} + + +def evaluate(model_engine, eval_dataloader): + import torch + from tqdm import tqdm + from deepspeed import comm as dist + + model_engine.eval() + torch.cuda.empty_cache() + losses = [] + rank = dist.get_rank() if dist.is_initialized() else 0 + + with torch.no_grad(): + if rank == 0: + enum = tqdm(eval_dataloader, desc="Evaluating", leave=False) + else: + enum = eval_dataloader + for batch in enum: + batch = {k: v.to(model_engine.device) for k, v in batch.items()} + outputs = model_engine(**batch) + loss = outputs.loss + losses.append(loss.item()) + del outputs + model_engine.train() + + if len(losses) == 0: + return None + avg_loss = sum(losses) / len(losses) + return avg_loss + +def print_r(rank, arg): + if rank == dist.get_rank(): + print(arg) + + +def _save_weights(model_engine, tokenizer, output_dir, step, keep_last=2): + """Save model weights for the given step; remove old checkpoints beyond keep_last.""" + import shutil + rank = dist.get_rank() + ckpt_dir = os.path.join(output_dir, f"step_{step}") + rank_dir = os.path.join(ckpt_dir, str(rank)) + os.makedirs(rank_dir, exist_ok=True) + state_dict = model_engine.module.state_dict() + if rank == 0: + torch.save(state_dict, os.path.join(rank_dir, "model_weights.pt")) + tokenizer.save_pretrained(rank_dir) + else: + expert_dict = {k: v for k, v in state_dict.items() if v.ndim == 3} + torch.save(expert_dict, os.path.join(rank_dir, "model_weights.pt")) + dist.barrier() + # Remove old checkpoints beyond keep_last (rank 0 only to avoid races) + if rank == 0: + ckpts = sorted( + [d for d in os.listdir(output_dir) if d.startswith("step_")], + key=lambda d: int(d.split("_")[1]), + ) + for old_ckpt in ckpts[:-keep_last]: + shutil.rmtree(os.path.join(output_dir, old_ckpt), ignore_errors=True) + dist.barrier() + print_r(0, f"Saved checkpoint to {ckpt_dir}") + + +def main(args): + logging.basicConfig(level=logging.INFO, filename="pytorch_log.txt") + set_seed(args.seed) + + # override batch size in ds_config + with open(args.deepspeed_config, "r") as f: + ds_config = json.load(f) + ds_config["train_batch_size"] = args.batch_size + delattr(args, "deepspeed_config") + # make sure models are properly loaded in zero3 + dschf = HfDeepSpeedConfig(ds_config) + + tokenizer = AutoTokenizer.from_pretrained(args.model_name, trust_remote_code=True) + if tokenizer.pad_token is None: + tokenizer.pad_token = tokenizer.eos_token + + try: + import flash_attn + attn_impl = "flash_attention_2" + except ImportError: + attn_impl = None + model = AutoModelForCausalLM.from_pretrained( + args.model_name, + torch_dtype=torch.bfloat16, + trust_remote_code=True, + attn_implementation=attn_impl, + ) + model.config.use_cache = False + model.gradient_checkpointing_enable() + + """ + # the code below allows you to train only part of the parameters + # we haven't parameterize this part yet, so uncomment down below and modify the code manually + + # Freeze all parameters except gate parameters BEFORE DeepSpeed initialization + # This needs to be done before passing to DeepSpeed + for name, param in model.named_parameters(): + if 'gate' in name.lower() and not 'gate_proj' in name.lower(): + param.requires_grad = True + print(f"Unfrozen parameter: {name}") + else: + param.requires_grad = False + + # Enable input gradient requirements to ensure gradient flow + # This is needed when using gradient checkpointing with partially frozen models + model.enable_input_require_grads() + """ + + # Load dataset and split into train/eval + if args.dataset_name in DATASET_REGISTRY: + raw_dataset, preprocessor_name = load_and_prepare_dataset(args.dataset_name) + preprocess_fn = PREPROCESSORS[preprocessor_name] + else: + dataset = load_dataset(args.dataset_name) + raw_dataset = dataset["train"] + if "problem" in raw_dataset.column_names: + preprocessor_name = "magicoder" + else: + preprocessor_name = "alpaca" + preprocess_fn = PREPROCESSORS[preprocessor_name] + + split_dataset = raw_dataset.train_test_split(test_size=0.05, seed=args.seed) + train_dataset = split_dataset["train"] + eval_dataset = split_dataset["test"] + + keep_cols = {"input_ids", "attention_mask", "labels"} + tokenized_train_dataset = train_dataset.map( + lambda x: preprocess_fn(x, tokenizer, max_length=args.max_length), + batched=False, + remove_columns=[c for c in train_dataset.column_names if c not in keep_cols], + ) + tokenized_eval_dataset = eval_dataset.map( + lambda x: preprocess_fn(x, tokenizer, max_length=args.max_length), + batched=False, + remove_columns=[c for c in eval_dataset.column_names if c not in keep_cols], + ) + + eval_dataloader = DataLoader( + tokenized_eval_dataset, + batch_size=args.eval_batch_size, + collate_fn=default_data_collator, + shuffle=False, + drop_last=True, + ) + + model_engine, optimizer, train_dataloader, lr_scheduler = deepspeed.initialize( + args=args, + model=model, + model_parameters=model.parameters(), + training_data=tokenized_train_dataset, + collate_fn=default_data_collator, + config=ds_config, + ) + + train_sampler = DistributedSampler( + tokenized_train_dataset, + shuffle=True, + seed=args.seed, + ) + per_device_batch = model_engine.train_micro_batch_size_per_gpu() + train_dataloader = DataLoader( + tokenized_train_dataset, + batch_size=per_device_batch, + sampler=train_sampler, + collate_fn=default_data_collator, + drop_last=True, + ) + + model_engine.train() + global_step = 0 + total_time = 0 + total_count = 0 + + # skip unnecessary evaluation and checkpoint saving + save_checkpoint_p = True + if args.bench_start >= 0 and args.bench_steps > 0: + save_checkpoint_p = False + if args.profile_start >= 0: + save_checkpoint_p = False + + if args.profile_start >= 0: + prof = torch.profiler.profile( + activities=[ + torch.profiler.ProfilerActivity.CPU, + torch.profiler.ProfilerActivity.CUDA, + ], + record_shapes=True, + profile_memory=True, + ) + else: + prof = None + + # setup logging + if args.wandb_name != None and dist.get_rank() == 0: + wandb.init(project="deepspeed_finetune_demo", name=args.wandb_name) + + global_samples = 0 + for epoch in range(args.num_train_epochs): + print_r(0, f"Starting epoch {epoch + 1}/{args.num_train_epochs}") + train_dataloader.sampler.set_epoch(epoch) + + for step, batch in enumerate(train_dataloader): + if prof != None and global_step == args.profile_start: + prof.start() + if prof != None and global_step - args.profile_start == args.profile_steps: + prof.stop() + # print profile + if dist.get_rank() == 0: + prof.export_chrome_trace("trace.json") + print( + prof.key_averages().table( + sort_by="self_cuda_time_total", row_limit=10 + ) + ) + step_start_time = time.time() + batch = {k: v.to(model_engine.device) for k, v in batch.items()} + outputs = model_engine(**batch) + loss = outputs.loss + + model_engine.backward(loss) + model_engine.step() + global_samples += model_engine.train_batch_size() + + step_time = time.time() - step_start_time + if args.bench_start >= 0 and args.bench_steps > 0: + if global_step >= args.bench_start: + total_time += step_time + total_count += 1 + if global_step >= args.bench_start + args.bench_steps - 1: + break + + if dist.get_rank() == 0 and args.wandb_name is not None: + wandb.log({"global_samples": global_samples, "train-loss": loss}) + if global_step % 1 == 0: # Print every step + msg = f"Step {global_step}, Loss: {loss.item():.4f}, Time: {step_time * 1000:.0f}ms" + print_r(0, msg) + if dist.get_rank() == 0: + logging.info(msg) + + # Evaluation after every eval_steps + if ( + args.eval_steps > 0 + and global_step % args.eval_steps == 0 + and save_checkpoint_p + ): + eval_loss = evaluate(model_engine, eval_dataloader) + if dist.get_rank() == 0: + if eval_loss is not None: + eval_loss_val = float(eval_loss) + if args.wandb_name != None: + wandb.log( + { + "global_samples": global_samples, + "eval-loss": eval_loss_val, + } + ) + eval_msg = f"[Eval @ step {global_step}] Eval Loss: {eval_loss_val:.4f}" + print(eval_msg, flush=True) + logging.info(eval_msg) + else: + eval_msg = f"[Eval @ step {global_step}] Eval Loss unavailable (no eval batches processed)" + print(eval_msg, flush=True) + logging.info(eval_msg) + if ( + args.checkpoint_steps > 0 + and global_step > 0 + and global_step % args.checkpoint_steps == 0 + and save_checkpoint_p + ): + _save_weights(model_engine, tokenizer, args.output_dir, global_step) + global_step += 1 + if prof != None: + prof.step() + if args.max_steps > 0 and global_step >= args.max_steps: + break + + if args.bench_start >= 0 and args.bench_steps > 0: + if global_step >= args.bench_start + args.bench_steps - 1: + break + if args.max_steps > 0 and global_step >= args.max_steps: + break + + if args.bench_start >= 0 and args.bench_steps > 0: + print_r(0, f"Average iteration time = {total_time / total_count}") + + if save_checkpoint_p: + _save_weights(model_engine, tokenizer, args.output_dir, global_step) + + print_r(0, "Training complete!") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--model_name", type=str, required=True) + parser.add_argument("--dataset_name", type=str, default="tatsu-lab/alpaca") + parser.add_argument( + "--local_rank", + type=int, + default=-1, + help="local rank passed from distributed launcher", + ) + + parser.add_argument("--batch_size", type=int, required=True) + parser.add_argument("--profile_start", type=int, default=-1) + parser.add_argument("--profile_steps", type=int, default=4) + parser.add_argument("--weight_decay", type=float, default=0.01) + parser.add_argument("--warmup", type=float, default=0.01) + parser.add_argument("--num_train_epochs", type=int, default=3) + parser.add_argument("--output_dir", type=str, required=True) + parser.add_argument("--seed", type=int, default=42) + parser.add_argument("--bench_start", type=int, default=-1) + parser.add_argument("--bench_steps", type=int, default=100) + parser.add_argument( + "--eval_steps", + type=int, + default=0, + help="Run evaluation every N steps (0 disables)", + ) + parser.add_argument( + "--max_length", type=int, default=2048, help="Max sequence length" + ) + parser.add_argument("--wandb_name", type=str, default=None) + parser.add_argument( + "--max_steps", type=int, default=-1, help="Stop after N steps (-1 = full epoch)" + ) + parser.add_argument( + "--checkpoint_steps", type=int, default=0, + help="Save a checkpoint every N steps (0 disables); keeps last 2", + ) + parser.add_argument( + "--eval_batch_size", type=int, default=4, help="Eval batch size per rank" + ) + parser = deepspeed.add_config_arguments(parser) + args = parser.parse_args() + + main(args) diff --git a/training/deepspeed_finetune_demo/profile.sh b/training/deepspeed_finetune_demo/profile.sh new file mode 100755 index 000000000..091d0e10d --- /dev/null +++ b/training/deepspeed_finetune_demo/profile.sh @@ -0,0 +1,5 @@ +NUM="${1:-2}" +MODEL="${2:-Qwen/Qwen2.5-0.5B}" +CONFIG="${3:-configs/z2_config.json}" +deepspeed --num_gpus=$NUM --bind_cores_to_rank finetune_llama.py --model_name $MODEL --output_dir output --batch_size 8 --deepspeed_config $CONFIG --num_train_epochs 1 --bench_start 32 --profile_start 10 --profile_steps 20 + diff --git a/training/deepspeed_finetune_demo/requirements.txt b/training/deepspeed_finetune_demo/requirements.txt new file mode 100644 index 000000000..02794f4da --- /dev/null +++ b/training/deepspeed_finetune_demo/requirements.txt @@ -0,0 +1,9 @@ +torch>=2.9.1 +deepspeed>=0.16.0 +datasets>=4.5.0 +transformers>=4.57.3 +numpy>=1.21.0 +evalplus>=0.3.1 +flash-attn>=2.8.3 +tiktoken +sentencepiece diff --git a/training/deepspeed_finetune_demo/run_and_evaluate.sh b/training/deepspeed_finetune_demo/run_and_evaluate.sh new file mode 100755 index 000000000..ca1967be9 --- /dev/null +++ b/training/deepspeed_finetune_demo/run_and_evaluate.sh @@ -0,0 +1,188 @@ +#!/bin/bash +# Fine-tune + evaluate: supports MBPP, MMLU, and GSM8K benchmarks +# Usage: bash run_and_evaluate.sh [model_name] [ds_config] [eval_steps] [wandb_name] +# +# Environment variables: +# BENCHMARK - one of: mbpp, mmlu, gsm8k (default: mbpp) +# DATASET - HuggingFace dataset name (default: auto-selected per benchmark) +# TP - tensor parallel size (default: 8) +# SKIP_TRAIN - set to 1 to skip training and go straight to eval +# +# Examples: +# bash run_and_evaluate.sh moonshotai/Moonlight-16B-A3B configs/z2_config.json 100 my_run +# BENCHMARK=mmlu bash run_and_evaluate.sh moonshotai/Moonlight-16B-A3B configs/z2_config.json 100 +# BENCHMARK=gsm8k bash run_and_evaluate.sh moonshotai/Moonlight-16B-A3B configs/z2_config.json 100 +set -euo pipefail + +MODEL=${1:-moonshotai/Moonlight-16B-A3B} +DS_CONFIG=${2:-configs/z2_config.json} +EVAL_STEPS=${3:-100} +WANDB_NAME=${4:-moonlight_finetune} +TP=${TP:-8} +BENCHMARK=${BENCHMARK:-mbpp} + +case "$BENCHMARK" in + mbpp) + DATASET=${DATASET:-sahil2801/CodeAlpaca-20k} + ;; + mmlu) + DATASET=${DATASET:-cais/mmlu} + ;; + gsm8k) + DATASET=${DATASET:-meta-math/MetaMathQA} + ;; + *) + echo "ERROR: Unknown BENCHMARK '$BENCHMARK'. Use one of: mbpp, mmlu, gsm8k" + exit 1 + ;; +esac + +# Derive a safe directory name from model (replace / with _) +MODEL_SLUG=$(echo "$MODEL" | tr '/' '_') +CONFIG_SLUG=$(basename "$DS_CONFIG" .json) +DATASET_SLUG=$(echo "$DATASET" | tr '/' '_' | tr '[:upper:]' '[:lower:]') + +export PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True +export VLLM_DISABLE_CUSTOM_ALL_REDUCE=1 +export VLLM_WORKER_MULTIPROC_METHOD=spawn +PYTHON=${PYTHON:-$(which python3)} +WORKDIR=$(cd "$(dirname "$0")" && pwd) +LOGDIR=$WORKDIR/experiment_logs +OUTPUT_DIR=$WORKDIR/output_${BENCHMARK}_${MODEL_SLUG}_${CONFIG_SLUG} +HF_DIR=$WORKDIR/hf_model_${BENCHMARK}_${MODEL_SLUG}_${CONFIG_SLUG} +BASELINE_DIR=$WORKDIR/eval_results/${BENCHMARK}_baseline_${MODEL_SLUG} +EVAL_DIR=$WORKDIR/eval_results/${BENCHMARK}_${MODEL_SLUG}_${CONFIG_SLUG} + +mkdir -p $LOGDIR $BASELINE_DIR $EVAL_DIR + +cd $WORKDIR + +# ---------- helper: run evaluation by benchmark type ---------- +run_eval() { + local model_path=$1 + local output_dir=$2 + local log_tag=$3 + local gen_log=$LOGDIR/${log_tag}_gen.log + local eval_log=$LOGDIR/${log_tag}_eval.log + + rm -rf "$output_dir" + + case "$BENCHMARK" in + mbpp) + $PYTHON evaluate/humaneval/gen_vllm.py \ + --model "$model_path" \ + --output "$output_dir" \ + --dataset mbpp \ + --tp "$TP" \ + --instruction \ + 2>&1 | tee "$gen_log" + + $PYTHON -m evalplus.evaluate \ + --dataset mbpp \ + --samples "$output_dir/samples.jsonl" \ + 2>&1 | tee "$eval_log" + ;; + + mmlu) + $PYTHON evaluate/mmlu/gen_mmlu.py \ + --model "$model_path" \ + --output "$output_dir" \ + --tp "$TP" \ + 2>&1 | tee "$gen_log" + + $PYTHON evaluate/mmlu/eval_mmlu.py \ + --samples "$output_dir/samples.jsonl" \ + 2>&1 | tee "$eval_log" + ;; + + gsm8k) + $PYTHON evaluate/gsm8k/gen_gsm8k.py \ + --model "$model_path" \ + --output "$output_dir" \ + --tp "$TP" \ + 2>&1 | tee "$gen_log" + + $PYTHON evaluate/gsm8k/eval_gsm8k.py \ + --samples "$output_dir/samples.jsonl" \ + 2>&1 | tee "$eval_log" + ;; + esac +} + +# ---------- STEP 0: BASELINE EVALUATION ---------- +echo "===== STEP 0: BASELINE EVALUATION (pre-finetune) =====" +echo "Model: ${MODEL}" +echo "Config: ${DS_CONFIG}" +echo "Dataset: ${DATASET}" +echo "Benchmark: ${BENCHMARK}" +echo "Eval steps: ${EVAL_STEPS}" +echo "W&B name: ${WANDB_NAME:-}" +echo "TP: ${TP}" +echo "Start: $(date)" + +run_eval "$MODEL" "$BASELINE_DIR" "baseline_${BENCHMARK}_${MODEL_SLUG}" +echo "Baseline eval done: $(date)" + +if [ "${SKIP_TRAIN:-0}" = "1" ]; then + echo "SKIP_TRAIN=1: skipping training and convert, jumping to post-finetune eval" +else + +# ---------- STEP 1: TRAINING ---------- +echo "===== STEP 1: TRAINING =====" +echo "Start: $(date)" +deepspeed --num_gpus=8 finetune_llama.py \ + --model_name $MODEL \ + --output_dir $OUTPUT_DIR \ + --batch_size 16 --max_length 512 \ + --deepspeed_config $DS_CONFIG \ + --dataset_name $DATASET \ + --num_train_epochs 1 \ + --eval_steps $EVAL_STEPS \ + ${WANDB_NAME:+--wandb_name "$WANDB_NAME"} \ + 2>&1 | tee $LOGDIR/${BENCHMARK}_${MODEL_SLUG}_${CONFIG_SLUG}_train.log +echo "Training done: $(date)" + +# ---------- STEP 2: CONVERT ---------- +CKPT_DIR=$(ls -d $OUTPUT_DIR/step_* 2>/dev/null | sort -t_ -k2 -n | tail -1) +if [ -z "$CKPT_DIR" ]; then + CKPT_DIR=$OUTPUT_DIR +fi +echo "Using checkpoint: $CKPT_DIR" +$PYTHON convert_ds_to_hf.py \ + --ds_checkpoint $CKPT_DIR \ + --original_model $MODEL \ + --output_dir $HF_DIR \ + --ep_size 8 \ + 2>&1 | tee $LOGDIR/${BENCHMARK}_${MODEL_SLUG}_${CONFIG_SLUG}_convert.log +echo "Convert done: $(date)" + +# Verify .py files were copied +if [ ! -f "$HF_DIR/modeling_deepseek.py" ]; then + echo "WARNING: modeling_deepseek.py not found in $HF_DIR" +else + echo "Verified: custom code files present in HF model dir" +fi + +# Delete DS checkpoint to save disk +echo "Removing DS checkpoint to save disk..." +rm -rf $OUTPUT_DIR +echo "DS checkpoint removed" + +fi # end SKIP_TRAIN + +# ---------- STEP 3: GENERATE + EVALUATE (post-finetune) ---------- +echo "===== STEP 3: GENERATE + EVALUATE (post-finetune) =====" +echo "Start: $(date)" +run_eval "$HF_DIR" "$EVAL_DIR" "${BENCHMARK}_${MODEL_SLUG}_${CONFIG_SLUG}" +echo "Evaluate done: $(date)" + +# ---------- RESULTS SUMMARY ---------- +echo "===== ALL DONE =====" +echo "" +echo "========== RESULTS SUMMARY ==========" +echo "--- Baseline (pre-finetune) ---" +grep -E "pass@1|Base|Plus|Accuracy" $LOGDIR/baseline_${BENCHMARK}_${MODEL_SLUG}_eval.log || true +echo "" +echo "--- Finetuned (post-finetune) ---" +grep -E "pass@1|Base|Plus|Accuracy" $LOGDIR/${BENCHMARK}_${MODEL_SLUG}_${CONFIG_SLUG}_eval.log || true +echo "====================================="