Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@

## Unreleased

### Fixed (PR #313 follow-up)

- The adaptive parallel-policy experiment follow-ups so benchmark/report rows
now describe the plan that actually executed, adaptive planning reuses the
runtime shard-partitioning path instead of duplicating profiling logic, stale
adaptive Criterion directories are selected deterministically, malformed
adaptive benchmark directory names now fail loudly instead of disappearing
from baked reports, conflicting truthful adaptive rows are rejected before
export, and the experimental selector seam stays out of the public
`warp-core` kernel surface while the benchmark-facing adaptive routing
entrypoints remain concrete and deterministic.

### Fixed (PR #312 follow-up)

- The docs-surface reduction follow-ups so the collision tour no longer points
Expand Down
10 changes: 8 additions & 2 deletions crates/warp-benches/benches/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,16 @@ results. This README summarizes how to run them and read the output.
- static round-robin shard assignment + per-worker deltas
- static round-robin shard assignment + per-shard deltas
- dedicated one-worker-per-shard + one-delta-per-shard
- adaptive shard routing, which selects a fixed policy from workload shape
- Each case includes canonical delta merge after parallel execution, so the
study reflects full policy cost for the synthetic independent workload.
- The policy matrix runs across loads `100`, `1000`, and `10000`, with worker
counts `1`, `4`, and `8` where the policy uses a worker pool.
- The policy matrix runs across loads `100`, `1000`, and `10000`, with
concrete `1w`, `4w`, and `8w` executions for the fixed dynamic/static
policies.
- Adaptive rows carry the incoming worker hint and the fixed plan the
selector actually chose for that workload/hint pair, so the baked report
stays truthful when the heuristic collapses to `1w` or caps itself at
`4w`.
- Throughput “elements” = executed items in the synthetic independent workload.

