Skip to content

Commit ecaa30b

Browse files
Andrey Golovanovclaude
andcommitted
Fix occurrence_count handling, per-direction BAC, and iterops rewrite
- Fix all metric modules to expand deduplicated flow_results by occurrence_count before statistical computation. Without this, each unique failure pattern was weighted equally regardless of how many MC iterations produced it. - Extract shared utilities into metrics/common.py: expand_flow_results, canonical_dc, baseline_demand_map, get_tm_baseline_and_failures. Removes duplication across latency.py, matrixdump.py, sps.py. - Add per-direction BAC (per_flow field on BacResult). Computes full BAC statistics per demand flow using the same _compute_bac_stats helper as the aggregate. Reveals directional asymmetry under failure. - Rewrite iterops.py: remove dead iteration_metrics extraction (field never existed in ngraph), replace with occurrence_count-aware iteration counts and timing from metadata. - Add mini DC-BB verification scenario (10 nodes, 2 planes, dual LH paths) with 252 hand-calculated assertions covering BAC, latency, alpha, iterops, and pair matrices. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent ea43e33 commit ecaa30b

69 files changed

Lines changed: 82073 additions & 40780 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

metrics/bac.py

Lines changed: 155 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import annotations
22

3-
from dataclasses import dataclass
3+
from dataclasses import dataclass, field
44
from pathlib import Path
55
from typing import Dict, List, Optional, Tuple
66

@@ -9,25 +9,26 @@
99
import pandas as pd
1010
import seaborn as sns
1111

12+
from .common import expand_flow_results
13+
1214

1315
@dataclass
1416
class BacResult:
1517
step_name: str
1618
mode: str # 'placement' or 'maxflow'
1719
series: pd.Series # delivered per iteration
1820
failure_ids: List[str]
19-
offered: float # offered demand (if known; else = series.max())
21+
offered: float # baseline delivered bandwidth
2022
quantiles_abs: Dict[float, float]
21-
quantiles_pct: Dict[float, float] # normalized by offered (0..1), if offered>0
22-
availability_at_pct_of_offer: Dict[float, float] # {90: 0.97, 99.9: 0.88, ...}
23-
auc_normalized: float # mean(min(delivered/offered,1.0))
24-
# Bandwidth threshold at probability p: smallest t s.t. P(delivered >= t) >= p
25-
# Absolute units (Gbps) and normalized by offered (0..1)
23+
quantiles_pct: Dict[float, float] # normalized by offered (0..1)
24+
availability_at_pct_of_offer: Dict[float, float] # {90: 0.97, ...}
25+
auc_normalized: float # mean(min(delivered/offered, 1.0))
2626
bw_at_probability_abs: Dict[float, float]
2727
bw_at_probability_pct: Dict[float, float]
28+
per_flow: Dict[str, "BacResult"] = field(default_factory=dict)
2829

2930
def to_jsonable(self) -> dict:
30-
return {
31+
d = {
3132
"step_name": self.step_name,
3233
"mode": self.mode,
3334
"series": list(map(float, self.series.values)),
@@ -46,6 +47,9 @@ def to_jsonable(self) -> dict:
4647
str(k): float(v) for k, v in self.bw_at_probability_pct.items()
4748
},
4849
}
50+
if self.per_flow:
51+
d["per_flow"] = {k: v.to_jsonable() for k, v in self.per_flow.items()}
52+
return d
4953

5054

5155
def _get_step(results: dict, name: str) -> dict:
@@ -63,92 +67,167 @@ def _detect_mode(results: dict, step_name: str, mode: str) -> str:
6367
return "placement"
6468

6569

66-
def compute_bac(results: dict, step_name: str, mode: str = "auto") -> BacResult:
67-
mode = _detect_mode(results, step_name, mode)
68-
# Validate baseline metadata and ordering
69-
step_meta = results.get("steps", {}).get(step_name, {}).get("metadata", {}) or {}
70-
if bool(step_meta.get("baseline")) is not True:
71-
raise ValueError(
72-
f"{step_name}.metadata.baseline must be true and baseline must be included"
73-
)
74-
data = _get_step(results, step_name)
75-
flow_results = data.get("flow_results", [])
76-
if not isinstance(flow_results, list) or not flow_results:
77-
raise ValueError(f"No flow_results for step: {step_name}")
78-
first = flow_results[0]
79-
if str(first.get("failure_id", "")) != "baseline":
80-
raise ValueError(
81-
f"{step_name} baseline must be first (flow_results[0].failure_id == 'baseline')"
82-
)
70+
def _sum_delivered(iteration: dict) -> float:
71+
"""Sum placed bandwidth across all flows in one iteration result."""
72+
total = 0.0
73+
for rec in iteration.get("flows", []) or []:
74+
src = rec.get("source", "")
75+
dst = rec.get("destination", "")
76+
if not src or not dst or src == dst:
77+
continue
78+
total += float(rec.get("placed", 0.0))
79+
return total
80+
81+
82+
_QUANTILE_PROBS = (0.50, 0.90, 0.95, 0.99, 0.999, 0.9999)
83+
_AVAIL_THRESHOLDS = (90.0, 95.0, 99.0, 99.9, 99.99)
8384

