From 79bd74e1bca90e8c92542aadb4497dec997cc092 Mon Sep 17 00:00:00 2001 From: colombod Date: Mon, 29 Jun 2026 18:42:02 +0000 Subject: [PATCH] fix(upload-cli): default to replay=true, add --no-replay opt-out MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The upload CLI never passed ?replay=true, so the server's 7-day in-memory idempotency cache silently dropped session:start events on re-upload, leaving Session nodes without started_at. Default replay=true bypasses that cache (Neo4j idempotency still holds via MERGE + SET n += row.props); --no-replay restores the old dedup behaviour for live in-progress sessions. - uploader.py: run_upload() gains replay: bool = True; POSTs with params={"replay":"true"} when set (None when --no-replay). - cli.py: --no-replay flag (default off); usage + IDEMPOTENCY help rewritten. - tests: red-green coverage for the query param and the flag. Re-applied on current main (auth-aware cli.py/uploader.py) — supersedes the stale, conflicting #22. Unit-proven: 182 passed, ruff + pyright clean. 🤖 Generated with [Amplifier](https://github.com/microsoft/amplifier) Co-Authored-By: Amplifier <240397093+microsoft-amplifier@users.noreply.github.com> --- .../cli.py | 35 ++++- .../uploader.py | 11 +- .../tests/test_cli.py | 135 ++++++++++++++++++ .../tests/test_uploader.py | 40 ++++++ 4 files changed, 213 insertions(+), 8 deletions(-) diff --git a/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/cli.py b/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/cli.py index 28537964..03b6f4b1 100644 --- a/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/cli.py +++ b/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/cli.py @@ -27,6 +27,7 @@ usage: context-intelligence-upload --path PATH --server-url URL --api-key KEY [--job-id ID] [--progress FILE] [--event-delay-ms MS] + [--no-replay] Replay context-intelligence session data to a server. @@ -41,6 +42,8 @@ default: /tmp/context-intelligence-upload-{job_id}.json --event-delay-ms Milliseconds to sleep between events (default: 0) Use 50-200 to reduce Neo4j write pressure on the server + --no-replay Disable replay=true; re-enable server 7-day idempotency cache + default: off (every event is replayed unconditionally) """ # --------------------------------------------------------------------------- @@ -117,13 +120,19 @@ alphabetically before enqueuing them. 5. Emit sessions in BFS order. -IDEMPOTENCY GUARANTEE ---------------------- -The tool has NO built-in deduplication -- re-running will re-upload all sessions. -Idempotency is provided by the server using the ``idempotency_key`` field in every -POST payload. This key is a SHA-256 hash of the canonical event JSON, so the server -can safely skip already-ingested events by treating ``idempotency_key`` as a natural -key. This means it is safe to re-upload the same PATH multiple times. +IDEMPOTENCY +----------- +The upload CLI bypasses the server-side event deduplication cache by default. +Every event in every session is forwarded to the server unconditionally, and the +server processes it on every run. Neo4j idempotency is guaranteed by +``MERGE + SET n += row.props`` semantics: re-uploading the same session data +produces the same graph state. + +Use ``--no-replay`` to re-enable the server's 7-day in-memory deduplication +cache. With ``--no-replay``, events whose ``idempotency_key`` was seen within +the last 7 days are silently skipped. Only use this flag when running the +upload tool against a live session in progress where duplicate suppression is +intentional. WORKSPACE BEHAVIOUR ------------------- @@ -330,6 +339,17 @@ def _build_parser() -> argparse.ArgumentParser: help="Milliseconds to sleep between events (default: 0; use 50-200 to reduce Neo4j pressure)", ) + parser.add_argument( + "--no-replay", + action="store_true", + default=False, + dest="no_replay", + help=( + "Disable the default replay=true query parameter on POST /events; " + "re-enables the server's 7-day idempotency cache" + ), + ) + # Auth flags parser.add_argument( "--auth-mode", @@ -454,6 +474,7 @@ def main() -> None: tracker=tracker, event_delay_s=args.event_delay_ms / 1000.0, auth_strategy=auth_strategy, + replay=not args.no_replay, ) # 7. Write result JSON to stdout diff --git a/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/uploader.py b/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/uploader.py index c7e47c60..9d9aaf31 100644 --- a/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/uploader.py +++ b/modules/tool-context-intelligence-upload/amplifier_module_tool_context_intelligence_upload/uploader.py @@ -105,6 +105,7 @@ def run_upload( event_delay_s: float = 0.0, *, auth_strategy: AuthStrategy | None = None, + replay: bool = True, ) -> UploadResult: """Replay all events from *sessions* to the server. @@ -127,6 +128,13 @@ def run_upload( Optional :class:`~context_intelligence.auth.AuthStrategy` that produces the ``Authorization`` header. When ``None``, an ``ApiKeyAuth`` is derived from *api_key* for backward compatibility. + replay: + When ``True`` (the default), every POST is sent with ``?replay=true`` so + the server bypasses its in-memory idempotency cache. This is the safe + default for re-uploading historical session data. Set to ``False`` to + re-enable the server's 7-day deduplication cache (the old behaviour); + only do this for live, in-progress sessions where duplicate suppression + is intentional. Returns ------- @@ -142,6 +150,7 @@ def run_upload( endpoint = f"{server_url}/events" timeout = httpx.Timeout(connect=5.0, read=30.0, write=30.0, pool=5.0) headers = auth_strategy.headers() + query_params: dict[str, str] | None = {"replay": "true"} if replay else None total_events_uploaded = 0 total_sessions_uploaded = 0 @@ -194,7 +203,7 @@ def run_upload( payload = build_payload(event, workspace, data) try: - response = client.post(endpoint, json=payload) + response = client.post(endpoint, json=payload, params=query_params) except httpx.HTTPError as exc: tracker.mark_failed( session_id=session_id, diff --git a/modules/tool-context-intelligence-upload/tests/test_cli.py b/modules/tool-context-intelligence-upload/tests/test_cli.py index 7782b75d..2ff5e752 100644 --- a/modules/tool-context-intelligence-upload/tests/test_cli.py +++ b/modules/tool-context-intelligence-upload/tests/test_cli.py @@ -35,6 +35,52 @@ def test_minus_h_stdout_contains_required_strings(self, capsys): assert "--server-url" in captured.out assert "--api-key" in captured.out + def test_compact_help_usage_line_contains_no_replay(self, capsys): + """_COMPACT_HELP usage line must include [--no-replay].""" + from amplifier_module_tool_context_intelligence_upload.cli import _COMPACT_HELP + + assert "[--no-replay]" in _COMPACT_HELP + + def test_compact_help_flags_block_contains_no_replay_entry(self, capsys): + """_COMPACT_HELP flags block must contain --no-replay entry with idempotency mention.""" + from amplifier_module_tool_context_intelligence_upload.cli import _COMPACT_HELP + + assert "--no-replay" in _COMPACT_HELP + assert "idempotency" in _COMPACT_HELP.lower() + + +# --------------------------------------------------------------------------- +# --no-replay argparse +# --------------------------------------------------------------------------- + + +class TestNoReplayArgparse: + """The --no-replay flag must be defined with correct argparse properties.""" + + def test_no_replay_default_is_false(self): + from amplifier_module_tool_context_intelligence_upload.cli import _build_parser + + args = _build_parser().parse_args( + ["--path", "/tmp", "--server-url", "http://localhost", "--api-key", "k"] + ) + assert args.no_replay is False + + def test_no_replay_flag_sets_no_replay_true(self): + from amplifier_module_tool_context_intelligence_upload.cli import _build_parser + + args = _build_parser().parse_args( + [ + "--path", + "/tmp", + "--server-url", + "http://localhost", + "--api-key", + "k", + "--no-replay", + ] + ) + assert args.no_replay is True + # --------------------------------------------------------------------------- # --help detailed help @@ -303,6 +349,95 @@ def test_successful_upload(self, tmp_path, capsys): assert result["status"] == "completed" assert result["sessions_uploaded"] == 1 + def test_no_replay_flag_passes_replay_false_to_run_upload(self, tmp_path, capsys): + """When --no-replay is passed, run_upload is called with replay=False.""" + from amplifier_module_tool_context_intelligence_upload.cli import main + + fake_sessions = [(tmp_path, {"session_id": "s1"})] + mock_result = MagicMock() + mock_result.success = True + mock_result.to_dict.return_value = { + "status": "completed", + "sessions_uploaded": 1, + "events_uploaded": 0, + } + + with ( + patch( + "sys.argv", + [ + "context-intelligence-upload", + "--path", + str(tmp_path), + "--server-url", + "http://localhost", + "--api-key", + "key", + "--no-replay", + ], + ), + patch( + "amplifier_module_tool_context_intelligence_upload.cli.discover_and_sort", + return_value=fake_sessions, + ), + patch( + "amplifier_module_tool_context_intelligence_upload.cli.run_upload", + return_value=mock_result, + ) as mock_run_upload, + patch("amplifier_module_tool_context_intelligence_upload.cli.ProgressTracker"), + pytest.raises(SystemExit) as exc_info, + ): + main() + + assert exc_info.value.code == 0 + # run_upload was called with replay=False (forwarded from --no-replay) + _, kwargs = mock_run_upload.call_args + assert kwargs.get("replay") is False + + def test_default_passes_replay_true_to_run_upload(self, tmp_path, capsys): + """When --no-replay is NOT passed, run_upload is called with replay=True.""" + from amplifier_module_tool_context_intelligence_upload.cli import main + + fake_sessions = [(tmp_path, {"session_id": "s1"})] + mock_result = MagicMock() + mock_result.success = True + mock_result.to_dict.return_value = { + "status": "completed", + "sessions_uploaded": 1, + "events_uploaded": 0, + } + + with ( + patch( + "sys.argv", + [ + "context-intelligence-upload", + "--path", + str(tmp_path), + "--server-url", + "http://localhost", + "--api-key", + "key", + ], + ), + patch( + "amplifier_module_tool_context_intelligence_upload.cli.discover_and_sort", + return_value=fake_sessions, + ), + patch( + "amplifier_module_tool_context_intelligence_upload.cli.run_upload", + return_value=mock_result, + ) as mock_run_upload, + patch("amplifier_module_tool_context_intelligence_upload.cli.ProgressTracker"), + pytest.raises(SystemExit) as exc_info, + ): + main() + + assert exc_info.value.code == 0 + # run_upload was called with replay=True (the default) + _, kwargs = mock_run_upload.call_args + assert kwargs.get("replay") is True + # --------------------------------------------------------------------------- # main() — env var config resolution (resolve_config integration) diff --git a/modules/tool-context-intelligence-upload/tests/test_uploader.py b/modules/tool-context-intelligence-upload/tests/test_uploader.py index 027b93f7..eed1bd9f 100644 --- a/modules/tool-context-intelligence-upload/tests/test_uploader.py +++ b/modules/tool-context-intelligence-upload/tests/test_uploader.py @@ -336,6 +336,46 @@ def test_authorization_header_set(self, tmp_path: Path) -> None: headers = kwargs.get("headers", {}) assert headers.get("Authorization") == "Bearer sk-my-key" + def test_default_sends_replay_true_query_param(self, tmp_path: Path) -> None: + """By default, run_upload posts with params={'replay': 'true'} on every call.""" + events = _make_events(1) + session_dir, metadata = _write_session(tmp_path, "abc", events) + sessions = [(session_dir, metadata)] + tracker = MagicMock() + + with patch("httpx.Client") as mock_client_cls: + mock_client = MagicMock() + mock_client_cls.return_value.__enter__.return_value = mock_client + mock_client.post.return_value = _mock_response(200) + + run_upload(sessions, "https://my-server.example.com", "api-key", tracker) + + # URL still ends with /events (params do not mutate the URL string) + url_called = mock_client.post.call_args[0][0] + assert url_called.endswith("/events") + # The replay query parameter is forwarded as the httpx `params` kwarg. + # Use a string value ("true") not a Python bool (True) — httpx serialises + # bool True as "True" (capital T), which the server would not recognise. + call_kwargs = mock_client.post.call_args[1] + assert call_kwargs.get("params") == {"replay": "true"} + + def test_replay_false_sends_no_params(self, tmp_path: Path) -> None: + """When replay=False, run_upload posts with params=None (no ?replay= query string).""" + events = _make_events(1) + session_dir, metadata = _write_session(tmp_path, "abc", events) + sessions = [(session_dir, metadata)] + tracker = MagicMock() + + with patch("httpx.Client") as mock_client_cls: + mock_client = MagicMock() + mock_client_cls.return_value.__enter__.return_value = mock_client + mock_client.post.return_value = _mock_response(200) + + run_upload(sessions, "https://my-server.example.com", "api-key", tracker, replay=False) + + call_kwargs = mock_client.post.call_args[1] + assert call_kwargs.get("params") is None + # --------------------------------------------------------------------------- # TestUploadEdgeCases