diff --git a/ddev/hatch.toml b/ddev/hatch.toml index 9550fb4a939c3..d1f351f807f94 100644 --- a/ddev/hatch.toml +++ b/ddev/hatch.toml @@ -7,6 +7,7 @@ mypy-files = [ [envs.default] python = "3.13" e2e-env = false +features = ["ai"] dependencies = [ "pyyaml", "pytest-asyncio", diff --git a/ddev/pyproject.toml b/ddev/pyproject.toml index 73928e956e9d4..424063afdb689 100644 --- a/ddev/pyproject.toml +++ b/ddev/pyproject.toml @@ -48,6 +48,11 @@ dependencies = [ ] dynamic = ["version"] +[project.optional-dependencies] +ai = [ + "prometheus-client", +] + [project.urls] Source = "https://github.com/DataDog/integrations-core" diff --git a/ddev/src/ddev/ai/flows/__init__.py b/ddev/src/ddev/ai/flows/__init__.py new file mode 100644 index 0000000000000..75c6647cb9233 --- /dev/null +++ b/ddev/src/ddev/ai/flows/__init__.py @@ -0,0 +1,3 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) diff --git a/ddev/src/ddev/ai/flows/openmetrics/__init__.py b/ddev/src/ddev/ai/flows/openmetrics/__init__.py new file mode 100644 index 0000000000000..75c6647cb9233 --- /dev/null +++ b/ddev/src/ddev/ai/flows/openmetrics/__init__.py @@ -0,0 +1,3 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) diff --git a/ddev/src/ddev/ai/flows/openmetrics/phases/__init__.py b/ddev/src/ddev/ai/flows/openmetrics/phases/__init__.py new file mode 100644 index 0000000000000..75c6647cb9233 --- /dev/null +++ b/ddev/src/ddev/ai/flows/openmetrics/phases/__init__.py @@ -0,0 +1,3 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) diff --git a/ddev/src/ddev/ai/flows/openmetrics/phases/inspect_endpoint.py b/ddev/src/ddev/ai/flows/openmetrics/phases/inspect_endpoint.py new file mode 100644 index 0000000000000..cce8205ccf9ad --- /dev/null +++ b/ddev/src/ddev/ai/flows/openmetrics/phases/inspect_endpoint.py @@ -0,0 +1,217 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +import json +import os +from pathlib import Path +from typing import Any + +import httpx +from prometheus_client import Metric +from prometheus_client.openmetrics.parser import text_string_to_metric_families as parse_openmetrics +from prometheus_client.parser import text_string_to_metric_families as parse_prometheus + +from ddev.ai.phases.base import Phase, PhaseOutcome +from ddev.ai.phases.config import AgentConfig, FlowConfigError, PhaseConfig + +REQUEST_TIMEOUT_SECONDS = 10.0 +RESPONSE_BODY_LIMIT_BYTES = 10 * 1024 * 1024 # 10 MB +JSONL_FILENAME_SUFFIX = "_metrics.jsonl" + + +class EndpointInspectionError(Exception): + """Raised when the endpoint is unreachable, its body is unusable, or the catalog cannot be written.""" + + +def _parse_exposition(body: str, content_type: str) -> tuple[list[Metric], str]: + """Parse body with the parser matching content_type. + + Returns (families, exposition_format) where exposition_format is + "openmetrics" or "prometheus". Raises EndpointInspectionError if parsing + fails or yields zero metric families. + """ + if content_type.startswith("application/openmetrics-text"): + parser = parse_openmetrics + exposition_format = "openmetrics" + else: + parser = parse_prometheus + exposition_format = "prometheus" + + try: + families = list(parser(body)) + except Exception as e: + raise EndpointInspectionError( + f"Body is not valid {exposition_format} exposition ({type(e).__name__}): {e}" + ) from e + + if not families: + raise EndpointInspectionError(f"Body parsed as {exposition_format} but contained zero metric families") + + return families, exposition_format + + +def _build_jsonl_rows(families: list[Metric]) -> list[dict[str, Any]]: + """Build one JSONL row per metric family.""" + rows: list[dict[str, Any]] = [] + for metric in families: + label_keys: set[str] = set() + for sample in metric.samples: + label_keys.update(sample.labels.keys()) + rows.append( + { + "name": metric.name, + "type": metric.type, + "help": metric.documentation or "", + "unit": metric.unit or "", + "label_keys": sorted(label_keys), + "sample_count": len(metric.samples), + } + ) + return rows + + +def _remove_if_exists(path: Path) -> None: + try: + path.unlink(missing_ok=True) + except OSError: + pass + + +def _write_jsonl(path: Path, rows: list[dict[str, Any]]) -> None: + """Atomically write rows as JSON Lines to path.""" + tmp_path = path.with_suffix(path.suffix + ".tmp") + try: + with tmp_path.open("w", encoding="utf-8") as fh: + for row in rows: + try: + line = json.dumps(row, separators=(",", ":"), ensure_ascii=False) + except (TypeError, ValueError) as e: + raise EndpointInspectionError(f"Failed to serialize metric {row.get('name')!r}: {e}") from e + fh.write(line) + fh.write("\n") + os.replace(tmp_path, path) + except EndpointInspectionError: + _remove_if_exists(tmp_path) + raise + except OSError as e: + _remove_if_exists(tmp_path) + raise EndpointInspectionError(f"Failed to write metrics catalog at {path}: {e}") from e + + +def _build_memory_text( + url: str, + status: int, + content_type: str, + exposition_format: str, + metric_count: int, + jsonl_path: Path, +) -> str: + """Render the markdown memory file describing the inspected endpoint.""" + lines = [ + "# Endpoint inspection", + "", + f"- **URL:** {url}", + f"- **HTTP status:** {status}", + f"- **Content-Type:** {content_type}", + f"- **Exposition format:** {exposition_format}", + f"- **Metric families detected:** {metric_count}", + f"- **Metrics catalog:** {jsonl_path}", + "", + "Endpoint is reachable and serves a Prometheus/OpenMetrics-compatible body.", + "The full list of metrics with metadata is in the catalog file above.", + ] + return "\n".join(lines) + + +class InspectEndpointPhase(Phase): + """Deterministic Phase 0 for the OpenMetrics pipeline. + + Performs a single HTTP fetch of ``endpoint_url`` and: + + 1. Confirms the endpoint is reachable (HTTP 200). + 2. Confirms the body is valid Prometheus or OpenMetrics exposition. + 3. Writes a ``_metrics.jsonl`` sidecar next to the memory file, + with one row per metric family — the ground-truth catalog later phases + use to drive metric renaming, ``metrics.py`` mapping, and + ``metadata.csv`` generation. + + Aborts the pipeline with EndpointInspectionError on any failure. + """ + + @classmethod + def validate_config( + cls, + phase_id: str, + config: PhaseConfig, + agents: dict[str, AgentConfig], + ) -> None: + if config.agent is not None: + raise FlowConfigError(f"Phase {phase_id!r} (InspectEndpointPhase) must not declare 'agent'") + if config.tasks: + raise FlowConfigError(f"Phase {phase_id!r} (InspectEndpointPhase) must not declare 'tasks'") + if config.checkpoint is not None: + raise FlowConfigError(f"Phase {phase_id!r} (InspectEndpointPhase) must not declare 'checkpoint'") + + async def execute(self, context: dict[str, Any]) -> PhaseOutcome: + endpoint_url = context.get("endpoint_url") + if not endpoint_url: + raise FlowConfigError(f"Phase {self._phase_id!r}: 'endpoint_url' runtime variable is required") + + limit_mb = RESPONSE_BODY_LIMIT_BYTES // (1024 * 1024) + try: + async with httpx.AsyncClient(timeout=REQUEST_TIMEOUT_SECONDS, follow_redirects=True) as client: + async with client.stream("GET", endpoint_url) as response: + if response.status_code != 200: + raise EndpointInspectionError( + f"Endpoint returned HTTP {response.status_code} (expected 200): {endpoint_url}" + ) + chunks: list[bytes] = [] + received = 0 + async for chunk in response.aiter_bytes(): + received += len(chunk) + if received > RESPONSE_BODY_LIMIT_BYTES: + raise EndpointInspectionError(f"Response body exceeds {limit_mb} MB limit: {endpoint_url}") + chunks.append(chunk) + body = b"".join(chunks).decode("utf-8", errors="replace") + content_type = response.headers.get("Content-Type", "") + except EndpointInspectionError: + raise + except httpx.TimeoutException as e: + raise EndpointInspectionError(f"Endpoint timed out after {REQUEST_TIMEOUT_SECONDS}s: {endpoint_url}") from e + except httpx.RequestError as e: + raise EndpointInspectionError(f"Request failed for {endpoint_url}: {e}") from e + + try: + families, exposition_format = _parse_exposition(body, content_type) + except EndpointInspectionError as e: + raise EndpointInspectionError(f"{e} ({endpoint_url})") from e + + rows = _build_jsonl_rows(families) + self._checkpoint_manager.memory_dir.mkdir(parents=True, exist_ok=True) + jsonl_path = (self._checkpoint_manager.memory_dir / f"{self._phase_id}{JSONL_FILENAME_SUFFIX}").resolve() + _write_jsonl(jsonl_path, rows) + + metric_count = len(families) + memory_text = _build_memory_text( + url=endpoint_url, + status=response.status_code, + content_type=content_type, + exposition_format=exposition_format, + metric_count=metric_count, + jsonl_path=jsonl_path, + ) + + return PhaseOutcome( + memory_text=memory_text, + total_input_tokens=0, + total_output_tokens=0, + extra_checkpoint={ + "endpoint_url": endpoint_url, + "status_code": response.status_code, + "content_type": content_type, + "exposition_format": exposition_format, + "metric_count": metric_count, + "metrics_jsonl_path": str(jsonl_path), + }, + ) diff --git a/ddev/src/ddev/ai/phases/checkpoint.py b/ddev/src/ddev/ai/phases/checkpoint.py index 3f32edb158a5b..c8c4be4d94445 100644 --- a/ddev/src/ddev/ai/phases/checkpoint.py +++ b/ddev/src/ddev/ai/phases/checkpoint.py @@ -40,13 +40,18 @@ def write_phase_checkpoint(self, phase_id: str, data: dict[str, Any]) -> None: checkpoints = self.read() checkpoints[phase_id] = data self._ensure_dir() - self._path.write_text(yaml.dump(checkpoints, default_flow_style=False), encoding="utf-8") + self._path.write_text(yaml.dump(checkpoints, default_flow_style=False, sort_keys=False), encoding="utf-8") def build_memory_prompt(self, user_additions: str | None) -> str: """Build the memory prompt to send to the agent at the end of a phase.""" base_prompt = "Write a brief summary of what you accomplished in this phase." return f"{user_additions}\n\n{base_prompt}" if user_additions else base_prompt + @property + def memory_dir(self) -> Path: + """Directory where memory files and per-phase sidecar artifacts are written.""" + return self._path.parent + def memory_path(self, phase_id: str) -> Path: """Return the resolved path to a phase's memory file.""" return (self.root / f"{phase_id}_memory.md").resolve() diff --git a/ddev/src/ddev/ai/phases/orchestrator.py b/ddev/src/ddev/ai/phases/orchestrator.py index de663c3138ed4..3a4e4bc94f7e6 100644 --- a/ddev/src/ddev/ai/phases/orchestrator.py +++ b/ddev/src/ddev/ai/phases/orchestrator.py @@ -21,12 +21,15 @@ def _discover_and_register_phases( registry: PhaseRegistry, - phases_dir: Path | None = None, - import_prefix: str = "ddev.ai.phases", + phases_dir: Path, + import_prefix: str, ) -> None: - """Import all non-private modules in phases_dir and register Phase subclasses.""" - if phases_dir is None: - phases_dir = Path(__file__).parent + """Import every non-private *.py in phases_dir and register Phase subclasses. + + Modules are imported by dotted path: ``{import_prefix}.{file_stem}``. The + caller is responsible for choosing the right pair (dir, prefix). Import + errors are fatal — a syntax error in any discovered module aborts startup. + """ for py_file in phases_dir.glob("*.py"): if py_file.stem.startswith("_"): continue @@ -75,7 +78,27 @@ async def on_initialize(self) -> None: """Discover custom phases, parse flow.yaml, construct phases, submit PhaseTrigger.""" config_dir = self._flow_yaml_path.parent - _discover_and_register_phases(self._phase_registry) + _discover_and_register_phases( + self._phase_registry, + Path(__file__).parent, + "ddev.ai.phases", + ) + + flow_phases_dir = config_dir / "phases" + if flow_phases_dir.is_dir(): + ai_root = Path(__file__).parent.parent + try: + rel = flow_phases_dir.relative_to(ai_root) + except ValueError: + raise FlowConfigError( + f"Flow phases directory {flow_phases_dir} must be inside the ddev.ai package tree ({ai_root})" + ) from None + flow_import_prefix = "ddev.ai." + ".".join(rel.parts) + _discover_and_register_phases( + self._phase_registry, + flow_phases_dir, + flow_import_prefix, + ) config = FlowConfig.from_yaml(self._flow_yaml_path, config_dir) diff --git a/ddev/tests/ai/flows/__init__.py b/ddev/tests/ai/flows/__init__.py new file mode 100644 index 0000000000000..75c6647cb9233 --- /dev/null +++ b/ddev/tests/ai/flows/__init__.py @@ -0,0 +1,3 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) diff --git a/ddev/tests/ai/flows/openmetrics/__init__.py b/ddev/tests/ai/flows/openmetrics/__init__.py new file mode 100644 index 0000000000000..75c6647cb9233 --- /dev/null +++ b/ddev/tests/ai/flows/openmetrics/__init__.py @@ -0,0 +1,3 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) diff --git a/ddev/tests/ai/flows/openmetrics/phases/__init__.py b/ddev/tests/ai/flows/openmetrics/phases/__init__.py new file mode 100644 index 0000000000000..75c6647cb9233 --- /dev/null +++ b/ddev/tests/ai/flows/openmetrics/phases/__init__.py @@ -0,0 +1,3 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) diff --git a/ddev/tests/ai/flows/openmetrics/phases/test_inspect_endpoint.py b/ddev/tests/ai/flows/openmetrics/phases/test_inspect_endpoint.py new file mode 100644 index 0000000000000..a8b341050291d --- /dev/null +++ b/ddev/tests/ai/flows/openmetrics/phases/test_inspect_endpoint.py @@ -0,0 +1,590 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +import asyncio +import json +import os + +import httpx +import pytest +from prometheus_client import Metric +from prometheus_client.parser import text_string_to_metric_families as parse_prometheus + +from ddev.ai.flows.openmetrics.phases import inspect_endpoint as inspect_endpoint_module +from ddev.ai.flows.openmetrics.phases.inspect_endpoint import ( + EndpointInspectionError, + InspectEndpointPhase, + _build_jsonl_rows, + _build_memory_text, + _parse_exposition, +) +from ddev.ai.phases.checkpoint import CheckpointManager +from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig +from ddev.ai.phases.messages import PhaseFailedMessage, PhaseTrigger +from ddev.ai.tools.fs.file_access_policy import FileAccessPolicy +from ddev.ai.tools.fs.file_registry import FileRegistry +from ddev.event_bus.exceptions import MessageProcessingError + +ENDPOINT_URL = "http://example.test:9100/metrics" +PHASE_ID = "inspect_endpoint" + +PROMETHEUS_BODY = """\ +# HELP http_requests_total Total HTTP requests. +# TYPE http_requests_total counter +http_requests_total{method="GET",code="200"} 1027 +# HELP process_resident_memory_bytes Resident memory in bytes. +# TYPE process_resident_memory_bytes gauge +process_resident_memory_bytes 1.2e+08 +# HELP request_duration_seconds Request latency. +# TYPE request_duration_seconds histogram +request_duration_seconds_bucket{le="0.1"} 3 +request_duration_seconds_bucket{le="0.5"} 5 +request_duration_seconds_bucket{le="+Inf"} 7 +request_duration_seconds_sum 1.5 +request_duration_seconds_count 7 +""" + +OPENMETRICS_BODY = """\ +# TYPE http_requests counter +# HELP http_requests Total HTTP requests. +# UNIT http_requests requests +http_requests_total{method="GET",code="200"} 1027 +# TYPE process_resident_memory_bytes gauge +# UNIT process_resident_memory_bytes bytes +# HELP process_resident_memory_bytes Resident memory in bytes. +process_resident_memory_bytes 1.2e+08 +# EOF +""" + +OPENMETRICS_BODY_CLEAN = """\ +# TYPE http_requests counter +# HELP http_requests Total HTTP requests. +http_requests_total{method="GET",code="200"} 1027 +# TYPE process_resident_memory_bytes gauge +# HELP process_resident_memory_bytes Resident memory in bytes. +process_resident_memory_bytes 1.2e+08 +# TYPE request_duration_seconds summary +# HELP request_duration_seconds Request latency. +request_duration_seconds{quantile="0.5"} 0.04 +request_duration_seconds{quantile="0.9"} 0.09 +request_duration_seconds_sum 1.5 +request_duration_seconds_count 7 +# EOF +""" + +OPENMETRICS_BODY_WITH_UNIT = """\ +# TYPE request_duration_seconds gauge +# HELP request_duration_seconds Request latency snapshot. +# UNIT request_duration_seconds seconds +request_duration_seconds 0.42 +# EOF +""" + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def flow_dir(tmp_path): + return tmp_path + + +@pytest.fixture +def message_queue(): + return asyncio.Queue() + + +def _make_phase( + flow_dir, + message_queue, + *, + phase_id: str = PHASE_ID, + runtime_variables: dict[str, str] | None = None, +) -> tuple[InspectEndpointPhase, CheckpointManager]: + checkpoint_manager = CheckpointManager(flow_dir / "checkpoints.yaml") + phase = InspectEndpointPhase( + phase_id=phase_id, + dependencies=[], + config=PhaseConfig(), + checkpoint_manager=checkpoint_manager, + runtime_variables=runtime_variables if runtime_variables is not None else {"endpoint_url": ENDPOINT_URL}, + flow_variables={}, + config_dir=flow_dir, + file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + ) + phase.queue = message_queue + return phase, checkpoint_manager + + +def _install_mock_transport(monkeypatch, handler): + """Patch httpx.AsyncClient inside the phase module to use a MockTransport handler.""" + real_client_cls = httpx.AsyncClient + + def factory(*args, **kwargs): + kwargs["transport"] = httpx.MockTransport(handler) + return real_client_cls(*args, **kwargs) + + monkeypatch.setattr(inspect_endpoint_module.httpx, "AsyncClient", factory) + + +def _ok_handler(status: int, body: str, content_type: str): + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(status_code=status, content=body.encode("utf-8"), headers={"Content-Type": content_type}) + + return handler + + +def _raising_handler(exc: Exception): + def handler(request: httpx.Request) -> httpx.Response: + raise exc + + return handler + + +def _read_jsonl(path) -> list[dict]: + return [json.loads(line) for line in path.read_text(encoding="utf-8").splitlines() if line] + + +class _Sample: + """Minimal Sample stand-in for the label-union unit test.""" + + def __init__(self, name: str, labels: dict[str, str], value: float): + self.name = name + self.labels = labels + self.value = value + + +# --------------------------------------------------------------------------- +# Phase happy paths +# --------------------------------------------------------------------------- + + +async def test_success_with_prometheus_body(flow_dir, message_queue, monkeypatch): + _install_mock_transport( + monkeypatch, + _ok_handler(200, PROMETHEUS_BODY, "text/plain; version=0.0.4; charset=utf-8"), + ) + phase, mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + expected_families = list(parse_prometheus(PROMETHEUS_BODY)) + + memory = mgr.memory_content(PHASE_ID) + assert ENDPOINT_URL in memory + assert "HTTP status:** 200" in memory + + checkpoint = mgr.read()[PHASE_ID] + assert checkpoint["status"] == "success" + assert checkpoint["exposition_format"] == "prometheus" + assert checkpoint["metric_count"] == len(expected_families) + assert "sample_metric_names" not in checkpoint + assert checkpoint["status_code"] == 200 + assert checkpoint["endpoint_url"] == ENDPOINT_URL + assert checkpoint["tokens"] == {"total_input": 0, "total_output": 0} + + +async def test_success_with_openmetrics_body(flow_dir, message_queue, monkeypatch): + content_type = "application/openmetrics-text; version=1.0.0; charset=utf-8" + _install_mock_transport(monkeypatch, _ok_handler(200, OPENMETRICS_BODY_CLEAN, content_type)) + phase, mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + checkpoint = mgr.read()[PHASE_ID] + assert checkpoint["status"] == "success" + assert checkpoint["exposition_format"] == "openmetrics" + assert checkpoint["content_type"] == content_type + assert checkpoint["metric_count"] >= 1 + + +# --------------------------------------------------------------------------- +# Phase failure paths +# --------------------------------------------------------------------------- + + +async def _assert_phase_fails(phase, mgr, message_queue, *, error_contains: str): + """Run process_message, expect failure, drive on_error like the framework would.""" + trigger = PhaseTrigger(id="start", phase_id=None) + try: + await phase.process_message(trigger) + except Exception as raised: + wrapped = MessageProcessingError(phase._phase_id, trigger, raised) + await phase.on_error(wrapped) + checkpoint = mgr.read()[phase._phase_id] + assert checkpoint["status"] == "failed" + assert error_contains.lower() in checkpoint["error"].lower() + msg = message_queue.get_nowait() + assert isinstance(msg, PhaseFailedMessage) + assert error_contains.lower() in msg.error.lower() + return raised + pytest.fail(f"Expected process_message() to raise; error should contain {error_contains!r}") + + +async def test_failure_non_200_status(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(503, "service unavailable", "text/plain")) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="HTTP 503") + + +async def test_failure_body_is_html(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, "not metrics", "text/plain")) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="not valid prometheus exposition") + + +async def test_failure_body_is_json(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, '{"hello": "world"}', "text/plain")) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="prometheus") + + +async def test_failure_zero_families(flow_dir, message_queue, monkeypatch): + body = "\n\n# just a stray comment, not a HELP/TYPE\n\n" + _install_mock_transport(monkeypatch, _ok_handler(200, body, "text/plain")) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="zero metric families") + + +async def test_failure_openmetrics_missing_eof(flow_dir, message_queue, monkeypatch): + _install_mock_transport( + monkeypatch, + _ok_handler(200, PROMETHEUS_BODY, "application/openmetrics-text; version=1.0.0"), + ) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="not valid openmetrics exposition") + + +async def test_failure_timeout(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _raising_handler(httpx.TimeoutException("slow"))) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="timed out") + + +async def test_failure_request_error(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _raising_handler(httpx.ConnectError("refused"))) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="request failed") + + +async def test_failure_missing_endpoint_url(flow_dir, message_queue): + phase, mgr = _make_phase(flow_dir, message_queue, runtime_variables={}) + + trigger = PhaseTrigger(id="start", phase_id=None) + with pytest.raises(FlowConfigError, match="endpoint_url"): + await phase.process_message(trigger) + + wrapped = MessageProcessingError(phase._phase_id, trigger, FlowConfigError("endpoint_url required")) + await phase.on_error(wrapped) + + checkpoint = mgr.read()[PHASE_ID] + assert checkpoint["status"] == "failed" + msg = message_queue.get_nowait() + assert isinstance(msg, PhaseFailedMessage) + + +# --------------------------------------------------------------------------- +# validate_config +# --------------------------------------------------------------------------- + + +def test_validate_config_rejects_agent(): + with pytest.raises(FlowConfigError, match="must not declare 'agent'"): + InspectEndpointPhase.validate_config("p", PhaseConfig(agent="x"), {"x": AgentConfig()}) + + +def test_validate_config_rejects_tasks(): + config = PhaseConfig(tasks=[TaskConfig(name="t", prompt="hi")]) + with pytest.raises(FlowConfigError, match="must not declare 'tasks'"): + InspectEndpointPhase.validate_config("p", config, {}) + + +def test_validate_config_rejects_checkpoint(): + config = PhaseConfig(checkpoint=CheckpointConfig(memory_prompt="x")) + with pytest.raises(FlowConfigError, match="must not declare 'checkpoint'"): + InspectEndpointPhase.validate_config("p", config, {}) + + +def test_validate_config_accepts_minimal(): + InspectEndpointPhase.validate_config("p", PhaseConfig(), {}) + + +# --------------------------------------------------------------------------- +# _parse_exposition +# --------------------------------------------------------------------------- + + +def test_parse_exposition_prometheus(): + families, fmt = _parse_exposition(PROMETHEUS_BODY, "text/plain; version=0.0.4") + assert fmt == "prometheus" + assert len(families) > 0 + + +def test_parse_exposition_openmetrics(): + families, fmt = _parse_exposition(OPENMETRICS_BODY_CLEAN, "application/openmetrics-text; version=1.0.0") + assert fmt == "openmetrics" + assert len(families) > 0 + + +def test_parse_exposition_empty_content_type_falls_back_to_prometheus(): + families, fmt = _parse_exposition(PROMETHEUS_BODY, "") + assert fmt == "prometheus" + assert len(families) > 0 + + +def test_parse_exposition_raises_on_invalid_body(): + with pytest.raises(EndpointInspectionError, match="not valid prometheus exposition"): + _parse_exposition("", "text/plain") + + +def test_parse_exposition_raises_on_zero_families(): + with pytest.raises(EndpointInspectionError, match="zero metric families"): + _parse_exposition("", "text/plain") + + +# --------------------------------------------------------------------------- +# _build_memory_text +# --------------------------------------------------------------------------- + + +def test_build_memory_text_renders_all_fields(tmp_path): + jsonl_path = tmp_path / "inspect_endpoint_metrics.jsonl" + text = _build_memory_text( + url="http://example.test:9100/metrics", + status=200, + content_type="text/plain; version=0.0.4", + exposition_format="prometheus", + metric_count=2, + jsonl_path=jsonl_path, + ) + assert "http://example.test:9100/metrics" in text + assert "200" in text + assert "text/plain; version=0.0.4" in text + assert "prometheus" in text + assert "2" in text + assert str(jsonl_path) in text + + +# --------------------------------------------------------------------------- +# JSONL + sidecar contract — new tests +# --------------------------------------------------------------------------- + + +async def test_jsonl_path_in_checkpoint(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase, mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + checkpoint = mgr.read()[PHASE_ID] + path_str = checkpoint["metrics_jsonl_path"] + assert isinstance(path_str, str) + assert os.path.isabs(path_str) + assert os.path.exists(path_str) + + +async def test_jsonl_one_row_per_family(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase, mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + checkpoint = mgr.read()[PHASE_ID] + rows = _read_jsonl(flow_dir / f"{PHASE_ID}_metrics.jsonl") + expected_families = list(parse_prometheus(PROMETHEUS_BODY)) + assert len(rows) == checkpoint["metric_count"] == len(expected_families) + + +async def test_jsonl_row_schema(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase, _mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + expected_keys = {"name", "type", "help", "unit", "label_keys", "sample_count"} + for row in _read_jsonl(flow_dir / f"{PHASE_ID}_metrics.jsonl"): + assert set(row.keys()) == expected_keys + assert isinstance(row["label_keys"], list) + assert all(isinstance(k, str) for k in row["label_keys"]) + assert row["label_keys"] == sorted(row["label_keys"]) + assert len(row["label_keys"]) == len(set(row["label_keys"])) + + +async def test_jsonl_counter_total_stripped_prometheus(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase, _mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + rows = _read_jsonl(flow_dir / f"{PHASE_ID}_metrics.jsonl") + counter_rows = [r for r in rows if r["type"] == "counter"] + assert len(counter_rows) == 1 + assert counter_rows[0]["name"] == "http_requests" + + +async def test_jsonl_counter_total_stripped_openmetrics(flow_dir, message_queue, monkeypatch): + _install_mock_transport( + monkeypatch, + _ok_handler(200, OPENMETRICS_BODY_CLEAN, "application/openmetrics-text; version=1.0.0"), + ) + phase, _mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + rows = _read_jsonl(flow_dir / f"{PHASE_ID}_metrics.jsonl") + counter_rows = [r for r in rows if r["type"] == "counter"] + assert len(counter_rows) == 1 + assert counter_rows[0]["name"] == "http_requests" + + +async def test_jsonl_histogram_collapses_to_single_row(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase, _mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + rows = _read_jsonl(flow_dir / f"{PHASE_ID}_metrics.jsonl") + hist_rows = [r for r in rows if r["type"] == "histogram"] + assert len(hist_rows) == 1 + row = hist_rows[0] + assert row["name"] == "request_duration_seconds" + assert "le" in row["label_keys"] + + +async def test_jsonl_summary_collapses_to_single_row(flow_dir, message_queue, monkeypatch): + _install_mock_transport( + monkeypatch, + _ok_handler(200, OPENMETRICS_BODY_CLEAN, "application/openmetrics-text; version=1.0.0"), + ) + phase, _mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + rows = _read_jsonl(flow_dir / f"{PHASE_ID}_metrics.jsonl") + summary_rows = [r for r in rows if r["type"] == "summary"] + assert len(summary_rows) == 1 + row = summary_rows[0] + assert row["name"] == "request_duration_seconds" + assert "quantile" in row["label_keys"] + + +async def test_jsonl_unit_populated_for_openmetrics(flow_dir, message_queue, monkeypatch): + _install_mock_transport( + monkeypatch, + _ok_handler(200, OPENMETRICS_BODY_WITH_UNIT, "application/openmetrics-text; version=1.0.0"), + ) + phase, _mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + rows = _read_jsonl(flow_dir / f"{PHASE_ID}_metrics.jsonl") + assert len(rows) == 1 + assert rows[0]["unit"] == "seconds" + + +async def test_jsonl_unit_empty_for_prometheus(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase, _mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + rows = _read_jsonl(flow_dir / f"{PHASE_ID}_metrics.jsonl") + assert all(r["unit"] == "" for r in rows) + + +def test_jsonl_label_keys_are_union_across_samples(): + metric = Metric("multi_labels", "doc", "gauge") + metric.samples.append(_Sample("multi_labels", {"a": "1"}, 1.0)) + metric.samples.append(_Sample("multi_labels", {"a": "1", "b": "2"}, 2.0)) + metric.samples.append(_Sample("multi_labels", {"a": "1", "c": "3"}, 3.0)) + + rows = _build_jsonl_rows([metric]) + + assert rows[0]["label_keys"] == ["a", "b", "c"] + + +async def test_jsonl_sample_count_matches_parser(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase, _mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + rows = _read_jsonl(flow_dir / f"{PHASE_ID}_metrics.jsonl") + families = list(parse_prometheus(PROMETHEUS_BODY)) + expected_counts = {m.name: len(m.samples) for m in families} + actual_counts = {r["name"]: r["sample_count"] for r in rows} + assert actual_counts == expected_counts + + +async def test_jsonl_ordering_matches_parser(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase, _mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + rows = _read_jsonl(flow_dir / f"{PHASE_ID}_metrics.jsonl") + families = list(parse_prometheus(PROMETHEUS_BODY)) + assert [r["name"] for r in rows] == [m.name for m in families] + + +async def test_jsonl_is_deterministic_byte_for_byte(flow_dir, message_queue, monkeypatch, tmp_path_factory): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase1, _ = _make_phase(flow_dir, message_queue) + await phase1.process_message(PhaseTrigger(id="start", phase_id=None)) + first_bytes = (flow_dir / f"{PHASE_ID}_metrics.jsonl").read_bytes() + + flow_dir_2 = tmp_path_factory.mktemp("second_run") + queue_2 = asyncio.Queue() + phase2, _ = _make_phase(flow_dir_2, queue_2) + await phase2.process_message(PhaseTrigger(id="start", phase_id=None)) + second_bytes = (flow_dir_2 / f"{PHASE_ID}_metrics.jsonl").read_bytes() + + assert first_bytes == second_bytes + assert first_bytes.endswith(b"\n") + + +async def test_memory_text_includes_jsonl_path(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase, mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + checkpoint = mgr.read()[PHASE_ID] + memory = mgr.memory_content(PHASE_ID) + assert checkpoint["metrics_jsonl_path"] in memory + + +async def test_jsonl_sidecar_atomic_on_os_replace_failure(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + + def boom(_src, _dst): + raise OSError("simulated atomic replace failure") + + monkeypatch.setattr(inspect_endpoint_module.os, "replace", boom) + + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="Failed to write metrics catalog") + + final_path = flow_dir / f"{PHASE_ID}_metrics.jsonl" + assert not final_path.exists() + + +async def test_jsonl_failure_propagates_as_phase_failure(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase, mgr = _make_phase(flow_dir, message_queue) + + blocker = flow_dir / f"{PHASE_ID}_metrics.jsonl.tmp" + blocker.mkdir() + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="Failed to write metrics catalog") diff --git a/ddev/tests/ai/phases/test_orchestrator.py b/ddev/tests/ai/phases/test_orchestrator.py index 85c9758a95d1b..076f7f596bcdb 100644 --- a/ddev/tests/ai/phases/test_orchestrator.py +++ b/ddev/tests/ai/phases/test_orchestrator.py @@ -46,9 +46,13 @@ def _make(base_dir: Path | None = None, **overrides: Any) -> PhaseOrchestrator: # --------------------------------------------------------------------------- +FRAMEWORK_PHASES_DIR = Path(__file__).resolve().parents[3] / "src" / "ddev" / "ai" / "phases" +FRAMEWORK_IMPORT_PREFIX = "ddev.ai.phases" + + def test_discover_registers_agentic_phase(): registry = PhaseRegistry() - _discover_and_register_phases(registry) + _discover_and_register_phases(registry, FRAMEWORK_PHASES_DIR, FRAMEWORK_IMPORT_PREFIX) assert "AgenticPhase" in registry.known_names() assert registry.get("AgenticPhase") is AgenticPhase @@ -119,9 +123,9 @@ def test_discover_skips_underscore_prefixed_files(tmp_path, monkeypatch): def test_discover_idempotent(): registry = PhaseRegistry() - _discover_and_register_phases(registry) + _discover_and_register_phases(registry, FRAMEWORK_PHASES_DIR, FRAMEWORK_IMPORT_PREFIX) first = registry.known_names() - _discover_and_register_phases(registry) + _discover_and_register_phases(registry, FRAMEWORK_PHASES_DIR, FRAMEWORK_IMPORT_PREFIX) second = registry.known_names() assert first == second @@ -135,7 +139,7 @@ def test_registry_get_unknown_raises(): def test_imported_class_not_registered(): """A class imported into a phases module but defined elsewhere should not be registered.""" registry = PhaseRegistry() - _discover_and_register_phases(registry) + _discover_and_register_phases(registry, FRAMEWORK_PHASES_DIR, FRAMEWORK_IMPORT_PREFIX) # BaseMessage is imported in messages.py but defined in event_bus — it should NOT be registered assert "BaseMessage" not in registry.known_names() @@ -156,7 +160,7 @@ class ExclusivePhase(Phase): def test_discover_does_not_mutate_global_state(): """_discover_and_register_phases only touches the registry passed to it.""" registry = PhaseRegistry() - _discover_and_register_phases(registry) + _discover_and_register_phases(registry, FRAMEWORK_PHASES_DIR, FRAMEWORK_IMPORT_PREFIX) # No module-level / class-level container should have been touched. # Verify by checking there is no class-level _registry attribute on PhaseRegistry. assert not hasattr(PhaseRegistry, "_registry") @@ -270,6 +274,31 @@ async def test_on_initialize_unknown_phase_type_raises_flow_config_error(tmp_pat await orchestrator.on_initialize() +async def test_on_initialize_flow_phases_dir_outside_ai_root_raises(tmp_path, make_orchestrator): + """phases/ directory outside the ddev.ai package tree raises FlowConfigError.""" + (tmp_path / "phases").mkdir() + (tmp_path / "prompts").mkdir() + (tmp_path / "prompts" / "writer.md").write_text("system prompt") + (tmp_path / "flow.yaml").write_text( + dedent("""\ + agents: + writer: + tools: [] + phases: + a: + agent: writer + tasks: + - name: t1 + prompt: do it + flow: + - phase: a + """) + ) + orchestrator = make_orchestrator(tmp_path) + with pytest.raises(FlowConfigError, match="ddev.ai package tree"): + await orchestrator.on_initialize() + + async def test_on_initialize_missing_agent_raises(tmp_path, make_orchestrator): (tmp_path / "prompts").mkdir() (tmp_path / "flow.yaml").write_text(