## Run
Expand Down
133 changes: 96 additions & 37 deletions crates/warp-benches/benches/parallel_baseline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criteri
use std::collections::BTreeMap;
use std::num::NonZeroUsize;
use std::time::Duration;
use warp_core::parallel::{build_work_units, execute_work_queue, WorkerResult};
use warp_core::parallel::{
build_work_units, execute_parallel_with_adaptive_routing, execute_work_queue,
resolve_adaptive_shard_routing, WorkerResult,
};
use warp_core::{
execute_parallel, execute_parallel_with_policy, execute_serial, make_node_id, make_type_id,
make_warp_id, AtomPayload, AttachmentKey, AttachmentValue, ExecItem, GraphStore, GraphView,
Expand Down Expand Up @@ -62,16 +65,23 @@ fn merge_for_commit_path(deltas: Vec<TickDelta>) -> Vec<WarpOp> {
flat.into_iter().map(|(_, op)| op).collect()
}

fn make_test_nodes(n: usize) -> Vec<NodeId> {
let mut nodes = Vec::with_capacity(n);
for i in 0..n {
nodes.push(make_node_id(&format!("bench/n{i}")));
}

nodes
}

/// Create a test graph with N independent nodes.
fn make_test_store(n: usize) -> (GraphStore, Vec<NodeId>) {
let node_ty = make_type_id("bench/node");
let mut store = GraphStore::default();
let mut nodes = Vec::with_capacity(n);
let nodes = make_test_nodes(n);

for i in 0..n {
let id = make_node_id(&format!("bench/n{i}"));
for &id in &nodes {
store.insert_node(id, NodeRecord { ty: node_ty });
nodes.push(id);
}

(store, nodes)
Expand Down Expand Up @@ -317,6 +327,12 @@ fn bench_worker_scaling(c: &mut Criterion) {
// Policy matrix comparison
// =============================================================================

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum PolicyMatrixCase {
Fixed(ParallelExecutionPolicy),
Adaptive,
}

fn policy_label(policy: ParallelExecutionPolicy) -> &'static str {
match policy {
ParallelExecutionPolicy::DYNAMIC_PER_WORKER => "dynamic_per_worker",
Expand All @@ -332,6 +348,19 @@ fn worker_hint(workers: usize) -> NonZeroUsize {
NonZeroUsize::new(workers.max(1)).map_or(NonZeroUsize::MIN, |w| w)
}

fn adaptive_case_label(
hint: NonZeroUsize,
selected_policy: ParallelExecutionPolicy,
selected_workers: NonZeroUsize,
) -> String {
format!(
"adaptive_shard_routing__hint_{}w__selected_{}_{}w",
hint.get(),
policy_label(selected_policy),
selected_workers.get(),
)
}

/// Compares shard assignment and delta accumulation strategies directly.
///
/// This includes canonical delta merge after parallel execution so the
Expand All @@ -344,41 +373,24 @@ fn bench_policy_matrix(c: &mut Criterion) {
.measurement_time(Duration::from_secs(5))
.sample_size(40);

let policies = [
ParallelExecutionPolicy::DYNAMIC_PER_WORKER,
ParallelExecutionPolicy::DYNAMIC_PER_SHARD,
ParallelExecutionPolicy::STATIC_PER_WORKER,
ParallelExecutionPolicy::STATIC_PER_SHARD,
ParallelExecutionPolicy::DEDICATED_PER_SHARD,
let cases = [
PolicyMatrixCase::Fixed(ParallelExecutionPolicy::DYNAMIC_PER_WORKER),
PolicyMatrixCase::Fixed(ParallelExecutionPolicy::DYNAMIC_PER_SHARD),
PolicyMatrixCase::Fixed(ParallelExecutionPolicy::STATIC_PER_WORKER),
PolicyMatrixCase::Fixed(ParallelExecutionPolicy::STATIC_PER_SHARD),
PolicyMatrixCase::Fixed(ParallelExecutionPolicy::DEDICATED_PER_SHARD),
PolicyMatrixCase::Adaptive,
];

for &n in &[100usize, 1_000, 10_000] {
group.throughput(Throughput::Elements(n as u64));
for policy in policies {
if policy == ParallelExecutionPolicy::DEDICATED_PER_SHARD {
group.bench_with_input(BenchmarkId::new(policy_label(policy), n), &n, |b, &n| {
b.iter_batched(
|| {
let (store, nodes) = make_test_store(n);
let items = make_exec_items(&nodes);
(store, items)
},
|(store, items)| {
let view = GraphView::new(&store);
let deltas =
execute_parallel_with_policy(view, &items, worker_hint(1), policy);
let merged = merge_for_commit_path(deltas);
criterion::black_box(merged)
},
BatchSize::SmallInput,
);
});
continue;
}

for &workers in &[1usize, 4, 8] {
for case in cases {
if case == PolicyMatrixCase::Fixed(ParallelExecutionPolicy::DEDICATED_PER_SHARD) {
group.bench_with_input(
BenchmarkId::new(format!("{}/{}w", policy_label(policy), workers), n),
BenchmarkId::new(
policy_label(ParallelExecutionPolicy::DEDICATED_PER_SHARD),
n,
),
&n,
|b, &n| {
b.iter_batched(
Expand All @@ -392,8 +404,8 @@ fn bench_policy_matrix(c: &mut Criterion) {
let deltas = execute_parallel_with_policy(
view,
&items,
worker_hint(workers),
policy,
worker_hint(1),
ParallelExecutionPolicy::DEDICATED_PER_SHARD,
);
let merged = merge_for_commit_path(deltas);
criterion::black_box(merged)
Expand All @@ -402,6 +414,53 @@ fn bench_policy_matrix(c: &mut Criterion) {
);
},
);
continue;
}

for &workers in &[1usize, 4, 8] {
let benchmark_label = match case {
PolicyMatrixCase::Fixed(policy) => {
format!("{}/{}w", policy_label(policy), workers)
}
PolicyMatrixCase::Adaptive => {
let nodes = make_test_nodes(n);
let items = make_exec_items(&nodes);
let hint = worker_hint(workers);
let (selected_policy, selected_workers) =
resolve_adaptive_shard_routing(&items, hint);
adaptive_case_label(hint, selected_policy, selected_workers)
}
};
group.bench_with_input(BenchmarkId::new(benchmark_label, n), &n, |b, &n| {
b.iter_batched(
|| {
let (store, nodes) = make_test_store(n);
let items = make_exec_items(&nodes);
(store, items)
},
|(store, items)| {
let view = GraphView::new(&store);
let deltas = match case {
PolicyMatrixCase::Fixed(policy) => execute_parallel_with_policy(
view,
&items,
worker_hint(workers),
policy,
),
PolicyMatrixCase::Adaptive => {
execute_parallel_with_adaptive_routing(
view,
&items,
worker_hint(workers),
)
}
};
let merged = merge_for_commit_path(deltas);
criterion::black_box(merged)
},
BatchSize::SmallInput,
);
});
}
}
}
Expand Down
Loading
Loading