Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion core/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,11 @@ async def _execute_tool(
write_sig = self._write_signature(name, params) if is_write_action else None
executed_writes = request_state.setdefault("executed_writes", set())
write_decisions = request_state.setdefault("write_decisions", {})
if is_write_action and write_sig in executed_writes:
# ``manage_jobs`` is exempt: job creation is idempotent and guarded on
# job id + status inside the tool, so an earlier write in the same turn
# must never block a (re)create — that was the "already fulfilled" bug
# against brand-new job ids (issue #11).
if is_write_action and name != "manage_jobs" and write_sig in executed_writes:
return {
"error": (
"This exact action was already completed in this request; not repeating it."
Expand Down Expand Up @@ -978,6 +982,18 @@ async def _tool_manage_jobs(self, params: dict) -> dict:
job_id = params.get("job_id", "").strip()
if not job_id:
job_id = f"agent_{uuid.uuid4().hex[:8]}"
else:
# Block only when this explicit id is already live (active or
# paused). Done/cancelled ids may be recreated; auto-generated
# ids are unique by construction. (issue #11)
existing = await self.job_store.get_job(job_id)
if existing and existing["status"] in ("active", "paused"):
return {
"error": (
f"Job already exists and is {existing['status']}: {job_id}. "
"Cancel it first or use a different id."
)
}

channel = params.get("channel", "telegram")
description = params.get("description", "")
Expand Down
107 changes: 101 additions & 6 deletions tests/test_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,22 +222,117 @@ async def test_distinct_writes_are_independent_after_success(agent, monkeypatch)

@pytest.mark.asyncio
async def test_identical_write_is_deduplicated(agent, monkeypatch) -> None:
"""An identical repeated write within a turn is still suppressed."""
"""An identical repeated write within a turn is still suppressed.

Uses send_email — a plain write — because manage_jobs is deliberately
exempt from the generic guard (it has its own id-based guard, see #11).
"""
from core.llm import LLMToolCall

async def _ok_send_email(params):
return {"ok": True}

monkeypatch.setattr(agent, "_request_approval", _approve)
monkeypatch.setattr(agent, "_tool_manage_jobs", _ok_manage_jobs)
monkeypatch.setattr(agent, "_tool_send_email", _ok_send_email)
agent.channels = {"telegram": object()}

args = {"account": "a", "to": "x@y.z", "subject": "s", "body": "b"}
state = agent._new_request_state()
call = _job_call("1", task="ping mum", run_at="2026-07-01T09:00:00")
first = await agent._execute_tool(call, "telegram", "u1", state)
first = await agent._execute_tool(
LLMToolCall(id="1", name="send_email", arguments=args), "telegram", "u1", state
)
repeat = await agent._execute_tool(
_job_call("2", task="ping mum", run_at="2026-07-01T09:00:00"),
LLMToolCall(id="2", name="send_email", arguments=dict(args)), "telegram", "u1", state
)
assert first.get("ok") is True
assert "already completed" in repeat.get("error", "")


# ---------------------------------------------------------------------------
# Job creation (#11): block only a live duplicate id, never a prior write
# ---------------------------------------------------------------------------


async def _no_sync(job_id): # scheduler.sync_job stub — no APScheduler in tests
return None


@pytest.mark.asyncio
async def test_brand_new_job_id_never_blocked(agent, monkeypatch) -> None:
"""A brand-new job id creates even after another job was made this turn."""
monkeypatch.setattr(agent, "_request_approval", _approve)
monkeypatch.setattr(agent.scheduler, "sync_job", _no_sync)
agent.channels = {"telegram": object()}

state = agent._new_request_state()
await agent._execute_tool(
_job_call("1", job_id="setup", task="t", run_at="2026-07-01T09:00:00"),
"telegram",
"u1",
state,
)
new = await agent._execute_tool(
_job_call(
"2",
job_id="flight-monitor-lx1272",
task="watch flight",
run_at="2026-07-02T09:00:00",
),
"telegram",
"u1",
state,
)
assert new.get("ok") is True
assert new.get("job_id") == "flight-monitor-lx1272"


@pytest.mark.asyncio
async def test_recreate_active_job_id_blocked_by_id_not_generic_guard(agent, monkeypatch) -> None:
"""Recreating a live id is blocked with an id-based message, not 'already completed'."""
monkeypatch.setattr(agent, "_request_approval", _approve)
monkeypatch.setattr(agent.scheduler, "sync_job", _no_sync)
agent.channels = {"telegram": object()}

state = agent._new_request_state()
first = await agent._execute_tool(
_job_call("1", job_id="flight-x", task="t", run_at="2026-07-01T09:00:00"),
"telegram",
"u1",
state,
)
second = await agent._execute_tool(
_job_call("2", job_id="flight-x", task="t", run_at="2026-07-01T09:00:00"),
"telegram",
"u1",
state,
)
assert first.get("ok") is True
assert "already completed" in repeat.get("error", "")
assert "already exists and is active" in second.get("error", "")
assert "already completed" not in second.get("error", "")


@pytest.mark.asyncio
async def test_cancelled_job_id_can_be_recreated(agent, monkeypatch) -> None:
"""A done/cancelled id is free to recreate (only live ids block)."""
monkeypatch.setattr(agent, "_request_approval", _approve)
monkeypatch.setattr(agent.scheduler, "sync_job", _no_sync)
agent.channels = {"telegram": object()}

state = agent._new_request_state()
await agent._execute_tool(
_job_call("1", job_id="flight-z", task="t", run_at="2026-07-01T09:00:00"),
"telegram",
"u1",
state,
)
await agent._tool_manage_jobs({"action": "cancel", "job_id": "flight-z"})
again = await agent._execute_tool(
_job_call("2", job_id="flight-z", task="t2", run_at="2026-07-03T09:00:00"),
"telegram",
"u1",
state,
)
assert again.get("ok") is True


@pytest.mark.asyncio
Expand Down
7 changes: 4 additions & 3 deletions tools/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,11 @@ def cmd_create(args) -> None:
print(json.dumps({"error": "Must specify --cron or --once"}))
sys.exit(1)

# Check if job already exists
# Block only a live (active/paused) id; done/cancelled may be recreated. (issue #11)
existing = store.get_job_sync(args.id)
if existing:
print(json.dumps({"error": f"Job already exists: {args.id}. Use 'edit' to modify."}))
if existing and existing["status"] in ("active", "paused"):
msg = f"Job already exists and is {existing['status']}: {args.id}. Use 'edit' to modify."
print(json.dumps({"error": msg}))
sys.exit(1)

job = store.upsert_job_sync(
Expand Down
Loading