Skip to content

Commit 9f11139

Browse files
author
Mateusz
committed
fix(acp): prune idle pool slots and stabilize history prefix hash
Replace idle runtimes in the pool under lock so acquirers always get the canonical instance. Hash conversation prefixes via ChatMessage.to_dict() with a versioned envelope so metadata-only drift does not reset the agent. Document session default pooling, HistoryState.prefix_hash, and prompt/history branches. Add tests for metadata-stable hashing, parallel acquire after idle reap, and pool recycle after kill_all. Made-with: Cursor
1 parent 23cd45d commit 9f11139

3 files changed

Lines changed: 148 additions & 15 deletions

File tree

src/connectors/acp_core/base_connector.py

Lines changed: 87 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,36 @@
5858
ACP_AGENT_THOUGHT_CHUNK = "agent_thought_chunk"
5959
ACP_CANCEL_METHODS = ("session/cancel", "session/stop", "session/end")
6060
ACP_GRACEFUL_CANCEL_TIMEOUT_SECONDS = 12.0
61+
# Increment when the canonicalization used for ACP history prefix hashes changes.
62+
HISTORY_PREFIX_HASH_VERSION = 2
63+
64+
65+
def _canonical_chat_message_for_history_hash(message: ChatMessage) -> dict[str, Any]:
66+
"""Return stable identity fields for divergence detection.
67+
68+
Uses :meth:`ChatMessage.to_dict` so fields such as ``metadata`` that are not
69+
part of the visible transcript do not spuriously invalidate the prefix hash.
70+
"""
71+
72+
return message.to_dict()
73+
74+
75+
def _hash_chat_messages_prefix_stable(
76+
messages: Sequence[ChatMessage],
77+
end_exclusive: int,
78+
) -> str:
79+
"""SHA-256 hex digest of the first ``end_exclusive`` messages (conversation prefix)."""
80+
81+
if end_exclusive <= 0:
82+
return hashlib.sha256(b"").hexdigest()
83+
slice_msgs = messages[:end_exclusive]
84+
payload = [_canonical_chat_message_for_history_hash(m) for m in slice_msgs]
85+
canonical = json.dumps(
86+
{"m": payload, "v": HISTORY_PREFIX_HASH_VERSION},
87+
sort_keys=True,
88+
separators=(",", ":"),
89+
)
90+
return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
6191

6292

