Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions qdp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ members = [
"qdp-core",
"qdp-kernels",
"qdp-python",
"qdp-runtime",
]
resolver = "2"

Expand Down
111 changes: 111 additions & 0 deletions qdp/docs/runtime/RUNTIME_V1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# QDP Runtime v1

## Summary

`qdp-runtime` is the v1 control-plane and execution skeleton for state-partitioned
distributed execution on top of Mahout QDP.

The current v1 implementation focuses on:

- state-partitioned metadata
- worker registration and device inventory
- weighted and topology-aware placement
- partition task generation and lifecycle
- in-process execution loops
- gather and metric-reduction planning
- a minimal runtime object/output model

This is intentionally a minimal v1. It is not yet a full multi-node transport
layer or a persistent GPU object store.

## Current Object and Output Model

Runtime outputs are tracked as `RuntimeObjectRecord`s inside the coordinator.

Current object kinds:

- `EncodedPartition`
- `ReducedMetric`

Each runtime object records:

- `object_id`
- `job_id`
- `partition_id`
- `kind`
- `location`
- `handle`
- `ready`

In v1, the object model is metadata-first. It gives the runtime a stable way to
track outputs across task completion, gather planning, and reduce planning,
without requiring a full persistent object store yet.

## Manual End-to-End Path

There are two example entry points in `qdp-runtime/examples`:

- `local_runtime_smoke.rs`
- `local_runtime_benchmark.rs`

### Smoke Example

This runs:

1. worker registration
2. job planning
3. in-process task execution
4. object registration
5. gather plan construction

Run it with:

```bash
cd qdp
cargo run -p qdp-runtime --example local_runtime_smoke
```

### Minimal Benchmark

This prints basic timings for:

- planning
- execution
- partition/object counts

Run it with:

```bash
cd qdp
cargo run -p qdp-runtime --example local_runtime_benchmark
```

## Optional Local QDP Integration

When built with the `local-executor` feature, `qdp-runtime` can use
`LocalEncodeWorkerExecutor` to call the real `qdp-core` encode path from the
runtime.

Example command:

```bash
cd qdp
cargo test -p qdp-runtime --features local-executor
```

## What v1 Supports

- `PartitionLocalConsume` metadata
- `GatherFullState` planning
- `ReduceMetrics` planning
- retry and lease timeout skeleton
- NVLink-aware placement hints

## What v1 Does Not Yet Support

- cross-node transport
- persistent GPU object store
- partition migration
- dynamic repartitioning
- full collective communication
- full benchmark suite integration
17 changes: 17 additions & 0 deletions qdp/qdp-runtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "qdp-runtime"
version.workspace = true
edition.workspace = true

[dependencies]
qdp-core = { path = "../qdp-core" }
thiserror = { workspace = true }
nvtx = { version = "1.3", optional = true }

[features]
default = []
observability = ["nvtx"]
local-executor = []

[lib]
name = "qdp_runtime"
64 changes: 64 additions & 0 deletions qdp/qdp-runtime/examples/local_runtime_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use std::time::Instant;

use qdp_runtime::{
ConsumptionMode, Coordinator, DType, DeviceCapabilities, HostPlatform, InProcessWorker,
PlacementPolicy, RuntimeJobSpec, WorkerRegistration,
};

fn worker(worker_id: &str, node_id: &str, device_id: usize) -> InProcessWorker {
InProcessWorker::new(WorkerRegistration {
worker_id: worker_id.to_string(),
node_id: node_id.to_string(),
devices: vec![DeviceCapabilities {
node_id: node_id.to_string(),
device_id,
device_name: format!("mock-gpu-{}", device_id),
total_memory_bytes: 48 * 1024 * 1024 * 1024,
free_memory_bytes: 40 * 1024 * 1024 * 1024,
max_safe_allocation_bytes: 40 * 1024 * 1024 * 1024,
measured_encode_samples_per_sec: Some(3000.0),
host_platform: HostPlatform::Linux,
stability_factor: 1.0,
peer_links: Vec::new(),
}],
})
.expect("valid worker registration")
}