84-
delivered = []
85-
demanded = []
86-
fids = []
87-
baseline_delivered: Optional[float] = None
88-
for it in flow_results:
89-
flows = it.get("flows", []) or []
90-
total_deliv = 0.0
91-
total_dem = 0.0
92-
for rec in flows:
93-
src = rec.get("source", "")
94-
dst = rec.get("destination", "")
95-
if not src or not dst or src == dst:
96-
continue
97-
placed = float(rec.get("placed", 0.0))
98-
demand = float(rec.get("demand", 0.0))
99-
total_deliv += placed
100-
total_dem += demand
101-
delivered.append(total_deliv)
102-
demanded.append(total_dem)
103-
fid = str(it.get("failure_id", f"it{len(fids)}"))
104-
fids.append(fid)
105-
if fid == "baseline":
106-
baseline_delivered = float(total_deliv)
107-
108-
s = pd.Series(delivered, index=pd.Index(fids, name="failure_id"), dtype=float) # pyright: ignore[reportAssignmentType]
109-
# Normalize strictly by baseline delivered (no-failure). Require presence.
110-
if baseline_delivered is not None and np.isfinite(baseline_delivered):
111-
offered = float(baseline_delivered)
112-
else:
113-
raise ValueError(
114-
f"{step_name} baseline iteration missing or has non-finite delivered value"
115-
)
11685

117-
probs = [0.50, 0.90, 0.95, 0.99, 0.999, 0.9999]
118-
q_abs = {p: float(s.quantile(p, interpolation="lower")) for p in probs}
86+
def _compute_bac_stats(
87+
series: pd.Series, offered: float
88+
) -> Tuple[
89+
Dict[float, float], # quantiles_abs
90+
Dict[float, float], # quantiles_pct
91+
Dict[float, float], # availability_at_pct_of_offer
92+
float, # auc_normalized
93+
Dict[float, float], # bw_at_probability_abs
94+
Dict[float, float], # bw_at_probability_pct
95+
]:
96+
"""Compute all BAC statistics from a delivered-bandwidth series.
11997
120-
q_pct = {}
98+
This is the single source of truth for BAC math. Used for both
99+
aggregate and per-flow computation.
100+
"""
101+
q_abs = {
102+
p: float(series.quantile(p, interpolation="lower")) for p in _QUANTILE_PROBS
103+
}
104+
105+
q_pct: Dict[float, float] = {}
121106
if offered > 0:
122-
for p in probs:
123-
val = float(s.quantile(p, interpolation="lower") / offered)
124-
# Guard against rare >1 due to numerical noise or offered<iteration delivered
107+
for p in _QUANTILE_PROBS:
108+
val = float(series.quantile(p, interpolation="lower") / offered)
125109
q_pct[p] = float(min(val, 1.0))
126110

127-
# Availability at thresholds (as fraction of iterations)
128-
avail = {}
129-
if offered > 0 and len(s) > 0:
130-
total = float(len(s))
131-
for pct in (90.0, 95.0, 99.0, 99.9, 99.99):
111+
avail: Dict[float, float] = {}
112+
if offered > 0 and len(series) > 0:
113+
total = float(len(series))
114+
for pct in _AVAIL_THRESHOLDS:
132115
thr = (pct / 100.0) * offered
133-
avail[pct] = float((s >= thr).sum()) / total # pyright: ignore[reportOperatorIssue]
116+
avail[pct] = float((series >= thr).sum()) / total # pyright: ignore[reportOperatorIssue]
134117

