From e7860024da36d234ef9e22b9a5b1cd0621710b50 Mon Sep 17 00:00:00 2001 From: Matteo Merola Date: Thu, 25 Jun 2026 14:20:21 +0200 Subject: [PATCH 1/2] fix(jobs): block create only on live duplicate id, not prior writes Job creation was gated by the generic per-turn write-dedup guard, which refused brand-new job ids with 'already fulfilled'. Exempt manage_jobs from that guard and instead block a create only when the same job id is already active/paused, both in the agent tool and the jobs.py CLI. Done/cancelled ids and auto-generated ids are free to (re)create. Fixes #11 --- core/agent.py | 18 +++++++++++++++++- tools/jobs.py | 7 ++++--- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/core/agent.py b/core/agent.py index ced0ec7..f5590b8 100644 --- a/core/agent.py +++ b/core/agent.py @@ -706,7 +706,11 @@ async def _execute_tool( write_sig = self._write_signature(name, params) if is_write_action else None executed_writes = request_state.setdefault("executed_writes", set()) write_decisions = request_state.setdefault("write_decisions", {}) - if is_write_action and write_sig in executed_writes: + # ``manage_jobs`` is exempt: job creation is idempotent and guarded on + # job id + status inside the tool, so an earlier write in the same turn + # must never block a (re)create — that was the "already fulfilled" bug + # against brand-new job ids (issue #11). + if is_write_action and name != "manage_jobs" and write_sig in executed_writes: return { "error": ( "This exact action was already completed in this request; not repeating it." @@ -978,6 +982,18 @@ async def _tool_manage_jobs(self, params: dict) -> dict: job_id = params.get("job_id", "").strip() if not job_id: job_id = f"agent_{uuid.uuid4().hex[:8]}" + else: + # Block only when this explicit id is already live (active or + # paused). Done/cancelled ids may be recreated; auto-generated + # ids are unique by construction. (issue #11) + existing = await self.job_store.get_job(job_id) + if existing and existing["status"] in ("active", "paused"): + return { + "error": ( + f"Job already exists and is {existing['status']}: {job_id}. " + "Cancel it first or use a different id." + ) + } channel = params.get("channel", "telegram") description = params.get("description", "") diff --git a/tools/jobs.py b/tools/jobs.py index 5f45564..ce14aba 100644 --- a/tools/jobs.py +++ b/tools/jobs.py @@ -116,10 +116,11 @@ def cmd_create(args) -> None: print(json.dumps({"error": "Must specify --cron or --once"})) sys.exit(1) - # Check if job already exists + # Block only a live (active/paused) id; done/cancelled may be recreated. (issue #11) existing = store.get_job_sync(args.id) - if existing: - print(json.dumps({"error": f"Job already exists: {args.id}. Use 'edit' to modify."})) + if existing and existing["status"] in ("active", "paused"): + msg = f"Job already exists and is {existing['status']}: {args.id}. Use 'edit' to modify." + print(json.dumps({"error": msg})) sys.exit(1) job = store.upsert_job_sync( From 51dbc72be2b0590feadbcd27cf1b59047d8d9b33 Mon Sep 17 00:00:00 2001 From: Matteo Merola Date: Thu, 25 Jun 2026 14:21:26 +0200 Subject: [PATCH 2/2] =?UTF-8?q?test(jobs):=20cover=20#11=20=E2=80=94=20bra?= =?UTF-8?q?nd-new=20id=20creates,=20live=20id=20blocks=20by=20id?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Repoint the generic write-dedup test to send_email (manage_jobs is now exempt) and add cases: brand-new id never blocked after a prior create, recreating a live id is blocked with an id-based message (not the generic guard), and a cancelled id can be recreated. --- tests/test_tools.py | 107 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 101 insertions(+), 6 deletions(-) diff --git a/tests/test_tools.py b/tests/test_tools.py index 82635ab..bd35e2a 100644 --- a/tests/test_tools.py +++ b/tests/test_tools.py @@ -222,22 +222,117 @@ async def test_distinct_writes_are_independent_after_success(agent, monkeypatch) @pytest.mark.asyncio async def test_identical_write_is_deduplicated(agent, monkeypatch) -> None: - """An identical repeated write within a turn is still suppressed.""" + """An identical repeated write within a turn is still suppressed. + + Uses send_email — a plain write — because manage_jobs is deliberately + exempt from the generic guard (it has its own id-based guard, see #11). + """ + from core.llm import LLMToolCall + + async def _ok_send_email(params): + return {"ok": True} + monkeypatch.setattr(agent, "_request_approval", _approve) - monkeypatch.setattr(agent, "_tool_manage_jobs", _ok_manage_jobs) + monkeypatch.setattr(agent, "_tool_send_email", _ok_send_email) agent.channels = {"telegram": object()} + args = {"account": "a", "to": "x@y.z", "subject": "s", "body": "b"} state = agent._new_request_state() - call = _job_call("1", task="ping mum", run_at="2026-07-01T09:00:00") - first = await agent._execute_tool(call, "telegram", "u1", state) + first = await agent._execute_tool( + LLMToolCall(id="1", name="send_email", arguments=args), "telegram", "u1", state + ) repeat = await agent._execute_tool( - _job_call("2", task="ping mum", run_at="2026-07-01T09:00:00"), + LLMToolCall(id="2", name="send_email", arguments=dict(args)), "telegram", "u1", state + ) + assert first.get("ok") is True + assert "already completed" in repeat.get("error", "") + + +# --------------------------------------------------------------------------- +# Job creation (#11): block only a live duplicate id, never a prior write +# --------------------------------------------------------------------------- + + +async def _no_sync(job_id): # scheduler.sync_job stub — no APScheduler in tests + return None + + +@pytest.mark.asyncio +async def test_brand_new_job_id_never_blocked(agent, monkeypatch) -> None: + """A brand-new job id creates even after another job was made this turn.""" + monkeypatch.setattr(agent, "_request_approval", _approve) + monkeypatch.setattr(agent.scheduler, "sync_job", _no_sync) + agent.channels = {"telegram": object()} + + state = agent._new_request_state() + await agent._execute_tool( + _job_call("1", job_id="setup", task="t", run_at="2026-07-01T09:00:00"), + "telegram", + "u1", + state, + ) + new = await agent._execute_tool( + _job_call( + "2", + job_id="flight-monitor-lx1272", + task="watch flight", + run_at="2026-07-02T09:00:00", + ), + "telegram", + "u1", + state, + ) + assert new.get("ok") is True + assert new.get("job_id") == "flight-monitor-lx1272" + + +@pytest.mark.asyncio +async def test_recreate_active_job_id_blocked_by_id_not_generic_guard(agent, monkeypatch) -> None: + """Recreating a live id is blocked with an id-based message, not 'already completed'.""" + monkeypatch.setattr(agent, "_request_approval", _approve) + monkeypatch.setattr(agent.scheduler, "sync_job", _no_sync) + agent.channels = {"telegram": object()} + + state = agent._new_request_state() + first = await agent._execute_tool( + _job_call("1", job_id="flight-x", task="t", run_at="2026-07-01T09:00:00"), + "telegram", + "u1", + state, + ) + second = await agent._execute_tool( + _job_call("2", job_id="flight-x", task="t", run_at="2026-07-01T09:00:00"), "telegram", "u1", state, ) assert first.get("ok") is True - assert "already completed" in repeat.get("error", "") + assert "already exists and is active" in second.get("error", "") + assert "already completed" not in second.get("error", "") + + +@pytest.mark.asyncio +async def test_cancelled_job_id_can_be_recreated(agent, monkeypatch) -> None: + """A done/cancelled id is free to recreate (only live ids block).""" + monkeypatch.setattr(agent, "_request_approval", _approve) + monkeypatch.setattr(agent.scheduler, "sync_job", _no_sync) + agent.channels = {"telegram": object()} + + state = agent._new_request_state() + await agent._execute_tool( + _job_call("1", job_id="flight-z", task="t", run_at="2026-07-01T09:00:00"), + "telegram", + "u1", + state, + ) + await agent._tool_manage_jobs({"action": "cancel", "job_id": "flight-z"}) + again = await agent._execute_tool( + _job_call("2", job_id="flight-z", task="t2", run_at="2026-07-03T09:00:00"), + "telegram", + "u1", + state, + ) + assert again.get("ok") is True @pytest.mark.asyncio