fn main() {
let worker_a = worker("worker-a", "node-a", 0);
let worker_b = worker("worker-b", "node-b", 0);

let mut coordinator = Coordinator::new();
coordinator
.register_worker(&worker_a)
.expect("register worker-a");
coordinator
.register_worker(&worker_b)
.expect("register worker-b");

let plan_start = Instant::now();
coordinator
.plan_job(RuntimeJobSpec {
job_id: "bench-job".to_string(),
state_id: "bench-state".to_string(),
global_qubits: 16,
dtype: DType::Complex64,
consumption_mode: ConsumptionMode::GatherFullState,
placement_policy: PlacementPolicy::Weighted,
})
.expect("plan job");
let plan_elapsed = plan_start.elapsed();

let run_start = Instant::now();
let completed = coordinator
.run_job_with_workers("bench-job", &[worker_a, worker_b])
.expect("run job");
let run_elapsed = run_start.elapsed();

println!("job_status={:?}", completed.status);
println!("planning_ms={:.3}", plan_elapsed.as_secs_f64() * 1000.0);
println!("execution_ms={:.3}", run_elapsed.as_secs_f64() * 1000.0);
println!("objects={}", coordinator.objects_for_job("bench-job").len());
println!("partitions={}", completed.tasks.len());
}
72 changes: 72 additions & 0 deletions qdp/qdp-runtime/examples/local_runtime_smoke.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use qdp_runtime::{
ConsumptionMode, Coordinator, DType, DeviceCapabilities, GatherTarget, HostPlatform,
InProcessWorker, PlacementPolicy, RuntimeJobSpec, WorkerRegistration,
};

fn worker(worker_id: &str, node_id: &str, device_id: usize) -> InProcessWorker {
InProcessWorker::new(WorkerRegistration {
worker_id: worker_id.to_string(),
node_id: node_id.to_string(),
devices: vec![DeviceCapabilities {
node_id: node_id.to_string(),
device_id,
device_name: format!("mock-gpu-{}", device_id),
total_memory_bytes: 48 * 1024 * 1024 * 1024,
free_memory_bytes: 40 * 1024 * 1024 * 1024,
max_safe_allocation_bytes: 40 * 1024 * 1024 * 1024,
measured_encode_samples_per_sec: Some(3000.0),
host_platform: HostPlatform::Linux,
stability_factor: 1.0,
peer_links: Vec::new(),
}],
})
.expect("valid worker registration")
}

fn main() {
let worker_a = worker("worker-a", "node-a", 0);
let worker_b = worker("worker-b", "node-b", 0);

let mut coordinator = Coordinator::new();
coordinator
.register_worker(&worker_a)
.expect("register worker-a");
coordinator
.register_worker(&worker_b)
.expect("register worker-b");

coordinator
.plan_job(RuntimeJobSpec {
job_id: "smoke-job".to_string(),
state_id: "smoke-state".to_string(),
global_qubits: 5,
dtype: DType::Complex64,
consumption_mode: ConsumptionMode::GatherFullState,
placement_policy: PlacementPolicy::Weighted,
})
.expect("plan job");

let completed = coordinator
.run_job_with_workers("smoke-job", &[worker_a, worker_b])
.expect("run job");

println!("job_status={:?}", completed.status);
println!("partition_count={}", completed.state.layout.partition_count);
println!("objects={}", coordinator.objects_for_job("smoke-job").len());

let gather = coordinator
.build_gather_plan("smoke-job", GatherTarget::HostMemory)
.expect("gather plan");
println!("gather_segments={}", gather.segments.len());
for segment in gather.segments {
println!(
"partition={} source={}/{} offset={} len={} handle={}",
segment.partition_id,
segment.source_node_id,
segment.source_device_id,
segment.destination_offset_amplitudes,
segment.amplitude_len,
segment.source_storage_handle
);
}
}
Loading
Loading