diff --git a/ddev/src/ddev/ai/agent/build.py b/ddev/src/ddev/ai/agent/build.py index 28a56e620bbbc..46b61dc2d1be5 100644 --- a/ddev/src/ddev/ai/agent/build.py +++ b/ddev/src/ddev/ai/agent/build.py @@ -6,15 +6,14 @@ from collections.abc import Callable from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import Any from ddev.ai.agent.anthropic_client import AnthropicAgent from ddev.ai.agent.base import BaseAgent +from ddev.ai.phases.config import AgentConfig +from ddev.ai.phases.goal import GOAL_REVIEWER_SYSTEM_PROMPT from ddev.ai.tools.fs.file_registry import FileRegistry -from ddev.ai.tools.registry import ToolRegistry - -if TYPE_CHECKING: - from ddev.ai.phases.config import AgentConfig +from ddev.ai.tools.registry import ToolRegistry, filter_read_only SubagentBuilder = Callable[ [str, str, list[str]], # (system_prompt, owner_id, tool_names) @@ -24,6 +23,10 @@ [str, str, SubagentBuilder | None, Path | None], # system_prompt, owner_id, subagent_builder, log_dir tuple[BaseAgent[Any], ToolRegistry], ] +GoalAgentBuilder = Callable[ + [str], # owner_id + tuple[BaseAgent[Any], ToolRegistry], +] def _resolve_client(agent_clients: dict[str, Any], provider: str) -> Any: @@ -159,3 +162,49 @@ def builder(system_prompt: str, owner_id: str, tool_names: list[str]) -> tuple[B ) return builder + + +def build_goal_agent( + parent_agent_config: AgentConfig, + agent_clients: dict[str, Any], + file_registry: FileRegistry, + owner_id: str, +) -> tuple[BaseAgent[Any], ToolRegistry]: + """Build the reviewer agent + its ToolRegistry. + + Uses the same provider as the parent agent. Model and max_tokens are left at + provider defaults — the parent's overrides are intentionally not forwarded. + Tools are filtered to the read-only subset of the parent's tool list. + """ + read_only_tool_names = filter_read_only(parent_agent_config.tools) + goal_agent_config = AgentConfig( + provider=parent_agent_config.provider, + tools=read_only_tool_names, + ) + + return _build_agent_and_registry( + agent_config=goal_agent_config, + agent_clients=agent_clients, + system_prompt=GOAL_REVIEWER_SYSTEM_PROMPT, + owner_id=owner_id, + tool_names=read_only_tool_names, + file_registry=file_registry, + ) + + +def make_goal_agent_builder( + parent_agent_config: AgentConfig, + agent_clients: dict[str, Any], + file_registry: FileRegistry, +) -> GoalAgentBuilder: + """Return a closure that builds a (reviewer_agent, reviewer_registry) tuple.""" + + def builder(owner_id: str) -> tuple[BaseAgent[Any], ToolRegistry]: + return build_goal_agent( + parent_agent_config=parent_agent_config, + agent_clients=agent_clients, + file_registry=file_registry, + owner_id=owner_id, + ) + + return builder diff --git a/ddev/src/ddev/ai/callbacks/callbacks.py b/ddev/src/ddev/ai/callbacks/callbacks.py index d0be2229a27c7..727681537f69e 100644 --- a/ddev/src/ddev/ai/callbacks/callbacks.py +++ b/ddev/src/ddev/ai/callbacks/callbacks.py @@ -72,6 +72,18 @@ class OnPhaseFinishCallback(Protocol): async def __call__(self, phase_id: str) -> None: ... +class OnBeforeGoalCheckCallback(Protocol): + """Called immediately before each reviewer agent run for a task with a goal.""" + + async def __call__(self, task_name: str, attempt: int) -> None: ... + + +class OnAfterGoalCheckCallback(Protocol): + """Called after each reviewer agent run, with the parsed verdict.""" + + async def __call__(self, task_name: str, attempt: int, valid: bool, reason: str) -> None: ... + + # --------------------------------------------------------------------------- # CallbackSet and Callbacks # --------------------------------------------------------------------------- @@ -103,6 +115,8 @@ def __init__(self) -> None: self._on_before_agent_send: list[OnBeforeAgentSendCallback] = [] self._on_phase_start: list[OnPhaseStartCallback] = [] self._on_phase_finish: list[OnPhaseFinishCallback] = [] + self._on_before_goal_check: list[OnBeforeGoalCheckCallback] = [] + self._on_after_goal_check: list[OnAfterGoalCheckCallback] = [] async def _fire(self, handlers: list[Any], *args: Any) -> None: for handler in handlers: @@ -174,6 +188,20 @@ def on_phase_finish(self, func: OnPhaseFinishCallback) -> OnPhaseFinishCallback: async def fire_phase_finish(self, phase_id: str) -> None: await self._fire(self._on_phase_finish, phase_id) + def on_before_goal_check(self, func: OnBeforeGoalCheckCallback) -> OnBeforeGoalCheckCallback: + self._on_before_goal_check.append(func) + return func + + async def fire_before_goal_check(self, task_name: str, attempt: int) -> None: + await self._fire(self._on_before_goal_check, task_name, attempt) + + def on_after_goal_check(self, func: OnAfterGoalCheckCallback) -> OnAfterGoalCheckCallback: + self._on_after_goal_check.append(func) + return func + + async def fire_after_goal_check(self, task_name: str, attempt: int, valid: bool, reason: str) -> None: + await self._fire(self._on_after_goal_check, task_name, attempt, valid, reason) + class Callbacks: """Container of CallbackSet instances. Dispatches each fire_* to all contained sets.""" @@ -216,3 +244,11 @@ async def fire_phase_start(self, phase_id: str) -> None: async def fire_phase_finish(self, phase_id: str) -> None: for s in self._sets: await s.fire_phase_finish(phase_id) + + async def fire_before_goal_check(self, task_name: str, attempt: int) -> None: + for s in self._sets: + await s.fire_before_goal_check(task_name, attempt) + + async def fire_after_goal_check(self, task_name: str, attempt: int, valid: bool, reason: str) -> None: + for s in self._sets: + await s.fire_after_goal_check(task_name, attempt, valid, reason) diff --git a/ddev/src/ddev/ai/phases/agentic_phase.py b/ddev/src/ddev/ai/phases/agentic_phase.py index 948bcfb3e7728..83e70a93bb653 100644 --- a/ddev/src/ddev/ai/phases/agentic_phase.py +++ b/ddev/src/ddev/ai/phases/agentic_phase.py @@ -2,21 +2,32 @@ # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) -import logging +from __future__ import annotations + from collections.abc import Callable +from datetime import UTC, datetime from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any -from ddev.ai.agent.base import BaseAgent -from ddev.ai.agent.build import AgentBuilder, SubagentBuilder, make_agent_builder, make_subagent_builder -from ddev.ai.callbacks.callbacks import Callbacks -from ddev.ai.phases.base import Phase, PhaseOutcome -from ddev.ai.phases.checkpoint import CheckpointManager +from ddev.ai.agent.build import ( + make_agent_builder, + make_goal_agent_builder, + make_subagent_builder, +) +from ddev.ai.phases.base import FlowServices, Phase, PhaseOutcome from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig +from ddev.ai.phases.goal import GOAL_TASK_SUFFIX, GoalValidationError, render_goal_text, run_goal_loop +from ddev.ai.phases.messages import PhaseFailedMessage from ddev.ai.phases.template import render_inline, render_prompt from ddev.ai.react.process import ReActProcess -from ddev.ai.tools.fs.file_registry import FileRegistry from ddev.ai.tools.registry import TOOL_MANIFEST +from ddev.event_bus.exceptions import MessageProcessingError, ProcessorHookError + +if TYPE_CHECKING: + from ddev.ai.agent.base import BaseAgent + from ddev.ai.agent.build import AgentBuilder, GoalAgentBuilder, SubagentBuilder + from ddev.ai.phases.goal import GoalLoopOutcome + from ddev.ai.react.types import ReActResult def render_task_prompt( @@ -54,32 +65,25 @@ def __init__( phase_id: str, dependencies: list[str], config: PhaseConfig, + services: FlowServices, agent_builder: AgentBuilder, - checkpoint_manager: CheckpointManager, - runtime_variables: dict[str, str], - flow_variables: dict[str, str], - config_dir: Path, - file_registry: FileRegistry, subagent_builder: SubagentBuilder | None = None, - callbacks: Callbacks | None = None, - logger: logging.Logger | None = None, + goal_agent_builder: GoalAgentBuilder | None = None, ) -> None: super().__init__( phase_id=phase_id, dependencies=dependencies, config=config, - checkpoint_manager=checkpoint_manager, - runtime_variables=runtime_variables, - flow_variables=flow_variables, - config_dir=config_dir, - file_registry=file_registry, - callbacks=callbacks, - logger=logger, + services=services, ) self._agent_builder = agent_builder self._subagent_builder = subagent_builder + self._goal_agent_builder = goal_agent_builder + self._goal_attempt_log: list[dict[str, Any]] = [] + self._total_input_tokens: int = 0 + self._total_output_tokens: int = 0 self._subagent_log_dir = ( - checkpoint_manager.root / "subagents" / phase_id if subagent_builder is not None else None + services.checkpoint_manager.root / "subagents" / phase_id if subagent_builder is not None else None ) @classmethod @@ -104,12 +108,13 @@ def extra_init_kwargs( # type: ignore[override] phase_config: PhaseConfig, agents: dict[str, AgentConfig], agent_clients: dict[str, Any], - file_registry: FileRegistry, + services: FlowServices, **_: Any, ) -> dict[str, Any]: if phase_config.agent is None: raise FlowConfigError(f"Phase {phase_id!r} (AgenticPhase) requires 'agent'") agent_config = agents[phase_config.agent] + file_registry = services.file_registry subagent_builder = None requires_subagent_builder = any( @@ -124,6 +129,15 @@ def extra_init_kwargs( # type: ignore[override] file_registry=file_registry, ) + any_goal = any((t.goal is not None or t.goal_path is not None) for t in phase_config.tasks) + goal_agent_builder = None + if any_goal: + goal_agent_builder = make_goal_agent_builder( + parent_agent_config=agent_config, + agent_clients=agent_clients, + file_registry=file_registry, + ) + return { "agent_builder": make_agent_builder( agent_config=agent_config, @@ -131,6 +145,7 @@ def extra_init_kwargs( # type: ignore[override] file_registry=file_registry, ), "subagent_builder": subagent_builder, + "goal_agent_builder": goal_agent_builder, } def before_react(self) -> None: @@ -139,29 +154,81 @@ def before_react(self) -> None: def after_react(self) -> None: """Called once after all tasks complete. Override for phase-specific teardown.""" + async def _compact_if_needed( + self, + process: ReActProcess, + last_result: ReActResult | None, + ) -> tuple[int, int]: + if last_result is None or last_result.context_usage is None: + return 0, 0 + if last_result.context_usage.context_pct < self._config.context_compact_threshold_pct: + return 0, 0 + return await process.compact() + async def run_tasks( self, process: ReActProcess, context: dict[str, Any], - ) -> tuple[int, int]: - """Run the task loop. Returns (total_input_tokens, total_output_tokens). + ) -> None: + """Run the task loop, accumulating tokens into self._total_input/output_tokens. Override to customize task execution — e.g. add retries, change ordering, etc. Default implementation iterates through config.tasks sequentially. """ - total_input = total_output = 0 - last_result = None + last_result: ReActResult | None = None + for task in self._config.tasks: - if last_result is not None and last_result.context_usage is not None: - if last_result.context_usage.context_pct >= self._config.context_compact_threshold_pct: - compact_in, compact_out = await process.compact() - total_input += compact_in - total_output += compact_out + cin, cout = await self._compact_if_needed(process, last_result) + self._total_input_tokens += cin + self._total_output_tokens += cout + + has_goal = task.goal is not None or task.goal_path is not None prompt = render_task_prompt(task, self._config_dir, context, self._resolver) + if has_goal: + prompt = prompt + GOAL_TASK_SUFFIX + last_result = await process.start(prompt) - total_input += last_result.total_input_tokens - total_output += last_result.total_output_tokens - return total_input, total_output + self._total_input_tokens += last_result.total_input_tokens + self._total_output_tokens += last_result.total_output_tokens + + if has_goal: + if self._goal_agent_builder is None: + raise ValueError("Goal agent builder is required when tasks specify a goal") + goal_text = render_goal_text(task, self._config_dir, context, self._resolver) + try: + outcome: GoalLoopOutcome = await run_goal_loop( + task=task, + goal_text=goal_text, + rendered_task_prompt=prompt, + worker_process=process, + initial_result=last_result, + goal_agent_builder=self._goal_agent_builder, + callbacks=self._callbacks, + phase_id=self._phase_id, + log_root=self._checkpoint_manager.root, + compact_if_needed=lambda r: self._compact_if_needed(process, r), + ) + except GoalValidationError as e: + self._goal_attempt_log.append( + { + "task": task.name, + "attempts": e.attempts, + "final_valid": False, + } + ) + self._total_input_tokens += e.input_tokens + self._total_output_tokens += e.output_tokens + raise + last_result = outcome.final_result + self._total_input_tokens += outcome.total_input_tokens + self._total_output_tokens += outcome.total_output_tokens + self._goal_attempt_log.append( + { + "task": task.name, + "attempts": outcome.attempts, + "final_valid": True, + } + ) def _build_agent_and_process(self, context: dict[str, Any]) -> tuple[BaseAgent[Any], ReActProcess]: """Build the agent and ReAct process used to drive task execution.""" @@ -202,13 +269,44 @@ async def _run_memory_step( async def execute(self, context: dict[str, Any]) -> PhaseOutcome: self.before_react() agent, process = self._build_agent_and_process(context) - total_input, total_output = await self.run_tasks(process, context) + await self.run_tasks(process, context) self.after_react() memory_text, mem_in, mem_out = await self._run_memory_step(agent, context) + extra: dict[str, Any] = {} + if self._goal_attempt_log: + extra["goal_validations"] = self._goal_attempt_log + return PhaseOutcome( memory_text=memory_text, - total_input_tokens=total_input + mem_in, - total_output_tokens=total_output + mem_out, + total_input_tokens=self._total_input_tokens + mem_in, + total_output_tokens=self._total_output_tokens + mem_out, + extra_checkpoint=extra, ) + + async def on_error(self, error: MessageProcessingError | ProcessorHookError) -> None: + payload: dict[str, Any] = { + "status": "failed", + "started_at": self._started_at.isoformat() if self._started_at else None, + "finished_at": datetime.now(UTC).isoformat(), + "error": str(error.original_exception), + "tokens": { + "total_input": self._total_input_tokens, + "total_output": self._total_output_tokens, + }, + } + if self._goal_attempt_log: + payload["goal_validations"] = self._goal_attempt_log + try: + self._checkpoint_manager.write_phase_checkpoint(self._phase_id, payload) + except Exception: + self._logger.exception("Failed to write failure checkpoint for phase %s", self._phase_id) + finally: + self.submit_message( + PhaseFailedMessage( + id=f"{self._phase_id}_failed", + phase_id=self._phase_id, + error=str(error.original_exception), + ) + ) diff --git a/ddev/src/ddev/ai/phases/base.py b/ddev/src/ddev/ai/phases/base.py index 227b92560a60f..36326d21a73af 100644 --- a/ddev/src/ddev/ai/phases/base.py +++ b/ddev/src/ddev/ai/phases/base.py @@ -19,6 +19,19 @@ from ddev.event_bus.orchestrator import AsyncProcessor, BaseMessage +@dataclass(frozen=True) +class FlowServices: + """Shared pipeline-level infrastructure passed to every phase.""" + + checkpoint_manager: CheckpointManager + runtime_variables: dict[str, str] + flow_variables: dict[str, str] + config_dir: Path + file_registry: FileRegistry + callbacks: Callbacks = field(default_factory=Callbacks) + logger: logging.Logger = field(default_factory=lambda: logging.getLogger(__name__)) + + @dataclass class PhaseOutcome: memory_text: str @@ -56,26 +69,20 @@ def __init__( phase_id: str, dependencies: list[str], config: PhaseConfig, - checkpoint_manager: CheckpointManager, - runtime_variables: dict[str, str], - flow_variables: dict[str, str], - config_dir: Path, - file_registry: FileRegistry, - callbacks: Callbacks | None = None, - logger: logging.Logger | None = None, + services: FlowServices, ) -> None: super().__init__(name=phase_id) self._phase_id = phase_id self._dependencies = set(dependencies) self._remaining_dependencies = set(dependencies) self._config = config - self._checkpoint_manager = checkpoint_manager - self._runtime_variables = runtime_variables - self._flow_variables = flow_variables - self._config_dir = config_dir - self._callbacks: Callbacks = callbacks or Callbacks() - self._file_registry = file_registry - self._logger = logger or logging.getLogger(__name__) + self._checkpoint_manager = services.checkpoint_manager + self._runtime_variables = services.runtime_variables + self._flow_variables = services.flow_variables + self._config_dir = services.config_dir + self._file_registry = services.file_registry + self._callbacks = services.callbacks + self._logger = services.logger self._started_at: datetime | None = None self._resolver: Callable[[str], str] | None = None self._executed = False @@ -112,10 +119,9 @@ def validate_config( def extra_init_kwargs(cls, **kwargs: Any) -> dict[str, Any]: """Override to inject subclass-specific kwargs into __init__ at construction time. - The orchestrator passes every framework-level dep (phase_id, phase_config, agents, - agent_clients, file_registry, checkpoint_manager, ...) as keyword arguments. - Subclasses pick the ones they need by declaring them explicitly and accept the - rest via **kwargs. + The orchestrator passes phase_id, phase_config, agents, agent_clients, and services + (FlowServices) as keyword arguments. Subclasses declare the ones they need + explicitly and accept the rest via **kwargs. """ return {} diff --git a/ddev/src/ddev/ai/phases/config.py b/ddev/src/ddev/ai/phases/config.py index 5c2564e19066b..3a845228401b2 100644 --- a/ddev/src/ddev/ai/phases/config.py +++ b/ddev/src/ddev/ai/phases/config.py @@ -7,7 +7,7 @@ from pathlib import Path import yaml -from pydantic import BaseModel, ConfigDict, ValidationError, field_validator, model_validator +from pydantic import BaseModel, ConfigDict, Field, ValidationError, field_validator, model_validator from ddev.ai.tools.registry import ToolRegistry @@ -53,16 +53,30 @@ def dfs(start: str, current: str, path: list[str], on_path: set[str]): class TaskConfig(BaseModel): model_config = ConfigDict(extra="forbid") - name: str + name: str = Field(pattern=r"^[A-Za-z0-9._-]{1,64}$") prompt_path: Path | None = None prompt: str | None = None + goal: str | None = None + goal_path: Path | None = None + max_goal_attempts: int = 5 @model_validator(mode="after") - def exactly_one_source(self) -> TaskConfig: + def exactly_one_prompt_source(self) -> TaskConfig: if (self.prompt_path is None) == (self.prompt is None): raise ValueError("Exactly one of 'prompt_path' or 'prompt' must be set") return self + @model_validator(mode="after") + def goal_consistency(self) -> TaskConfig: + if self.goal is not None and self.goal_path is not None: + raise ValueError("At most one of 'goal' or 'goal_path' may be set") + has_goal = self.goal is not None or self.goal_path is not None + if not has_goal and "max_goal_attempts" in self.model_fields_set: + raise ValueError("'max_goal_attempts' may only be set when 'goal' or 'goal_path' is set") + if has_goal and self.max_goal_attempts < 1: + raise ValueError("'max_goal_attempts' must be at least 1") + return self + class CheckpointConfig(BaseModel): """Optional extra instructions for the memory step. If omitted, only a summary is written.""" @@ -177,6 +191,12 @@ def _validate_files(self, config_dir: Path) -> None: raise FlowConfigError( f"Phase {phase_id!r} task {i} ({task.name!r}): prompt_path not found: {resolved}" ) + if task.goal_path is not None: + resolved = config_dir / task.goal_path + if not resolved.exists(): + raise FlowConfigError( + f"Phase {phase_id!r} task {i} ({task.name!r}): goal_path not found: {resolved}" + ) if phase.checkpoint is not None and phase.checkpoint.memory_prompt_path is not None: resolved = config_dir / phase.checkpoint.memory_prompt_path diff --git a/ddev/src/ddev/ai/phases/goal.py b/ddev/src/ddev/ai/phases/goal.py new file mode 100644 index 0000000000000..c45bfe7c02b7f --- /dev/null +++ b/ddev/src/ddev/ai/phases/goal.py @@ -0,0 +1,327 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +from __future__ import annotations + +import json +from collections.abc import Awaitable, Callable +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from ddev.ai.callbacks.callbacks import Callbacks +from ddev.ai.phases.config import TaskConfig +from ddev.ai.phases.template import render_inline, render_prompt +from ddev.ai.react.process import ReActProcess +from ddev.ai.react.types import ReActResult +from ddev.ai.tools.agents.agent_logger import AgentLogger + +if TYPE_CHECKING: + from ddev.ai.agent.build import GoalAgentBuilder + +GOAL_REVIEWER_SYSTEM_PROMPT = """\ +You are a strict, independent reviewer. Your only job is to verify whether a +goal was met by another agent. You do not fix anything; you only report. + +You will receive a user message with three sections: +1. "Original task" — the prompt the worker agent was given. +2. "Goal to verify" — the specific criterion you must check. +3. "Worker summary" — the worker's own description of what it did, including + any files it created or modified, and any intentional decisions about scope. + +How to work: +- Read the relevant files yourself with the tools provided. Do not trust the + worker summary blindly — verify it. +- If the worker summary explains that an apparent gap is intentional and the + explanation is plausible and consistent with the task, accept that specific + gap as valid. +- Be specific in your reasoning. Vague rejections are useless to the worker. + +Output contract: +- Reply with ONLY a JSON object as your final response, with no surrounding prose + and no markdown code fences. +- Schema: {"valid": , "reason": }. +- "reason" must be specific and actionable when "valid" is false. +- "reason" may be an empty string when "valid" is true. +""" + +GOAL_TASK_SUFFIX = """ + +--- +Before you finish, write a brief summary of what you did. Include: +- the files you created or modified (with absolute paths), +- any intentional decisions about scope (e.g. items deliberately excluded and why), +- anything a reviewer would need to verify your work. +Your work will be checked by an independent reviewer using only this summary +and the files you produced. +""" + +GOAL_RETRY_PROMPT_TEMPLATE = """\ +A reviewer checked your work against this goal and reported it failed: + +Goal: {goal} + +Reviewer reason: {reason} + +Address only the issue above. If you believe the reviewer is wrong, explain +why clearly in your final summary (do not silently ignore it). +""" + +GOAL_PARSE_RETRY_PROMPT = ( + "Your previous reply was not a valid JSON object matching the schema " + '{"valid": , "reason": }. Reply with only that JSON object, ' + "with no surrounding prose and no markdown code fences." +) + + +class GoalValidationError(Exception): + """Base class for goal-validation failures. Carries the token cost and attempt count.""" + + def __init__(self, message: str, input_tokens: int = 0, output_tokens: int = 0) -> None: + super().__init__(message) + self.input_tokens = input_tokens + self.output_tokens = output_tokens + self.attempts: int = 0 + + +class GoalParseError(GoalValidationError): + """Reviewer failed to return valid JSON after the parse-retry.""" + + +class GoalAttemptsExhausted(GoalValidationError): + """Reviewer rejected the work on every attempt up to max_goal_attempts.""" + + +@dataclass(frozen=True) +class GoalCheckResult: + valid: bool + reason: str + input_tokens: int + output_tokens: int + + +@dataclass(frozen=True) +class GoalLoopOutcome: + final_result: ReActResult + attempts: int + total_input_tokens: int + total_output_tokens: int + + +def render_goal_text( + task: TaskConfig, + config_dir: Path, + context: dict[str, Any], + resolver: Callable[[str], str] | None, +) -> str: + """Render the goal — from file if goal_path is set, inline otherwise.""" + if task.goal_path is not None: + return render_prompt(config_dir / task.goal_path, context, resolver) + assert task.goal is not None # caller checks + return render_inline(task.goal, context, resolver) + + +def build_reviewer_user_message( + *, + rendered_task_prompt: str, + goal_text: str, + worker_summary: str, +) -> str: + return ( + f"## Original task\n{rendered_task_prompt}\n\n" + f"## Goal to verify\n{goal_text}\n\n" + f"## Worker summary\n{worker_summary}\n" + ) + + +def parse_reviewer_output(text: str) -> tuple[bool, str] | None: + """Return (valid, reason) if text parses; None if it does not.""" + stripped = text.strip() + if stripped.startswith("```"): + stripped = stripped.strip("`") + if stripped.lower().startswith("json"): + stripped = stripped[4:] + stripped = stripped.strip() + try: + obj = json.loads(stripped) + except (json.JSONDecodeError, ValueError): + return None + if not isinstance(obj, dict): + return None + valid = obj.get("valid") + reason = obj.get("reason", "") + if not isinstance(valid, bool) or not isinstance(reason, str): + return None + return valid, reason + + +async def _run_reviewer_once( + reviewer_process: ReActProcess, + user_message: str, +) -> GoalCheckResult: + """Send ``user_message`` to the reviewer and parse its JSON output. + + On parse failure, ask the reviewer once more for valid JSON. If that + second response still does not parse, raise GoalParseError. + """ + result = await reviewer_process.start(user_message) + in_tokens = result.total_input_tokens + out_tokens = result.total_output_tokens + + parsed = parse_reviewer_output(result.final_response.text or "") + if parsed is None: + retry_result = await reviewer_process.start(GOAL_PARSE_RETRY_PROMPT) + in_tokens += retry_result.total_input_tokens + out_tokens += retry_result.total_output_tokens + parsed = parse_reviewer_output(retry_result.final_response.text or "") + if parsed is None: + raise GoalParseError( + "Reviewer did not return valid JSON after one parse-retry. " + f"Last raw output: {retry_result.final_response.text!r}", + input_tokens=in_tokens, + output_tokens=out_tokens, + ) + + valid, reason = parsed + return GoalCheckResult(valid=valid, reason=reason, input_tokens=in_tokens, output_tokens=out_tokens) + + +async def _drive_goal_loop( + *, + task: TaskConfig, + goal_text: str, + rendered_task_prompt: str, + worker_process: ReActProcess, + initial_result: ReActResult, + reviewer_process: ReActProcess, + callbacks: Callbacks, + compact_if_needed: Callable[[ReActResult], Awaitable[tuple[int, int]]], +) -> GoalLoopOutcome: + total_in = total_out = 0 + attempts = 0 + worker_result = initial_result + + try: + while True: + attempts += 1 + await callbacks.fire_before_goal_check(task.name, attempts) + + user_message = build_reviewer_user_message( + rendered_task_prompt=rendered_task_prompt, + goal_text=goal_text, + worker_summary=worker_result.final_response.text or "(no summary provided)", + ) + + if attempts > 1: + reviewer_process.reset() + + check = await _run_reviewer_once(reviewer_process, user_message) + total_in += check.input_tokens + total_out += check.output_tokens + + await callbacks.fire_after_goal_check(task.name, attempts, check.valid, check.reason) + + if check.valid: + return GoalLoopOutcome( + final_result=worker_result, + attempts=attempts, + total_input_tokens=total_in, + total_output_tokens=total_out, + ) + + if attempts >= task.max_goal_attempts: + raise GoalAttemptsExhausted( + f"Task {task.name!r} failed goal validation after " + f"{attempts} attempts. Last reviewer reason: {check.reason}" + ) + + compact_in, compact_out = await compact_if_needed(worker_result) + total_in += compact_in + total_out += compact_out + + retry_prompt = GOAL_RETRY_PROMPT_TEMPLATE.format(goal=goal_text, reason=check.reason) + worker_result = await worker_process.start(retry_prompt) + total_in += worker_result.total_input_tokens + total_out += worker_result.total_output_tokens + except GoalValidationError as e: + e.input_tokens += total_in + e.output_tokens += total_out + e.attempts = attempts + raise + + +async def run_goal_loop( + *, + task: TaskConfig, + goal_text: str, + rendered_task_prompt: str, + worker_process: ReActProcess, + initial_result: ReActResult, + goal_agent_builder: GoalAgentBuilder, + callbacks: Callbacks, + phase_id: str, + log_root: Path, + compact_if_needed: Callable[[ReActResult], Awaitable[tuple[int, int]]], +) -> GoalLoopOutcome: + """Drive the reviewer + worker-retry loop for a single task with a goal. + + Reviewer activity is logged to ``/goal_agent//.jsonl`` + via AgentLogger. The reviewer's ReActProcess uses only the logger's callbacks — + the phase callbacks see only the bracketing before/after_goal_check events. + """ + log_dir = log_root / "goal_agent" / phase_id + log_path = log_dir / f"{task.name}.jsonl" + try: + log_dir.mkdir(parents=True, exist_ok=True) + except OSError as e: + raise OSError(f"Goal reviewer log directory could not be created at {log_dir}: {e}") from e + + reviewer_owner_id = f"{phase_id}.goal.{task.name}" + reviewer_agent, reviewer_registry = goal_agent_builder(reviewer_owner_id) + + try: + agent_logger = AgentLogger(log_path) + except OSError as e: + raise OSError(f"Goal reviewer log could not be opened at {log_path}: {e}") from e + + try: + agent_logger.log_start( + system_prompt=GOAL_REVIEWER_SYSTEM_PROMPT, + prompt=f"", + tools=[d["name"] for d in reviewer_registry.definitions], + ) + reviewer_process = ReActProcess( + agent=reviewer_agent, + tool_registry=reviewer_registry, + callbacks=agent_logger.build_callbacks(), + ) + + outcome = await _drive_goal_loop( + task=task, + goal_text=goal_text, + rendered_task_prompt=rendered_task_prompt, + worker_process=worker_process, + initial_result=initial_result, + reviewer_process=reviewer_process, + callbacks=callbacks, + compact_if_needed=compact_if_needed, + ) + agent_logger.log_finish( + success=True, + attempts=outcome.attempts, + total_input_tokens=outcome.total_input_tokens, + total_output_tokens=outcome.total_output_tokens, + ) + return outcome + except GoalValidationError as e: + agent_logger.log_finish( + success=False, + attempts=e.attempts, + total_input_tokens=e.input_tokens, + total_output_tokens=e.output_tokens, + error=f"{type(e).__name__}: {e}", + ) + raise + finally: + agent_logger.close() diff --git a/ddev/src/ddev/ai/phases/orchestrator.py b/ddev/src/ddev/ai/phases/orchestrator.py index 914d6aadf557e..de663c3138ed4 100644 --- a/ddev/src/ddev/ai/phases/orchestrator.py +++ b/ddev/src/ddev/ai/phases/orchestrator.py @@ -9,7 +9,7 @@ from typing import Any from ddev.ai.callbacks.callbacks import Callbacks -from ddev.ai.phases.base import Phase, PhaseRegistry +from ddev.ai.phases.base import FlowServices, Phase, PhaseRegistry from ddev.ai.phases.checkpoint import CheckpointManager from ddev.ai.phases.config import FlowConfig, FlowConfigError from ddev.ai.phases.messages import PhaseFailedMessage, PhaseTrigger @@ -94,6 +94,16 @@ async def on_initialize(self) -> None: dependency_map: dict[str, list[str]] = {entry.phase: entry.dependencies for entry in config.flow} + services = FlowServices( + checkpoint_manager=checkpoint_manager, + runtime_variables=self._runtime_variables, + flow_variables=config.variables, + config_dir=config_dir, + file_registry=self._file_registry, + callbacks=self._callbacks, + logger=self._logger, + ) + for entry in config.flow: phase_id = entry.phase phase_config = config.phases[phase_id] @@ -104,13 +114,7 @@ async def on_initialize(self) -> None: "phase_id": phase_id, "dependencies": dependencies, "config": phase_config, - "checkpoint_manager": checkpoint_manager, - "runtime_variables": self._runtime_variables, - "flow_variables": config.variables, - "config_dir": config_dir, - "file_registry": self._file_registry, - "callbacks": self._callbacks, - "logger": self._logger, + "services": services, } phase_kwargs.update( phase_cls.extra_init_kwargs( @@ -118,7 +122,7 @@ async def on_initialize(self) -> None: phase_config=phase_config, agents=config.agents, agent_clients=self._agent_clients, - file_registry=self._file_registry, + services=services, ) ) diff --git a/ddev/src/ddev/ai/tools/registry.py b/ddev/src/ddev/ai/tools/registry.py index 48b99e8f065da..2ff25522fea4b 100644 --- a/ddev/src/ddev/ai/tools/registry.py +++ b/ddev/src/ddev/ai/tools/registry.py @@ -71,36 +71,39 @@ class ToolSpec: ``factory`` receives the already-imported class and the shared ToolContext and returns a constructed tool instance. ``requires_subagent_builder`` marks agentic tools that need subagent wiring. + ``read_only`` marks tools that only inspect state and never mutate it. """ module: str cls: str factory: Callable[[type, ToolContext], ToolProtocol] = _plain_factory requires_subagent_builder: bool = False + read_only: bool = False TOOL_MANIFEST: dict[str, ToolSpec] = { - "read_file": ToolSpec("fs.read_file", "ReadFileTool", factory=_file_registry_factory), - "create_file": ToolSpec("fs.create_file", "CreateFileTool", factory=_file_registry_factory), - "edit_file": ToolSpec("fs.edit_file", "EditFileTool", factory=_file_registry_factory), - "append_file": ToolSpec("fs.append_file", "AppendFileTool", factory=_file_registry_factory), - "grep": ToolSpec("shell.grep", "GrepTool", factory=_file_policy_factory), - "list_files": ToolSpec("shell.list_files", "ListFilesTool"), - "mkdir": ToolSpec("fs.mkdir", "MkdirTool", factory=_file_policy_factory), - "http_get": ToolSpec("http.http_get", "HttpGetTool"), - "ddev_create": ToolSpec("shell.ddev.create", "DdevCreateTool"), - "ddev_test": ToolSpec("shell.ddev.ddev_test", "DdevTestTool"), - "ddev_env_show": ToolSpec("shell.ddev.env_show", "DdevEnvShowTool"), - "ddev_env_start": ToolSpec("shell.ddev.env_start", "DdevEnvStartTool"), - "ddev_env_stop": ToolSpec("shell.ddev.env_stop", "DdevEnvStopTool"), - "ddev_env_test": ToolSpec("shell.ddev.env_test", "DdevEnvTestTool"), - "ddev_release_changelog": ToolSpec("shell.ddev.release_changelog", "DdevReleaseChangelogTool"), - "ddev_validate": ToolSpec("shell.ddev.validate", "DdevValidateTool"), + "read_file": ToolSpec("fs.read_file", "ReadFileTool", factory=_file_registry_factory, read_only=True), + "create_file": ToolSpec("fs.create_file", "CreateFileTool", factory=_file_registry_factory, read_only=False), + "edit_file": ToolSpec("fs.edit_file", "EditFileTool", factory=_file_registry_factory, read_only=False), + "append_file": ToolSpec("fs.append_file", "AppendFileTool", factory=_file_registry_factory, read_only=False), + "grep": ToolSpec("shell.grep", "GrepTool", factory=_file_policy_factory, read_only=True), + "list_files": ToolSpec("shell.list_files", "ListFilesTool", read_only=True), + "mkdir": ToolSpec("fs.mkdir", "MkdirTool", factory=_file_policy_factory, read_only=False), + "http_get": ToolSpec("http.http_get", "HttpGetTool", read_only=True), + "ddev_create": ToolSpec("shell.ddev.create", "DdevCreateTool", read_only=False), + "ddev_test": ToolSpec("shell.ddev.ddev_test", "DdevTestTool", read_only=False), + "ddev_env_show": ToolSpec("shell.ddev.env_show", "DdevEnvShowTool", read_only=False), + "ddev_env_start": ToolSpec("shell.ddev.env_start", "DdevEnvStartTool", read_only=False), + "ddev_env_stop": ToolSpec("shell.ddev.env_stop", "DdevEnvStopTool", read_only=False), + "ddev_env_test": ToolSpec("shell.ddev.env_test", "DdevEnvTestTool", read_only=False), + "ddev_release_changelog": ToolSpec("shell.ddev.release_changelog", "DdevReleaseChangelogTool", read_only=False), + "ddev_validate": ToolSpec("shell.ddev.validate", "DdevValidateTool", read_only=False), "spawn_subagent": ToolSpec( "agents.spawn_subagent", "SpawnSubagentTool", factory=_spawn_subagent_factory, requires_subagent_builder=True, + read_only=False, ), } @@ -159,3 +162,15 @@ async def run(self, name: str, raw: dict[str, object]) -> ToolResult: if tool is None: return ToolResult(success=False, error=f"Unknown tool: {name!r}") return await tool.run(raw) + + +def filter_read_only(tool_names: list[str]) -> list[str]: + """Return only the names whose ToolSpec has read_only=True. Unknown names raise.""" + out: list[str] = [] + for name in tool_names: + spec = TOOL_MANIFEST.get(name) + if spec is None: + raise ValueError(f"Unknown tool name: {name!r}") + if spec.read_only: + out.append(name) + return out diff --git a/ddev/tests/ai/agent/test_build.py b/ddev/tests/ai/agent/test_build.py index 2ee6a42762bf6..f996da6ee0da2 100644 --- a/ddev/tests/ai/agent/test_build.py +++ b/ddev/tests/ai/agent/test_build.py @@ -6,9 +6,16 @@ import pytest -from ddev.ai.agent.anthropic_client import AnthropicAgent -from ddev.ai.agent.build import build_agent, build_subagent, make_agent_builder, make_subagent_builder +from ddev.ai.agent.anthropic_client import DEFAULT_MAX_TOKENS, DEFAULT_MODEL, AnthropicAgent +from ddev.ai.agent.build import ( + build_agent, + build_subagent, + make_agent_builder, + make_goal_agent_builder, + make_subagent_builder, +) from ddev.ai.phases.config import AgentConfig +from ddev.ai.phases.goal import GOAL_REVIEWER_SYSTEM_PROMPT from ddev.ai.tools.fs.file_access_policy import FileAccessPolicy from ddev.ai.tools.fs.file_registry import FileRegistry from ddev.ai.tools.registry import ToolRegistry @@ -128,3 +135,50 @@ def test_make_subagent_builder(file_registry, clients): agent, registry = builder("sys", "sub-1", []) assert isinstance(agent, AnthropicAgent) assert agent.name == "sub-1" + + +# --------------------------------------------------------------------------- +# make_goal_agent_builder +# --------------------------------------------------------------------------- + + +def test_goal_agent_builder_unknown_provider_raises(file_registry, clients): + config = AgentConfig.model_construct(provider="bad_provider", tools=[]) + builder = make_goal_agent_builder(config, clients, file_registry) + with pytest.raises(ValueError, match="Unknown agent provider: 'bad_provider'"): + builder("phase.goal.task") + + +def test_make_goal_agent_builder_filters_read_only_tools(file_registry, clients): + config = AgentConfig( + provider="anthropic", + tools=["read_file", "edit_file", "grep", "create_file"], + ) + builder = make_goal_agent_builder(config, clients, file_registry) + agent, registry = builder("phase.goal.task") + + assert isinstance(agent, AnthropicAgent) + assert agent.name == "phase.goal.task" + tool_names = {d["name"] for d in registry.definitions} + assert tool_names == {"read_file", "grep"} + + +def test_make_goal_agent_builder_uses_default_model(file_registry, clients): + config = AgentConfig( + provider="anthropic", + tools=["read_file"], + model="claude-opus-4-7", + max_tokens=999, + ) + builder = make_goal_agent_builder(config, clients, file_registry) + agent, _ = builder("phase.goal.task") + + assert agent._model == DEFAULT_MODEL + assert agent._max_tokens == DEFAULT_MAX_TOKENS + + +def test_make_goal_agent_builder_uses_reviewer_system_prompt(file_registry, clients): + config = AgentConfig(provider="anthropic", tools=["read_file"]) + builder = make_goal_agent_builder(config, clients, file_registry) + agent, _ = builder("phase.goal.task") + assert agent._system_prompt == GOAL_REVIEWER_SYSTEM_PROMPT diff --git a/ddev/tests/ai/callbacks/test_callbacks.py b/ddev/tests/ai/callbacks/test_callbacks.py index 79a4ae151fb8e..9c61a6bc12dbb 100644 --- a/ddev/tests/ai/callbacks/test_callbacks.py +++ b/ddev/tests/ai/callbacks/test_callbacks.py @@ -449,6 +449,52 @@ async def good(iteration: int) -> None: assert fired == [True] +# --------------------------------------------------------------------------- +# on_before_goal_check and on_after_goal_check +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("event", ["before", "after"], ids=["before", "after"]) +async def test_goal_check_callbacks_register_fire_and_swallow_exceptions(event): + cb = CallbackSet() + fired: list = [] + + decorator = cb.on_before_goal_check if event == "before" else cb.on_after_goal_check + + @decorator + async def bad(*args): + raise RuntimeError("boom") + + @decorator + async def good(*args): + fired.append(args) + + if event == "before": + await cb.fire_before_goal_check("task-x", 3) + assert fired == [("task-x", 3)] + else: + await cb.fire_after_goal_check("task-x", 3, False, "missing y") + assert fired == [("task-x", 3, False, "missing y")] + + +async def test_callbacks_dispatches_goal_check_to_all_sets(): + s1, s2 = CallbackSet(), CallbackSet() + fired: list = [] + + @s1.on_before_goal_check + async def h1(name, attempt): + fired.append(("s1", name, attempt)) + + @s2.on_after_goal_check + async def h2(name, attempt, valid, reason): + fired.append(("s2", name, attempt, valid, reason)) + + cb = Callbacks([s1, s2]) + await cb.fire_before_goal_check("t", 1) + await cb.fire_after_goal_check("t", 1, True, "") + assert fired == [("s1", "t", 1), ("s2", "t", 1, True, "")] + + # --------------------------------------------------------------------------- # Callbacks container # --------------------------------------------------------------------------- diff --git a/ddev/tests/ai/phases/conftest.py b/ddev/tests/ai/phases/conftest.py index 96149455e1385..90977313cfe47 100644 --- a/ddev/tests/ai/phases/conftest.py +++ b/ddev/tests/ai/phases/conftest.py @@ -8,7 +8,9 @@ import pytest from ddev.ai.agent.types import AgentResponse, ContextUsage, StopReason, TokenUsage, ToolResultMessage +from ddev.ai.callbacks.callbacks import Callbacks from ddev.ai.phases.agentic_phase import AgenticPhase +from ddev.ai.phases.base import FlowServices from ddev.ai.phases.checkpoint import CheckpointManager from ddev.ai.phases.config import PhaseConfig, TaskConfig from ddev.ai.tools.fs.file_access_policy import FileAccessPolicy @@ -119,6 +121,7 @@ def make_agent_phase( context_compact_threshold_pct: int = 80, callbacks=None, captured_agent_kwargs: dict[str, Any] | None = None, + goal_agent_builder=None, ) -> tuple[AgenticPhase, CheckpointManager]: """Build an AgenticPhase ready for process_message-driven tests. @@ -132,18 +135,22 @@ def make_agent_phase( context_compact_threshold_pct=context_compact_threshold_pct, ) checkpoint_manager = CheckpointManager(flow_dir / "checkpoints.yaml") + services = FlowServices( + checkpoint_manager=checkpoint_manager, + runtime_variables=runtime_variables or {}, + flow_variables=flow_variables or {}, + config_dir=flow_dir, + file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + callbacks=callbacks or Callbacks(), + ) phase = AgenticPhase( phase_id=phase_id, dependencies=dependencies or [], config=config, + services=services, agent_builder=make_agent_builder(mock_agent, captured_agent_kwargs), - checkpoint_manager=checkpoint_manager, - runtime_variables=runtime_variables or {}, - flow_variables=flow_variables or {}, - config_dir=flow_dir, - file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), - callbacks=callbacks, + goal_agent_builder=goal_agent_builder, ) phase.queue = message_queue return phase, checkpoint_manager diff --git a/ddev/tests/ai/phases/test_agentic_phase.py b/ddev/tests/ai/phases/test_agentic_phase.py index 36d4701b5b353..f761519d182bd 100644 --- a/ddev/tests/ai/phases/test_agentic_phase.py +++ b/ddev/tests/ai/phases/test_agentic_phase.py @@ -375,12 +375,22 @@ def test_extra_init_kwargs_creates_subagent_builder_from_tool_metadata( tools: list[str], expected: bool, ) -> None: + from ddev.ai.phases.base import FlowServices + from ddev.ai.phases.checkpoint import CheckpointManager + + services = FlowServices( + checkpoint_manager=CheckpointManager(flow_dir / "checkpoints.yaml"), + runtime_variables={}, + flow_variables={}, + config_dir=flow_dir, + file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + ) kwargs = AgenticPhase.extra_init_kwargs( phase_id="p1", phase_config=PhaseConfig(agent="writer", tasks=[TaskConfig(name="t1", prompt="Do the work.")]), agents={"writer": AgentConfig(tools=tools)}, agent_clients={}, - file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + services=services, ) assert (kwargs["subagent_builder"] is not None) is expected @@ -438,17 +448,22 @@ def agent_builder_fn( ] ) + from ddev.ai.phases.base import FlowServices + checkpoint_manager = CheckpointManager(flow_dir / "checkpoints.yaml") - phase = AgenticPhase( - phase_id="p1", - dependencies=[], - config=PhaseConfig(agent="writer", tasks=[TaskConfig(name="t1", prompt="Do the work.")]), - agent_builder=agent_builder_fn, + services = FlowServices( checkpoint_manager=checkpoint_manager, runtime_variables={}, flow_variables={}, config_dir=flow_dir, file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + ) + phase = AgenticPhase( + phase_id="p1", + dependencies=[], + config=PhaseConfig(agent="writer", tasks=[TaskConfig(name="t1", prompt="Do the work.")]), + services=services, + agent_builder=agent_builder_fn, subagent_builder=mock_subagent_builder, ) phase.queue = message_queue @@ -464,3 +479,266 @@ def agent_builder_fn( assert log_file.exists() events = {e["event"] for e in read_jsonl(log_file)} assert {"start", "finish"} <= events + + +# --------------------------------------------------------------------------- +# Goal validation integration tests +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "tasks,expect_builder", + [ + ([TaskConfig(name="t1", prompt="x")], False), + ([TaskConfig(name="t1", prompt="x", goal="verify")], True), + ([TaskConfig(name="t1", prompt="x"), TaskConfig(name="t2", prompt="y", goal="verify")], True), + ], + ids=["no_goal", "single_goal", "mixed"], +) +def test_extra_init_kwargs_creates_goal_agent_builder_when_any_task_has_goal( + flow_dir, + tasks, + expect_builder, +): + from ddev.ai.phases.base import FlowServices + from ddev.ai.phases.checkpoint import CheckpointManager + + services = FlowServices( + checkpoint_manager=CheckpointManager(flow_dir / "checkpoints.yaml"), + runtime_variables={}, + flow_variables={}, + config_dir=flow_dir, + file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + ) + kwargs = AgenticPhase.extra_init_kwargs( + phase_id="p1", + phase_config=PhaseConfig(agent="writer", tasks=tasks), + agents={"writer": AgentConfig()}, + agent_clients={}, + services=services, + ) + assert (kwargs["goal_agent_builder"] is not None) is expect_builder + + +async def test_phase_with_goal_passes_first_attempt(flow_dir, monkeypatch, message_queue): + worker = MockAgent( + [ + make_response("worker did the work", 100, 50), + make_response("phase summary", 10, 5), + ] + ) + reviewer_responses = [make_response('{"valid": true, "reason": ""}', 7, 3)] + + captured_builder_calls: list = [] + + def goal_builder(owner_id): + captured_builder_calls.append(owner_id) + agent = MockAgent(list(reviewer_responses)) + return agent, ToolRegistry([]) + + phase, mgr = make_agent_phase( + flow_dir, + worker, + monkeypatch, + message_queue, + tasks=[TaskConfig(name="t1", prompt="Do it.", goal="verify it")], + goal_agent_builder=goal_builder, + ) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + cp = mgr.read()["p1"] + assert cp["status"] == "success" + assert cp["goal_validations"] == [{"task": "t1", "attempts": 1, "final_valid": True}] + assert worker.send_calls[0].startswith("Do it.") + assert "independent reviewer" in worker.send_calls[0] + assert cp["tokens"] == {"total_input": 100 + 7 + 10, "total_output": 50 + 3 + 5} + assert captured_builder_calls == ["p1.goal.t1"] + + log_file = mgr.root / "goal_agent" / "p1" / "t1.jsonl" + assert log_file.exists() + events = {e["event"] for e in read_jsonl(log_file)} + assert {"start", "finish"} <= events + + +async def test_phase_with_goal_exhausts_attempts_fails_phase(flow_dir, monkeypatch, message_queue): + worker = MockAgent( + [ + make_response("attempt 1", 0, 0), + make_response("attempt 2", 0, 0), + ] + ) + + def goal_builder(owner_id): + agent = MockAgent( + [ + make_response('{"valid": false, "reason": "first miss"}', 0, 0), + make_response('{"valid": false, "reason": "second miss"}', 0, 0), + ] + ) + return agent, ToolRegistry([]) + + phase, mgr = make_agent_phase( + flow_dir, + worker, + monkeypatch, + message_queue, + tasks=[TaskConfig(name="t1", prompt="Do it.", goal="g", max_goal_attempts=2)], + goal_agent_builder=goal_builder, + ) + + from ddev.ai.phases.goal import GoalAttemptsExhausted + + with pytest.raises(GoalAttemptsExhausted): + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert mgr.read() == {} + assert phase._goal_attempt_log == [{"task": "t1", "attempts": 2, "final_valid": False}] + + log_file = mgr.root / "goal_agent" / "p1" / "t1.jsonl" + assert log_file.exists() + finish_events = [e for e in read_jsonl(log_file) if e["event"] == "finish"] + assert len(finish_events) == 1 + assert finish_events[0]["success"] is False + + +async def test_phase_goal_partial_progress_preserved_on_exhaustion(flow_dir, monkeypatch, message_queue): + """When task 1 passes goal validation and task 2 exhausts attempts, both entries are logged.""" + worker = MockAgent( + [ + make_response("t1 done", 0, 0), + make_response("t2 attempt 1", 0, 0), + make_response("t2 attempt 2", 0, 0), + ] + ) + + call_count = 0 + + def goal_builder(owner_id): + nonlocal call_count + call_count += 1 + if call_count == 1: + agent = MockAgent([make_response('{"valid": true, "reason": ""}', 0, 0)]) + else: + agent = MockAgent( + [ + make_response('{"valid": false, "reason": "miss 1"}', 0, 0), + make_response('{"valid": false, "reason": "miss 2"}', 0, 0), + ] + ) + return agent, ToolRegistry([]) + + phase, _ = make_agent_phase( + flow_dir, + worker, + monkeypatch, + message_queue, + tasks=[ + TaskConfig(name="t1", prompt="First.", goal="check t1", max_goal_attempts=2), + TaskConfig(name="t2", prompt="Second.", goal="check t2", max_goal_attempts=2), + ], + goal_agent_builder=goal_builder, + ) + + from ddev.ai.phases.goal import GoalAttemptsExhausted + + with pytest.raises(GoalAttemptsExhausted): + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert phase._goal_attempt_log == [ + {"task": "t1", "attempts": 1, "final_valid": True}, + {"task": "t2", "attempts": 2, "final_valid": False}, + ] + + +async def test_goal_exhaustion_tokens_captured_on_phase(flow_dir, monkeypatch, message_queue): + """Goal-loop tokens are folded into the phase total even when the phase fails.""" + worker = MockAgent( + [ + make_response("worker attempt 1", 10, 5), + make_response("worker attempt 2", 10, 5), + ] + ) + + def goal_builder(owner_id): + return MockAgent( + [ + make_response('{"valid": false, "reason": "miss 1"}', 8, 4), + make_response('{"valid": false, "reason": "miss 2"}', 8, 4), + ] + ), ToolRegistry([]) + + phase, _ = make_agent_phase( + flow_dir, + worker, + monkeypatch, + message_queue, + tasks=[TaskConfig(name="t1", prompt="Do it.", goal="g", max_goal_attempts=2)], + goal_agent_builder=goal_builder, + ) + + from ddev.ai.phases.goal import GoalAttemptsExhausted + + with pytest.raises(GoalAttemptsExhausted): + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert phase._total_input_tokens == 10 + 8 + 10 + 8 + assert phase._total_output_tokens == 5 + 4 + 5 + 4 + + +async def test_on_error_writes_tokens_and_goal_validations_to_checkpoint(flow_dir, monkeypatch, message_queue): + """on_error includes token counts and goal_validations in the failure checkpoint.""" + from ddev.ai.phases.messages import PhaseTrigger + from ddev.event_bus.exceptions import MessageProcessingError + + worker = MockAgent([make_response("done", 0, 0)]) + phase, mgr = make_agent_phase(flow_dir, worker, monkeypatch, message_queue) + + phase._total_input_tokens = 42 + phase._total_output_tokens = 17 + phase._goal_attempt_log = [{"task": "t1", "attempts": 2, "final_valid": False}] + phase._started_at = None + + err = MessageProcessingError( + processor_name="p1", + message=PhaseTrigger(id="start", phase_id=None), + original_exception=RuntimeError("something went wrong"), + ) + await phase.on_error(err) + + cp = mgr.read()["p1"] + assert cp["status"] == "failed" + assert cp["tokens"] == {"total_input": 42, "total_output": 17} + assert cp["goal_validations"] == [{"task": "t1", "attempts": 2, "final_valid": False}] + assert cp["error"] == "something went wrong" + + +async def test_goal_parse_error_logged_and_tokens_captured(flow_dir, monkeypatch, message_queue): + """GoalParseError is treated the same as GoalAttemptsExhausted: logged with final_valid=False.""" + from ddev.ai.phases.goal import GoalParseError + + worker = MockAgent([make_response("worker done", 10, 5)]) + + def goal_builder(owner_id): + return MockAgent( + [ + make_response("not json", 8, 4), + make_response("still not json", 6, 3), + ] + ), ToolRegistry([]) + + phase, _ = make_agent_phase( + flow_dir, + worker, + monkeypatch, + message_queue, + tasks=[TaskConfig(name="t1", prompt="Do it.", goal="g")], + goal_agent_builder=goal_builder, + ) + + with pytest.raises(GoalParseError): + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + assert phase._goal_attempt_log == [{"task": "t1", "attempts": 1, "final_valid": False}] + assert phase._total_input_tokens == 10 + 8 + 6 + assert phase._total_output_tokens == 5 + 4 + 3 diff --git a/ddev/tests/ai/phases/test_base.py b/ddev/tests/ai/phases/test_base.py index b776aa11448a1..9c02a3931d1e1 100644 --- a/ddev/tests/ai/phases/test_base.py +++ b/ddev/tests/ai/phases/test_base.py @@ -6,7 +6,7 @@ import pytest -from ddev.ai.phases.base import Phase, PhaseOutcome +from ddev.ai.phases.base import FlowServices, Phase, PhaseOutcome from ddev.ai.phases.checkpoint import CheckpointManager from ddev.ai.phases.config import PhaseConfig from ddev.ai.phases.messages import PhaseFailedMessage, PhaseTrigger @@ -35,15 +35,18 @@ def _make_stub_phase( outcome=None, ): checkpoint_manager = CheckpointManager(flow_dir / "checkpoints.yaml") - phase = _StubPhase( - phase_id=phase_id, - dependencies=dependencies or [], - config=PhaseConfig(), + services = FlowServices( checkpoint_manager=checkpoint_manager, runtime_variables={}, flow_variables={}, config_dir=flow_dir, file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + ) + phase = _StubPhase( + phase_id=phase_id, + dependencies=dependencies or [], + config=PhaseConfig(), + services=services, outcome=outcome, ) phase.queue = message_queue diff --git a/ddev/tests/ai/phases/test_config.py b/ddev/tests/ai/phases/test_config.py index 79deda7b989b5..0e34b611a4af6 100644 --- a/ddev/tests/ai/phases/test_config.py +++ b/ddev/tests/ai/phases/test_config.py @@ -46,6 +46,35 @@ def test_task_config_extra_field_raises(): TaskConfig(name="t1", prompt="Do it.", unknown_field="x") +@pytest.mark.parametrize( + "kwargs,match", + [ + ({"name": "t", "prompt": "x", "goal": "g", "goal_path": "g.md"}, "At most one of 'goal' or 'goal_path'"), + ({"name": "t", "prompt": "x", "max_goal_attempts": 3}, "'max_goal_attempts' may only be set"), + ({"name": "t", "prompt": "x", "goal": "g", "max_goal_attempts": 0}, "must be at least 1"), + ], + ids=["both_goal_sources", "attempts_without_goal", "attempts_below_one"], +) +def test_task_config_goal_validation_rejects(kwargs, match): + with pytest.raises(ValidationError, match=match): + TaskConfig(**kwargs) + + +@pytest.mark.parametrize( + "kwargs", + [ + {"name": "t", "prompt": "x", "goal": "verify it"}, + {"name": "t", "prompt": "x", "goal_path": "g.md"}, + {"name": "t", "prompt": "x", "goal": "verify it", "max_goal_attempts": 7}, + ], + ids=["goal", "goal_path", "goal_with_custom_attempts"], +) +def test_task_config_goal_accepts_valid(kwargs): + tc = TaskConfig(**kwargs) + assert (tc.goal is not None) ^ (tc.goal_path is not None) + assert tc.max_goal_attempts == kwargs.get("max_goal_attempts", 5) + + # --------------------------------------------------------------------------- # CheckpointConfig # --------------------------------------------------------------------------- @@ -316,6 +345,31 @@ def test_from_yaml_missing_file(tmp_path): FlowConfig.from_yaml(tmp_path / "nonexistent.yaml", tmp_path) +def test_from_yaml_missing_task_goal_path(tmp_path): + prompts_dir = tmp_path / "prompts" + prompts_dir.mkdir() + (prompts_dir / "writer.md").write_text("system prompt") + flow_yaml = tmp_path / "flow.yaml" + flow_yaml.write_text( + """\ +agents: + writer: + tools: [] +phases: + p1: + agent: writer + tasks: + - name: t1 + prompt: "Do it." + goal_path: prompts/goal.md +flow: + - phase: p1 +""" + ) + with pytest.raises(FlowConfigError, match="goal_path not found"): + FlowConfig.from_yaml(flow_yaml, tmp_path) + + # --------------------------------------------------------------------------- # FlowConfig cycle detection via model_validate # --------------------------------------------------------------------------- diff --git a/ddev/tests/ai/phases/test_goal.py b/ddev/tests/ai/phases/test_goal.py new file mode 100644 index 0000000000000..01139760c209f --- /dev/null +++ b/ddev/tests/ai/phases/test_goal.py @@ -0,0 +1,383 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +import json + +import pytest + +from ddev.ai.callbacks.callbacks import Callbacks, CallbackSet +from ddev.ai.phases.config import TaskConfig +from ddev.ai.phases.goal import ( + GoalAttemptsExhausted, + GoalParseError, + build_reviewer_user_message, + parse_reviewer_output, + render_goal_text, + run_goal_loop, +) +from ddev.ai.react.process import ReActProcess +from ddev.ai.react.types import ReActResult +from ddev.ai.tools.registry import ToolRegistry + +from .conftest import MockAgent, make_response, resolve_key + +# --------------------------------------------------------------------------- +# parse_reviewer_output +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "text,expected", + [ + ('{"valid": true, "reason": ""}', (True, "")), + ('{"valid": false, "reason": "missing metric x"}', (False, "missing metric x")), + (' {"valid": true, "reason": "ok"} ', (True, "ok")), + ('```json\n{"valid": true, "reason": ""}\n```', (True, "")), + ('```\n{"valid": false, "reason": "no"}\n```', (False, "no")), + ], + ids=["plain_true", "plain_false", "whitespace", "fenced_json", "fenced_plain"], +) +def test_parse_reviewer_output_accepts(text, expected): + assert parse_reviewer_output(text) == expected + + +@pytest.mark.parametrize( + "text", + [ + "", + "not json at all", + '{"valid": "yes", "reason": "x"}', + '{"valid": true, "reason": 42}', + '{"reason": "x"}', + '["valid", true]', + ], + ids=["empty", "prose", "valid_not_bool", "reason_not_str", "missing_valid", "not_object"], +) +def test_parse_reviewer_output_rejects(text): + assert parse_reviewer_output(text) is None + + +# --------------------------------------------------------------------------- +# render_goal_text +# --------------------------------------------------------------------------- + + +def test_render_goal_text_inline_and_path(tmp_path): + inline = render_goal_text( + TaskConfig(name="t", prompt="x", goal="check ${name}"), + tmp_path, + {"name": "Alice"}, + None, + ) + assert inline == "check Alice" + + (tmp_path / "goal.md").write_text("verify ${target}") + from_file = render_goal_text( + TaskConfig(name="t", prompt="x", goal_path="goal.md"), + tmp_path, + {"target": "endpoint"}, + None, + ) + assert from_file == "verify endpoint" + + +def test_render_goal_text_forwards_resolver(tmp_path): + result = render_goal_text( + TaskConfig(name="t", prompt="x", goal="see ${draft_memory}"), + tmp_path, + {}, + resolve_key, + ) + assert result == "see resolved(draft_memory)" + + +# --------------------------------------------------------------------------- +# build_reviewer_user_message +# --------------------------------------------------------------------------- + + +def test_build_reviewer_user_message_sections(): + msg = build_reviewer_user_message( + rendered_task_prompt="TASK", + goal_text="GOAL", + worker_summary="SUMMARY", + ) + assert "## Original task\nTASK" in msg + assert "## Goal to verify\nGOAL" in msg + assert "## Worker summary\nSUMMARY" in msg + + +# --------------------------------------------------------------------------- +# Helpers used by run_goal_loop tests +# --------------------------------------------------------------------------- + + +def _make_worker_process(responses): + """ReActProcess wired to a MockAgent — used as the worker.""" + agent = MockAgent(list(responses)) + return ReActProcess(agent=agent, tool_registry=ToolRegistry([])), agent + + +def _reviewer_builder(responses): + """A GoalAgentBuilder that returns a fresh MockAgent + empty ToolRegistry.""" + agent = MockAgent(list(responses)) + calls: list[str] = [] + + def builder(owner_id): + calls.append(owner_id) + return agent, ToolRegistry([]) + + return builder, calls, agent + + +async def _noop_compact(_): + return 0, 0 + + +# --------------------------------------------------------------------------- +# run_goal_loop +# --------------------------------------------------------------------------- + + +async def test_run_goal_loop_passes_on_first_attempt(tmp_path): + worker_process, worker_agent = _make_worker_process([]) + initial_result = ReActResult( + final_response=make_response("did things"), + iterations=1, + total_input_tokens=100, + total_output_tokens=50, + context_usage=None, + ) + builder, builder_calls, _ = _reviewer_builder([make_response('{"valid": true, "reason": ""}', 20, 10)]) + + outcome = await run_goal_loop( + task=TaskConfig(name="t1", prompt="x", goal="verify"), + goal_text="verify", + rendered_task_prompt="TASK", + worker_process=worker_process, + initial_result=initial_result, + goal_agent_builder=builder, + callbacks=Callbacks(), + phase_id="p1", + log_root=tmp_path, + compact_if_needed=_noop_compact, + ) + + assert outcome.attempts == 1 + assert outcome.total_input_tokens == 20 + assert outcome.total_output_tokens == 10 + assert outcome.final_result is initial_result + assert worker_agent.send_calls == [] + assert builder_calls == ["p1.goal.t1"] + + log_path = tmp_path / "goal_agent" / "p1" / "t1.jsonl" + assert log_path.exists() + events = {json.loads(line)["event"] for line in log_path.read_text().splitlines() if line.strip()} + assert {"start", "finish"} <= events + + +async def test_run_goal_loop_one_retry_then_pass(tmp_path): + worker_process, worker_agent = _make_worker_process([make_response("fixed it", 30, 15)]) + initial_result = ReActResult( + final_response=make_response("initial work"), + iterations=1, + total_input_tokens=100, + total_output_tokens=50, + context_usage=None, + ) + builder, _, reviewer_agent = _reviewer_builder( + [ + make_response('{"valid": false, "reason": "missing X"}', 20, 10), + make_response('{"valid": true, "reason": ""}', 25, 12), + ] + ) + + outcome = await run_goal_loop( + task=TaskConfig(name="t1", prompt="x", goal="g"), + goal_text="g", + rendered_task_prompt="TASK", + worker_process=worker_process, + initial_result=initial_result, + goal_agent_builder=builder, + callbacks=Callbacks(), + phase_id="p1", + log_root=tmp_path, + compact_if_needed=_noop_compact, + ) + + assert outcome.attempts == 2 + assert outcome.final_result.final_response.text == "fixed it" + assert len(worker_agent.send_calls) == 1 + assert "missing X" in worker_agent.send_calls[0] + assert "g" in worker_agent.send_calls[0] + assert len(reviewer_agent.send_calls) == 2 + assert outcome.total_input_tokens == 20 + 25 + 30 + assert outcome.total_output_tokens == 10 + 12 + 15 + + +async def test_run_goal_loop_exhausts_attempts(tmp_path): + worker_process, _ = _make_worker_process([make_response("attempt 2", 10, 5)]) + initial_result = ReActResult( + final_response=make_response("attempt 1"), + iterations=1, + total_input_tokens=10, + total_output_tokens=5, + context_usage=None, + ) + builder, _, _ = _reviewer_builder( + [ + make_response('{"valid": false, "reason": "first miss"}', 5, 3), + make_response('{"valid": false, "reason": "second miss"}', 7, 4), + ] + ) + + with pytest.raises(GoalAttemptsExhausted, match="2 attempts") as exc_info: + await run_goal_loop( + task=TaskConfig(name="t1", prompt="x", goal="g", max_goal_attempts=2), + goal_text="g", + rendered_task_prompt="TASK", + worker_process=worker_process, + initial_result=initial_result, + goal_agent_builder=builder, + callbacks=Callbacks(), + phase_id="p1", + log_root=tmp_path, + compact_if_needed=_noop_compact, + ) + + err = exc_info.value + assert err.input_tokens == 5 + 10 + 7 + assert err.output_tokens == 3 + 5 + 4 + + log_path = tmp_path / "goal_agent" / "p1" / "t1.jsonl" + finish = next( + json.loads(line) for line in log_path.read_text().splitlines() if json.loads(line)["event"] == "finish" + ) + assert finish["total_input_tokens"] == 5 + 10 + 7 + assert finish["total_output_tokens"] == 3 + 5 + 4 + assert finish["success"] is False + + +async def test_run_goal_loop_parse_retry_succeeds(tmp_path): + worker_process, _ = _make_worker_process([]) + initial_result = ReActResult( + final_response=make_response("done"), + iterations=1, + total_input_tokens=0, + total_output_tokens=0, + context_usage=None, + ) + builder, _, reviewer_agent = _reviewer_builder( + [ + make_response("not json", 5, 5), + make_response('{"valid": true, "reason": ""}', 7, 7), + ] + ) + + outcome = await run_goal_loop( + task=TaskConfig(name="t1", prompt="x", goal="g"), + goal_text="g", + rendered_task_prompt="TASK", + worker_process=worker_process, + initial_result=initial_result, + goal_agent_builder=builder, + callbacks=Callbacks(), + phase_id="p1", + log_root=tmp_path, + compact_if_needed=_noop_compact, + ) + assert outcome.attempts == 1 + assert len(reviewer_agent.send_calls) == 2 + assert outcome.total_input_tokens == 5 + 7 + assert outcome.total_output_tokens == 5 + 7 + + +async def test_run_goal_loop_parse_retry_fails_raises(tmp_path): + worker_process, _ = _make_worker_process([]) + initial_result = ReActResult( + final_response=make_response("done"), + iterations=1, + total_input_tokens=0, + total_output_tokens=0, + context_usage=None, + ) + builder, _, _ = _reviewer_builder( + [ + make_response("not json", 5, 3), + make_response("still not json", 7, 4), + ] + ) + + with pytest.raises(GoalParseError) as exc_info: + await run_goal_loop( + task=TaskConfig(name="t1", prompt="x", goal="g"), + goal_text="g", + rendered_task_prompt="TASK", + worker_process=worker_process, + initial_result=initial_result, + goal_agent_builder=builder, + callbacks=Callbacks(), + phase_id="p1", + log_root=tmp_path, + compact_if_needed=_noop_compact, + ) + + err = exc_info.value + assert err.input_tokens == 5 + 7 + assert err.output_tokens == 3 + 4 + + log_path = tmp_path / "goal_agent" / "p1" / "t1.jsonl" + finish = next( + json.loads(line) for line in log_path.read_text().splitlines() if json.loads(line)["event"] == "finish" + ) + assert finish["total_input_tokens"] == 5 + 7 + assert finish["total_output_tokens"] == 3 + 4 + assert finish["success"] is False + + +async def test_run_goal_loop_fires_callbacks(tmp_path): + events: list = [] + cb_set = CallbackSet() + + @cb_set.on_before_goal_check + async def _before(task_name, attempt): + events.append(("before", task_name, attempt)) + + @cb_set.on_after_goal_check + async def _after(task_name, attempt, valid, reason): + events.append(("after", task_name, attempt, valid, reason)) + + worker_process, _ = _make_worker_process([make_response("attempt 2", 0, 0)]) + initial_result = ReActResult( + final_response=make_response("attempt 1"), + iterations=1, + total_input_tokens=0, + total_output_tokens=0, + context_usage=None, + ) + builder, _, _ = _reviewer_builder( + [ + make_response('{"valid": false, "reason": "fix X"}', 0, 0), + make_response('{"valid": true, "reason": ""}', 0, 0), + ] + ) + + await run_goal_loop( + task=TaskConfig(name="t1", prompt="x", goal="g"), + goal_text="g", + rendered_task_prompt="TASK", + worker_process=worker_process, + initial_result=initial_result, + goal_agent_builder=builder, + callbacks=Callbacks([cb_set]), + phase_id="p1", + log_root=tmp_path, + compact_if_needed=_noop_compact, + ) + assert events == [ + ("before", "t1", 1), + ("after", "t1", 1, False, "fix X"), + ("before", "t1", 2), + ("after", "t1", 2, True, ""), + ] diff --git a/ddev/tests/ai/tools/test_registry.py b/ddev/tests/ai/tools/test_registry.py index a8e3f4f40ea3e..2f6e33d3ea3e4 100644 --- a/ddev/tests/ai/tools/test_registry.py +++ b/ddev/tests/ai/tools/test_registry.py @@ -7,7 +7,7 @@ from ddev.ai.tools.core.types import ToolResult from ddev.ai.tools.fs.file_access_policy import FileAccessPolicy from ddev.ai.tools.fs.file_registry import FileRegistry -from ddev.ai.tools.registry import ToolRegistry +from ddev.ai.tools.registry import ToolRegistry, filter_read_only # --------------------------------------------------------------------------- # Fake tools — implement ToolProtocol without depending on BaseTool @@ -210,6 +210,30 @@ def test_from_names_fs_tools_share_file_registry(tmp_path): assert all(r is registries[0] for r in registries) +# --------------------------------------------------------------------------- +# read_only manifest annotations and filter_read_only +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "input_names,expected", + [ + (["read_file", "edit_file", "grep", "create_file"], ["read_file", "grep"]), + (["edit_file", "create_file"], []), + ([], []), + (["read_file", "list_files"], ["read_file", "list_files"]), + ], + ids=["mixed", "all_writes", "empty", "all_reads"], +) +def test_filter_read_only(input_names, expected): + assert filter_read_only(input_names) == expected + + +def test_filter_read_only_unknown_name_raises(): + with pytest.raises(ValueError, match="Unknown tool name"): + filter_read_only(["read_file", "teleport"]) + + def test_from_names_reuses_supplied_file_registry(tmp_path): """Multiple ToolRegistries can share one FileRegistry; tools carry their own owner_id.""" shared = FileRegistry(policy=FileAccessPolicy(write_root=tmp_path))