Skip to content

Commit 660ddd5

Browse files
author
Mateusz
committed
fix: deterministic merge order for parallel CBOR chunk decoding
1 parent 1abbaeb commit 660ddd5

1 file changed

Lines changed: 55 additions & 19 deletions

File tree

tests/unit/core/ports/test_usage_chunk_cbor_replay.py

Lines changed: 55 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313

1414
from __future__ import annotations
1515

16-
import json
17-
from concurrent.futures import ThreadPoolExecutor, as_completed
16+
import json
17+
from concurrent.futures import Future, ThreadPoolExecutor
1818
from pathlib import Path
1919
from typing import Any, cast
2020

@@ -63,10 +63,21 @@ def load_cbor_entries(capture_file: Path) -> list[dict[str, Any]]:
6363
return objects
6464

6565

66-
def _stop_chunks_for_capture_file(capture_file: Path) -> list[dict[str, Any]]:
66+
def _stop_chunks_for_capture_file(capture_file: Path) -> list[dict[str, Any]]:
6767
"""Decode one capture file and extract stop+usage chunks (worker for parallel I/O)."""
6868

69-
return extract_stop_chunks_with_usage(load_cbor_entries(capture_file))
69+
return extract_stop_chunks_with_usage(load_cbor_entries(capture_file))
70+
71+
72+
def _merge_chunks_by_capture_order(
73+
capture_files: list[Path],
74+
chunks_by_capture: dict[Path, list[dict[str, Any]]],
75+
) -> list[dict[str, Any]]:
76+
"""Merge decoded chunks following the original capture file order."""
77+
merged: list[dict[str, Any]] = []
78+
for capture_file in capture_files:
79+
merged.extend(chunks_by_capture.get(capture_file, []))
80+
return merged
7081

7182

7283
def extract_stop_chunks_with_usage(
@@ -119,7 +130,7 @@ def simulate_connector_output(stop_chunk: dict[str, Any]) -> ProcessedResponse:
119130
)
120131

121132

122-
def verify_no_usage_leak(proc_resp: ProcessedResponse) -> tuple[bool, str]:
133+
def verify_no_usage_leak(proc_resp: ProcessedResponse) -> tuple[bool, str]:
123134
"""Verify that StreamingContent correctly serializes without leaking usage.
124135
125136
Returns (success, error_message)
@@ -164,10 +175,32 @@ def verify_no_usage_leak(proc_resp: ProcessedResponse) -> tuple[bool, str]:
164175

165176
return True, ""
166177

167-
return False, "No SSE data line found in output"
168-
169-
170-
class TestStopChunkWithUsageProtection:
178+
return False, "No SSE data line found in output"
179+
180+
181+
class TestCBORChunkMergeOrder:
182+
"""Tests for deterministic merge order across parallel decode results."""
183+
184+
def test_merge_chunks_preserves_capture_file_order(self) -> None:
185+
"""Merged chunks should follow capture_files order, not completion order."""
186+
file_a = Path("a.cbor")
187+
file_b = Path("b.cbor")
188+
file_c = Path("c.cbor")
189+
190+
chunks_by_capture = {
191+
file_b: [{"id": "b-1"}],
192+
file_a: [{"id": "a-1"}, {"id": "a-2"}],
193+
file_c: [{"id": "c-1"}],
194+
}
195+
196+
merged = _merge_chunks_by_capture_order(
197+
[file_a, file_b, file_c],
198+
chunks_by_capture,
199+
)
200+
assert [chunk["id"] for chunk in merged] == ["a-1", "a-2", "b-1", "c-1"]
201+
202+
203+
class TestStopChunkWithUsageProtection:
171204
"""Tests for StopChunkWithUsage stringification protection."""
172205

173206
def test_str_raises_usage_chunk_leak_error(self) -> None:
@@ -215,21 +248,24 @@ def test_json_dumps_with_dict_conversion(self) -> None:
215248

216249

217250
@pytest.fixture(scope="session")
218-
def cbor_stop_chunks() -> list[dict[str, Any]]:
251+
def cbor_stop_chunks() -> list[dict[str, Any]]:
219252
"""Load stop chunks from available CBOR captures (once per pytest worker)."""
220253
capture_files = get_cbor_capture_files()
221254
if not capture_files:
222255
pytest.skip("No CBOR capture files available for replay testing")
223256

224-
# Decode captures in parallel: bounded workers avoid thread overhead on Windows.
225-
max_workers = min(8, max(1, len(capture_files)))
226-
all_chunks: list[dict[str, Any]] = []
227-
with ThreadPoolExecutor(max_workers=max_workers) as pool:
228-
futures = {
229-
pool.submit(_stop_chunks_for_capture_file, p): p for p in capture_files
230-
}
231-
for fut in as_completed(futures):
232-
all_chunks.extend(fut.result())
257+
# Decode captures in parallel: bounded workers avoid thread overhead on Windows.
258+
max_workers = min(8, max(1, len(capture_files)))
259+
chunks_by_capture: dict[Path, list[dict[str, Any]]] = {}
260+
with ThreadPoolExecutor(max_workers=max_workers) as pool:
261+
futures_by_capture: dict[Path, Future[list[dict[str, Any]]]] = {
262+
p: pool.submit(_stop_chunks_for_capture_file, p) for p in capture_files
263+
}
264+
265+
for capture_file in capture_files:
266+
chunks_by_capture[capture_file] = futures_by_capture[capture_file].result()
267+
268+
all_chunks = _merge_chunks_by_capture_order(capture_files, chunks_by_capture)
233269

234270
if not all_chunks:
235271
pytest.skip("No stop chunks with usage found in captures")

0 commit comments

Comments
 (0)