6393
class _RuntimeCancellable:
@@ -175,6 +205,15 @@ def _create_runtime(
175205

176206
@staticmethod
177207
def _resolve_client_session_id(request: ConnectorChatCompletionsRequest) -> str:
208+
"""Resolve the logical client session used to key ACP subprocess pools.
209+
210+
When neither ``ConnectorRequestContext.session_id`` nor
211+
``request.request.session_id`` is set, callers share the pool key
212+
``"default"`` (one ACP runtime per ``(project_dir, model)`` for all
213+
such traffic). Upstream layers should set a stable session id when
214+
isolation between clients or tabs is required.
215+
"""
216+
178217
sid: str | None = None
179218
if request.context is not None and request.context.session_id:
180219
sid = request.context.session_id
@@ -190,12 +229,7 @@ def _resolve_client_session_id(request: ConnectorChatCompletionsRequest) -> str:
190229
def _hash_messages_prefix(
191230
messages: Sequence[ChatMessage], end_exclusive: int
192231
) -> str:
193-
if end_exclusive <= 0:
194-
return hashlib.sha256(b"").hexdigest()
195-
slice_msgs = messages[:end_exclusive]
196-
payload = [m.model_dump(mode="json", exclude_none=True) for m in slice_msgs]
197-
canonical = json.dumps(payload, sort_keys=True, separators=(",", ":"))
198-
return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
232+
return _hash_chat_messages_prefix_stable(messages, end_exclusive)
199233

200234
async def _acquire_runtime(
201235
self, request: ConnectorChatCompletionsRequest
@@ -218,8 +252,7 @@ async def _acquire_runtime(
218252
)
219253
self._runtimes[runtime_key] = runtime
220254

221-
await self._reap_idle_runtime(runtime)
222-
return runtime
255+
return await self._reap_idle_runtime(runtime_key, runtime)
223256

224257
def _resolve_project_dir_override(
225258
self, request: ConnectorChatCompletionsRequest
@@ -259,19 +292,46 @@ def _resolve_project_dir_for_request(
259292

260293
return self._default_project_dir
261294

262-
async def _reap_idle_runtime(self, runtime: ACPProcessRuntime) -> None:
295+
async def _reap_idle_runtime(
296+
self,
297+
runtime_key: tuple[str, str, str],
298+
runtime: ACPProcessRuntime,
299+
) -> ACPProcessRuntime:
300+
"""Drop idle subprocesses and swap in a fresh :class:`ACPProcessRuntime` slot.
301+
302+
Replacing the pool entry (instead of only clearing ``runtime.process``)
303+
avoids unbounded growth of dead :class:`ACPProcessRuntime` objects while
304+
ensuring concurrent acquirers always resolve to the canonical instance
305+
currently registered for ``runtime_key``.
306+
"""
307+
263308
if self._idle_timeout <= 0:
264-
return
309+
return runtime
265310
if runtime.request_lock is None or runtime.request_lock.locked():
266-
return
311+
return runtime
267312
if runtime.process is None:
268-
return
313+
return runtime
269314
if runtime.last_activity <= 0:
270-
return
315+
return runtime
271316
if (time.monotonic() - runtime.last_activity) < self._idle_timeout:
272-
return
317+
return runtime
318+
273319
await self._kill_runtime(runtime)
274320

321+
async with self._runtime_pool_lock:
322+
current = self._runtimes.get(runtime_key)
323+
if current is runtime:
324+
replacement = self._create_runtime(
325+
runtime.project_dir,
326+
runtime.model,
327+
runtime.client_session_id,
328+
)
329+
self._runtimes[runtime_key] = replacement
330+
return replacement
331+
if current is not None:
332+
return current
333+
return runtime
334+
275335
async def _spawn_process(self, runtime: ACPProcessRuntime) -> None:
276336
assert runtime.process_lock is not None
277337
async with runtime.process_lock:
@@ -800,6 +860,13 @@ async def _prepare_prompt_request_locked(
800860
runtime: ACPProcessRuntime,
801861
request: ConnectorChatCompletionsRequest,
802862
) -> tuple[int, str]:
863+
"""Build ``session/prompt`` text and JSON-RPC id under ``runtime.request_lock``.
864+
865+
History is tracked with :class:`HistoryState` so we can send a compact
866+
tail transcript on append-only turns, resend the full transcript after
867+
detected divergence, or send only the last user line on idempotent retries.
868+
"""
869+
803870
await self._spawn_process(runtime)
804871
await self._initialize_runtime(runtime)
805872

@@ -811,6 +878,7 @@ async def _prepare_prompt_request_locked(
811878
new_history_state: HistoryState
812879
user_message: str
813880

881+
# First prompt for this subprocess: full Markdown transcript + state seed.
814882
if state is None:
815883
user_message = ACPTranscriptSerializer.serialize(messages)
816884
new_history_state = HistoryState(
@@ -825,6 +893,7 @@ async def _prepare_prompt_request_locked(
825893
or self._hash_messages_prefix(messages, n) != prefix_hash
826894
)
827895

896+
# Prefix edit, branch switch, or truncated history vs. what ACP saw.
828897
if diverged:
829898
if logger.isEnabledFor(logging.INFO):
830899
logger.info(
@@ -842,9 +911,11 @@ async def _prepare_prompt_request_locked(
842911
message_count=len(messages),
843912
prefix_hash=self._hash_messages_prefix(messages, len(messages)),
844913
)
914+
# Same message list as last successful prompt (e.g. client retry).
845915
elif len(messages) == n:
846916
user_message = self._extract_user_message_as_string(messages)
847917
new_history_state = state
918+
# Append-only: agent already saw messages[:n]; ship incremental context.
848919
else:
849920
user_message = ACPTranscriptSerializer.serialize_tail(messages, n)
850921
if not user_message.strip():
@@ -1035,6 +1106,8 @@ async def chat_completions( # type: ignore[override]
10351106
if runtime.request_lock is None:
10361107
raise BackendError(message="ACP runtime is missing request lock")
10371108

1109+
# Streaming holds the per-runtime lock for the full SSE response so idle reap
1110+
# cannot swap the pool entry until the stream completes (see ``_reap_idle_runtime``).
10381111
if bool(getattr(request.request, "stream", False)):
10391112
await runtime.request_lock.acquire()
10401113
try:

src/connectors/acp_core/types.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,11 @@ class AcpStreamPiece:
8484

8585
@dataclass(frozen=True, slots=True)
8686
class HistoryState:
87-
"""Tracks how much of ``processed_messages`` has been applied to the ACP agent."""
87+
"""Tracks how much of ``processed_messages`` has been applied to the ACP agent.
88+
89+
``prefix_hash`` is an opaque SHA-256 hex digest of the acknowledged prefix; it is
90+
computed only inside the ACP base connector and must not be interpreted elsewhere.
91+
"""
8892

8993
message_count: int
9094
prefix_hash: str

tests/unit/connectors/acp_core/test_base_connector.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import asyncio
34
from collections.abc import AsyncGenerator
45
from pathlib import Path
56
from typing import Any
@@ -323,6 +324,61 @@ def test_hash_messages_prefix_detects_edit(connector: DummyAcpConnector) -> None
323324
assert h1 != h2
324325

325326

327+
def test_hash_messages_prefix_stable_ignores_metadata_only_changes(
328+
connector: DummyAcpConnector,
329+
) -> None:
330+
a = [ChatMessage(role="user", content="hi", metadata={"a": 1})]
331+
b = [ChatMessage(role="user", content="hi", metadata={"b": 2})]
332+
assert connector._hash_messages_prefix(a, 1) == connector._hash_messages_prefix(
333+
b, 1
334+
)
335+
336+
337+
@pytest.mark.asyncio
338+
async def test_parallel_acquire_after_idle_reap_uses_same_pool_runtime(
339+
connector: DummyAcpConnector,
340+
) -> None:
341+
connector._default_project_dir = Path("/tmp/dummy")
342+
connector._idle_timeout = 5.0
343+
rid = "sess-par"
344+
key = connector._build_runtime_key(Path("/tmp/dummy"), "model", rid)
345+
stale = connector._create_runtime(Path("/tmp/dummy"), "model", rid)
346+
stale.process = MagicMock()
347+
stale.process.poll.return_value = None
348+
stale.last_activity = 1.0
349+
async with connector._runtime_pool_lock:
350+
connector._runtimes[key] = stale
351+
352+
with (
353+
patch(
354+
"src.connectors.acp_core.base_connector.time.monotonic",
355+
return_value=100.0,
356+
),
357+
patch.object(connector, "_terminate_process", AsyncMock()),
358+
):
359+
r1, r2 = await asyncio.gather(
360+
connector._acquire_runtime(_make_request(session_id=rid)),
361+
connector._acquire_runtime(_make_request(session_id=rid)),
362+
)
363+
364+
assert r1 is r2
365+
assert r1 is not stale
366+
assert connector._runtimes[key] is r1
367+
368+
369+
@pytest.mark.asyncio
370+
async def test_kill_all_runtimes_next_acquire_creates_new_runtime_object(
371+
connector: DummyAcpConnector,
372+
) -> None:
373+
connector._default_project_dir = Path("/tmp/dummy")
374+
r_before = await connector._acquire_runtime(_make_request(session_id="recycle"))
375+
await connector._kill_all_runtimes()
376+
assert len(connector._runtimes) == 0
377+
r_after = await connector._acquire_runtime(_make_request(session_id="recycle"))
378+
assert r_after is not r_before
379+
assert r_after.history_state is None
380+
381+
326382
@pytest.mark.asyncio
327383
async def test_non_streaming_chat_completions_preserve_reasoning_content(
328384
connector: DummyAcpConnector,

0 commit comments

Comments
 (0)