135-
# Bandwidth-at-probability (inverse availability)
136118
bw_abs: Dict[float, float] = {}
137119
bw_pct: Dict[float, float] = {}
138-
for p in (90.0, 95.0, 99.0, 99.9, 99.99):
139-
q = max(0.0, 1.0 - (p / 100.0)) # lower-tail quantile
120+
for p in _AVAIL_THRESHOLDS:
121+
q = max(0.0, 1.0 - (p / 100.0))
140122
try:
141-
t_abs = float(s.quantile(q, interpolation="lower"))
123+
t_abs = float(series.quantile(q, interpolation="lower"))
142124
except Exception:
143125
t_abs = float("nan")
144126
bw_abs[p] = t_abs
145127
bw_pct[p] = float(t_abs / offered) if offered > 0 else float("nan")
146128

147129
auc_norm = 1.0
148-
if offered > 0 and len(s) > 0:
149-
norm = s.astype(float) / offered
130+
if offered > 0 and len(series) > 0:
131+
norm = series.astype(float) / offered
150132
auc_norm = float(norm.clip(upper=1.0).mean())
151133

134+
return q_abs, q_pct, avail, auc_norm, bw_abs, bw_pct
135+
136+
137+
def _flow_label(flow_source: str) -> str:
138+
"""Extract a readable directional label from a flow's source field.
139+
140+
Flow source format: ``_src_<source_pattern>|<target_pattern>|<hash>``
141+
Returns label like ``abc1/rsw>xyz1/rsw``.
142+
"""
143+
demand_id = flow_source.removeprefix("_src_").removeprefix("_snk_")
144+
parts = demand_id.split("|")
145+
if len(parts) >= 2:
146+
src_part = parts[0].strip("^$")
147+
dst_part = parts[1].strip("^$")
148+
return f"{src_part}>{dst_part}"
149+
return demand_id[:30]
150+
151+
152+
def compute_bac(results: dict, step_name: str, mode: str = "auto") -> BacResult:
153+
mode = _detect_mode(results, step_name, mode)
154+
data = _get_step(results, step_name)
155+
156+
baseline = data.get("baseline")
157+
if not isinstance(baseline, dict):
158+
raise ValueError(f"{step_name}: data.baseline dict required")
159+
flow_results = data.get("flow_results", [])
160+
if not isinstance(flow_results, list) or not flow_results:
161+
raise ValueError(f"No flow_results for step: {step_name}")
162+
163+
# Baseline determines offered bandwidth
164+
offered = _sum_delivered(baseline)
165+
if not np.isfinite(offered) or offered <= 0:
166+
raise ValueError(f"{step_name}: baseline delivered must be finite and > 0")
167+
168+
# Expand deduplicated patterns by occurrence_count
169+
expanded = expand_flow_results(flow_results)
170+
171+
# ── Aggregate series ──
172+
delivered = [offered]
173+
fids: List[str] = ["baseline"]
174+
for idx, it in enumerate(expanded):
175+
delivered.append(_sum_delivered(it))
176+
fids.append(str(it.get("failure_id", f"it{idx}")))
177+
178+
s = pd.Series(delivered, dtype=float)
179+
s.index.name = "iteration"
180+
181+
q_abs, q_pct, avail, auc_norm, bw_abs, bw_pct = _compute_bac_stats(s, offered)
182+
183+
# ── Per-flow series ──
184+
# Build baseline per-flow map: source_field → (label, baseline_placed)
185+
flow_map: Dict[str, Tuple[str, float]] = {}
186+
for rec in baseline.get("flows", []) or []:
187+
src = rec.get("source", "")
188+
dst = rec.get("destination", "")
189+
if not src or not dst or src == dst:
190+
continue
191+
placed = float(rec.get("placed", 0.0))
192+
if placed <= 0:
193+
continue
194+
flow_map[src] = (_flow_label(src), placed)
195+
196+
per_flow: Dict[str, BacResult] = {}
197+
if len(flow_map) > 1:
198+
# Only compute per-flow when there are multiple flows to separate
199+
flow_series: Dict[str, List[float]] = {
200+
src: [bl_placed] for src, (_label, bl_placed) in flow_map.items()
201+
}
202+
203+
for it in expanded:
204+
it_flows = {f["source"]: f for f in it.get("flows", []) or []}
205+
for src, (_label, _bl_placed) in flow_map.items():
206+
if src in it_flows:
207+
flow_series[src].append(float(it_flows[src].get("placed", 0.0)))
208+
else:
209+
flow_series[src].append(0.0)
210+
211+
for src, (label, bl_placed) in flow_map.items():
212+
fs = pd.Series(flow_series[src], dtype=float)
213+
fs.index.name = "iteration"
214+
fq_abs, fq_pct, favail, fauc, fbw_abs, fbw_pct = _compute_bac_stats(
215+
fs, bl_placed
216+
)
217+
per_flow[label] = BacResult(
218+
step_name=step_name,
219+
mode=mode,
220+
series=fs,
221+
failure_ids=list(fids),
222+
offered=float(bl_placed),
223+
quantiles_abs=fq_abs,
224+
quantiles_pct=fq_pct,
225+
availability_at_pct_of_offer=favail,
226+
auc_normalized=fauc,
227+
bw_at_probability_abs=fbw_abs,
228+
bw_at_probability_pct=fbw_pct,
229+
)
230+
152231
return BacResult(
153232
step_name=step_name,
154233
mode=mode,
@@ -161,6 +240,7 @@ def compute_bac(results: dict, step_name: str, mode: str = "auto") -> BacResult:
161240
auc_normalized=auc_norm,
162241
bw_at_probability_abs=bw_abs,
163242
bw_at_probability_pct=bw_pct,
243+
per_flow=per_flow,
164244
)
165245

