From f61389728e013dbaa378771be8d83c13afce6d48 Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Tue, 26 May 2026 16:20:11 +0200 Subject: [PATCH 01/10] Add goal option to phase task --- ddev/src/ddev/ai/agent/build.py | 55 +++- ddev/src/ddev/ai/callbacks/callbacks.py | 36 +++ ddev/src/ddev/ai/phases/agentic_phase.py | 92 +++++- ddev/src/ddev/ai/phases/config.py | 22 +- ddev/src/ddev/ai/phases/goal.py | 291 +++++++++++++++++ ddev/src/ddev/ai/tools/registry.py | 47 ++- ddev/tests/ai/agent/test_build.py | 58 +++- ddev/tests/ai/callbacks/test_callbacks.py | 46 +++ ddev/tests/ai/phases/conftest.py | 2 + ddev/tests/ai/phases/test_agentic_phase.py | 104 ++++++ ddev/tests/ai/phases/test_config.py | 54 ++++ ddev/tests/ai/phases/test_goal.py | 359 +++++++++++++++++++++ ddev/tests/ai/tools/test_registry.py | 26 +- 13 files changed, 1162 insertions(+), 30 deletions(-) create mode 100644 ddev/src/ddev/ai/phases/goal.py create mode 100644 ddev/tests/ai/phases/test_goal.py diff --git a/ddev/src/ddev/ai/agent/build.py b/ddev/src/ddev/ai/agent/build.py index 28a56e620bbbc..7653d3fe30410 100644 --- a/ddev/src/ddev/ai/agent/build.py +++ b/ddev/src/ddev/ai/agent/build.py @@ -11,7 +11,7 @@ from ddev.ai.agent.anthropic_client import AnthropicAgent from ddev.ai.agent.base import BaseAgent from ddev.ai.tools.fs.file_registry import FileRegistry -from ddev.ai.tools.registry import ToolRegistry +from ddev.ai.tools.registry import ToolRegistry, filter_read_only if TYPE_CHECKING: from ddev.ai.phases.config import AgentConfig @@ -24,6 +24,10 @@ [str, str, SubagentBuilder | None, Path | None], # system_prompt, owner_id, subagent_builder, log_dir tuple[BaseAgent[Any], ToolRegistry], ] +GoalAgentBuilder = Callable[ + [str], # owner_id + tuple[BaseAgent[Any], ToolRegistry], +] def _resolve_client(agent_clients: dict[str, Any], provider: str) -> Any: @@ -159,3 +163,52 @@ def builder(system_prompt: str, owner_id: str, tool_names: list[str]) -> tuple[B ) return builder + + +def build_goal_agent( + parent_agent_config: AgentConfig, + agent_clients: dict[str, Any], + file_registry: FileRegistry, + owner_id: str, +) -> tuple[BaseAgent[Any], ToolRegistry]: + """Build the reviewer agent + its ToolRegistry. + + Uses the same provider as the parent agent. Model and max_tokens are left at + provider defaults — the parent's overrides are intentionally not forwarded. + Tools are filtered to the read-only subset of the parent's tool list. + """ + from ddev.ai.phases.config import AgentConfig + from ddev.ai.phases.goal import GOAL_REVIEWER_SYSTEM_PROMPT + + read_only_tool_names = filter_read_only(parent_agent_config.tools) + goal_agent_config = AgentConfig( + provider=parent_agent_config.provider, + tools=read_only_tool_names, + ) + + return _build_agent_and_registry( + agent_config=goal_agent_config, + agent_clients=agent_clients, + system_prompt=GOAL_REVIEWER_SYSTEM_PROMPT, + owner_id=owner_id, + tool_names=read_only_tool_names, + file_registry=file_registry, + ) + + +def make_goal_agent_builder( + parent_agent_config: AgentConfig, + agent_clients: dict[str, Any], + file_registry: FileRegistry, +) -> GoalAgentBuilder: + """Return a closure that builds a (reviewer_agent, reviewer_registry) tuple.""" + + def builder(owner_id: str) -> tuple[BaseAgent[Any], ToolRegistry]: + return build_goal_agent( + parent_agent_config=parent_agent_config, + agent_clients=agent_clients, + file_registry=file_registry, + owner_id=owner_id, + ) + + return builder diff --git a/ddev/src/ddev/ai/callbacks/callbacks.py b/ddev/src/ddev/ai/callbacks/callbacks.py index d0be2229a27c7..727681537f69e 100644 --- a/ddev/src/ddev/ai/callbacks/callbacks.py +++ b/ddev/src/ddev/ai/callbacks/callbacks.py @@ -72,6 +72,18 @@ class OnPhaseFinishCallback(Protocol): async def __call__(self, phase_id: str) -> None: ... +class OnBeforeGoalCheckCallback(Protocol): + """Called immediately before each reviewer agent run for a task with a goal.""" + + async def __call__(self, task_name: str, attempt: int) -> None: ... + + +class OnAfterGoalCheckCallback(Protocol): + """Called after each reviewer agent run, with the parsed verdict.""" + + async def __call__(self, task_name: str, attempt: int, valid: bool, reason: str) -> None: ... + + # --------------------------------------------------------------------------- # CallbackSet and Callbacks # --------------------------------------------------------------------------- @@ -103,6 +115,8 @@ def __init__(self) -> None: self._on_before_agent_send: list[OnBeforeAgentSendCallback] = [] self._on_phase_start: list[OnPhaseStartCallback] = [] self._on_phase_finish: list[OnPhaseFinishCallback] = [] + self._on_before_goal_check: list[OnBeforeGoalCheckCallback] = [] + self._on_after_goal_check: list[OnAfterGoalCheckCallback] = [] async def _fire(self, handlers: list[Any], *args: Any) -> None: for handler in handlers: @@ -174,6 +188,20 @@ def on_phase_finish(self, func: OnPhaseFinishCallback) -> OnPhaseFinishCallback: async def fire_phase_finish(self, phase_id: str) -> None: await self._fire(self._on_phase_finish, phase_id) + def on_before_goal_check(self, func: OnBeforeGoalCheckCallback) -> OnBeforeGoalCheckCallback: + self._on_before_goal_check.append(func) + return func + + async def fire_before_goal_check(self, task_name: str, attempt: int) -> None: + await self._fire(self._on_before_goal_check, task_name, attempt) + + def on_after_goal_check(self, func: OnAfterGoalCheckCallback) -> OnAfterGoalCheckCallback: + self._on_after_goal_check.append(func) + return func + + async def fire_after_goal_check(self, task_name: str, attempt: int, valid: bool, reason: str) -> None: + await self._fire(self._on_after_goal_check, task_name, attempt, valid, reason) + class Callbacks: """Container of CallbackSet instances. Dispatches each fire_* to all contained sets.""" @@ -216,3 +244,11 @@ async def fire_phase_start(self, phase_id: str) -> None: async def fire_phase_finish(self, phase_id: str) -> None: for s in self._sets: await s.fire_phase_finish(phase_id) + + async def fire_before_goal_check(self, task_name: str, attempt: int) -> None: + for s in self._sets: + await s.fire_before_goal_check(task_name, attempt) + + async def fire_after_goal_check(self, task_name: str, attempt: int, valid: bool, reason: str) -> None: + for s in self._sets: + await s.fire_after_goal_check(task_name, attempt, valid, reason) diff --git a/ddev/src/ddev/ai/phases/agentic_phase.py b/ddev/src/ddev/ai/phases/agentic_phase.py index 948bcfb3e7728..4210c1e477eae 100644 --- a/ddev/src/ddev/ai/phases/agentic_phase.py +++ b/ddev/src/ddev/ai/phases/agentic_phase.py @@ -2,22 +2,34 @@ # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) +from __future__ import annotations + import logging from collections.abc import Callable from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any -from ddev.ai.agent.base import BaseAgent -from ddev.ai.agent.build import AgentBuilder, SubagentBuilder, make_agent_builder, make_subagent_builder +from ddev.ai.agent.build import ( + make_agent_builder, + make_goal_agent_builder, + make_subagent_builder, +) from ddev.ai.callbacks.callbacks import Callbacks from ddev.ai.phases.base import Phase, PhaseOutcome from ddev.ai.phases.checkpoint import CheckpointManager from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig +from ddev.ai.phases.goal import GOAL_TASK_SUFFIX, render_goal_text, run_goal_loop from ddev.ai.phases.template import render_inline, render_prompt from ddev.ai.react.process import ReActProcess from ddev.ai.tools.fs.file_registry import FileRegistry from ddev.ai.tools.registry import TOOL_MANIFEST +if TYPE_CHECKING: + from ddev.ai.agent.base import BaseAgent + from ddev.ai.agent.build import AgentBuilder, GoalAgentBuilder, SubagentBuilder + from ddev.ai.phases.goal import GoalLoopOutcome + from ddev.ai.react.types import ReActResult + def render_task_prompt( task: TaskConfig, @@ -61,6 +73,7 @@ def __init__( config_dir: Path, file_registry: FileRegistry, subagent_builder: SubagentBuilder | None = None, + goal_agent_builder: GoalAgentBuilder | None = None, callbacks: Callbacks | None = None, logger: logging.Logger | None = None, ) -> None: @@ -78,6 +91,8 @@ def __init__( ) self._agent_builder = agent_builder self._subagent_builder = subagent_builder + self._goal_agent_builder = goal_agent_builder + self._goal_attempt_log: list[dict[str, Any]] = [] self._subagent_log_dir = ( checkpoint_manager.root / "subagents" / phase_id if subagent_builder is not None else None ) @@ -124,6 +139,15 @@ def extra_init_kwargs( # type: ignore[override] file_registry=file_registry, ) + any_goal = any((t.goal is not None or t.goal_path is not None) for t in phase_config.tasks) + goal_agent_builder = None + if any_goal: + goal_agent_builder = make_goal_agent_builder( + parent_agent_config=agent_config, + agent_clients=agent_clients, + file_registry=file_registry, + ) + return { "agent_builder": make_agent_builder( agent_config=agent_config, @@ -131,6 +155,7 @@ def extra_init_kwargs( # type: ignore[override] file_registry=file_registry, ), "subagent_builder": subagent_builder, + "goal_agent_builder": goal_agent_builder, } def before_react(self) -> None: @@ -139,6 +164,17 @@ def before_react(self) -> None: def after_react(self) -> None: """Called once after all tasks complete. Override for phase-specific teardown.""" + async def _compact_if_needed( + self, + process: ReActProcess, + last_result: ReActResult | None, + ) -> tuple[int, int]: + if last_result is None or last_result.context_usage is None: + return 0, 0 + if last_result.context_usage.context_pct < self._config.context_compact_threshold_pct: + return 0, 0 + return await process.compact() + async def run_tasks( self, process: ReActProcess, @@ -150,17 +186,50 @@ async def run_tasks( Default implementation iterates through config.tasks sequentially. """ total_input = total_output = 0 - last_result = None + last_result: ReActResult | None = None + goal_attempt_log: list[dict[str, Any]] = [] + for task in self._config.tasks: - if last_result is not None and last_result.context_usage is not None: - if last_result.context_usage.context_pct >= self._config.context_compact_threshold_pct: - compact_in, compact_out = await process.compact() - total_input += compact_in - total_output += compact_out + cin, cout = await self._compact_if_needed(process, last_result) + total_input += cin + total_output += cout + + has_goal = task.goal is not None or task.goal_path is not None prompt = render_task_prompt(task, self._config_dir, context, self._resolver) + if has_goal: + prompt = prompt + GOAL_TASK_SUFFIX + last_result = await process.start(prompt) total_input += last_result.total_input_tokens total_output += last_result.total_output_tokens + + if has_goal: + assert self._goal_agent_builder is not None + goal_text = render_goal_text(task, self._config_dir, context, self._resolver) + outcome: GoalLoopOutcome = await run_goal_loop( + task=task, + goal_text=goal_text, + rendered_task_prompt=prompt, + worker_process=process, + initial_result=last_result, + goal_agent_builder=self._goal_agent_builder, + callbacks=self._callbacks, + phase_id=self._phase_id, + log_root=self._checkpoint_manager.root, + compact_if_needed=lambda r: self._compact_if_needed(process, r), + ) + last_result = outcome.final_result + total_input += outcome.total_input_tokens + total_output += outcome.total_output_tokens + goal_attempt_log.append( + { + "task": task.name, + "attempts": outcome.attempts, + "final_valid": True, + } + ) + + self._goal_attempt_log = goal_attempt_log return total_input, total_output def _build_agent_and_process(self, context: dict[str, Any]) -> tuple[BaseAgent[Any], ReActProcess]: @@ -207,8 +276,13 @@ async def execute(self, context: dict[str, Any]) -> PhaseOutcome: memory_text, mem_in, mem_out = await self._run_memory_step(agent, context) + extra: dict[str, Any] = {} + if self._goal_attempt_log: + extra["goal_validations"] = self._goal_attempt_log + return PhaseOutcome( memory_text=memory_text, total_input_tokens=total_input + mem_in, total_output_tokens=total_output + mem_out, + extra_checkpoint=extra, ) diff --git a/ddev/src/ddev/ai/phases/config.py b/ddev/src/ddev/ai/phases/config.py index 5c2564e19066b..e1641243a8f12 100644 --- a/ddev/src/ddev/ai/phases/config.py +++ b/ddev/src/ddev/ai/phases/config.py @@ -56,13 +56,27 @@ class TaskConfig(BaseModel): name: str prompt_path: Path | None = None prompt: str | None = None + goal: str | None = None + goal_path: Path | None = None + max_goal_attempts: int = 5 @model_validator(mode="after") - def exactly_one_source(self) -> TaskConfig: + def exactly_one_prompt_source(self) -> TaskConfig: if (self.prompt_path is None) == (self.prompt is None): raise ValueError("Exactly one of 'prompt_path' or 'prompt' must be set") return self + @model_validator(mode="after") + def goal_consistency(self) -> TaskConfig: + if self.goal is not None and self.goal_path is not None: + raise ValueError("At most one of 'goal' or 'goal_path' may be set") + has_goal = self.goal is not None or self.goal_path is not None + if not has_goal and "max_goal_attempts" in self.model_fields_set: + raise ValueError("'max_goal_attempts' may only be set when 'goal' or 'goal_path' is set") + if has_goal and self.max_goal_attempts < 1: + raise ValueError("'max_goal_attempts' must be at least 1") + return self + class CheckpointConfig(BaseModel): """Optional extra instructions for the memory step. If omitted, only a summary is written.""" @@ -177,6 +191,12 @@ def _validate_files(self, config_dir: Path) -> None: raise FlowConfigError( f"Phase {phase_id!r} task {i} ({task.name!r}): prompt_path not found: {resolved}" ) + if task.goal_path is not None: + resolved = config_dir / task.goal_path + if not resolved.exists(): + raise FlowConfigError( + f"Phase {phase_id!r} task {i} ({task.name!r}): goal_path not found: {resolved}" + ) if phase.checkpoint is not None and phase.checkpoint.memory_prompt_path is not None: resolved = config_dir / phase.checkpoint.memory_prompt_path diff --git a/ddev/src/ddev/ai/phases/goal.py b/ddev/src/ddev/ai/phases/goal.py new file mode 100644 index 0000000000000..843710a1b1a7c --- /dev/null +++ b/ddev/src/ddev/ai/phases/goal.py @@ -0,0 +1,291 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +from __future__ import annotations + +import json +from collections.abc import Awaitable, Callable +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from ddev.ai.callbacks.callbacks import Callbacks +from ddev.ai.phases.config import TaskConfig +from ddev.ai.phases.template import render_inline, render_prompt +from ddev.ai.react.process import ReActProcess +from ddev.ai.react.types import ReActResult +from ddev.ai.tools.agents.agent_logger import AgentLogger + +if TYPE_CHECKING: + from ddev.ai.agent.build import GoalAgentBuilder + +GOAL_REVIEWER_SYSTEM_PROMPT = """\ +You are a strict, independent reviewer. Your only job is to verify whether a +goal was met by another agent. You do not fix anything; you only report. + +You will receive a user message with three sections: +1. "Original task" — the prompt the worker agent was given. +2. "Goal to verify" — the specific criterion you must check. +3. "Worker summary" — the worker's own description of what it did, including + any files it created or modified, and any intentional decisions about scope. + +How to work: +- Read the relevant files yourself with the tools provided. Do not trust the + worker summary blindly — verify it. +- If the worker summary explains that an apparent gap is intentional and the + explanation is plausible and consistent with the task, accept the work as + valid. +- Be specific in your reasoning. Vague rejections are useless to the worker. + +Output contract: +- Reply with ONLY a JSON object as your final response, with no surrounding prose + and no markdown code fences. +- Schema: {"valid": , "reason": }. +- "reason" must be specific and actionable when "valid" is false. +- "reason" may be an empty string when "valid" is true. +""" + +GOAL_TASK_SUFFIX = """ + +--- +Before you finish, write a brief summary of what you did. Include: +- the files you created or modified (with absolute paths), +- any intentional decisions about scope (e.g. items deliberately excluded and why), +- anything a reviewer would need to verify your work. +Your work will be checked by an independent reviewer using only this summary +and the files you produced. +""" + +GOAL_RETRY_PROMPT_TEMPLATE = """\ +A reviewer checked your work against this goal and reported it failed: + +Goal: {goal} + +Reviewer reason: {reason} + +Address only the issue above. If you believe the reviewer is wrong, explain +why clearly in your final summary (do not silently ignore it). +""" + +GOAL_PARSE_RETRY_PROMPT = ( + "Your previous reply was not a valid JSON object matching the schema " + '{"valid": , "reason": }. Reply with only that JSON object, ' + "with no surrounding prose and no markdown code fences." +) + + +class GoalValidationError(Exception): + """Base class for goal-validation failures.""" + + +class GoalParseError(GoalValidationError): + """Reviewer failed to return valid JSON after the parse-retry.""" + + +class GoalAttemptsExhausted(GoalValidationError): + """Reviewer rejected the work on every attempt up to max_goal_attempts.""" + + +@dataclass(frozen=True) +class GoalCheckResult: + valid: bool + reason: str + input_tokens: int + output_tokens: int + + +@dataclass(frozen=True) +class GoalLoopOutcome: + final_result: ReActResult + attempts: int + total_input_tokens: int + total_output_tokens: int + + +def render_goal_text( + task: TaskConfig, + config_dir: Path, + context: dict[str, Any], + resolver: Callable[[str], str] | None, +) -> str: + """Render the goal — from file if goal_path is set, inline otherwise.""" + if task.goal_path is not None: + return render_prompt(config_dir / task.goal_path, context, resolver) + assert task.goal is not None # caller checks + return render_inline(task.goal, context, resolver) + + +def build_reviewer_user_message( + *, + rendered_task_prompt: str, + goal_text: str, + worker_summary: str, +) -> str: + return ( + f"## Original task\n{rendered_task_prompt}\n\n" + f"## Goal to verify\n{goal_text}\n\n" + f"## Worker summary\n{worker_summary}\n" + ) + + +def parse_reviewer_output(text: str) -> tuple[bool, str] | None: + """Return (valid, reason) if text parses; None if it does not.""" + stripped = text.strip() + if stripped.startswith("```"): + stripped = stripped.strip("`") + if stripped.lower().startswith("json"): + stripped = stripped[4:] + stripped = stripped.strip() + try: + obj = json.loads(stripped) + except (json.JSONDecodeError, ValueError): + return None + if not isinstance(obj, dict): + return None + valid = obj.get("valid") + reason = obj.get("reason", "") + if not isinstance(valid, bool) or not isinstance(reason, str): + return None + return valid, reason + + +async def _run_reviewer_once( + reviewer_process: ReActProcess, + user_message: str, +) -> GoalCheckResult: + """Send ``user_message`` to the reviewer and parse its JSON output. + + On parse failure, ask the reviewer once more for valid JSON. If that + second response still does not parse, raise GoalParseError. + """ + result = await reviewer_process.start(user_message) + in_tokens = result.total_input_tokens + out_tokens = result.total_output_tokens + + parsed = parse_reviewer_output(result.final_response.text or "") + if parsed is None: + retry_result = await reviewer_process.start(GOAL_PARSE_RETRY_PROMPT) + in_tokens += retry_result.total_input_tokens + out_tokens += retry_result.total_output_tokens + parsed = parse_reviewer_output(retry_result.final_response.text or "") + if parsed is None: + raise GoalParseError( + "Reviewer did not return valid JSON after one parse-retry. " + f"Last raw output: {retry_result.final_response.text!r}" + ) + + valid, reason = parsed + return GoalCheckResult(valid=valid, reason=reason, input_tokens=in_tokens, output_tokens=out_tokens) + + +async def run_goal_loop( + *, + task: TaskConfig, + goal_text: str, + rendered_task_prompt: str, + worker_process: ReActProcess, + initial_result: ReActResult, + goal_agent_builder: GoalAgentBuilder, + callbacks: Callbacks, + phase_id: str, + log_root: Path, + compact_if_needed: Callable[[ReActResult], Awaitable[tuple[int, int]]], +) -> GoalLoopOutcome: + """Drive the reviewer + worker-retry loop for a single task with a goal. + + Reviewer activity is logged to ``/goal_agent//.jsonl`` + via AgentLogger. The reviewer's ReActProcess uses only the logger's callbacks — + the phase callbacks see only the bracketing before/after_goal_check events. + """ + log_dir = log_root / "goal_agent" / phase_id + log_path = log_dir / f"{task.name}.jsonl" + try: + log_dir.mkdir(parents=True, exist_ok=True) + except OSError as e: + raise OSError(f"Goal reviewer log directory could not be created at {log_dir}: {e}") from e + + reviewer_owner_id = f"{phase_id}.goal.{task.name}" + reviewer_agent, reviewer_registry = goal_agent_builder(reviewer_owner_id) + + try: + agent_logger = AgentLogger(log_path) + except OSError as e: + raise OSError(f"Goal reviewer log could not be opened at {log_path}: {e}") from e + + try: + agent_logger.log_start( + system_prompt=GOAL_REVIEWER_SYSTEM_PROMPT, + prompt=f"", + tools=[d["name"] for d in reviewer_registry.definitions], + ) + + reviewer_process = ReActProcess( + agent=reviewer_agent, + tool_registry=reviewer_registry, + callbacks=agent_logger.build_callbacks(), + ) + + total_in = total_out = 0 + attempts = 0 + worker_result = initial_result + + try: + while True: + attempts += 1 + await callbacks.fire_before_goal_check(task.name, attempts) + + user_message = build_reviewer_user_message( + rendered_task_prompt=rendered_task_prompt, + goal_text=goal_text, + worker_summary=worker_result.final_response.text or "(no summary provided)", + ) + + if attempts > 1: + reviewer_process.reset() + + check = await _run_reviewer_once(reviewer_process, user_message) + total_in += check.input_tokens + total_out += check.output_tokens + + await callbacks.fire_after_goal_check(task.name, attempts, check.valid, check.reason) + + if check.valid: + agent_logger.log_finish( + success=True, + attempts=attempts, + total_input_tokens=total_in, + total_output_tokens=total_out, + ) + return GoalLoopOutcome( + final_result=worker_result, + attempts=attempts, + total_input_tokens=total_in, + total_output_tokens=total_out, + ) + + if attempts >= task.max_goal_attempts: + raise GoalAttemptsExhausted( + f"Task {task.name!r} failed goal validation after " + f"{attempts} attempts. Last reviewer reason: {check.reason}" + ) + + compact_in, compact_out = await compact_if_needed(worker_result) + total_in += compact_in + total_out += compact_out + + retry_prompt = GOAL_RETRY_PROMPT_TEMPLATE.format(goal=goal_text, reason=check.reason) + worker_result = await worker_process.start(retry_prompt) + total_in += worker_result.total_input_tokens + total_out += worker_result.total_output_tokens + except GoalValidationError as e: + agent_logger.log_finish( + success=False, + attempts=attempts, + total_input_tokens=total_in, + total_output_tokens=total_out, + error=f"{type(e).__name__}: {e}", + ) + raise + finally: + agent_logger.close() diff --git a/ddev/src/ddev/ai/tools/registry.py b/ddev/src/ddev/ai/tools/registry.py index 48b99e8f065da..2ff25522fea4b 100644 --- a/ddev/src/ddev/ai/tools/registry.py +++ b/ddev/src/ddev/ai/tools/registry.py @@ -71,36 +71,39 @@ class ToolSpec: ``factory`` receives the already-imported class and the shared ToolContext and returns a constructed tool instance. ``requires_subagent_builder`` marks agentic tools that need subagent wiring. + ``read_only`` marks tools that only inspect state and never mutate it. """ module: str cls: str factory: Callable[[type, ToolContext], ToolProtocol] = _plain_factory requires_subagent_builder: bool = False + read_only: bool = False TOOL_MANIFEST: dict[str, ToolSpec] = { - "read_file": ToolSpec("fs.read_file", "ReadFileTool", factory=_file_registry_factory), - "create_file": ToolSpec("fs.create_file", "CreateFileTool", factory=_file_registry_factory), - "edit_file": ToolSpec("fs.edit_file", "EditFileTool", factory=_file_registry_factory), - "append_file": ToolSpec("fs.append_file", "AppendFileTool", factory=_file_registry_factory), - "grep": ToolSpec("shell.grep", "GrepTool", factory=_file_policy_factory), - "list_files": ToolSpec("shell.list_files", "ListFilesTool"), - "mkdir": ToolSpec("fs.mkdir", "MkdirTool", factory=_file_policy_factory), - "http_get": ToolSpec("http.http_get", "HttpGetTool"), - "ddev_create": ToolSpec("shell.ddev.create", "DdevCreateTool"), - "ddev_test": ToolSpec("shell.ddev.ddev_test", "DdevTestTool"), - "ddev_env_show": ToolSpec("shell.ddev.env_show", "DdevEnvShowTool"), - "ddev_env_start": ToolSpec("shell.ddev.env_start", "DdevEnvStartTool"), - "ddev_env_stop": ToolSpec("shell.ddev.env_stop", "DdevEnvStopTool"), - "ddev_env_test": ToolSpec("shell.ddev.env_test", "DdevEnvTestTool"), - "ddev_release_changelog": ToolSpec("shell.ddev.release_changelog", "DdevReleaseChangelogTool"), - "ddev_validate": ToolSpec("shell.ddev.validate", "DdevValidateTool"), + "read_file": ToolSpec("fs.read_file", "ReadFileTool", factory=_file_registry_factory, read_only=True), + "create_file": ToolSpec("fs.create_file", "CreateFileTool", factory=_file_registry_factory, read_only=False), + "edit_file": ToolSpec("fs.edit_file", "EditFileTool", factory=_file_registry_factory, read_only=False), + "append_file": ToolSpec("fs.append_file", "AppendFileTool", factory=_file_registry_factory, read_only=False), + "grep": ToolSpec("shell.grep", "GrepTool", factory=_file_policy_factory, read_only=True), + "list_files": ToolSpec("shell.list_files", "ListFilesTool", read_only=True), + "mkdir": ToolSpec("fs.mkdir", "MkdirTool", factory=_file_policy_factory, read_only=False), + "http_get": ToolSpec("http.http_get", "HttpGetTool", read_only=True), + "ddev_create": ToolSpec("shell.ddev.create", "DdevCreateTool", read_only=False), + "ddev_test": ToolSpec("shell.ddev.ddev_test", "DdevTestTool", read_only=False), + "ddev_env_show": ToolSpec("shell.ddev.env_show", "DdevEnvShowTool", read_only=False), + "ddev_env_start": ToolSpec("shell.ddev.env_start", "DdevEnvStartTool", read_only=False), + "ddev_env_stop": ToolSpec("shell.ddev.env_stop", "DdevEnvStopTool", read_only=False), + "ddev_env_test": ToolSpec("shell.ddev.env_test", "DdevEnvTestTool", read_only=False), + "ddev_release_changelog": ToolSpec("shell.ddev.release_changelog", "DdevReleaseChangelogTool", read_only=False), + "ddev_validate": ToolSpec("shell.ddev.validate", "DdevValidateTool", read_only=False), "spawn_subagent": ToolSpec( "agents.spawn_subagent", "SpawnSubagentTool", factory=_spawn_subagent_factory, requires_subagent_builder=True, + read_only=False, ), } @@ -159,3 +162,15 @@ async def run(self, name: str, raw: dict[str, object]) -> ToolResult: if tool is None: return ToolResult(success=False, error=f"Unknown tool: {name!r}") return await tool.run(raw) + + +def filter_read_only(tool_names: list[str]) -> list[str]: + """Return only the names whose ToolSpec has read_only=True. Unknown names raise.""" + out: list[str] = [] + for name in tool_names: + spec = TOOL_MANIFEST.get(name) + if spec is None: + raise ValueError(f"Unknown tool name: {name!r}") + if spec.read_only: + out.append(name) + return out diff --git a/ddev/tests/ai/agent/test_build.py b/ddev/tests/ai/agent/test_build.py index 2ee6a42762bf6..f996da6ee0da2 100644 --- a/ddev/tests/ai/agent/test_build.py +++ b/ddev/tests/ai/agent/test_build.py @@ -6,9 +6,16 @@ import pytest -from ddev.ai.agent.anthropic_client import AnthropicAgent -from ddev.ai.agent.build import build_agent, build_subagent, make_agent_builder, make_subagent_builder +from ddev.ai.agent.anthropic_client import DEFAULT_MAX_TOKENS, DEFAULT_MODEL, AnthropicAgent +from ddev.ai.agent.build import ( + build_agent, + build_subagent, + make_agent_builder, + make_goal_agent_builder, + make_subagent_builder, +) from ddev.ai.phases.config import AgentConfig +from ddev.ai.phases.goal import GOAL_REVIEWER_SYSTEM_PROMPT from ddev.ai.tools.fs.file_access_policy import FileAccessPolicy from ddev.ai.tools.fs.file_registry import FileRegistry from ddev.ai.tools.registry import ToolRegistry @@ -128,3 +135,50 @@ def test_make_subagent_builder(file_registry, clients): agent, registry = builder("sys", "sub-1", []) assert isinstance(agent, AnthropicAgent) assert agent.name == "sub-1" + + +# --------------------------------------------------------------------------- +# make_goal_agent_builder +# --------------------------------------------------------------------------- + + +def test_goal_agent_builder_unknown_provider_raises(file_registry, clients): + config = AgentConfig.model_construct(provider="bad_provider", tools=[]) + builder = make_goal_agent_builder(config, clients, file_registry) + with pytest.raises(ValueError, match="Unknown agent provider: 'bad_provider'"): + builder("phase.goal.task") + + +def test_make_goal_agent_builder_filters_read_only_tools(file_registry, clients): + config = AgentConfig( + provider="anthropic", + tools=["read_file", "edit_file", "grep", "create_file"], + ) + builder = make_goal_agent_builder(config, clients, file_registry) + agent, registry = builder("phase.goal.task") + + assert isinstance(agent, AnthropicAgent) + assert agent.name == "phase.goal.task" + tool_names = {d["name"] for d in registry.definitions} + assert tool_names == {"read_file", "grep"} + + +def test_make_goal_agent_builder_uses_default_model(file_registry, clients): + config = AgentConfig( + provider="anthropic", + tools=["read_file"], + model="claude-opus-4-7", + max_tokens=999, + ) + builder = make_goal_agent_builder(config, clients, file_registry) + agent, _ = builder("phase.goal.task") + + assert agent._model == DEFAULT_MODEL + assert agent._max_tokens == DEFAULT_MAX_TOKENS + + +def test_make_goal_agent_builder_uses_reviewer_system_prompt(file_registry, clients): + config = AgentConfig(provider="anthropic", tools=["read_file"]) + builder = make_goal_agent_builder(config, clients, file_registry) + agent, _ = builder("phase.goal.task") + assert agent._system_prompt == GOAL_REVIEWER_SYSTEM_PROMPT diff --git a/ddev/tests/ai/callbacks/test_callbacks.py b/ddev/tests/ai/callbacks/test_callbacks.py index 79a4ae151fb8e..9c61a6bc12dbb 100644 --- a/ddev/tests/ai/callbacks/test_callbacks.py +++ b/ddev/tests/ai/callbacks/test_callbacks.py @@ -449,6 +449,52 @@ async def good(iteration: int) -> None: assert fired == [True] +# --------------------------------------------------------------------------- +# on_before_goal_check and on_after_goal_check +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("event", ["before", "after"], ids=["before", "after"]) +async def test_goal_check_callbacks_register_fire_and_swallow_exceptions(event): + cb = CallbackSet() + fired: list = [] + + decorator = cb.on_before_goal_check if event == "before" else cb.on_after_goal_check + + @decorator + async def bad(*args): + raise RuntimeError("boom") + + @decorator + async def good(*args): + fired.append(args) + + if event == "before": + await cb.fire_before_goal_check("task-x", 3) + assert fired == [("task-x", 3)] + else: + await cb.fire_after_goal_check("task-x", 3, False, "missing y") + assert fired == [("task-x", 3, False, "missing y")] + + +async def test_callbacks_dispatches_goal_check_to_all_sets(): + s1, s2 = CallbackSet(), CallbackSet() + fired: list = [] + + @s1.on_before_goal_check + async def h1(name, attempt): + fired.append(("s1", name, attempt)) + + @s2.on_after_goal_check + async def h2(name, attempt, valid, reason): + fired.append(("s2", name, attempt, valid, reason)) + + cb = Callbacks([s1, s2]) + await cb.fire_before_goal_check("t", 1) + await cb.fire_after_goal_check("t", 1, True, "") + assert fired == [("s1", "t", 1), ("s2", "t", 1, True, "")] + + # --------------------------------------------------------------------------- # Callbacks container # --------------------------------------------------------------------------- diff --git a/ddev/tests/ai/phases/conftest.py b/ddev/tests/ai/phases/conftest.py index 96149455e1385..9923604c37af1 100644 --- a/ddev/tests/ai/phases/conftest.py +++ b/ddev/tests/ai/phases/conftest.py @@ -119,6 +119,7 @@ def make_agent_phase( context_compact_threshold_pct: int = 80, callbacks=None, captured_agent_kwargs: dict[str, Any] | None = None, + goal_agent_builder=None, ) -> tuple[AgenticPhase, CheckpointManager]: """Build an AgenticPhase ready for process_message-driven tests. @@ -144,6 +145,7 @@ def make_agent_phase( config_dir=flow_dir, file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), callbacks=callbacks, + goal_agent_builder=goal_agent_builder, ) phase.queue = message_queue return phase, checkpoint_manager diff --git a/ddev/tests/ai/phases/test_agentic_phase.py b/ddev/tests/ai/phases/test_agentic_phase.py index 36d4701b5b353..09c41aee50316 100644 --- a/ddev/tests/ai/phases/test_agentic_phase.py +++ b/ddev/tests/ai/phases/test_agentic_phase.py @@ -464,3 +464,107 @@ def agent_builder_fn( assert log_file.exists() events = {e["event"] for e in read_jsonl(log_file)} assert {"start", "finish"} <= events + + +# --------------------------------------------------------------------------- +# Goal validation integration tests +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "tasks,expect_builder", + [ + ([TaskConfig(name="t1", prompt="x")], False), + ([TaskConfig(name="t1", prompt="x", goal="verify")], True), + ([TaskConfig(name="t1", prompt="x"), TaskConfig(name="t2", prompt="y", goal="verify")], True), + ], + ids=["no_goal", "single_goal", "mixed"], +) +def test_extra_init_kwargs_creates_goal_agent_builder_when_any_task_has_goal( + flow_dir, + tasks, + expect_builder, +): + kwargs = AgenticPhase.extra_init_kwargs( + phase_id="p1", + phase_config=PhaseConfig(agent="writer", tasks=tasks), + agents={"writer": AgentConfig()}, + agent_clients={}, + file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + ) + assert (kwargs["goal_agent_builder"] is not None) is expect_builder + + +async def test_phase_with_goal_passes_first_attempt(flow_dir, monkeypatch, message_queue): + worker = MockAgent( + [ + make_response("worker did the work", 100, 50), + make_response("phase summary", 10, 5), + ] + ) + reviewer_responses = [make_response('{"valid": true, "reason": ""}', 7, 3)] + + captured_builder_calls: list = [] + + def goal_builder(owner_id): + captured_builder_calls.append(owner_id) + agent = MockAgent(list(reviewer_responses)) + return agent, ToolRegistry([]) + + phase, mgr = make_agent_phase( + flow_dir, + worker, + monkeypatch, + message_queue, + tasks=[TaskConfig(name="t1", prompt="Do it.", goal="verify it")], + goal_agent_builder=goal_builder, + ) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + cp = mgr.read()["p1"] + assert cp["status"] == "success" + assert cp["goal_validations"] == [{"task": "t1", "attempts": 1, "final_valid": True}] + assert worker.send_calls[0].startswith("Do it.") + assert "independent reviewer" in worker.send_calls[0] + assert cp["tokens"] == {"total_input": 100 + 7 + 10, "total_output": 50 + 3 + 5} + assert captured_builder_calls == ["p1.goal.t1"] + + log_file = mgr.root / "goal_agent" / "p1" / "t1.jsonl" + assert log_file.exists() + events = {e["event"] for e in read_jsonl(log_file)} + assert {"start", "finish"} <= events + + +async def test_phase_with_goal_exhausts_attempts_fails_phase(flow_dir, monkeypatch, message_queue): + worker = MockAgent( + [ + make_response("attempt 1", 0, 0), + make_response("attempt 2", 0, 0), + ] + ) + + def goal_builder(owner_id): + agent = MockAgent( + [ + make_response('{"valid": false, "reason": "first miss"}', 0, 0), + make_response('{"valid": false, "reason": "second miss"}', 0, 0), + ] + ) + return agent, ToolRegistry([]) + + phase, mgr = make_agent_phase( + flow_dir, + worker, + monkeypatch, + message_queue, + tasks=[TaskConfig(name="t1", prompt="Do it.", goal="g", max_goal_attempts=2)], + goal_agent_builder=goal_builder, + ) + + from ddev.ai.phases.goal import GoalAttemptsExhausted + + with pytest.raises(GoalAttemptsExhausted): + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert mgr.read() == {} diff --git a/ddev/tests/ai/phases/test_config.py b/ddev/tests/ai/phases/test_config.py index 79deda7b989b5..0e34b611a4af6 100644 --- a/ddev/tests/ai/phases/test_config.py +++ b/ddev/tests/ai/phases/test_config.py @@ -46,6 +46,35 @@ def test_task_config_extra_field_raises(): TaskConfig(name="t1", prompt="Do it.", unknown_field="x") +@pytest.mark.parametrize( + "kwargs,match", + [ + ({"name": "t", "prompt": "x", "goal": "g", "goal_path": "g.md"}, "At most one of 'goal' or 'goal_path'"), + ({"name": "t", "prompt": "x", "max_goal_attempts": 3}, "'max_goal_attempts' may only be set"), + ({"name": "t", "prompt": "x", "goal": "g", "max_goal_attempts": 0}, "must be at least 1"), + ], + ids=["both_goal_sources", "attempts_without_goal", "attempts_below_one"], +) +def test_task_config_goal_validation_rejects(kwargs, match): + with pytest.raises(ValidationError, match=match): + TaskConfig(**kwargs) + + +@pytest.mark.parametrize( + "kwargs", + [ + {"name": "t", "prompt": "x", "goal": "verify it"}, + {"name": "t", "prompt": "x", "goal_path": "g.md"}, + {"name": "t", "prompt": "x", "goal": "verify it", "max_goal_attempts": 7}, + ], + ids=["goal", "goal_path", "goal_with_custom_attempts"], +) +def test_task_config_goal_accepts_valid(kwargs): + tc = TaskConfig(**kwargs) + assert (tc.goal is not None) ^ (tc.goal_path is not None) + assert tc.max_goal_attempts == kwargs.get("max_goal_attempts", 5) + + # --------------------------------------------------------------------------- # CheckpointConfig # --------------------------------------------------------------------------- @@ -316,6 +345,31 @@ def test_from_yaml_missing_file(tmp_path): FlowConfig.from_yaml(tmp_path / "nonexistent.yaml", tmp_path) +def test_from_yaml_missing_task_goal_path(tmp_path): + prompts_dir = tmp_path / "prompts" + prompts_dir.mkdir() + (prompts_dir / "writer.md").write_text("system prompt") + flow_yaml = tmp_path / "flow.yaml" + flow_yaml.write_text( + """\ +agents: + writer: + tools: [] +phases: + p1: + agent: writer + tasks: + - name: t1 + prompt: "Do it." + goal_path: prompts/goal.md +flow: + - phase: p1 +""" + ) + with pytest.raises(FlowConfigError, match="goal_path not found"): + FlowConfig.from_yaml(flow_yaml, tmp_path) + + # --------------------------------------------------------------------------- # FlowConfig cycle detection via model_validate # --------------------------------------------------------------------------- diff --git a/ddev/tests/ai/phases/test_goal.py b/ddev/tests/ai/phases/test_goal.py new file mode 100644 index 0000000000000..cf042d6b4daec --- /dev/null +++ b/ddev/tests/ai/phases/test_goal.py @@ -0,0 +1,359 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +import json + +import pytest + +from ddev.ai.callbacks.callbacks import Callbacks, CallbackSet +from ddev.ai.phases.config import TaskConfig +from ddev.ai.phases.goal import ( + GoalAttemptsExhausted, + GoalParseError, + build_reviewer_user_message, + parse_reviewer_output, + render_goal_text, + run_goal_loop, +) +from ddev.ai.react.process import ReActProcess +from ddev.ai.react.types import ReActResult +from ddev.ai.tools.registry import ToolRegistry + +from .conftest import MockAgent, make_response, resolve_key + +# --------------------------------------------------------------------------- +# parse_reviewer_output +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "text,expected", + [ + ('{"valid": true, "reason": ""}', (True, "")), + ('{"valid": false, "reason": "missing metric x"}', (False, "missing metric x")), + (' {"valid": true, "reason": "ok"} ', (True, "ok")), + ('```json\n{"valid": true, "reason": ""}\n```', (True, "")), + ('```\n{"valid": false, "reason": "no"}\n```', (False, "no")), + ], + ids=["plain_true", "plain_false", "whitespace", "fenced_json", "fenced_plain"], +) +def test_parse_reviewer_output_accepts(text, expected): + assert parse_reviewer_output(text) == expected + + +@pytest.mark.parametrize( + "text", + [ + "", + "not json at all", + '{"valid": "yes", "reason": "x"}', + '{"valid": true, "reason": 42}', + '{"reason": "x"}', + '["valid", true]', + ], + ids=["empty", "prose", "valid_not_bool", "reason_not_str", "missing_valid", "not_object"], +) +def test_parse_reviewer_output_rejects(text): + assert parse_reviewer_output(text) is None + + +# --------------------------------------------------------------------------- +# render_goal_text +# --------------------------------------------------------------------------- + + +def test_render_goal_text_inline_and_path(tmp_path): + inline = render_goal_text( + TaskConfig(name="t", prompt="x", goal="check ${name}"), + tmp_path, + {"name": "Alice"}, + None, + ) + assert inline == "check Alice" + + (tmp_path / "goal.md").write_text("verify ${target}") + from_file = render_goal_text( + TaskConfig(name="t", prompt="x", goal_path="goal.md"), + tmp_path, + {"target": "endpoint"}, + None, + ) + assert from_file == "verify endpoint" + + +def test_render_goal_text_forwards_resolver(tmp_path): + result = render_goal_text( + TaskConfig(name="t", prompt="x", goal="see ${draft_memory}"), + tmp_path, + {}, + resolve_key, + ) + assert result == "see resolved(draft_memory)" + + +# --------------------------------------------------------------------------- +# build_reviewer_user_message +# --------------------------------------------------------------------------- + + +def test_build_reviewer_user_message_sections(): + msg = build_reviewer_user_message( + rendered_task_prompt="TASK", + goal_text="GOAL", + worker_summary="SUMMARY", + ) + assert "## Original task\nTASK" in msg + assert "## Goal to verify\nGOAL" in msg + assert "## Worker summary\nSUMMARY" in msg + + +# --------------------------------------------------------------------------- +# Helpers used by run_goal_loop tests +# --------------------------------------------------------------------------- + + +def _make_worker_process(responses): + """ReActProcess wired to a MockAgent — used as the worker.""" + agent = MockAgent(list(responses)) + return ReActProcess(agent=agent, tool_registry=ToolRegistry([])), agent + + +def _reviewer_builder(responses): + """A GoalAgentBuilder that returns a fresh MockAgent + empty ToolRegistry.""" + agent = MockAgent(list(responses)) + calls: list[str] = [] + + def builder(owner_id): + calls.append(owner_id) + return agent, ToolRegistry([]) + + return builder, calls, agent + + +async def _noop_compact(_): + return 0, 0 + + +# --------------------------------------------------------------------------- +# run_goal_loop +# --------------------------------------------------------------------------- + + +async def test_run_goal_loop_passes_on_first_attempt(tmp_path): + worker_process, worker_agent = _make_worker_process([]) + initial_result = ReActResult( + final_response=make_response("did things"), + iterations=1, + total_input_tokens=100, + total_output_tokens=50, + context_usage=None, + ) + builder, builder_calls, _ = _reviewer_builder([make_response('{"valid": true, "reason": ""}', 20, 10)]) + + outcome = await run_goal_loop( + task=TaskConfig(name="t1", prompt="x", goal="verify"), + goal_text="verify", + rendered_task_prompt="TASK", + worker_process=worker_process, + initial_result=initial_result, + goal_agent_builder=builder, + callbacks=Callbacks(), + phase_id="p1", + log_root=tmp_path, + compact_if_needed=_noop_compact, + ) + + assert outcome.attempts == 1 + assert outcome.total_input_tokens == 20 + assert outcome.total_output_tokens == 10 + assert outcome.final_result is initial_result + assert worker_agent.send_calls == [] + assert builder_calls == ["p1.goal.t1"] + + log_path = tmp_path / "goal_agent" / "p1" / "t1.jsonl" + assert log_path.exists() + events = {json.loads(line)["event"] for line in log_path.read_text().splitlines() if line.strip()} + assert {"start", "finish"} <= events + + +async def test_run_goal_loop_one_retry_then_pass(tmp_path): + worker_process, worker_agent = _make_worker_process([make_response("fixed it", 30, 15)]) + initial_result = ReActResult( + final_response=make_response("initial work"), + iterations=1, + total_input_tokens=100, + total_output_tokens=50, + context_usage=None, + ) + builder, _, reviewer_agent = _reviewer_builder( + [ + make_response('{"valid": false, "reason": "missing X"}', 20, 10), + make_response('{"valid": true, "reason": ""}', 25, 12), + ] + ) + + outcome = await run_goal_loop( + task=TaskConfig(name="t1", prompt="x", goal="g"), + goal_text="g", + rendered_task_prompt="TASK", + worker_process=worker_process, + initial_result=initial_result, + goal_agent_builder=builder, + callbacks=Callbacks(), + phase_id="p1", + log_root=tmp_path, + compact_if_needed=_noop_compact, + ) + + assert outcome.attempts == 2 + assert outcome.final_result.final_response.text == "fixed it" + assert len(worker_agent.send_calls) == 1 + assert "missing X" in worker_agent.send_calls[0] + assert "g" in worker_agent.send_calls[0] + assert len(reviewer_agent.send_calls) == 2 + assert outcome.total_input_tokens == 20 + 25 + 30 + assert outcome.total_output_tokens == 10 + 12 + 15 + + +async def test_run_goal_loop_exhausts_attempts(tmp_path): + worker_process, _ = _make_worker_process([make_response("attempt 2", 10, 5)]) + initial_result = ReActResult( + final_response=make_response("attempt 1"), + iterations=1, + total_input_tokens=10, + total_output_tokens=5, + context_usage=None, + ) + builder, _, _ = _reviewer_builder( + [ + make_response('{"valid": false, "reason": "first miss"}', 5, 5), + make_response('{"valid": false, "reason": "second miss"}', 5, 5), + ] + ) + + with pytest.raises(GoalAttemptsExhausted, match="2 attempts"): + await run_goal_loop( + task=TaskConfig(name="t1", prompt="x", goal="g", max_goal_attempts=2), + goal_text="g", + rendered_task_prompt="TASK", + worker_process=worker_process, + initial_result=initial_result, + goal_agent_builder=builder, + callbacks=Callbacks(), + phase_id="p1", + log_root=tmp_path, + compact_if_needed=_noop_compact, + ) + + +async def test_run_goal_loop_parse_retry_succeeds(tmp_path): + worker_process, _ = _make_worker_process([]) + initial_result = ReActResult( + final_response=make_response("done"), + iterations=1, + total_input_tokens=0, + total_output_tokens=0, + context_usage=None, + ) + builder, _, reviewer_agent = _reviewer_builder( + [ + make_response("not json", 5, 5), + make_response('{"valid": true, "reason": ""}', 7, 7), + ] + ) + + outcome = await run_goal_loop( + task=TaskConfig(name="t1", prompt="x", goal="g"), + goal_text="g", + rendered_task_prompt="TASK", + worker_process=worker_process, + initial_result=initial_result, + goal_agent_builder=builder, + callbacks=Callbacks(), + phase_id="p1", + log_root=tmp_path, + compact_if_needed=_noop_compact, + ) + assert outcome.attempts == 1 + assert len(reviewer_agent.send_calls) == 2 + assert outcome.total_input_tokens == 5 + 7 + assert outcome.total_output_tokens == 5 + 7 + + +async def test_run_goal_loop_parse_retry_fails_raises(tmp_path): + worker_process, _ = _make_worker_process([]) + initial_result = ReActResult( + final_response=make_response("done"), + iterations=1, + total_input_tokens=0, + total_output_tokens=0, + context_usage=None, + ) + builder, _, _ = _reviewer_builder( + [ + make_response("not json", 5, 5), + make_response("still not json", 5, 5), + ] + ) + + with pytest.raises(GoalParseError): + await run_goal_loop( + task=TaskConfig(name="t1", prompt="x", goal="g"), + goal_text="g", + rendered_task_prompt="TASK", + worker_process=worker_process, + initial_result=initial_result, + goal_agent_builder=builder, + callbacks=Callbacks(), + phase_id="p1", + log_root=tmp_path, + compact_if_needed=_noop_compact, + ) + + +async def test_run_goal_loop_fires_callbacks(tmp_path): + events: list = [] + cb_set = CallbackSet() + + @cb_set.on_before_goal_check + async def _before(task_name, attempt): + events.append(("before", task_name, attempt)) + + @cb_set.on_after_goal_check + async def _after(task_name, attempt, valid, reason): + events.append(("after", task_name, attempt, valid, reason)) + + worker_process, _ = _make_worker_process([make_response("attempt 2", 0, 0)]) + initial_result = ReActResult( + final_response=make_response("attempt 1"), + iterations=1, + total_input_tokens=0, + total_output_tokens=0, + context_usage=None, + ) + builder, _, _ = _reviewer_builder( + [ + make_response('{"valid": false, "reason": "fix X"}', 0, 0), + make_response('{"valid": true, "reason": ""}', 0, 0), + ] + ) + + await run_goal_loop( + task=TaskConfig(name="t1", prompt="x", goal="g"), + goal_text="g", + rendered_task_prompt="TASK", + worker_process=worker_process, + initial_result=initial_result, + goal_agent_builder=builder, + callbacks=Callbacks([cb_set]), + phase_id="p1", + log_root=tmp_path, + compact_if_needed=_noop_compact, + ) + assert events == [ + ("before", "t1", 1), + ("after", "t1", 1, False, "fix X"), + ("before", "t1", 2), + ("after", "t1", 2, True, ""), + ] diff --git a/ddev/tests/ai/tools/test_registry.py b/ddev/tests/ai/tools/test_registry.py index a8e3f4f40ea3e..2f6e33d3ea3e4 100644 --- a/ddev/tests/ai/tools/test_registry.py +++ b/ddev/tests/ai/tools/test_registry.py @@ -7,7 +7,7 @@ from ddev.ai.tools.core.types import ToolResult from ddev.ai.tools.fs.file_access_policy import FileAccessPolicy from ddev.ai.tools.fs.file_registry import FileRegistry -from ddev.ai.tools.registry import ToolRegistry +from ddev.ai.tools.registry import ToolRegistry, filter_read_only # --------------------------------------------------------------------------- # Fake tools — implement ToolProtocol without depending on BaseTool @@ -210,6 +210,30 @@ def test_from_names_fs_tools_share_file_registry(tmp_path): assert all(r is registries[0] for r in registries) +# --------------------------------------------------------------------------- +# read_only manifest annotations and filter_read_only +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "input_names,expected", + [ + (["read_file", "edit_file", "grep", "create_file"], ["read_file", "grep"]), + (["edit_file", "create_file"], []), + ([], []), + (["read_file", "list_files"], ["read_file", "list_files"]), + ], + ids=["mixed", "all_writes", "empty", "all_reads"], +) +def test_filter_read_only(input_names, expected): + assert filter_read_only(input_names) == expected + + +def test_filter_read_only_unknown_name_raises(): + with pytest.raises(ValueError, match="Unknown tool name"): + filter_read_only(["read_file", "teleport"]) + + def test_from_names_reuses_supplied_file_registry(tmp_path): """Multiple ToolRegistries can share one FileRegistry; tools carry their own owner_id.""" shared = FileRegistry(policy=FileAccessPolicy(write_root=tmp_path)) From 776f7106162218341730b96e4a76a43f42f8a40c Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Tue, 26 May 2026 18:21:40 +0200 Subject: [PATCH 02/10] Little nits --- ddev/src/ddev/ai/phases/agentic_phase.py | 3 ++- ddev/src/ddev/ai/phases/goal.py | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/ddev/src/ddev/ai/phases/agentic_phase.py b/ddev/src/ddev/ai/phases/agentic_phase.py index 4210c1e477eae..ab7fec093430f 100644 --- a/ddev/src/ddev/ai/phases/agentic_phase.py +++ b/ddev/src/ddev/ai/phases/agentic_phase.py @@ -204,7 +204,8 @@ async def run_tasks( total_output += last_result.total_output_tokens if has_goal: - assert self._goal_agent_builder is not None + if self._goal_agent_builder is None: + raise ValueError("Goal agent builder is required when tasks specify a goal") goal_text = render_goal_text(task, self._config_dir, context, self._resolver) outcome: GoalLoopOutcome = await run_goal_loop( task=task, diff --git a/ddev/src/ddev/ai/phases/goal.py b/ddev/src/ddev/ai/phases/goal.py index 843710a1b1a7c..a4cc823916444 100644 --- a/ddev/src/ddev/ai/phases/goal.py +++ b/ddev/src/ddev/ai/phases/goal.py @@ -34,8 +34,8 @@ - Read the relevant files yourself with the tools provided. Do not trust the worker summary blindly — verify it. - If the worker summary explains that an apparent gap is intentional and the - explanation is plausible and consistent with the task, accept the work as - valid. + explanation is plausible and consistent with the task, accept that specific + gap as valid. - Be specific in your reasoning. Vague rejections are useless to the worker. Output contract: From 9e508fbf786be6a8e44822a514c4e2c60e0559b3 Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Wed, 27 May 2026 12:34:57 +0200 Subject: [PATCH 03/10] Write goal_attempt_log in checkpoints even if the validation failed --- ddev/src/ddev/ai/phases/agentic_phase.py | 40 ++++++++++------- ddev/tests/ai/phases/test_agentic_phase.py | 50 ++++++++++++++++++++++ 2 files changed, 74 insertions(+), 16 deletions(-) diff --git a/ddev/src/ddev/ai/phases/agentic_phase.py b/ddev/src/ddev/ai/phases/agentic_phase.py index ab7fec093430f..9e219f75279b4 100644 --- a/ddev/src/ddev/ai/phases/agentic_phase.py +++ b/ddev/src/ddev/ai/phases/agentic_phase.py @@ -18,7 +18,7 @@ from ddev.ai.phases.base import Phase, PhaseOutcome from ddev.ai.phases.checkpoint import CheckpointManager from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig -from ddev.ai.phases.goal import GOAL_TASK_SUFFIX, render_goal_text, run_goal_loop +from ddev.ai.phases.goal import GOAL_TASK_SUFFIX, GoalAttemptsExhausted, render_goal_text, run_goal_loop from ddev.ai.phases.template import render_inline, render_prompt from ddev.ai.react.process import ReActProcess from ddev.ai.tools.fs.file_registry import FileRegistry @@ -187,7 +187,6 @@ async def run_tasks( """ total_input = total_output = 0 last_result: ReActResult | None = None - goal_attempt_log: list[dict[str, Any]] = [] for task in self._config.tasks: cin, cout = await self._compact_if_needed(process, last_result) @@ -207,22 +206,32 @@ async def run_tasks( if self._goal_agent_builder is None: raise ValueError("Goal agent builder is required when tasks specify a goal") goal_text = render_goal_text(task, self._config_dir, context, self._resolver) - outcome: GoalLoopOutcome = await run_goal_loop( - task=task, - goal_text=goal_text, - rendered_task_prompt=prompt, - worker_process=process, - initial_result=last_result, - goal_agent_builder=self._goal_agent_builder, - callbacks=self._callbacks, - phase_id=self._phase_id, - log_root=self._checkpoint_manager.root, - compact_if_needed=lambda r: self._compact_if_needed(process, r), - ) + try: + outcome: GoalLoopOutcome = await run_goal_loop( + task=task, + goal_text=goal_text, + rendered_task_prompt=prompt, + worker_process=process, + initial_result=last_result, + goal_agent_builder=self._goal_agent_builder, + callbacks=self._callbacks, + phase_id=self._phase_id, + log_root=self._checkpoint_manager.root, + compact_if_needed=lambda r: self._compact_if_needed(process, r), + ) + except GoalAttemptsExhausted: + self._goal_attempt_log.append( + { + "task": task.name, + "attempts": task.max_goal_attempts, + "final_valid": False, + } + ) + raise last_result = outcome.final_result total_input += outcome.total_input_tokens total_output += outcome.total_output_tokens - goal_attempt_log.append( + self._goal_attempt_log.append( { "task": task.name, "attempts": outcome.attempts, @@ -230,7 +239,6 @@ async def run_tasks( } ) - self._goal_attempt_log = goal_attempt_log return total_input, total_output def _build_agent_and_process(self, context: dict[str, Any]) -> tuple[BaseAgent[Any], ReActProcess]: diff --git a/ddev/tests/ai/phases/test_agentic_phase.py b/ddev/tests/ai/phases/test_agentic_phase.py index 09c41aee50316..3246fa098169c 100644 --- a/ddev/tests/ai/phases/test_agentic_phase.py +++ b/ddev/tests/ai/phases/test_agentic_phase.py @@ -568,3 +568,53 @@ def goal_builder(owner_id): await phase.process_message(PhaseTrigger(id="start", phase_id=None)) assert mgr.read() == {} + assert phase._goal_attempt_log == [{"task": "t1", "attempts": 2, "final_valid": False}] + + +async def test_phase_goal_partial_progress_preserved_on_exhaustion(flow_dir, monkeypatch, message_queue): + """When task 1 passes goal validation and task 2 exhausts attempts, both entries are logged.""" + worker = MockAgent( + [ + make_response("t1 done", 0, 0), + make_response("t2 attempt 1", 0, 0), + make_response("t2 attempt 2", 0, 0), + ] + ) + + call_count = 0 + + def goal_builder(owner_id): + nonlocal call_count + call_count += 1 + if call_count == 1: + agent = MockAgent([make_response('{"valid": true, "reason": ""}', 0, 0)]) + else: + agent = MockAgent( + [ + make_response('{"valid": false, "reason": "miss 1"}', 0, 0), + make_response('{"valid": false, "reason": "miss 2"}', 0, 0), + ] + ) + return agent, ToolRegistry([]) + + phase, _ = make_agent_phase( + flow_dir, + worker, + monkeypatch, + message_queue, + tasks=[ + TaskConfig(name="t1", prompt="First.", goal="check t1", max_goal_attempts=2), + TaskConfig(name="t2", prompt="Second.", goal="check t2", max_goal_attempts=2), + ], + goal_agent_builder=goal_builder, + ) + + from ddev.ai.phases.goal import GoalAttemptsExhausted + + with pytest.raises(GoalAttemptsExhausted): + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert phase._goal_attempt_log == [ + {"task": "t1", "attempts": 1, "final_valid": True}, + {"task": "t2", "attempts": 2, "final_valid": False}, + ] From f321c461f4e935852bb7f7f7309ec629d37065fd Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Wed, 27 May 2026 12:46:48 +0200 Subject: [PATCH 04/10] Count tokens when GoalParseError is raised --- ddev/src/ddev/ai/phases/goal.py | 12 +++++++++++- ddev/tests/ai/phases/test_goal.py | 18 +++++++++++++++--- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/ddev/src/ddev/ai/phases/goal.py b/ddev/src/ddev/ai/phases/goal.py index a4cc823916444..ac1933d2bf071 100644 --- a/ddev/src/ddev/ai/phases/goal.py +++ b/ddev/src/ddev/ai/phases/goal.py @@ -82,6 +82,11 @@ class GoalValidationError(Exception): class GoalParseError(GoalValidationError): """Reviewer failed to return valid JSON after the parse-retry.""" + def __init__(self, message: str, input_tokens: int = 0, output_tokens: int = 0) -> None: + super().__init__(message) + self.input_tokens = input_tokens + self.output_tokens = output_tokens + class GoalAttemptsExhausted(GoalValidationError): """Reviewer rejected the work on every attempt up to max_goal_attempts.""" @@ -172,7 +177,9 @@ async def _run_reviewer_once( if parsed is None: raise GoalParseError( "Reviewer did not return valid JSON after one parse-retry. " - f"Last raw output: {retry_result.final_response.text!r}" + f"Last raw output: {retry_result.final_response.text!r}", + input_tokens=in_tokens, + output_tokens=out_tokens, ) valid, reason = parsed @@ -279,6 +286,9 @@ async def run_goal_loop( total_in += worker_result.total_input_tokens total_out += worker_result.total_output_tokens except GoalValidationError as e: + if isinstance(e, GoalParseError): + total_in += e.input_tokens + total_out += e.output_tokens agent_logger.log_finish( success=False, attempts=attempts, diff --git a/ddev/tests/ai/phases/test_goal.py b/ddev/tests/ai/phases/test_goal.py index cf042d6b4daec..e2d5b79d77591 100644 --- a/ddev/tests/ai/phases/test_goal.py +++ b/ddev/tests/ai/phases/test_goal.py @@ -292,12 +292,12 @@ async def test_run_goal_loop_parse_retry_fails_raises(tmp_path): ) builder, _, _ = _reviewer_builder( [ - make_response("not json", 5, 5), - make_response("still not json", 5, 5), + make_response("not json", 5, 3), + make_response("still not json", 7, 4), ] ) - with pytest.raises(GoalParseError): + with pytest.raises(GoalParseError) as exc_info: await run_goal_loop( task=TaskConfig(name="t1", prompt="x", goal="g"), goal_text="g", @@ -311,6 +311,18 @@ async def test_run_goal_loop_parse_retry_fails_raises(tmp_path): compact_if_needed=_noop_compact, ) + err = exc_info.value + assert err.input_tokens == 5 + 7 + assert err.output_tokens == 3 + 4 + + log_path = tmp_path / "goal_agent" / "p1" / "t1.jsonl" + finish = next( + json.loads(line) for line in log_path.read_text().splitlines() if json.loads(line)["event"] == "finish" + ) + assert finish["total_input_tokens"] == 5 + 7 + assert finish["total_output_tokens"] == 3 + 4 + assert finish["success"] is False + async def test_run_goal_loop_fires_callbacks(tmp_path): events: list = [] From 59a5b0d28fe98086e21eb2f9663ee20ce0f95311 Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Wed, 27 May 2026 14:47:39 +0200 Subject: [PATCH 05/10] Fix token counting --- ddev/src/ddev/ai/phases/agentic_phase.py | 64 +++++++++++---- ddev/src/ddev/ai/phases/goal.py | 21 ++--- ddev/tests/ai/phases/test_agentic_phase.py | 93 ++++++++++++++++++++++ ddev/tests/ai/phases/test_goal.py | 18 ++++- 4 files changed, 166 insertions(+), 30 deletions(-) diff --git a/ddev/src/ddev/ai/phases/agentic_phase.py b/ddev/src/ddev/ai/phases/agentic_phase.py index 9e219f75279b4..5c6ac60aae212 100644 --- a/ddev/src/ddev/ai/phases/agentic_phase.py +++ b/ddev/src/ddev/ai/phases/agentic_phase.py @@ -6,6 +6,7 @@ import logging from collections.abc import Callable +from datetime import UTC, datetime from pathlib import Path from typing import TYPE_CHECKING, Any @@ -18,11 +19,13 @@ from ddev.ai.phases.base import Phase, PhaseOutcome from ddev.ai.phases.checkpoint import CheckpointManager from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig -from ddev.ai.phases.goal import GOAL_TASK_SUFFIX, GoalAttemptsExhausted, render_goal_text, run_goal_loop +from ddev.ai.phases.goal import GOAL_TASK_SUFFIX, GoalValidationError, render_goal_text, run_goal_loop +from ddev.ai.phases.messages import PhaseFailedMessage from ddev.ai.phases.template import render_inline, render_prompt from ddev.ai.react.process import ReActProcess from ddev.ai.tools.fs.file_registry import FileRegistry from ddev.ai.tools.registry import TOOL_MANIFEST +from ddev.event_bus.exceptions import MessageProcessingError, ProcessorHookError if TYPE_CHECKING: from ddev.ai.agent.base import BaseAgent @@ -93,6 +96,8 @@ def __init__( self._subagent_builder = subagent_builder self._goal_agent_builder = goal_agent_builder self._goal_attempt_log: list[dict[str, Any]] = [] + self._total_input_tokens: int = 0 + self._total_output_tokens: int = 0 self._subagent_log_dir = ( checkpoint_manager.root / "subagents" / phase_id if subagent_builder is not None else None ) @@ -179,19 +184,18 @@ async def run_tasks( self, process: ReActProcess, context: dict[str, Any], - ) -> tuple[int, int]: - """Run the task loop. Returns (total_input_tokens, total_output_tokens). + ) -> None: + """Run the task loop, accumulating tokens into self._total_input/output_tokens. Override to customize task execution — e.g. add retries, change ordering, etc. Default implementation iterates through config.tasks sequentially. """ - total_input = total_output = 0 last_result: ReActResult | None = None for task in self._config.tasks: cin, cout = await self._compact_if_needed(process, last_result) - total_input += cin - total_output += cout + self._total_input_tokens += cin + self._total_output_tokens += cout has_goal = task.goal is not None or task.goal_path is not None prompt = render_task_prompt(task, self._config_dir, context, self._resolver) @@ -199,8 +203,8 @@ async def run_tasks( prompt = prompt + GOAL_TASK_SUFFIX last_result = await process.start(prompt) - total_input += last_result.total_input_tokens - total_output += last_result.total_output_tokens + self._total_input_tokens += last_result.total_input_tokens + self._total_output_tokens += last_result.total_output_tokens if has_goal: if self._goal_agent_builder is None: @@ -219,18 +223,20 @@ async def run_tasks( log_root=self._checkpoint_manager.root, compact_if_needed=lambda r: self._compact_if_needed(process, r), ) - except GoalAttemptsExhausted: + except GoalValidationError as e: self._goal_attempt_log.append( { "task": task.name, - "attempts": task.max_goal_attempts, + "attempts": e.attempts, "final_valid": False, } ) + self._total_input_tokens += e.input_tokens + self._total_output_tokens += e.output_tokens raise last_result = outcome.final_result - total_input += outcome.total_input_tokens - total_output += outcome.total_output_tokens + self._total_input_tokens += outcome.total_input_tokens + self._total_output_tokens += outcome.total_output_tokens self._goal_attempt_log.append( { "task": task.name, @@ -239,8 +245,6 @@ async def run_tasks( } ) - return total_input, total_output - def _build_agent_and_process(self, context: dict[str, Any]) -> tuple[BaseAgent[Any], ReActProcess]: """Build the agent and ReAct process used to drive task execution.""" system_prompt = render_prompt( @@ -280,7 +284,7 @@ async def _run_memory_step( async def execute(self, context: dict[str, Any]) -> PhaseOutcome: self.before_react() agent, process = self._build_agent_and_process(context) - total_input, total_output = await self.run_tasks(process, context) + await self.run_tasks(process, context) self.after_react() memory_text, mem_in, mem_out = await self._run_memory_step(agent, context) @@ -291,7 +295,33 @@ async def execute(self, context: dict[str, Any]) -> PhaseOutcome: return PhaseOutcome( memory_text=memory_text, - total_input_tokens=total_input + mem_in, - total_output_tokens=total_output + mem_out, + total_input_tokens=self._total_input_tokens + mem_in, + total_output_tokens=self._total_output_tokens + mem_out, extra_checkpoint=extra, ) + + async def on_error(self, error: MessageProcessingError | ProcessorHookError) -> None: + payload: dict[str, Any] = { + "status": "failed", + "started_at": self._started_at.isoformat() if self._started_at else None, + "finished_at": datetime.now(UTC).isoformat(), + "error": str(error.original_exception), + "tokens": { + "total_input": self._total_input_tokens, + "total_output": self._total_output_tokens, + }, + } + if self._goal_attempt_log: + payload["goal_validations"] = self._goal_attempt_log + try: + self._checkpoint_manager.write_phase_checkpoint(self._phase_id, payload) + except Exception: + self._logger.exception("Failed to write failure checkpoint for phase %s", self._phase_id) + finally: + self.submit_message( + PhaseFailedMessage( + id=f"{self._phase_id}_failed", + phase_id=self._phase_id, + error=str(error.original_exception), + ) + ) diff --git a/ddev/src/ddev/ai/phases/goal.py b/ddev/src/ddev/ai/phases/goal.py index ac1933d2bf071..ceaf6e5e5af71 100644 --- a/ddev/src/ddev/ai/phases/goal.py +++ b/ddev/src/ddev/ai/phases/goal.py @@ -76,16 +76,17 @@ class GoalValidationError(Exception): - """Base class for goal-validation failures.""" - - -class GoalParseError(GoalValidationError): - """Reviewer failed to return valid JSON after the parse-retry.""" + """Base class for goal-validation failures. Carries the token cost and attempt count.""" def __init__(self, message: str, input_tokens: int = 0, output_tokens: int = 0) -> None: super().__init__(message) self.input_tokens = input_tokens self.output_tokens = output_tokens + self.attempts: int = 0 + + +class GoalParseError(GoalValidationError): + """Reviewer failed to return valid JSON after the parse-retry.""" class GoalAttemptsExhausted(GoalValidationError): @@ -286,14 +287,14 @@ async def run_goal_loop( total_in += worker_result.total_input_tokens total_out += worker_result.total_output_tokens except GoalValidationError as e: - if isinstance(e, GoalParseError): - total_in += e.input_tokens - total_out += e.output_tokens + e.input_tokens += total_in + e.output_tokens += total_out + e.attempts = attempts agent_logger.log_finish( success=False, attempts=attempts, - total_input_tokens=total_in, - total_output_tokens=total_out, + total_input_tokens=e.input_tokens, + total_output_tokens=e.output_tokens, error=f"{type(e).__name__}: {e}", ) raise diff --git a/ddev/tests/ai/phases/test_agentic_phase.py b/ddev/tests/ai/phases/test_agentic_phase.py index 3246fa098169c..fbbe06ae9f41d 100644 --- a/ddev/tests/ai/phases/test_agentic_phase.py +++ b/ddev/tests/ai/phases/test_agentic_phase.py @@ -618,3 +618,96 @@ def goal_builder(owner_id): {"task": "t1", "attempts": 1, "final_valid": True}, {"task": "t2", "attempts": 2, "final_valid": False}, ] + + +async def test_goal_exhaustion_tokens_captured_on_phase(flow_dir, monkeypatch, message_queue): + """Goal-loop tokens are folded into the phase total even when the phase fails.""" + worker = MockAgent( + [ + make_response("worker attempt 1", 10, 5), + make_response("worker attempt 2", 10, 5), + ] + ) + + def goal_builder(owner_id): + return MockAgent( + [ + make_response('{"valid": false, "reason": "miss 1"}', 8, 4), + make_response('{"valid": false, "reason": "miss 2"}', 8, 4), + ] + ), ToolRegistry([]) + + phase, _ = make_agent_phase( + flow_dir, + worker, + monkeypatch, + message_queue, + tasks=[TaskConfig(name="t1", prompt="Do it.", goal="g", max_goal_attempts=2)], + goal_agent_builder=goal_builder, + ) + + from ddev.ai.phases.goal import GoalAttemptsExhausted + + with pytest.raises(GoalAttemptsExhausted): + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert phase._total_input_tokens == 10 + 8 + 10 + 8 + assert phase._total_output_tokens == 5 + 4 + 5 + 4 + + +async def test_on_error_writes_tokens_and_goal_validations_to_checkpoint(flow_dir, monkeypatch, message_queue): + """on_error includes token counts and goal_validations in the failure checkpoint.""" + from ddev.ai.phases.messages import PhaseTrigger + from ddev.event_bus.exceptions import MessageProcessingError + + worker = MockAgent([make_response("done", 0, 0)]) + phase, mgr = make_agent_phase(flow_dir, worker, monkeypatch, message_queue) + + phase._total_input_tokens = 42 + phase._total_output_tokens = 17 + phase._goal_attempt_log = [{"task": "t1", "attempts": 2, "final_valid": False}] + phase._started_at = None + + err = MessageProcessingError( + processor_name="p1", + message=PhaseTrigger(id="start", phase_id=None), + original_exception=RuntimeError("something went wrong"), + ) + await phase.on_error(err) + + cp = mgr.read()["p1"] + assert cp["status"] == "failed" + assert cp["tokens"] == {"total_input": 42, "total_output": 17} + assert cp["goal_validations"] == [{"task": "t1", "attempts": 2, "final_valid": False}] + assert cp["error"] == "something went wrong" + + +async def test_goal_parse_error_logged_and_tokens_captured(flow_dir, monkeypatch, message_queue): + """GoalParseError is treated the same as GoalAttemptsExhausted: logged with final_valid=False.""" + from ddev.ai.phases.goal import GoalParseError + + worker = MockAgent([make_response("worker done", 10, 5)]) + + def goal_builder(owner_id): + return MockAgent( + [ + make_response("not json", 8, 4), + make_response("still not json", 6, 3), + ] + ), ToolRegistry([]) + + phase, _ = make_agent_phase( + flow_dir, + worker, + monkeypatch, + message_queue, + tasks=[TaskConfig(name="t1", prompt="Do it.", goal="g")], + goal_agent_builder=goal_builder, + ) + + with pytest.raises(GoalParseError): + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert phase._goal_attempt_log == [{"task": "t1", "attempts": 1, "final_valid": False}] + assert phase._total_input_tokens == 10 + 8 + 6 + assert phase._total_output_tokens == 5 + 4 + 3 diff --git a/ddev/tests/ai/phases/test_goal.py b/ddev/tests/ai/phases/test_goal.py index e2d5b79d77591..01139760c209f 100644 --- a/ddev/tests/ai/phases/test_goal.py +++ b/ddev/tests/ai/phases/test_goal.py @@ -227,12 +227,12 @@ async def test_run_goal_loop_exhausts_attempts(tmp_path): ) builder, _, _ = _reviewer_builder( [ - make_response('{"valid": false, "reason": "first miss"}', 5, 5), - make_response('{"valid": false, "reason": "second miss"}', 5, 5), + make_response('{"valid": false, "reason": "first miss"}', 5, 3), + make_response('{"valid": false, "reason": "second miss"}', 7, 4), ] ) - with pytest.raises(GoalAttemptsExhausted, match="2 attempts"): + with pytest.raises(GoalAttemptsExhausted, match="2 attempts") as exc_info: await run_goal_loop( task=TaskConfig(name="t1", prompt="x", goal="g", max_goal_attempts=2), goal_text="g", @@ -246,6 +246,18 @@ async def test_run_goal_loop_exhausts_attempts(tmp_path): compact_if_needed=_noop_compact, ) + err = exc_info.value + assert err.input_tokens == 5 + 10 + 7 + assert err.output_tokens == 3 + 5 + 4 + + log_path = tmp_path / "goal_agent" / "p1" / "t1.jsonl" + finish = next( + json.loads(line) for line in log_path.read_text().splitlines() if json.loads(line)["event"] == "finish" + ) + assert finish["total_input_tokens"] == 5 + 10 + 7 + assert finish["total_output_tokens"] == 3 + 5 + 4 + assert finish["success"] is False + async def test_run_goal_loop_parse_retry_succeeds(tmp_path): worker_process, _ = _make_worker_process([]) From 6e27a89b4e8c6a353c7114c455e124950d7c86d2 Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Wed, 27 May 2026 14:59:21 +0200 Subject: [PATCH 06/10] Clear the try/catch in run_goal_loop --- ddev/src/ddev/ai/phases/goal.py | 155 ++++++++++++++++++-------------- 1 file changed, 90 insertions(+), 65 deletions(-) diff --git a/ddev/src/ddev/ai/phases/goal.py b/ddev/src/ddev/ai/phases/goal.py index ceaf6e5e5af71..c45bfe7c02b7f 100644 --- a/ddev/src/ddev/ai/phases/goal.py +++ b/ddev/src/ddev/ai/phases/goal.py @@ -187,6 +187,70 @@ async def _run_reviewer_once( return GoalCheckResult(valid=valid, reason=reason, input_tokens=in_tokens, output_tokens=out_tokens) +async def _drive_goal_loop( + *, + task: TaskConfig, + goal_text: str, + rendered_task_prompt: str, + worker_process: ReActProcess, + initial_result: ReActResult, + reviewer_process: ReActProcess, + callbacks: Callbacks, + compact_if_needed: Callable[[ReActResult], Awaitable[tuple[int, int]]], +) -> GoalLoopOutcome: + total_in = total_out = 0 + attempts = 0 + worker_result = initial_result + + try: + while True: + attempts += 1 + await callbacks.fire_before_goal_check(task.name, attempts) + + user_message = build_reviewer_user_message( + rendered_task_prompt=rendered_task_prompt, + goal_text=goal_text, + worker_summary=worker_result.final_response.text or "(no summary provided)", + ) + + if attempts > 1: + reviewer_process.reset() + + check = await _run_reviewer_once(reviewer_process, user_message) + total_in += check.input_tokens + total_out += check.output_tokens + + await callbacks.fire_after_goal_check(task.name, attempts, check.valid, check.reason) + + if check.valid: + return GoalLoopOutcome( + final_result=worker_result, + attempts=attempts, + total_input_tokens=total_in, + total_output_tokens=total_out, + ) + + if attempts >= task.max_goal_attempts: + raise GoalAttemptsExhausted( + f"Task {task.name!r} failed goal validation after " + f"{attempts} attempts. Last reviewer reason: {check.reason}" + ) + + compact_in, compact_out = await compact_if_needed(worker_result) + total_in += compact_in + total_out += compact_out + + retry_prompt = GOAL_RETRY_PROMPT_TEMPLATE.format(goal=goal_text, reason=check.reason) + worker_result = await worker_process.start(retry_prompt) + total_in += worker_result.total_input_tokens + total_out += worker_result.total_output_tokens + except GoalValidationError as e: + e.input_tokens += total_in + e.output_tokens += total_out + e.attempts = attempts + raise + + async def run_goal_loop( *, task: TaskConfig, @@ -227,76 +291,37 @@ async def run_goal_loop( prompt=f"", tools=[d["name"] for d in reviewer_registry.definitions], ) - reviewer_process = ReActProcess( agent=reviewer_agent, tool_registry=reviewer_registry, callbacks=agent_logger.build_callbacks(), ) - total_in = total_out = 0 - attempts = 0 - worker_result = initial_result - - try: - while True: - attempts += 1 - await callbacks.fire_before_goal_check(task.name, attempts) - - user_message = build_reviewer_user_message( - rendered_task_prompt=rendered_task_prompt, - goal_text=goal_text, - worker_summary=worker_result.final_response.text or "(no summary provided)", - ) - - if attempts > 1: - reviewer_process.reset() - - check = await _run_reviewer_once(reviewer_process, user_message) - total_in += check.input_tokens - total_out += check.output_tokens - - await callbacks.fire_after_goal_check(task.name, attempts, check.valid, check.reason) - - if check.valid: - agent_logger.log_finish( - success=True, - attempts=attempts, - total_input_tokens=total_in, - total_output_tokens=total_out, - ) - return GoalLoopOutcome( - final_result=worker_result, - attempts=attempts, - total_input_tokens=total_in, - total_output_tokens=total_out, - ) - - if attempts >= task.max_goal_attempts: - raise GoalAttemptsExhausted( - f"Task {task.name!r} failed goal validation after " - f"{attempts} attempts. Last reviewer reason: {check.reason}" - ) - - compact_in, compact_out = await compact_if_needed(worker_result) - total_in += compact_in - total_out += compact_out - - retry_prompt = GOAL_RETRY_PROMPT_TEMPLATE.format(goal=goal_text, reason=check.reason) - worker_result = await worker_process.start(retry_prompt) - total_in += worker_result.total_input_tokens - total_out += worker_result.total_output_tokens - except GoalValidationError as e: - e.input_tokens += total_in - e.output_tokens += total_out - e.attempts = attempts - agent_logger.log_finish( - success=False, - attempts=attempts, - total_input_tokens=e.input_tokens, - total_output_tokens=e.output_tokens, - error=f"{type(e).__name__}: {e}", - ) - raise + outcome = await _drive_goal_loop( + task=task, + goal_text=goal_text, + rendered_task_prompt=rendered_task_prompt, + worker_process=worker_process, + initial_result=initial_result, + reviewer_process=reviewer_process, + callbacks=callbacks, + compact_if_needed=compact_if_needed, + ) + agent_logger.log_finish( + success=True, + attempts=outcome.attempts, + total_input_tokens=outcome.total_input_tokens, + total_output_tokens=outcome.total_output_tokens, + ) + return outcome + except GoalValidationError as e: + agent_logger.log_finish( + success=False, + attempts=e.attempts, + total_input_tokens=e.input_tokens, + total_output_tokens=e.output_tokens, + error=f"{type(e).__name__}: {e}", + ) + raise finally: agent_logger.close() From 3e710c3c6505e13fa2a7be1c0f4afa9992f82276 Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Wed, 27 May 2026 15:07:22 +0200 Subject: [PATCH 07/10] Hoist imports in agent/build --- ddev/src/ddev/ai/agent/build.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/ddev/src/ddev/ai/agent/build.py b/ddev/src/ddev/ai/agent/build.py index 7653d3fe30410..46b61dc2d1be5 100644 --- a/ddev/src/ddev/ai/agent/build.py +++ b/ddev/src/ddev/ai/agent/build.py @@ -6,16 +6,15 @@ from collections.abc import Callable from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import Any from ddev.ai.agent.anthropic_client import AnthropicAgent from ddev.ai.agent.base import BaseAgent +from ddev.ai.phases.config import AgentConfig +from ddev.ai.phases.goal import GOAL_REVIEWER_SYSTEM_PROMPT from ddev.ai.tools.fs.file_registry import FileRegistry from ddev.ai.tools.registry import ToolRegistry, filter_read_only -if TYPE_CHECKING: - from ddev.ai.phases.config import AgentConfig - SubagentBuilder = Callable[ [str, str, list[str]], # (system_prompt, owner_id, tool_names) tuple[BaseAgent[Any], ToolRegistry], @@ -177,9 +176,6 @@ def build_goal_agent( provider defaults — the parent's overrides are intentionally not forwarded. Tools are filtered to the read-only subset of the parent's tool list. """ - from ddev.ai.phases.config import AgentConfig - from ddev.ai.phases.goal import GOAL_REVIEWER_SYSTEM_PROMPT - read_only_tool_names = filter_read_only(parent_agent_config.tools) goal_agent_config = AgentConfig( provider=parent_agent_config.provider, From 09325f2a80964fd58264fd4d9636caa7a2d211db Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Wed, 27 May 2026 15:11:15 +0200 Subject: [PATCH 08/10] Improve logging test --- ddev/tests/ai/phases/test_agentic_phase.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ddev/tests/ai/phases/test_agentic_phase.py b/ddev/tests/ai/phases/test_agentic_phase.py index fbbe06ae9f41d..38a22667057d3 100644 --- a/ddev/tests/ai/phases/test_agentic_phase.py +++ b/ddev/tests/ai/phases/test_agentic_phase.py @@ -570,6 +570,12 @@ def goal_builder(owner_id): assert mgr.read() == {} assert phase._goal_attempt_log == [{"task": "t1", "attempts": 2, "final_valid": False}] + log_file = mgr.root / "goal_agent" / "p1" / "t1.jsonl" + assert log_file.exists() + finish_events = [e for e in read_jsonl(log_file) if e["event"] == "finish"] + assert len(finish_events) == 1 + assert finish_events[0]["success"] is False + async def test_phase_goal_partial_progress_preserved_on_exhaustion(flow_dir, monkeypatch, message_queue): """When task 1 passes goal validation and task 2 exhausts attempts, both entries are logged.""" From 895ed9dc94afc35ba9d57b103c5260293e511356 Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Wed, 27 May 2026 15:17:25 +0200 Subject: [PATCH 09/10] Add pattern validation to task name --- ddev/src/ddev/ai/phases/config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddev/src/ddev/ai/phases/config.py b/ddev/src/ddev/ai/phases/config.py index e1641243a8f12..3a845228401b2 100644 --- a/ddev/src/ddev/ai/phases/config.py +++ b/ddev/src/ddev/ai/phases/config.py @@ -7,7 +7,7 @@ from pathlib import Path import yaml -from pydantic import BaseModel, ConfigDict, ValidationError, field_validator, model_validator +from pydantic import BaseModel, ConfigDict, Field, ValidationError, field_validator, model_validator from ddev.ai.tools.registry import ToolRegistry @@ -53,7 +53,7 @@ def dfs(start: str, current: str, path: list[str], on_path: set[str]): class TaskConfig(BaseModel): model_config = ConfigDict(extra="forbid") - name: str + name: str = Field(pattern=r"^[A-Za-z0-9._-]{1,64}$") prompt_path: Path | None = None prompt: str | None = None goal: str | None = None From 1fa7c91ad80f7f5c5c9b68f4ed4198e4fead3b11 Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Fri, 29 May 2026 09:57:14 +0200 Subject: [PATCH 10/10] Introduce FlowServices to group shared phase dependencies (#23858) * Add PipelineContext to simplify args * Rename to FlowServices --- ddev/src/ddev/ai/phases/agentic_phase.py | 27 ++++---------- ddev/src/ddev/ai/phases/base.py | 42 ++++++++++++---------- ddev/src/ddev/ai/phases/orchestrator.py | 22 +++++++----- ddev/tests/ai/phases/conftest.py | 17 +++++---- ddev/tests/ai/phases/test_agentic_phase.py | 39 ++++++++++++++++---- ddev/tests/ai/phases/test_base.py | 13 ++++--- 6 files changed, 94 insertions(+), 66 deletions(-) diff --git a/ddev/src/ddev/ai/phases/agentic_phase.py b/ddev/src/ddev/ai/phases/agentic_phase.py index 5c6ac60aae212..83e70a93bb653 100644 --- a/ddev/src/ddev/ai/phases/agentic_phase.py +++ b/ddev/src/ddev/ai/phases/agentic_phase.py @@ -4,7 +4,6 @@ from __future__ import annotations -import logging from collections.abc import Callable from datetime import UTC, datetime from pathlib import Path @@ -15,15 +14,12 @@ make_goal_agent_builder, make_subagent_builder, ) -from ddev.ai.callbacks.callbacks import Callbacks -from ddev.ai.phases.base import Phase, PhaseOutcome -from ddev.ai.phases.checkpoint import CheckpointManager +from ddev.ai.phases.base import FlowServices, Phase, PhaseOutcome from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig from ddev.ai.phases.goal import GOAL_TASK_SUFFIX, GoalValidationError, render_goal_text, run_goal_loop from ddev.ai.phases.messages import PhaseFailedMessage from ddev.ai.phases.template import render_inline, render_prompt from ddev.ai.react.process import ReActProcess -from ddev.ai.tools.fs.file_registry import FileRegistry from ddev.ai.tools.registry import TOOL_MANIFEST from ddev.event_bus.exceptions import MessageProcessingError, ProcessorHookError @@ -69,28 +65,16 @@ def __init__( phase_id: str, dependencies: list[str], config: PhaseConfig, + services: FlowServices, agent_builder: AgentBuilder, - checkpoint_manager: CheckpointManager, - runtime_variables: dict[str, str], - flow_variables: dict[str, str], - config_dir: Path, - file_registry: FileRegistry, subagent_builder: SubagentBuilder | None = None, goal_agent_builder: GoalAgentBuilder | None = None, - callbacks: Callbacks | None = None, - logger: logging.Logger | None = None, ) -> None: super().__init__( phase_id=phase_id, dependencies=dependencies, config=config, - checkpoint_manager=checkpoint_manager, - runtime_variables=runtime_variables, - flow_variables=flow_variables, - config_dir=config_dir, - file_registry=file_registry, - callbacks=callbacks, - logger=logger, + services=services, ) self._agent_builder = agent_builder self._subagent_builder = subagent_builder @@ -99,7 +83,7 @@ def __init__( self._total_input_tokens: int = 0 self._total_output_tokens: int = 0 self._subagent_log_dir = ( - checkpoint_manager.root / "subagents" / phase_id if subagent_builder is not None else None + services.checkpoint_manager.root / "subagents" / phase_id if subagent_builder is not None else None ) @classmethod @@ -124,12 +108,13 @@ def extra_init_kwargs( # type: ignore[override] phase_config: PhaseConfig, agents: dict[str, AgentConfig], agent_clients: dict[str, Any], - file_registry: FileRegistry, + services: FlowServices, **_: Any, ) -> dict[str, Any]: if phase_config.agent is None: raise FlowConfigError(f"Phase {phase_id!r} (AgenticPhase) requires 'agent'") agent_config = agents[phase_config.agent] + file_registry = services.file_registry subagent_builder = None requires_subagent_builder = any( diff --git a/ddev/src/ddev/ai/phases/base.py b/ddev/src/ddev/ai/phases/base.py index 227b92560a60f..36326d21a73af 100644 --- a/ddev/src/ddev/ai/phases/base.py +++ b/ddev/src/ddev/ai/phases/base.py @@ -19,6 +19,19 @@ from ddev.event_bus.orchestrator import AsyncProcessor, BaseMessage +@dataclass(frozen=True) +class FlowServices: + """Shared pipeline-level infrastructure passed to every phase.""" + + checkpoint_manager: CheckpointManager + runtime_variables: dict[str, str] + flow_variables: dict[str, str] + config_dir: Path + file_registry: FileRegistry + callbacks: Callbacks = field(default_factory=Callbacks) + logger: logging.Logger = field(default_factory=lambda: logging.getLogger(__name__)) + + @dataclass class PhaseOutcome: memory_text: str @@ -56,26 +69,20 @@ def __init__( phase_id: str, dependencies: list[str], config: PhaseConfig, - checkpoint_manager: CheckpointManager, - runtime_variables: dict[str, str], - flow_variables: dict[str, str], - config_dir: Path, - file_registry: FileRegistry, - callbacks: Callbacks | None = None, - logger: logging.Logger | None = None, + services: FlowServices, ) -> None: super().__init__(name=phase_id) self._phase_id = phase_id self._dependencies = set(dependencies) self._remaining_dependencies = set(dependencies) self._config = config - self._checkpoint_manager = checkpoint_manager - self._runtime_variables = runtime_variables - self._flow_variables = flow_variables - self._config_dir = config_dir - self._callbacks: Callbacks = callbacks or Callbacks() - self._file_registry = file_registry - self._logger = logger or logging.getLogger(__name__) + self._checkpoint_manager = services.checkpoint_manager + self._runtime_variables = services.runtime_variables + self._flow_variables = services.flow_variables + self._config_dir = services.config_dir + self._file_registry = services.file_registry + self._callbacks = services.callbacks + self._logger = services.logger self._started_at: datetime | None = None self._resolver: Callable[[str], str] | None = None self._executed = False @@ -112,10 +119,9 @@ def validate_config( def extra_init_kwargs(cls, **kwargs: Any) -> dict[str, Any]: """Override to inject subclass-specific kwargs into __init__ at construction time. - The orchestrator passes every framework-level dep (phase_id, phase_config, agents, - agent_clients, file_registry, checkpoint_manager, ...) as keyword arguments. - Subclasses pick the ones they need by declaring them explicitly and accept the - rest via **kwargs. + The orchestrator passes phase_id, phase_config, agents, agent_clients, and services + (FlowServices) as keyword arguments. Subclasses declare the ones they need + explicitly and accept the rest via **kwargs. """ return {} diff --git a/ddev/src/ddev/ai/phases/orchestrator.py b/ddev/src/ddev/ai/phases/orchestrator.py index 914d6aadf557e..de663c3138ed4 100644 --- a/ddev/src/ddev/ai/phases/orchestrator.py +++ b/ddev/src/ddev/ai/phases/orchestrator.py @@ -9,7 +9,7 @@ from typing import Any from ddev.ai.callbacks.callbacks import Callbacks -from ddev.ai.phases.base import Phase, PhaseRegistry +from ddev.ai.phases.base import FlowServices, Phase, PhaseRegistry from ddev.ai.phases.checkpoint import CheckpointManager from ddev.ai.phases.config import FlowConfig, FlowConfigError from ddev.ai.phases.messages import PhaseFailedMessage, PhaseTrigger @@ -94,6 +94,16 @@ async def on_initialize(self) -> None: dependency_map: dict[str, list[str]] = {entry.phase: entry.dependencies for entry in config.flow} + services = FlowServices( + checkpoint_manager=checkpoint_manager, + runtime_variables=self._runtime_variables, + flow_variables=config.variables, + config_dir=config_dir, + file_registry=self._file_registry, + callbacks=self._callbacks, + logger=self._logger, + ) + for entry in config.flow: phase_id = entry.phase phase_config = config.phases[phase_id] @@ -104,13 +114,7 @@ async def on_initialize(self) -> None: "phase_id": phase_id, "dependencies": dependencies, "config": phase_config, - "checkpoint_manager": checkpoint_manager, - "runtime_variables": self._runtime_variables, - "flow_variables": config.variables, - "config_dir": config_dir, - "file_registry": self._file_registry, - "callbacks": self._callbacks, - "logger": self._logger, + "services": services, } phase_kwargs.update( phase_cls.extra_init_kwargs( @@ -118,7 +122,7 @@ async def on_initialize(self) -> None: phase_config=phase_config, agents=config.agents, agent_clients=self._agent_clients, - file_registry=self._file_registry, + services=services, ) ) diff --git a/ddev/tests/ai/phases/conftest.py b/ddev/tests/ai/phases/conftest.py index 9923604c37af1..90977313cfe47 100644 --- a/ddev/tests/ai/phases/conftest.py +++ b/ddev/tests/ai/phases/conftest.py @@ -8,7 +8,9 @@ import pytest from ddev.ai.agent.types import AgentResponse, ContextUsage, StopReason, TokenUsage, ToolResultMessage +from ddev.ai.callbacks.callbacks import Callbacks from ddev.ai.phases.agentic_phase import AgenticPhase +from ddev.ai.phases.base import FlowServices from ddev.ai.phases.checkpoint import CheckpointManager from ddev.ai.phases.config import PhaseConfig, TaskConfig from ddev.ai.tools.fs.file_access_policy import FileAccessPolicy @@ -133,18 +135,21 @@ def make_agent_phase( context_compact_threshold_pct=context_compact_threshold_pct, ) checkpoint_manager = CheckpointManager(flow_dir / "checkpoints.yaml") + services = FlowServices( + checkpoint_manager=checkpoint_manager, + runtime_variables=runtime_variables or {}, + flow_variables=flow_variables or {}, + config_dir=flow_dir, + file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + callbacks=callbacks or Callbacks(), + ) phase = AgenticPhase( phase_id=phase_id, dependencies=dependencies or [], config=config, + services=services, agent_builder=make_agent_builder(mock_agent, captured_agent_kwargs), - checkpoint_manager=checkpoint_manager, - runtime_variables=runtime_variables or {}, - flow_variables=flow_variables or {}, - config_dir=flow_dir, - file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), - callbacks=callbacks, goal_agent_builder=goal_agent_builder, ) phase.queue = message_queue diff --git a/ddev/tests/ai/phases/test_agentic_phase.py b/ddev/tests/ai/phases/test_agentic_phase.py index 38a22667057d3..f761519d182bd 100644 --- a/ddev/tests/ai/phases/test_agentic_phase.py +++ b/ddev/tests/ai/phases/test_agentic_phase.py @@ -375,12 +375,22 @@ def test_extra_init_kwargs_creates_subagent_builder_from_tool_metadata( tools: list[str], expected: bool, ) -> None: + from ddev.ai.phases.base import FlowServices + from ddev.ai.phases.checkpoint import CheckpointManager + + services = FlowServices( + checkpoint_manager=CheckpointManager(flow_dir / "checkpoints.yaml"), + runtime_variables={}, + flow_variables={}, + config_dir=flow_dir, + file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + ) kwargs = AgenticPhase.extra_init_kwargs( phase_id="p1", phase_config=PhaseConfig(agent="writer", tasks=[TaskConfig(name="t1", prompt="Do the work.")]), agents={"writer": AgentConfig(tools=tools)}, agent_clients={}, - file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + services=services, ) assert (kwargs["subagent_builder"] is not None) is expected @@ -438,17 +448,22 @@ def agent_builder_fn( ] ) + from ddev.ai.phases.base import FlowServices + checkpoint_manager = CheckpointManager(flow_dir / "checkpoints.yaml") - phase = AgenticPhase( - phase_id="p1", - dependencies=[], - config=PhaseConfig(agent="writer", tasks=[TaskConfig(name="t1", prompt="Do the work.")]), - agent_builder=agent_builder_fn, + services = FlowServices( checkpoint_manager=checkpoint_manager, runtime_variables={}, flow_variables={}, config_dir=flow_dir, file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + ) + phase = AgenticPhase( + phase_id="p1", + dependencies=[], + config=PhaseConfig(agent="writer", tasks=[TaskConfig(name="t1", prompt="Do the work.")]), + services=services, + agent_builder=agent_builder_fn, subagent_builder=mock_subagent_builder, ) phase.queue = message_queue @@ -485,12 +500,22 @@ def test_extra_init_kwargs_creates_goal_agent_builder_when_any_task_has_goal( tasks, expect_builder, ): + from ddev.ai.phases.base import FlowServices + from ddev.ai.phases.checkpoint import CheckpointManager + + services = FlowServices( + checkpoint_manager=CheckpointManager(flow_dir / "checkpoints.yaml"), + runtime_variables={}, + flow_variables={}, + config_dir=flow_dir, + file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + ) kwargs = AgenticPhase.extra_init_kwargs( phase_id="p1", phase_config=PhaseConfig(agent="writer", tasks=tasks), agents={"writer": AgentConfig()}, agent_clients={}, - file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + services=services, ) assert (kwargs["goal_agent_builder"] is not None) is expect_builder diff --git a/ddev/tests/ai/phases/test_base.py b/ddev/tests/ai/phases/test_base.py index b776aa11448a1..9c02a3931d1e1 100644 --- a/ddev/tests/ai/phases/test_base.py +++ b/ddev/tests/ai/phases/test_base.py @@ -6,7 +6,7 @@ import pytest -from ddev.ai.phases.base import Phase, PhaseOutcome +from ddev.ai.phases.base import FlowServices, Phase, PhaseOutcome from ddev.ai.phases.checkpoint import CheckpointManager from ddev.ai.phases.config import PhaseConfig from ddev.ai.phases.messages import PhaseFailedMessage, PhaseTrigger @@ -35,15 +35,18 @@ def _make_stub_phase( outcome=None, ): checkpoint_manager = CheckpointManager(flow_dir / "checkpoints.yaml") - phase = _StubPhase( - phase_id=phase_id, - dependencies=dependencies or [], - config=PhaseConfig(), + services = FlowServices( checkpoint_manager=checkpoint_manager, runtime_variables={}, flow_variables={}, config_dir=flow_dir, file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + ) + phase = _StubPhase( + phase_id=phase_id, + dependencies=dependencies or [], + config=PhaseConfig(), + services=services, outcome=outcome, ) phase.queue = message_queue