From 2b6b6f4a3c4b568a1ba2cb80cd352e4194595138 Mon Sep 17 00:00:00 2001 From: Diego Colombo <> Date: Thu, 21 May 2026 07:39:26 +0100 Subject: [PATCH 1/2] fix: upload CLI defaults to replay=True, add --no-replay opt-out The server /events endpoint accepts ?replay=true to bypass the 7-day in-memory idempotency cache. The upload CLI now passes this by default so re-uploading old sessions always lands correctly in Neo4j regardless of how recently the same events were previously sent. --no-replay restores the old cache-enforced deduplication for callers that want explicit deduplication (e.g. uploading a session that is currently being captured live). Files changed: - uploader.py: replay: bool = True param, params={"replay": True} on POST - cli.py: --no-replay flag, replay=not args.no_replay forwarding, _DETAILED_HELP IDEMPOTENCY section rewritten to reflect new default - tests/test_uploader.py: test default sends params={"replay": True} - tests/test_cli.py: --no-replay behavioural tests --- .../cli.py | 33 +++-- .../uploader.py | 11 +- .../tests/test_cli.py | 130 ++++++++++++++++++ .../tests/test_uploader.py | 21 +++ 4 files changed, 186 insertions(+), 9 deletions(-) diff --git a/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/cli.py b/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/cli.py index f4a9380..3646e32 100644 --- a/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/cli.py +++ b/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/cli.py @@ -26,7 +26,7 @@ _COMPACT_HELP = """\ usage: context-intelligence-upload --path PATH --server-url URL --api-key KEY [--job-id ID] [--progress FILE] - [--event-delay-ms MS] + [--event-delay-ms MS] [--no-replay] Replay context-intelligence session data to a server. @@ -41,6 +41,8 @@ default: /tmp/context-intelligence-upload-{job_id}.json --event-delay-ms Milliseconds to sleep between events (default: 0) Use 50-200 to reduce Neo4j write pressure on the server + --no-replay Disable replay=true; re-enable server 7-day idempotency cache + default: off (every event is replayed unconditionally) """ # --------------------------------------------------------------------------- @@ -117,13 +119,19 @@ alphabetically before enqueuing them. 5. Emit sessions in BFS order. -IDEMPOTENCY GUARANTEE ---------------------- -The tool has NO built-in deduplication -- re-running will re-upload all sessions. -Idempotency is provided by the server using the ``idempotency_key`` field in every -POST payload. This key is a SHA-256 hash of the canonical event JSON, so the server -can safely skip already-ingested events by treating ``idempotency_key`` as a natural -key. This means it is safe to re-upload the same PATH multiple times. +IDEMPOTENCY +----------- +The upload CLI bypasses the server-side event deduplication cache by default. +Every event in every session is forwarded to the server unconditionally, and the +server processes it on every run. Neo4j idempotency is guaranteed by +``MERGE + SET n += row.props`` semantics: re-uploading the same session data +produces the same graph state. + +Use ``--no-replay`` to re-enable the server's 7-day in-memory deduplication +cache. With ``--no-replay``, events whose ``idempotency_key`` was seen within +the last 7 days are silently skipped. Only use this flag when running the +upload tool against a live session in progress where duplicate suppression is +intentional. WORKSPACE BEHAVIOUR ------------------- @@ -329,6 +337,14 @@ def _build_parser() -> argparse.ArgumentParser: dest="event_delay_ms", help="Milliseconds to sleep between events (default: 0; use 50-200 to reduce Neo4j pressure)", ) + parser.add_argument( + "--no-replay", + action="store_true", + default=False, + dest="no_replay", + help="Disable the default replay=True query parameter on POST /events; " + "re-enables the server's 7-day idempotency cache", + ) return parser @@ -399,6 +415,7 @@ def main() -> None: api_key=api_key, tracker=tracker, event_delay_s=args.event_delay_ms / 1000.0, + replay=not args.no_replay, ) # 7. Write result JSON to stdout diff --git a/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/uploader.py b/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/uploader.py index b037df5..69de06a 100644 --- a/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/uploader.py +++ b/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/uploader.py @@ -102,6 +102,7 @@ def run_upload( api_key: str, tracker: ProgressTracker, event_delay_s: float = 0.0, + replay: bool = True, ) -> UploadResult: """Replay all events from *sessions* to the server. @@ -119,6 +120,13 @@ def run_upload( Seconds to sleep between each successful event POST. Defaults to ``0.0`` (no delay). Set to a positive value (e.g. ``0.05``) to throttle the upload rate and reduce Neo4j write pressure on the server. + replay: + When ``True`` (the default), every POST is sent with ``?replay=true`` so + the server bypasses its in-memory idempotency cache. This is the safe + default for re-uploading historical session data. Set to ``False`` to + re-enable the server's 7-day deduplication cache (the old behaviour); + only do this for live, in-progress sessions where duplicate suppression + is intentional. Returns ------- @@ -129,6 +137,7 @@ def run_upload( endpoint = f"{server_url}/events" timeout = httpx.Timeout(connect=5.0, read=30.0, write=30.0, pool=5.0) headers = {"Authorization": f"Bearer {api_key}"} + query_params: dict[str, Any] | None = {"replay": True} if replay else None total_events_uploaded = 0 total_sessions_uploaded = 0 @@ -181,7 +190,7 @@ def run_upload( payload = build_payload(event, workspace, data) try: - response = client.post(endpoint, json=payload) + response = client.post(endpoint, json=payload, params=query_params) except httpx.HTTPError as exc: tracker.mark_failed( session_id=session_id, diff --git a/modules/tool-context-intelligence-upload/tests/test_cli.py b/modules/tool-context-intelligence-upload/tests/test_cli.py index 7782b75..8708047 100644 --- a/modules/tool-context-intelligence-upload/tests/test_cli.py +++ b/modules/tool-context-intelligence-upload/tests/test_cli.py @@ -35,6 +35,47 @@ def test_minus_h_stdout_contains_required_strings(self, capsys): assert "--server-url" in captured.out assert "--api-key" in captured.out + def test_compact_help_usage_line_contains_no_replay(self, capsys): + """_COMPACT_HELP usage line must include [--no-replay].""" + from amplifier_module_tool_context_intelligence_upload.cli import _COMPACT_HELP + + assert "[--no-replay]" in _COMPACT_HELP + + def test_compact_help_flags_block_contains_no_replay_entry(self, capsys): + """_COMPACT_HELP flags block must contain --no-replay two-line entry.""" + from amplifier_module_tool_context_intelligence_upload.cli import _COMPACT_HELP + + assert "--no-replay" in _COMPACT_HELP + assert "idempotency" in _COMPACT_HELP.lower() + + +class TestNoReplayArgparse: + """The --no-replay flag must be defined with correct argparse properties.""" + + def test_no_replay_default_is_false(self): + from amplifier_module_tool_context_intelligence_upload.cli import _build_parser + + args = _build_parser().parse_args( + ["--path", "/tmp", "--server-url", "http://localhost", "--api-key", "k"] + ) + assert args.no_replay is False + + def test_no_replay_flag_sets_no_replay_true(self): + from amplifier_module_tool_context_intelligence_upload.cli import _build_parser + + args = _build_parser().parse_args( + [ + "--path", + "/tmp", + "--server-url", + "http://localhost", + "--api-key", + "k", + "--no-replay", + ] + ) + assert args.no_replay is True + # --------------------------------------------------------------------------- # --help detailed help @@ -303,6 +344,95 @@ def test_successful_upload(self, tmp_path, capsys): assert result["status"] == "completed" assert result["sessions_uploaded"] == 1 + def test_no_replay_flag_passes_replay_false_to_run_upload(self, tmp_path, capsys): + """When --no-replay is passed, run_upload is called with replay=False.""" + from amplifier_module_tool_context_intelligence_upload.cli import main + + fake_sessions = [(tmp_path, {"session_id": "s1"})] + mock_result = MagicMock() + mock_result.success = True + mock_result.to_dict.return_value = { + "status": "completed", + "sessions_uploaded": 1, + "events_uploaded": 0, + } + + with ( + patch( + "sys.argv", + [ + "context-intelligence-upload", + "--path", + str(tmp_path), + "--server-url", + "http://localhost", + "--api-key", + "key", + "--no-replay", + ], + ), + patch( + "amplifier_module_tool_context_intelligence_upload.cli.discover_and_sort", + return_value=fake_sessions, + ), + patch( + "amplifier_module_tool_context_intelligence_upload.cli.run_upload", + return_value=mock_result, + ) as mock_run_upload, + patch("amplifier_module_tool_context_intelligence_upload.cli.ProgressTracker"), + pytest.raises(SystemExit) as exc_info, + ): + main() + + assert exc_info.value.code == 0 + # run_upload was called with replay=False (forwarded from --no-replay) + _, kwargs = mock_run_upload.call_args + assert kwargs.get("replay") is False + + def test_default_passes_replay_true_to_run_upload(self, tmp_path, capsys): + """When --no-replay is NOT passed, run_upload is called with replay=True.""" + from amplifier_module_tool_context_intelligence_upload.cli import main + + fake_sessions = [(tmp_path, {"session_id": "s1"})] + mock_result = MagicMock() + mock_result.success = True + mock_result.to_dict.return_value = { + "status": "completed", + "sessions_uploaded": 1, + "events_uploaded": 0, + } + + with ( + patch( + "sys.argv", + [ + "context-intelligence-upload", + "--path", + str(tmp_path), + "--server-url", + "http://localhost", + "--api-key", + "key", + ], + ), + patch( + "amplifier_module_tool_context_intelligence_upload.cli.discover_and_sort", + return_value=fake_sessions, + ), + patch( + "amplifier_module_tool_context_intelligence_upload.cli.run_upload", + return_value=mock_result, + ) as mock_run_upload, + patch("amplifier_module_tool_context_intelligence_upload.cli.ProgressTracker"), + pytest.raises(SystemExit) as exc_info, + ): + main() + + assert exc_info.value.code == 0 + # run_upload was called with replay=True (the default) + _, kwargs = mock_run_upload.call_args + assert kwargs.get("replay") is True + # --------------------------------------------------------------------------- # main() — env var config resolution (resolve_config integration) diff --git a/modules/tool-context-intelligence-upload/tests/test_uploader.py b/modules/tool-context-intelligence-upload/tests/test_uploader.py index 027b93f..44b6736 100644 --- a/modules/tool-context-intelligence-upload/tests/test_uploader.py +++ b/modules/tool-context-intelligence-upload/tests/test_uploader.py @@ -336,6 +336,27 @@ def test_authorization_header_set(self, tmp_path: Path) -> None: headers = kwargs.get("headers", {}) assert headers.get("Authorization") == "Bearer sk-my-key" + def test_default_sends_replay_true_query_param(self, tmp_path: Path) -> None: + """By default, run_upload posts with params={'replay': True} on every call.""" + events = _make_events(1) + session_dir, metadata = _write_session(tmp_path, "abc", events) + sessions = [(session_dir, metadata)] + tracker = MagicMock() + + with patch("httpx.Client") as mock_client_cls: + mock_client = MagicMock() + mock_client_cls.return_value.__enter__.return_value = mock_client + mock_client.post.return_value = _mock_response(200) + + run_upload(sessions, "https://my-server.example.com", "api-key", tracker) + + # URL still ends with /events (params do not mutate the URL string) + url_called = mock_client.post.call_args[0][0] + assert url_called.endswith("/events") + # The replay query parameter is forwarded as the httpx `params` kwarg + call_kwargs = mock_client.post.call_args[1] + assert call_kwargs.get("params") == {"replay": True} + # --------------------------------------------------------------------------- # TestUploadEdgeCases From cd71d521ffd7d1b2183e1e855bc24723899c43eb Mon Sep 17 00:00:00 2001 From: Diego Colombo <> Date: Thu, 21 May 2026 13:20:49 +0100 Subject: [PATCH 2/2] fix: correct httpx bool serialization in replay query parameter httpx serialises Python bool True to the string "True" (capital T) via str(), but the server expects lowercase "true". This was causing the replay flag to be silently ignored on POST /events calls. Changes: - uploader.py: Changed {"replay": True} to {"replay": "true"} and updated the type hint from dict[str, Any] | None to dict[str, str] | None. - test_uploader.py: Updated the assertion to match the corrected string value and added an explanatory comment describing why the string form is required. - cli.py: Corrected the help text reference from replay=True to replay=true and reformatted [--no-replay] to its own continuation line for consistency. All 168 tests pass. Ruff clean. Pyright 0 errors. Generated with Amplifier Co-Authored-By: Amplifier <240397093+microsoft-amplifier@users.noreply.github.com> --- .../cli.py | 5 +++-- .../uploader.py | 2 +- .../tool-context-intelligence-upload/tests/test_uploader.py | 6 ++++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/cli.py b/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/cli.py index 3646e32..25d033a 100644 --- a/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/cli.py +++ b/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/cli.py @@ -26,7 +26,8 @@ _COMPACT_HELP = """\ usage: context-intelligence-upload --path PATH --server-url URL --api-key KEY [--job-id ID] [--progress FILE] - [--event-delay-ms MS] [--no-replay] + [--event-delay-ms MS] + [--no-replay] Replay context-intelligence session data to a server. @@ -342,7 +343,7 @@ def _build_parser() -> argparse.ArgumentParser: action="store_true", default=False, dest="no_replay", - help="Disable the default replay=True query parameter on POST /events; " + help="Disable the default replay=true query parameter on POST /events; " "re-enables the server's 7-day idempotency cache", ) diff --git a/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/uploader.py b/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/uploader.py index 69de06a..28fe4bc 100644 --- a/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/uploader.py +++ b/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/uploader.py @@ -137,7 +137,7 @@ def run_upload( endpoint = f"{server_url}/events" timeout = httpx.Timeout(connect=5.0, read=30.0, write=30.0, pool=5.0) headers = {"Authorization": f"Bearer {api_key}"} - query_params: dict[str, Any] | None = {"replay": True} if replay else None + query_params: dict[str, str] | None = {"replay": "true"} if replay else None total_events_uploaded = 0 total_sessions_uploaded = 0 diff --git a/modules/tool-context-intelligence-upload/tests/test_uploader.py b/modules/tool-context-intelligence-upload/tests/test_uploader.py index 44b6736..9ea4e78 100644 --- a/modules/tool-context-intelligence-upload/tests/test_uploader.py +++ b/modules/tool-context-intelligence-upload/tests/test_uploader.py @@ -337,7 +337,7 @@ def test_authorization_header_set(self, tmp_path: Path) -> None: assert headers.get("Authorization") == "Bearer sk-my-key" def test_default_sends_replay_true_query_param(self, tmp_path: Path) -> None: - """By default, run_upload posts with params={'replay': True} on every call.""" + """By default, run_upload posts with params={'replay': 'true'} on every call.""" events = _make_events(1) session_dir, metadata = _write_session(tmp_path, "abc", events) sessions = [(session_dir, metadata)] @@ -354,8 +354,10 @@ def test_default_sends_replay_true_query_param(self, tmp_path: Path) -> None: url_called = mock_client.post.call_args[0][0] assert url_called.endswith("/events") # The replay query parameter is forwarded as the httpx `params` kwarg + # Use a string value ("true") not a Python bool (True) — httpx serialises + # bool True as "True" (capital T), which the server would not recognise. call_kwargs = mock_client.post.call_args[1] - assert call_kwargs.get("params") == {"replay": True} + assert call_kwargs.get("params") == {"replay": "true"} # ---------------------------------------------------------------------------