166246

metrics/common.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
"""Shared utilities for metrics modules."""
2+
3+
from __future__ import annotations
4+
5+
from typing import Dict, List, Tuple
6+
7+
8+
def expand_flow_results(flow_results: list[dict]) -> list[dict]:
9+
"""Expand deduplicated flow_results by occurrence_count.
10+
11+
ngraph deduplicates identical failure patterns during Monte Carlo
12+
simulation. Each entry's ``occurrence_count`` indicates how many
13+
iterations produced that exact pattern. This function repeats each
14+
entry accordingly so that downstream statistical operations weight
15+
each iteration equally.
16+
17+
Entries without ``occurrence_count`` default to 1 (backward compatible).
18+
"""
19+
expanded: list[dict] = []
20+
for it in flow_results:
21+
count = max(1, int(it.get("occurrence_count", 1)))
22+
for _ in range(count):
23+
expanded.append(it)
24+
return expanded
25+
26+
27+
def canonical_dc(endpoint: str) -> str:
28+
"""Normalize endpoint to canonical DC-level path ``metro/dc``.
29+
30+
Examples::
31+
32+
'metro1/dc1' → 'metro1/dc1'
33+
'metro1/dc1/dc/dc' → 'metro1/dc1'
34+
'metro1/dc1/rack/node' → 'metro1/dc1'
35+
"""
36+
if not endpoint:
37+
return endpoint
38+
parts = endpoint.split("/")
39+
if len(parts) >= 2:
40+
return f"{parts[0]}/{parts[1]}"
41+
return endpoint
42+
43+
44+
def baseline_demand_map(
45+
results: dict, step_name: str = "tm_placement"
46+
) -> Dict[Tuple[str, str], float]:
47+
"""Extract per-pair baseline demand from a placement step.
48+
49+
Returns mapping ``(canonical_src, canonical_dst) -> demand``.
50+
Pairs with zero or negative demand are excluded.
51+
"""
52+
step = results.get("steps", {}).get(step_name, {}) or {}
53+
data = step.get("data", {}) or {}
54+
base = data.get("baseline")
55+
if not isinstance(base, dict):
56+
return {}
57+
out: Dict[Tuple[str, str], float] = {}
58+
for rec in base.get("flows", []) or []:
59+
s = canonical_dc(rec.get("source", ""))
60+
d = canonical_dc(rec.get("destination", ""))
61+
if not s or not d or s == d:
62+
continue
63+
try:
64+
dem = float(rec.get("demand", 0.0))
65+
except Exception:
66+
dem = 0.0
67+
if dem <= 0.0:
68+
continue
69+
out[(s, d)] = dem
70+
return out
71+
72+
73+
def get_tm_baseline_and_failures(results: dict) -> Tuple[dict, List[dict]]:
74+
"""Extract baseline dict and expanded failure list from tm_placement.
75+
76+
The returned failure list is expanded by ``occurrence_count`` so each
77+
Monte Carlo iteration is represented as a separate entry.
78+
"""
79+
tm_step = results.get("steps", {}).get("tm_placement", {}) or {}
80+
tm_data = tm_step.get("data", {}) or {}
81+
baseline = tm_data.get("baseline")
82+
if not isinstance(baseline, dict):
83+
raise ValueError("tm_placement.data.baseline dict required")
84+
flow_results = tm_data.get("flow_results", []) or []
85+
if not isinstance(flow_results, list):
86+
raise ValueError("tm_placement.data.flow_results must be a list")
87+
return baseline, expand_flow_results(flow_results)

0 commit comments

Comments
 (0)