From 49bbc2ad2a52922092afd873f06cf634c04c264b Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Tue, 12 May 2026 16:15:34 +0200 Subject: [PATCH 1/5] First implementation of Phase0 --- ddev/pyproject.toml | 1 + ddev/src/ddev/ai/flows/__init__.py | 3 + .../src/ddev/ai/flows/openmetrics/__init__.py | 3 + .../ai/flows/openmetrics/phases/__init__.py | 3 + .../openmetrics/phases/validate_endpoint.py | 150 ++++++++ ddev/src/ddev/ai/phases/orchestrator.py | 30 +- ddev/tests/ai/flows/__init__.py | 3 + ddev/tests/ai/flows/openmetrics/__init__.py | 3 + .../ai/flows/openmetrics/phases/__init__.py | 3 + .../phases/test_validate_endpoint.py | 345 ++++++++++++++++++ ddev/tests/ai/phases/test_orchestrator.py | 14 +- 11 files changed, 547 insertions(+), 11 deletions(-) create mode 100644 ddev/src/ddev/ai/flows/__init__.py create mode 100644 ddev/src/ddev/ai/flows/openmetrics/__init__.py create mode 100644 ddev/src/ddev/ai/flows/openmetrics/phases/__init__.py create mode 100644 ddev/src/ddev/ai/flows/openmetrics/phases/validate_endpoint.py create mode 100644 ddev/tests/ai/flows/__init__.py create mode 100644 ddev/tests/ai/flows/openmetrics/__init__.py create mode 100644 ddev/tests/ai/flows/openmetrics/phases/__init__.py create mode 100644 ddev/tests/ai/flows/openmetrics/phases/test_validate_endpoint.py diff --git a/ddev/pyproject.toml b/ddev/pyproject.toml index 73928e956e9d4..2cc99310eac27 100644 --- a/ddev/pyproject.toml +++ b/ddev/pyproject.toml @@ -35,6 +35,7 @@ dependencies = [ "httpx", "jsonpointer", "pluggy", + "prometheus-client", "rich>=12.5.1", "stamina==23.2.0", "tomli; python_version < '3.11'", diff --git a/ddev/src/ddev/ai/flows/__init__.py b/ddev/src/ddev/ai/flows/__init__.py new file mode 100644 index 0000000000000..75c6647cb9233 --- /dev/null +++ b/ddev/src/ddev/ai/flows/__init__.py @@ -0,0 +1,3 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) diff --git a/ddev/src/ddev/ai/flows/openmetrics/__init__.py b/ddev/src/ddev/ai/flows/openmetrics/__init__.py new file mode 100644 index 0000000000000..75c6647cb9233 --- /dev/null +++ b/ddev/src/ddev/ai/flows/openmetrics/__init__.py @@ -0,0 +1,3 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) diff --git a/ddev/src/ddev/ai/flows/openmetrics/phases/__init__.py b/ddev/src/ddev/ai/flows/openmetrics/phases/__init__.py new file mode 100644 index 0000000000000..75c6647cb9233 --- /dev/null +++ b/ddev/src/ddev/ai/flows/openmetrics/phases/__init__.py @@ -0,0 +1,3 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) diff --git a/ddev/src/ddev/ai/flows/openmetrics/phases/validate_endpoint.py b/ddev/src/ddev/ai/flows/openmetrics/phases/validate_endpoint.py new file mode 100644 index 0000000000000..9d948ddfdf526 --- /dev/null +++ b/ddev/src/ddev/ai/flows/openmetrics/phases/validate_endpoint.py @@ -0,0 +1,150 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +from typing import Any + +import httpx +from prometheus_client import Metric +from prometheus_client.openmetrics.parser import text_string_to_metric_families as parse_openmetrics +from prometheus_client.parser import text_string_to_metric_families as parse_prometheus + +from ddev.ai.phases.base import Phase, PhaseOutcome +from ddev.ai.phases.config import AgentConfig, FlowConfigError, PhaseConfig + +_REQUEST_TIMEOUT_SECONDS = 10.0 +_MAX_METRIC_SAMPLES = 10 + + +class EndpointValidationError(Exception): + """Raised by ValidateEndpointPhase when the target endpoint is unreachable or its body is unusable.""" + + +def _parse_exposition(body: str, content_type: str) -> tuple[list[Metric], str]: + """Parse body with the parser matching content_type. + + Returns (families, exposition_format) where exposition_format is + "openmetrics" or "prometheus". Raises EndpointValidationError if parsing + fails or yields zero metric families. + """ + if content_type.startswith("application/openmetrics-text"): + parser = parse_openmetrics + exposition_format = "openmetrics" + else: + parser = parse_prometheus + exposition_format = "prometheus" + + try: + families = list(parser(body)) + except Exception as e: + raise EndpointValidationError(f"Body is not valid {exposition_format} exposition: {e}") from e + + if not families: + raise EndpointValidationError(f"Body parsed as {exposition_format} but contained zero metric families") + + return families, exposition_format + + +def _build_memory_text( + url: str, + status: int, + content_type: str, + exposition_format: str, + families: list[Metric], +) -> str: + """Render the markdown memory file describing the validated endpoint.""" + lines = [ + "# Endpoint validation", + "", + f"- **URL:** {url}", + f"- **HTTP status:** {status}", + f"- **Content-Type:** {content_type}", + f"- **Exposition format:** {exposition_format}", + f"- **Metric families detected:** {len(families)}", + f"- **First {min(_MAX_METRIC_SAMPLES, len(families))} metric names:**", + ] + for metric in families[:_MAX_METRIC_SAMPLES]: + if metric.type: + lines.append(f" - `{metric.name}` ({metric.type})") + else: + lines.append(f" - `{metric.name}`") + lines.append("") + lines.append("Endpoint is reachable and serves a Prometheus/OpenMetrics-compatible body.") + return "\n".join(lines) + + +class ValidateEndpointPhase(Phase): + """Deterministic Phase 0 for the OpenMetrics pipeline. + + Confirms that ``endpoint_url`` (from runtime variables) is reachable and + serves a body that the same parser used by ``OpenMetricsBaseCheckV2`` can + accept. Aborts the pipeline with EndpointValidationError on any failure so + the user doesn't burn tokens generating an integration against a dead or + malformed endpoint. Writes a memory file summarizing the endpoint for + downstream phases. + """ + + @classmethod + def validate_config( + cls, + phase_id: str, + config: PhaseConfig, + agents: dict[str, AgentConfig], + ) -> None: + # This phase is deterministic: no agent, no tasks, no memory prompt. + # Catching misconfigured flow.yaml entries at orchestrator startup + # avoids surprising mid-pipeline failures. + if config.agent is not None: + raise FlowConfigError(f"Phase {phase_id!r} (ValidateEndpointPhase) must not declare 'agent'") + if config.tasks: + raise FlowConfigError(f"Phase {phase_id!r} (ValidateEndpointPhase) must not declare 'tasks'") + if config.checkpoint is not None: + raise FlowConfigError(f"Phase {phase_id!r} (ValidateEndpointPhase) must not declare 'checkpoint'") + + async def execute(self, context: dict[str, Any]) -> PhaseOutcome: + endpoint_url = context.get("endpoint_url") + if not endpoint_url: + raise FlowConfigError(f"Phase {self._phase_id!r}: 'endpoint_url' runtime variable is required") + + try: + async with httpx.AsyncClient(timeout=_REQUEST_TIMEOUT_SECONDS, follow_redirects=True) as client: + response = await client.get(endpoint_url) + except httpx.TimeoutException as e: + raise EndpointValidationError( + f"Endpoint timed out after {_REQUEST_TIMEOUT_SECONDS}s: {endpoint_url}" + ) from e + except httpx.RequestError as e: + raise EndpointValidationError(f"Request failed for {endpoint_url}: {e}") from e + + if response.status_code != 200: + raise EndpointValidationError( + f"Endpoint returned HTTP {response.status_code} (expected 200): {endpoint_url}" + ) + + content_type = response.headers.get("Content-Type", "") + try: + families, exposition_format = _parse_exposition(response.text, content_type) + except EndpointValidationError as e: + raise EndpointValidationError(f"{e} ({endpoint_url})") from e + + memory_text = _build_memory_text( + url=endpoint_url, + status=response.status_code, + content_type=content_type, + exposition_format=exposition_format, + families=families, + ) + + return PhaseOutcome( + memory_text=memory_text, + total_input_tokens=0, + total_output_tokens=0, + extra_checkpoint={ + "endpoint_url": endpoint_url, + "status_code": response.status_code, + "content_type": content_type, + "exposition_format": exposition_format, + "metric_count": len(families), + "sample_metric_names": [m.name for m in families[:_MAX_METRIC_SAMPLES]], + }, + ) diff --git a/ddev/src/ddev/ai/phases/orchestrator.py b/ddev/src/ddev/ai/phases/orchestrator.py index de663c3138ed4..875c83839835a 100644 --- a/ddev/src/ddev/ai/phases/orchestrator.py +++ b/ddev/src/ddev/ai/phases/orchestrator.py @@ -21,12 +21,15 @@ def _discover_and_register_phases( registry: PhaseRegistry, - phases_dir: Path | None = None, - import_prefix: str = "ddev.ai.phases", + phases_dir: Path, + import_prefix: str, ) -> None: - """Import all non-private modules in phases_dir and register Phase subclasses.""" - if phases_dir is None: - phases_dir = Path(__file__).parent + """Import every non-private *.py in phases_dir and register Phase subclasses. + + Modules are imported by dotted path: ``{import_prefix}.{file_stem}``. The + caller is responsible for choosing the right pair (dir, prefix). Import + errors are fatal — a syntax error in any discovered module aborts startup. + """ for py_file in phases_dir.glob("*.py"): if py_file.stem.startswith("_"): continue @@ -75,7 +78,22 @@ async def on_initialize(self) -> None: """Discover custom phases, parse flow.yaml, construct phases, submit PhaseTrigger.""" config_dir = self._flow_yaml_path.parent - _discover_and_register_phases(self._phase_registry) + _discover_and_register_phases( + self._phase_registry, + Path(__file__).parent, + "ddev.ai.phases", + ) + + flow_phases_dir = config_dir / "phases" + if flow_phases_dir.is_dir(): + ai_root = Path(__file__).parent.parent + rel = flow_phases_dir.relative_to(ai_root) + flow_import_prefix = "ddev.ai." + ".".join(rel.parts) + _discover_and_register_phases( + self._phase_registry, + flow_phases_dir, + flow_import_prefix, + ) config = FlowConfig.from_yaml(self._flow_yaml_path, config_dir) diff --git a/ddev/tests/ai/flows/__init__.py b/ddev/tests/ai/flows/__init__.py new file mode 100644 index 0000000000000..75c6647cb9233 --- /dev/null +++ b/ddev/tests/ai/flows/__init__.py @@ -0,0 +1,3 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) diff --git a/ddev/tests/ai/flows/openmetrics/__init__.py b/ddev/tests/ai/flows/openmetrics/__init__.py new file mode 100644 index 0000000000000..75c6647cb9233 --- /dev/null +++ b/ddev/tests/ai/flows/openmetrics/__init__.py @@ -0,0 +1,3 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) diff --git a/ddev/tests/ai/flows/openmetrics/phases/__init__.py b/ddev/tests/ai/flows/openmetrics/phases/__init__.py new file mode 100644 index 0000000000000..75c6647cb9233 --- /dev/null +++ b/ddev/tests/ai/flows/openmetrics/phases/__init__.py @@ -0,0 +1,3 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) diff --git a/ddev/tests/ai/flows/openmetrics/phases/test_validate_endpoint.py b/ddev/tests/ai/flows/openmetrics/phases/test_validate_endpoint.py new file mode 100644 index 0000000000000..114d1d219cf93 --- /dev/null +++ b/ddev/tests/ai/flows/openmetrics/phases/test_validate_endpoint.py @@ -0,0 +1,345 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +import asyncio + +import httpx +import pytest +from prometheus_client import Metric +from prometheus_client.parser import text_string_to_metric_families as parse_prometheus + +from ddev.ai.flows.openmetrics.phases import validate_endpoint as validate_endpoint_module +from ddev.ai.flows.openmetrics.phases.validate_endpoint import ( + EndpointValidationError, + ValidateEndpointPhase, + _build_memory_text, + _parse_exposition, +) +from ddev.ai.phases.checkpoint import CheckpointManager +from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig +from ddev.ai.phases.messages import PhaseFailedMessage, PhaseTrigger +from ddev.ai.tools.fs.file_access_policy import FileAccessPolicy +from ddev.ai.tools.fs.file_registry import FileRegistry +from ddev.event_bus.exceptions import MessageProcessingError + +ENDPOINT_URL = "http://example.test:9100/metrics" + +PROMETHEUS_BODY = """\ +# HELP http_requests_total Total HTTP requests. +# TYPE http_requests_total counter +http_requests_total{method="GET",code="200"} 1027 +# HELP process_resident_memory_bytes Resident memory in bytes. +# TYPE process_resident_memory_bytes gauge +process_resident_memory_bytes 1.2e+08 +# HELP request_duration_seconds Request latency. +# TYPE request_duration_seconds histogram +request_duration_seconds_bucket{le="0.1"} 3 +request_duration_seconds_bucket{le="0.5"} 5 +request_duration_seconds_bucket{le="+Inf"} 7 +request_duration_seconds_sum 1.5 +request_duration_seconds_count 7 +""" + +OPENMETRICS_BODY = """\ +# TYPE http_requests counter +# HELP http_requests Total HTTP requests. +# UNIT http_requests requests +http_requests_total{method="GET",code="200"} 1027 +# TYPE process_resident_memory_bytes gauge +# UNIT process_resident_memory_bytes bytes +# HELP process_resident_memory_bytes Resident memory in bytes. +process_resident_memory_bytes 1.2e+08 +# EOF +""" + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def flow_dir(tmp_path): + return tmp_path + + +@pytest.fixture +def message_queue(): + return asyncio.Queue() + + +def _make_phase( + flow_dir, + message_queue, + *, + phase_id: str = "validate_endpoint", + runtime_variables: dict[str, str] | None = None, +) -> tuple[ValidateEndpointPhase, CheckpointManager]: + checkpoint_manager = CheckpointManager(flow_dir / "checkpoints.yaml") + phase = ValidateEndpointPhase( + phase_id=phase_id, + dependencies=[], + config=PhaseConfig(), + checkpoint_manager=checkpoint_manager, + runtime_variables=runtime_variables if runtime_variables is not None else {"endpoint_url": ENDPOINT_URL}, + flow_variables={}, + config_dir=flow_dir, + file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + ) + phase.queue = message_queue + return phase, checkpoint_manager + + +def _install_mock_transport(monkeypatch, handler): + """Patch httpx.AsyncClient inside the phase module to use a MockTransport handler.""" + real_client_cls = httpx.AsyncClient + + def factory(*args, **kwargs): + kwargs["transport"] = httpx.MockTransport(handler) + return real_client_cls(*args, **kwargs) + + monkeypatch.setattr(validate_endpoint_module.httpx, "AsyncClient", factory) + + +def _ok_handler(status: int, body: str, content_type: str): + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(status_code=status, content=body.encode("utf-8"), headers={"Content-Type": content_type}) + + return handler + + +def _raising_handler(exc: Exception): + def handler(request: httpx.Request) -> httpx.Response: + raise exc + + return handler + + +# --------------------------------------------------------------------------- +# Phase happy paths +# --------------------------------------------------------------------------- + + +async def test_success_with_prometheus_body(flow_dir, message_queue, monkeypatch): + _install_mock_transport( + monkeypatch, + _ok_handler(200, PROMETHEUS_BODY, "text/plain; version=0.0.4; charset=utf-8"), + ) + phase, mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + expected_families = list(parse_prometheus(PROMETHEUS_BODY)) + expected_names = [m.name for m in expected_families[:10]] + + memory = mgr.memory_content("validate_endpoint") + assert ENDPOINT_URL in memory + assert "HTTP status:** 200" in memory + assert expected_names[0] in memory + assert expected_families[0].type in memory + + checkpoint = mgr.read()["validate_endpoint"] + assert checkpoint["status"] == "success" + assert checkpoint["exposition_format"] == "prometheus" + assert checkpoint["metric_count"] == len(expected_families) + assert checkpoint["sample_metric_names"] == expected_names + assert checkpoint["status_code"] == 200 + assert checkpoint["endpoint_url"] == ENDPOINT_URL + assert checkpoint["tokens"] == {"total_input": 0, "total_output": 0} + + +async def test_success_with_openmetrics_body(flow_dir, message_queue, monkeypatch): + content_type = "application/openmetrics-text; version=1.0.0; charset=utf-8" + _install_mock_transport(monkeypatch, _ok_handler(200, OPENMETRICS_BODY, content_type)) + phase, mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + checkpoint = mgr.read()["validate_endpoint"] + assert checkpoint["status"] == "success" + assert checkpoint["exposition_format"] == "openmetrics" + assert checkpoint["content_type"] == content_type + assert checkpoint["metric_count"] >= 1 + + +# --------------------------------------------------------------------------- +# Phase failure paths +# --------------------------------------------------------------------------- + + +async def _assert_phase_fails(phase, mgr, message_queue, *, error_contains: str): + """Run execute, expect failure, drive on_error like the framework would.""" + trigger = PhaseTrigger(id="start", phase_id=None) + context = { + "endpoint_url": phase._runtime_variables.get("endpoint_url"), + "phase_name": phase._phase_id, + } + try: + await phase.execute(context) + except Exception as raised: + wrapped = MessageProcessingError(phase._phase_id, trigger, raised) + await phase.on_error(wrapped) + checkpoint = mgr.read()[phase._phase_id] + assert checkpoint["status"] == "failed" + assert error_contains.lower() in checkpoint["error"].lower() + msg = message_queue.get_nowait() + assert isinstance(msg, PhaseFailedMessage) + assert error_contains.lower() in msg.error.lower() + return raised + pytest.fail(f"Expected execute() to raise; error should contain {error_contains!r}") + + +async def test_failure_non_200_status(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(503, "service unavailable", "text/plain")) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="HTTP 503") + + +async def test_failure_body_is_html(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, "not metrics", "text/plain")) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="not valid prometheus exposition") + + +async def test_failure_body_is_json(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, '{"hello": "world"}', "text/plain")) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="prometheus") + + +async def test_failure_zero_families(flow_dir, message_queue, monkeypatch): + body = "\n\n# just a stray comment, not a HELP/TYPE\n\n" + _install_mock_transport(monkeypatch, _ok_handler(200, body, "text/plain")) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="zero metric families") + + +async def test_failure_openmetrics_missing_eof(flow_dir, message_queue, monkeypatch): + _install_mock_transport( + monkeypatch, + _ok_handler(200, PROMETHEUS_BODY, "application/openmetrics-text; version=1.0.0"), + ) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="not valid openmetrics exposition") + + +async def test_failure_timeout(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _raising_handler(httpx.TimeoutException("slow"))) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="timed out") + + +async def test_failure_request_error(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _raising_handler(httpx.ConnectError("refused"))) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="request failed") + + +async def test_failure_missing_endpoint_url(flow_dir, message_queue): + phase, mgr = _make_phase(flow_dir, message_queue, runtime_variables={}) + + trigger = PhaseTrigger(id="start", phase_id=None) + with pytest.raises(FlowConfigError, match="endpoint_url"): + await phase.process_message(trigger) + + wrapped = MessageProcessingError(phase._phase_id, trigger, FlowConfigError("endpoint_url required")) + await phase.on_error(wrapped) + + checkpoint = mgr.read()["validate_endpoint"] + assert checkpoint["status"] == "failed" + msg = message_queue.get_nowait() + assert isinstance(msg, PhaseFailedMessage) + + +# --------------------------------------------------------------------------- +# validate_config +# --------------------------------------------------------------------------- + + +def test_validate_config_rejects_agent(): + with pytest.raises(FlowConfigError, match="must not declare 'agent'"): + ValidateEndpointPhase.validate_config("p", PhaseConfig(agent="x"), {"x": AgentConfig()}) + + +def test_validate_config_rejects_tasks(): + config = PhaseConfig(tasks=[TaskConfig(name="t", prompt="hi")]) + with pytest.raises(FlowConfigError, match="must not declare 'tasks'"): + ValidateEndpointPhase.validate_config("p", config, {}) + + +def test_validate_config_rejects_checkpoint(): + config = PhaseConfig(checkpoint=CheckpointConfig(memory_prompt="x")) + with pytest.raises(FlowConfigError, match="must not declare 'checkpoint'"): + ValidateEndpointPhase.validate_config("p", config, {}) + + +def test_validate_config_accepts_minimal(): + ValidateEndpointPhase.validate_config("p", PhaseConfig(), {}) + + +# --------------------------------------------------------------------------- +# _parse_exposition +# --------------------------------------------------------------------------- + + +def test_parse_exposition_prometheus(): + families, fmt = _parse_exposition(PROMETHEUS_BODY, "text/plain; version=0.0.4") + assert fmt == "prometheus" + assert len(families) > 0 + + +def test_parse_exposition_openmetrics(): + families, fmt = _parse_exposition(OPENMETRICS_BODY, "application/openmetrics-text; version=1.0.0") + assert fmt == "openmetrics" + assert len(families) > 0 + + +def test_parse_exposition_empty_content_type_falls_back_to_prometheus(): + families, fmt = _parse_exposition(PROMETHEUS_BODY, "") + assert fmt == "prometheus" + assert len(families) > 0 + + +def test_parse_exposition_raises_on_invalid_body(): + with pytest.raises(EndpointValidationError, match="not valid prometheus exposition"): + _parse_exposition("", "text/plain") + + +def test_parse_exposition_raises_on_zero_families(): + with pytest.raises(EndpointValidationError, match="zero metric families"): + _parse_exposition("", "text/plain") + + +# --------------------------------------------------------------------------- +# _build_memory_text +# --------------------------------------------------------------------------- + + +def test_build_memory_text_renders_all_fields(): + families = [ + Metric("widgets", "Widget count.", "counter"), + Metric("gizmos", "Gizmo gauge.", "gauge"), + ] + text = _build_memory_text( + url="http://example.test:9100/metrics", + status=200, + content_type="text/plain; version=0.0.4", + exposition_format="prometheus", + families=families, + ) + assert "http://example.test:9100/metrics" in text + assert "200" in text + assert "text/plain; version=0.0.4" in text + assert "prometheus" in text + assert "widgets" in text + assert "counter" in text + assert "gizmos" in text + assert "gauge" in text diff --git a/ddev/tests/ai/phases/test_orchestrator.py b/ddev/tests/ai/phases/test_orchestrator.py index 85c9758a95d1b..509aa3c9dea44 100644 --- a/ddev/tests/ai/phases/test_orchestrator.py +++ b/ddev/tests/ai/phases/test_orchestrator.py @@ -46,9 +46,13 @@ def _make(base_dir: Path | None = None, **overrides: Any) -> PhaseOrchestrator: # --------------------------------------------------------------------------- +FRAMEWORK_PHASES_DIR = Path(__file__).resolve().parents[3] / "src" / "ddev" / "ai" / "phases" +FRAMEWORK_IMPORT_PREFIX = "ddev.ai.phases" + + def test_discover_registers_agentic_phase(): registry = PhaseRegistry() - _discover_and_register_phases(registry) + _discover_and_register_phases(registry, FRAMEWORK_PHASES_DIR, FRAMEWORK_IMPORT_PREFIX) assert "AgenticPhase" in registry.known_names() assert registry.get("AgenticPhase") is AgenticPhase @@ -119,9 +123,9 @@ def test_discover_skips_underscore_prefixed_files(tmp_path, monkeypatch): def test_discover_idempotent(): registry = PhaseRegistry() - _discover_and_register_phases(registry) + _discover_and_register_phases(registry, FRAMEWORK_PHASES_DIR, FRAMEWORK_IMPORT_PREFIX) first = registry.known_names() - _discover_and_register_phases(registry) + _discover_and_register_phases(registry, FRAMEWORK_PHASES_DIR, FRAMEWORK_IMPORT_PREFIX) second = registry.known_names() assert first == second @@ -135,7 +139,7 @@ def test_registry_get_unknown_raises(): def test_imported_class_not_registered(): """A class imported into a phases module but defined elsewhere should not be registered.""" registry = PhaseRegistry() - _discover_and_register_phases(registry) + _discover_and_register_phases(registry, FRAMEWORK_PHASES_DIR, FRAMEWORK_IMPORT_PREFIX) # BaseMessage is imported in messages.py but defined in event_bus — it should NOT be registered assert "BaseMessage" not in registry.known_names() @@ -156,7 +160,7 @@ class ExclusivePhase(Phase): def test_discover_does_not_mutate_global_state(): """_discover_and_register_phases only touches the registry passed to it.""" registry = PhaseRegistry() - _discover_and_register_phases(registry) + _discover_and_register_phases(registry, FRAMEWORK_PHASES_DIR, FRAMEWORK_IMPORT_PREFIX) # No module-level / class-level container should have been touched. # Verify by checking there is no class-level _registry attribute on PhaseRegistry. assert not hasattr(PhaseRegistry, "_registry") From 6cbaa4adb5f064b3a8615259f2c2e9bdce73314f Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Thu, 14 May 2026 14:47:10 +0200 Subject: [PATCH 2/5] Merge phase0 and phase1 into phase0. Add metrics dumping to jsonl. --- ddev/hatch.toml | 1 + ddev/pyproject.toml | 6 +- .../openmetrics/phases/inspect_endpoint.py | 205 ++++++ .../openmetrics/phases/validate_endpoint.py | 150 ----- ddev/src/ddev/ai/phases/checkpoint.py | 7 +- .../phases/test_inspect_endpoint.py | 594 ++++++++++++++++++ .../phases/test_validate_endpoint.py | 345 ---------- 7 files changed, 811 insertions(+), 497 deletions(-) create mode 100644 ddev/src/ddev/ai/flows/openmetrics/phases/inspect_endpoint.py delete mode 100644 ddev/src/ddev/ai/flows/openmetrics/phases/validate_endpoint.py create mode 100644 ddev/tests/ai/flows/openmetrics/phases/test_inspect_endpoint.py delete mode 100644 ddev/tests/ai/flows/openmetrics/phases/test_validate_endpoint.py diff --git a/ddev/hatch.toml b/ddev/hatch.toml index 9550fb4a939c3..d1f351f807f94 100644 --- a/ddev/hatch.toml +++ b/ddev/hatch.toml @@ -7,6 +7,7 @@ mypy-files = [ [envs.default] python = "3.13" e2e-env = false +features = ["ai"] dependencies = [ "pyyaml", "pytest-asyncio", diff --git a/ddev/pyproject.toml b/ddev/pyproject.toml index 2cc99310eac27..424063afdb689 100644 --- a/ddev/pyproject.toml +++ b/ddev/pyproject.toml @@ -35,7 +35,6 @@ dependencies = [ "httpx", "jsonpointer", "pluggy", - "prometheus-client", "rich>=12.5.1", "stamina==23.2.0", "tomli; python_version < '3.11'", @@ -49,6 +48,11 @@ dependencies = [ ] dynamic = ["version"] +[project.optional-dependencies] +ai = [ + "prometheus-client", +] + [project.urls] Source = "https://github.com/DataDog/integrations-core" diff --git a/ddev/src/ddev/ai/flows/openmetrics/phases/inspect_endpoint.py b/ddev/src/ddev/ai/flows/openmetrics/phases/inspect_endpoint.py new file mode 100644 index 0000000000000..bcc45088802fb --- /dev/null +++ b/ddev/src/ddev/ai/flows/openmetrics/phases/inspect_endpoint.py @@ -0,0 +1,205 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +import json +import os +from pathlib import Path +from typing import Any + +import httpx +from prometheus_client import Metric +from prometheus_client.openmetrics.parser import text_string_to_metric_families as parse_openmetrics +from prometheus_client.parser import text_string_to_metric_families as parse_prometheus + +from ddev.ai.phases.base import Phase, PhaseOutcome +from ddev.ai.phases.config import AgentConfig, FlowConfigError, PhaseConfig + +REQUEST_TIMEOUT_SECONDS = 10.0 +JSONL_FILENAME_SUFFIX = "_metrics.jsonl" + + +class EndpointInspectionError(Exception): + """Raised when the endpoint is unreachable, its body is unusable, or the catalog cannot be written.""" + + +def _parse_exposition(body: str, content_type: str) -> tuple[list[Metric], str]: + """Parse body with the parser matching content_type. + + Returns (families, exposition_format) where exposition_format is + "openmetrics" or "prometheus". Raises EndpointInspectionError if parsing + fails or yields zero metric families. + """ + if content_type.startswith("application/openmetrics-text"): + parser = parse_openmetrics + exposition_format = "openmetrics" + else: + parser = parse_prometheus + exposition_format = "prometheus" + + try: + families = list(parser(body)) + except Exception as e: + raise EndpointInspectionError(f"Body is not valid {exposition_format} exposition: {e}") from e + + if not families: + raise EndpointInspectionError(f"Body parsed as {exposition_format} but contained zero metric families") + + return families, exposition_format + + +def _build_jsonl_rows(families: list[Metric]) -> list[dict[str, Any]]: + """Build one JSONL row per metric family.""" + rows: list[dict[str, Any]] = [] + for metric in families: + label_keys: set[str] = set() + for sample in metric.samples: + label_keys.update(sample.labels.keys()) + rows.append( + { + "name": metric.name, + "type": metric.type, + "help": metric.documentation or "", + "unit": metric.unit or "", + "label_keys": sorted(label_keys), + "sample_count": len(metric.samples), + } + ) + return rows + + +def _write_jsonl(path: Path, rows: list[dict[str, Any]]) -> None: + """Atomically write rows as JSON Lines to path.""" + tmp_path = path.with_suffix(path.suffix + ".tmp") + try: + with tmp_path.open("w", encoding="utf-8") as fh: + for row in rows: + try: + line = json.dumps(row, separators=(",", ":"), ensure_ascii=False) + except (TypeError, ValueError) as e: + raise EndpointInspectionError(f"Failed to serialize metric {row.get('name')!r}: {e}") from e + fh.write(line) + fh.write("\n") + os.replace(tmp_path, path) + except EndpointInspectionError: + if tmp_path.exists(): + try: + tmp_path.unlink() + except OSError: + pass + raise + except OSError as e: + if tmp_path.exists(): + try: + tmp_path.unlink() + except OSError: + pass + raise EndpointInspectionError(f"Failed to write metrics catalog at {path}: {e}") from e + + +def _build_memory_text( + url: str, + status: int, + content_type: str, + exposition_format: str, + metric_count: int, + jsonl_path: Path, +) -> str: + """Render the markdown memory file describing the inspected endpoint.""" + lines = [ + "# Endpoint inspection", + "", + f"- **URL:** {url}", + f"- **HTTP status:** {status}", + f"- **Content-Type:** {content_type}", + f"- **Exposition format:** {exposition_format}", + f"- **Metric families detected:** {metric_count}", + f"- **Metrics catalog:** {jsonl_path}", + "", + "Endpoint is reachable and serves a Prometheus/OpenMetrics-compatible body.", + "The full list of metrics with metadata is in the catalog file above.", + ] + return "\n".join(lines) + + +class InspectEndpointPhase(Phase): + """Deterministic Phase 0 for the OpenMetrics pipeline. + + Performs a single HTTP fetch of ``endpoint_url`` and: + + 1. Confirms the endpoint is reachable (HTTP 200). + 2. Confirms the body is valid Prometheus or OpenMetrics exposition. + 3. Writes a ``_metrics.jsonl`` sidecar next to the memory file, + with one row per metric family — the ground-truth catalog later phases + use to drive metric renaming, ``metrics.py`` mapping, and + ``metadata.csv`` generation. + + Aborts the pipeline with EndpointInspectionError on any failure. + """ + + @classmethod + def validate_config( + cls, + phase_id: str, + config: PhaseConfig, + agents: dict[str, AgentConfig], + ) -> None: + if config.agent is not None: + raise FlowConfigError(f"Phase {phase_id!r} (InspectEndpointPhase) must not declare 'agent'") + if config.tasks: + raise FlowConfigError(f"Phase {phase_id!r} (InspectEndpointPhase) must not declare 'tasks'") + if config.checkpoint is not None: + raise FlowConfigError(f"Phase {phase_id!r} (InspectEndpointPhase) must not declare 'checkpoint'") + + async def execute(self, context: dict[str, Any]) -> PhaseOutcome: + endpoint_url = context.get("endpoint_url") + if not endpoint_url: + raise FlowConfigError(f"Phase {self._phase_id!r}: 'endpoint_url' runtime variable is required") + + try: + async with httpx.AsyncClient(timeout=REQUEST_TIMEOUT_SECONDS, follow_redirects=True) as client: + response = await client.get(endpoint_url) + except httpx.TimeoutException as e: + raise EndpointInspectionError(f"Endpoint timed out after {REQUEST_TIMEOUT_SECONDS}s: {endpoint_url}") from e + except httpx.RequestError as e: + raise EndpointInspectionError(f"Request failed for {endpoint_url}: {e}") from e + + if response.status_code != 200: + raise EndpointInspectionError( + f"Endpoint returned HTTP {response.status_code} (expected 200): {endpoint_url}" + ) + + content_type = response.headers.get("Content-Type", "") + try: + families, exposition_format = _parse_exposition(response.text, content_type) + except EndpointInspectionError as e: + raise EndpointInspectionError(f"{e} ({endpoint_url})") from e + + rows = _build_jsonl_rows(families) + self._checkpoint_manager.memory_dir.mkdir(parents=True, exist_ok=True) + jsonl_path = (self._checkpoint_manager.memory_dir / f"{self._phase_id}{JSONL_FILENAME_SUFFIX}").resolve() + _write_jsonl(jsonl_path, rows) + + metric_count = len(families) + memory_text = _build_memory_text( + url=endpoint_url, + status=response.status_code, + content_type=content_type, + exposition_format=exposition_format, + metric_count=metric_count, + jsonl_path=jsonl_path, + ) + + return PhaseOutcome( + memory_text=memory_text, + total_input_tokens=0, + total_output_tokens=0, + extra_checkpoint={ + "endpoint_url": endpoint_url, + "status_code": response.status_code, + "content_type": content_type, + "exposition_format": exposition_format, + "metric_count": metric_count, + "metrics_jsonl_path": str(jsonl_path), + }, + ) diff --git a/ddev/src/ddev/ai/flows/openmetrics/phases/validate_endpoint.py b/ddev/src/ddev/ai/flows/openmetrics/phases/validate_endpoint.py deleted file mode 100644 index 9d948ddfdf526..0000000000000 --- a/ddev/src/ddev/ai/flows/openmetrics/phases/validate_endpoint.py +++ /dev/null @@ -1,150 +0,0 @@ -# (C) Datadog, Inc. 2026-present -# All rights reserved -# Licensed under a 3-clause BSD style license (see LICENSE) - -from typing import Any - -import httpx -from prometheus_client import Metric -from prometheus_client.openmetrics.parser import text_string_to_metric_families as parse_openmetrics -from prometheus_client.parser import text_string_to_metric_families as parse_prometheus - -from ddev.ai.phases.base import Phase, PhaseOutcome -from ddev.ai.phases.config import AgentConfig, FlowConfigError, PhaseConfig - -_REQUEST_TIMEOUT_SECONDS = 10.0 -_MAX_METRIC_SAMPLES = 10 - - -class EndpointValidationError(Exception): - """Raised by ValidateEndpointPhase when the target endpoint is unreachable or its body is unusable.""" - - -def _parse_exposition(body: str, content_type: str) -> tuple[list[Metric], str]: - """Parse body with the parser matching content_type. - - Returns (families, exposition_format) where exposition_format is - "openmetrics" or "prometheus". Raises EndpointValidationError if parsing - fails or yields zero metric families. - """ - if content_type.startswith("application/openmetrics-text"): - parser = parse_openmetrics - exposition_format = "openmetrics" - else: - parser = parse_prometheus - exposition_format = "prometheus" - - try: - families = list(parser(body)) - except Exception as e: - raise EndpointValidationError(f"Body is not valid {exposition_format} exposition: {e}") from e - - if not families: - raise EndpointValidationError(f"Body parsed as {exposition_format} but contained zero metric families") - - return families, exposition_format - - -def _build_memory_text( - url: str, - status: int, - content_type: str, - exposition_format: str, - families: list[Metric], -) -> str: - """Render the markdown memory file describing the validated endpoint.""" - lines = [ - "# Endpoint validation", - "", - f"- **URL:** {url}", - f"- **HTTP status:** {status}", - f"- **Content-Type:** {content_type}", - f"- **Exposition format:** {exposition_format}", - f"- **Metric families detected:** {len(families)}", - f"- **First {min(_MAX_METRIC_SAMPLES, len(families))} metric names:**", - ] - for metric in families[:_MAX_METRIC_SAMPLES]: - if metric.type: - lines.append(f" - `{metric.name}` ({metric.type})") - else: - lines.append(f" - `{metric.name}`") - lines.append("") - lines.append("Endpoint is reachable and serves a Prometheus/OpenMetrics-compatible body.") - return "\n".join(lines) - - -class ValidateEndpointPhase(Phase): - """Deterministic Phase 0 for the OpenMetrics pipeline. - - Confirms that ``endpoint_url`` (from runtime variables) is reachable and - serves a body that the same parser used by ``OpenMetricsBaseCheckV2`` can - accept. Aborts the pipeline with EndpointValidationError on any failure so - the user doesn't burn tokens generating an integration against a dead or - malformed endpoint. Writes a memory file summarizing the endpoint for - downstream phases. - """ - - @classmethod - def validate_config( - cls, - phase_id: str, - config: PhaseConfig, - agents: dict[str, AgentConfig], - ) -> None: - # This phase is deterministic: no agent, no tasks, no memory prompt. - # Catching misconfigured flow.yaml entries at orchestrator startup - # avoids surprising mid-pipeline failures. - if config.agent is not None: - raise FlowConfigError(f"Phase {phase_id!r} (ValidateEndpointPhase) must not declare 'agent'") - if config.tasks: - raise FlowConfigError(f"Phase {phase_id!r} (ValidateEndpointPhase) must not declare 'tasks'") - if config.checkpoint is not None: - raise FlowConfigError(f"Phase {phase_id!r} (ValidateEndpointPhase) must not declare 'checkpoint'") - - async def execute(self, context: dict[str, Any]) -> PhaseOutcome: - endpoint_url = context.get("endpoint_url") - if not endpoint_url: - raise FlowConfigError(f"Phase {self._phase_id!r}: 'endpoint_url' runtime variable is required") - - try: - async with httpx.AsyncClient(timeout=_REQUEST_TIMEOUT_SECONDS, follow_redirects=True) as client: - response = await client.get(endpoint_url) - except httpx.TimeoutException as e: - raise EndpointValidationError( - f"Endpoint timed out after {_REQUEST_TIMEOUT_SECONDS}s: {endpoint_url}" - ) from e - except httpx.RequestError as e: - raise EndpointValidationError(f"Request failed for {endpoint_url}: {e}") from e - - if response.status_code != 200: - raise EndpointValidationError( - f"Endpoint returned HTTP {response.status_code} (expected 200): {endpoint_url}" - ) - - content_type = response.headers.get("Content-Type", "") - try: - families, exposition_format = _parse_exposition(response.text, content_type) - except EndpointValidationError as e: - raise EndpointValidationError(f"{e} ({endpoint_url})") from e - - memory_text = _build_memory_text( - url=endpoint_url, - status=response.status_code, - content_type=content_type, - exposition_format=exposition_format, - families=families, - ) - - return PhaseOutcome( - memory_text=memory_text, - total_input_tokens=0, - total_output_tokens=0, - extra_checkpoint={ - "endpoint_url": endpoint_url, - "status_code": response.status_code, - "content_type": content_type, - "exposition_format": exposition_format, - "metric_count": len(families), - "sample_metric_names": [m.name for m in families[:_MAX_METRIC_SAMPLES]], - }, - ) diff --git a/ddev/src/ddev/ai/phases/checkpoint.py b/ddev/src/ddev/ai/phases/checkpoint.py index 3f32edb158a5b..c8c4be4d94445 100644 --- a/ddev/src/ddev/ai/phases/checkpoint.py +++ b/ddev/src/ddev/ai/phases/checkpoint.py @@ -40,13 +40,18 @@ def write_phase_checkpoint(self, phase_id: str, data: dict[str, Any]) -> None: checkpoints = self.read() checkpoints[phase_id] = data self._ensure_dir() - self._path.write_text(yaml.dump(checkpoints, default_flow_style=False), encoding="utf-8") + self._path.write_text(yaml.dump(checkpoints, default_flow_style=False, sort_keys=False), encoding="utf-8") def build_memory_prompt(self, user_additions: str | None) -> str: """Build the memory prompt to send to the agent at the end of a phase.""" base_prompt = "Write a brief summary of what you accomplished in this phase." return f"{user_additions}\n\n{base_prompt}" if user_additions else base_prompt + @property + def memory_dir(self) -> Path: + """Directory where memory files and per-phase sidecar artifacts are written.""" + return self._path.parent + def memory_path(self, phase_id: str) -> Path: """Return the resolved path to a phase's memory file.""" return (self.root / f"{phase_id}_memory.md").resolve() diff --git a/ddev/tests/ai/flows/openmetrics/phases/test_inspect_endpoint.py b/ddev/tests/ai/flows/openmetrics/phases/test_inspect_endpoint.py new file mode 100644 index 0000000000000..004a59dcfcef6 --- /dev/null +++ b/ddev/tests/ai/flows/openmetrics/phases/test_inspect_endpoint.py @@ -0,0 +1,594 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +import asyncio +import json +import os + +import httpx +import pytest +from prometheus_client import Metric +from prometheus_client.parser import text_string_to_metric_families as parse_prometheus + +from ddev.ai.flows.openmetrics.phases import inspect_endpoint as inspect_endpoint_module +from ddev.ai.flows.openmetrics.phases.inspect_endpoint import ( + EndpointInspectionError, + InspectEndpointPhase, + _build_jsonl_rows, + _build_memory_text, + _parse_exposition, +) +from ddev.ai.phases.checkpoint import CheckpointManager +from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig +from ddev.ai.phases.messages import PhaseFailedMessage, PhaseTrigger +from ddev.ai.tools.fs.file_access_policy import FileAccessPolicy +from ddev.ai.tools.fs.file_registry import FileRegistry +from ddev.event_bus.exceptions import MessageProcessingError + +ENDPOINT_URL = "http://example.test:9100/metrics" +PHASE_ID = "inspect_endpoint" + +PROMETHEUS_BODY = """\ +# HELP http_requests_total Total HTTP requests. +# TYPE http_requests_total counter +http_requests_total{method="GET",code="200"} 1027 +# HELP process_resident_memory_bytes Resident memory in bytes. +# TYPE process_resident_memory_bytes gauge +process_resident_memory_bytes 1.2e+08 +# HELP request_duration_seconds Request latency. +# TYPE request_duration_seconds histogram +request_duration_seconds_bucket{le="0.1"} 3 +request_duration_seconds_bucket{le="0.5"} 5 +request_duration_seconds_bucket{le="+Inf"} 7 +request_duration_seconds_sum 1.5 +request_duration_seconds_count 7 +""" + +OPENMETRICS_BODY = """\ +# TYPE http_requests counter +# HELP http_requests Total HTTP requests. +# UNIT http_requests requests +http_requests_total{method="GET",code="200"} 1027 +# TYPE process_resident_memory_bytes gauge +# UNIT process_resident_memory_bytes bytes +# HELP process_resident_memory_bytes Resident memory in bytes. +process_resident_memory_bytes 1.2e+08 +# EOF +""" + +OPENMETRICS_BODY_CLEAN = """\ +# TYPE http_requests counter +# HELP http_requests Total HTTP requests. +http_requests_total{method="GET",code="200"} 1027 +# TYPE process_resident_memory_bytes gauge +# HELP process_resident_memory_bytes Resident memory in bytes. +process_resident_memory_bytes 1.2e+08 +# TYPE request_duration_seconds summary +# HELP request_duration_seconds Request latency. +request_duration_seconds{quantile="0.5"} 0.04 +request_duration_seconds{quantile="0.9"} 0.09 +request_duration_seconds_sum 1.5 +request_duration_seconds_count 7 +# EOF +""" + +OPENMETRICS_BODY_WITH_UNIT = """\ +# TYPE request_duration_seconds gauge +# HELP request_duration_seconds Request latency snapshot. +# UNIT request_duration_seconds seconds +request_duration_seconds 0.42 +# EOF +""" + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def flow_dir(tmp_path): + return tmp_path + + +@pytest.fixture +def message_queue(): + return asyncio.Queue() + + +def _make_phase( + flow_dir, + message_queue, + *, + phase_id: str = PHASE_ID, + runtime_variables: dict[str, str] | None = None, +) -> tuple[InspectEndpointPhase, CheckpointManager]: + checkpoint_manager = CheckpointManager(flow_dir / "checkpoints.yaml") + phase = InspectEndpointPhase( + phase_id=phase_id, + dependencies=[], + config=PhaseConfig(), + checkpoint_manager=checkpoint_manager, + runtime_variables=runtime_variables if runtime_variables is not None else {"endpoint_url": ENDPOINT_URL}, + flow_variables={}, + config_dir=flow_dir, + file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), + ) + phase.queue = message_queue + return phase, checkpoint_manager + + +def _install_mock_transport(monkeypatch, handler): + """Patch httpx.AsyncClient inside the phase module to use a MockTransport handler.""" + real_client_cls = httpx.AsyncClient + + def factory(*args, **kwargs): + kwargs["transport"] = httpx.MockTransport(handler) + return real_client_cls(*args, **kwargs) + + monkeypatch.setattr(inspect_endpoint_module.httpx, "AsyncClient", factory) + + +def _ok_handler(status: int, body: str, content_type: str): + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(status_code=status, content=body.encode("utf-8"), headers={"Content-Type": content_type}) + + return handler + + +def _raising_handler(exc: Exception): + def handler(request: httpx.Request) -> httpx.Response: + raise exc + + return handler + + +def _read_jsonl(path) -> list[dict]: + return [json.loads(line) for line in path.read_text(encoding="utf-8").splitlines() if line] + + +# --------------------------------------------------------------------------- +# Phase happy paths +# --------------------------------------------------------------------------- + + +async def test_success_with_prometheus_body(flow_dir, message_queue, monkeypatch): + _install_mock_transport( + monkeypatch, + _ok_handler(200, PROMETHEUS_BODY, "text/plain; version=0.0.4; charset=utf-8"), + ) + phase, mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + expected_families = list(parse_prometheus(PROMETHEUS_BODY)) + + memory = mgr.memory_content(PHASE_ID) + assert ENDPOINT_URL in memory + assert "HTTP status:** 200" in memory + + checkpoint = mgr.read()[PHASE_ID] + assert checkpoint["status"] == "success" + assert checkpoint["exposition_format"] == "prometheus" + assert checkpoint["metric_count"] == len(expected_families) + assert "sample_metric_names" not in checkpoint + assert checkpoint["status_code"] == 200 + assert checkpoint["endpoint_url"] == ENDPOINT_URL + assert checkpoint["tokens"] == {"total_input": 0, "total_output": 0} + + +async def test_success_with_openmetrics_body(flow_dir, message_queue, monkeypatch): + content_type = "application/openmetrics-text; version=1.0.0; charset=utf-8" + _install_mock_transport(monkeypatch, _ok_handler(200, OPENMETRICS_BODY_CLEAN, content_type)) + phase, mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + checkpoint = mgr.read()[PHASE_ID] + assert checkpoint["status"] == "success" + assert checkpoint["exposition_format"] == "openmetrics" + assert checkpoint["content_type"] == content_type + assert checkpoint["metric_count"] >= 1 + + +# --------------------------------------------------------------------------- +# Phase failure paths +# --------------------------------------------------------------------------- + + +async def _assert_phase_fails(phase, mgr, message_queue, *, error_contains: str): + """Run execute, expect failure, drive on_error like the framework would.""" + trigger = PhaseTrigger(id="start", phase_id=None) + context = { + "endpoint_url": phase._runtime_variables.get("endpoint_url"), + "phase_name": phase._phase_id, + } + try: + await phase.execute(context) + except Exception as raised: + wrapped = MessageProcessingError(phase._phase_id, trigger, raised) + await phase.on_error(wrapped) + checkpoint = mgr.read()[phase._phase_id] + assert checkpoint["status"] == "failed" + assert error_contains.lower() in checkpoint["error"].lower() + msg = message_queue.get_nowait() + assert isinstance(msg, PhaseFailedMessage) + assert error_contains.lower() in msg.error.lower() + return raised + pytest.fail(f"Expected execute() to raise; error should contain {error_contains!r}") + + +async def test_failure_non_200_status(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(503, "service unavailable", "text/plain")) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="HTTP 503") + + +async def test_failure_body_is_html(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, "not metrics", "text/plain")) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="not valid prometheus exposition") + + +async def test_failure_body_is_json(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, '{"hello": "world"}', "text/plain")) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="prometheus") + + +async def test_failure_zero_families(flow_dir, message_queue, monkeypatch): + body = "\n\n# just a stray comment, not a HELP/TYPE\n\n" + _install_mock_transport(monkeypatch, _ok_handler(200, body, "text/plain")) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="zero metric families") + + +async def test_failure_openmetrics_missing_eof(flow_dir, message_queue, monkeypatch): + _install_mock_transport( + monkeypatch, + _ok_handler(200, PROMETHEUS_BODY, "application/openmetrics-text; version=1.0.0"), + ) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="not valid openmetrics exposition") + + +async def test_failure_timeout(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _raising_handler(httpx.TimeoutException("slow"))) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="timed out") + + +async def test_failure_request_error(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _raising_handler(httpx.ConnectError("refused"))) + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="request failed") + + +async def test_failure_missing_endpoint_url(flow_dir, message_queue): + phase, mgr = _make_phase(flow_dir, message_queue, runtime_variables={}) + + trigger = PhaseTrigger(id="start", phase_id=None) + with pytest.raises(FlowConfigError, match="endpoint_url"): + await phase.process_message(trigger) + + wrapped = MessageProcessingError(phase._phase_id, trigger, FlowConfigError("endpoint_url required")) + await phase.on_error(wrapped) + + checkpoint = mgr.read()[PHASE_ID] + assert checkpoint["status"] == "failed" + msg = message_queue.get_nowait() + assert isinstance(msg, PhaseFailedMessage) + + +# --------------------------------------------------------------------------- +# validate_config +# --------------------------------------------------------------------------- + + +def test_validate_config_rejects_agent(): + with pytest.raises(FlowConfigError, match="must not declare 'agent'"): + InspectEndpointPhase.validate_config("p", PhaseConfig(agent="x"), {"x": AgentConfig()}) + + +def test_validate_config_rejects_tasks(): + config = PhaseConfig(tasks=[TaskConfig(name="t", prompt="hi")]) + with pytest.raises(FlowConfigError, match="must not declare 'tasks'"): + InspectEndpointPhase.validate_config("p", config, {}) + + +def test_validate_config_rejects_checkpoint(): + config = PhaseConfig(checkpoint=CheckpointConfig(memory_prompt="x")) + with pytest.raises(FlowConfigError, match="must not declare 'checkpoint'"): + InspectEndpointPhase.validate_config("p", config, {}) + + +def test_validate_config_accepts_minimal(): + InspectEndpointPhase.validate_config("p", PhaseConfig(), {}) + + +# --------------------------------------------------------------------------- +# _parse_exposition +# --------------------------------------------------------------------------- + + +def test_parse_exposition_prometheus(): + families, fmt = _parse_exposition(PROMETHEUS_BODY, "text/plain; version=0.0.4") + assert fmt == "prometheus" + assert len(families) > 0 + + +def test_parse_exposition_openmetrics(): + families, fmt = _parse_exposition(OPENMETRICS_BODY_CLEAN, "application/openmetrics-text; version=1.0.0") + assert fmt == "openmetrics" + assert len(families) > 0 + + +def test_parse_exposition_empty_content_type_falls_back_to_prometheus(): + families, fmt = _parse_exposition(PROMETHEUS_BODY, "") + assert fmt == "prometheus" + assert len(families) > 0 + + +def test_parse_exposition_raises_on_invalid_body(): + with pytest.raises(EndpointInspectionError, match="not valid prometheus exposition"): + _parse_exposition("", "text/plain") + + +def test_parse_exposition_raises_on_zero_families(): + with pytest.raises(EndpointInspectionError, match="zero metric families"): + _parse_exposition("", "text/plain") + + +# --------------------------------------------------------------------------- +# _build_memory_text +# --------------------------------------------------------------------------- + + +def test_build_memory_text_renders_all_fields(tmp_path): + jsonl_path = tmp_path / "inspect_endpoint_metrics.jsonl" + text = _build_memory_text( + url="http://example.test:9100/metrics", + status=200, + content_type="text/plain; version=0.0.4", + exposition_format="prometheus", + metric_count=2, + jsonl_path=jsonl_path, + ) + assert "http://example.test:9100/metrics" in text + assert "200" in text + assert "text/plain; version=0.0.4" in text + assert "prometheus" in text + assert "2" in text + assert str(jsonl_path) in text + + +# --------------------------------------------------------------------------- +# JSONL + sidecar contract — new tests +# --------------------------------------------------------------------------- + + +async def test_jsonl_path_in_checkpoint(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase, mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + checkpoint = mgr.read()[PHASE_ID] + path_str = checkpoint["metrics_jsonl_path"] + assert isinstance(path_str, str) + assert os.path.isabs(path_str) + assert os.path.exists(path_str) + + +async def test_jsonl_one_row_per_family(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase, mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + checkpoint = mgr.read()[PHASE_ID] + rows = _read_jsonl(flow_dir / f"{PHASE_ID}_metrics.jsonl") + expected_families = list(parse_prometheus(PROMETHEUS_BODY)) + assert len(rows) == checkpoint["metric_count"] == len(expected_families) + + +async def test_jsonl_row_schema(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase, _mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + expected_keys = {"name", "type", "help", "unit", "label_keys", "sample_count"} + for row in _read_jsonl(flow_dir / f"{PHASE_ID}_metrics.jsonl"): + assert set(row.keys()) == expected_keys + assert isinstance(row["label_keys"], list) + assert all(isinstance(k, str) for k in row["label_keys"]) + assert row["label_keys"] == sorted(row["label_keys"]) + assert len(row["label_keys"]) == len(set(row["label_keys"])) + + +async def test_jsonl_counter_total_stripped_prometheus(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase, _mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + rows = _read_jsonl(flow_dir / f"{PHASE_ID}_metrics.jsonl") + counter_rows = [r for r in rows if r["type"] == "counter"] + assert len(counter_rows) == 1 + assert counter_rows[0]["name"] == "http_requests" + + +async def test_jsonl_counter_total_stripped_openmetrics(flow_dir, message_queue, monkeypatch): + _install_mock_transport( + monkeypatch, + _ok_handler(200, OPENMETRICS_BODY_CLEAN, "application/openmetrics-text; version=1.0.0"), + ) + phase, _mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + rows = _read_jsonl(flow_dir / f"{PHASE_ID}_metrics.jsonl") + counter_rows = [r for r in rows if r["type"] == "counter"] + assert len(counter_rows) == 1 + assert counter_rows[0]["name"] == "http_requests" + + +async def test_jsonl_histogram_collapses_to_single_row(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase, _mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + rows = _read_jsonl(flow_dir / f"{PHASE_ID}_metrics.jsonl") + hist_rows = [r for r in rows if r["type"] == "histogram"] + assert len(hist_rows) == 1 + row = hist_rows[0] + assert row["name"] == "request_duration_seconds" + assert "le" in row["label_keys"] + + +async def test_jsonl_summary_collapses_to_single_row(flow_dir, message_queue, monkeypatch): + _install_mock_transport( + monkeypatch, + _ok_handler(200, OPENMETRICS_BODY_CLEAN, "application/openmetrics-text; version=1.0.0"), + ) + phase, _mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + rows = _read_jsonl(flow_dir / f"{PHASE_ID}_metrics.jsonl") + summary_rows = [r for r in rows if r["type"] == "summary"] + assert len(summary_rows) == 1 + row = summary_rows[0] + assert row["name"] == "request_duration_seconds" + assert "quantile" in row["label_keys"] + + +async def test_jsonl_unit_populated_for_openmetrics(flow_dir, message_queue, monkeypatch): + _install_mock_transport( + monkeypatch, + _ok_handler(200, OPENMETRICS_BODY_WITH_UNIT, "application/openmetrics-text; version=1.0.0"), + ) + phase, _mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + rows = _read_jsonl(flow_dir / f"{PHASE_ID}_metrics.jsonl") + assert len(rows) == 1 + assert rows[0]["unit"] == "seconds" + + +async def test_jsonl_unit_empty_for_prometheus(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase, _mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + rows = _read_jsonl(flow_dir / f"{PHASE_ID}_metrics.jsonl") + assert all(r["unit"] == "" for r in rows) + + +def test_jsonl_label_keys_are_union_across_samples(): + metric = Metric("multi_labels", "doc", "gauge") + metric.samples.append(_Sample("multi_labels", {"a": "1"}, 1.0)) + metric.samples.append(_Sample("multi_labels", {"a": "1", "b": "2"}, 2.0)) + metric.samples.append(_Sample("multi_labels", {"a": "1", "c": "3"}, 3.0)) + + rows = _build_jsonl_rows([metric]) + + assert rows[0]["label_keys"] == ["a", "b", "c"] + + +async def test_jsonl_sample_count_matches_parser(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase, _mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + rows = _read_jsonl(flow_dir / f"{PHASE_ID}_metrics.jsonl") + families = list(parse_prometheus(PROMETHEUS_BODY)) + expected_counts = {m.name: len(m.samples) for m in families} + actual_counts = {r["name"]: r["sample_count"] for r in rows} + assert actual_counts == expected_counts + + +async def test_jsonl_ordering_matches_parser(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase, _mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + rows = _read_jsonl(flow_dir / f"{PHASE_ID}_metrics.jsonl") + families = list(parse_prometheus(PROMETHEUS_BODY)) + assert [r["name"] for r in rows] == [m.name for m in families] + + +async def test_jsonl_is_deterministic_byte_for_byte(flow_dir, message_queue, monkeypatch, tmp_path_factory): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase1, _ = _make_phase(flow_dir, message_queue) + await phase1.process_message(PhaseTrigger(id="start", phase_id=None)) + first_bytes = (flow_dir / f"{PHASE_ID}_metrics.jsonl").read_bytes() + + flow_dir_2 = tmp_path_factory.mktemp("second_run") + queue_2 = asyncio.Queue() + phase2, _ = _make_phase(flow_dir_2, queue_2) + await phase2.process_message(PhaseTrigger(id="start", phase_id=None)) + second_bytes = (flow_dir_2 / f"{PHASE_ID}_metrics.jsonl").read_bytes() + + assert first_bytes == second_bytes + assert first_bytes.endswith(b"\n") + + +async def test_memory_text_includes_jsonl_path(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase, mgr = _make_phase(flow_dir, message_queue) + + await phase.process_message(PhaseTrigger(id="start", phase_id=None)) + + checkpoint = mgr.read()[PHASE_ID] + memory = mgr.memory_content(PHASE_ID) + assert checkpoint["metrics_jsonl_path"] in memory + + +async def test_jsonl_sidecar_atomic_on_os_replace_failure(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + + def boom(_src, _dst): + raise OSError("simulated atomic replace failure") + + monkeypatch.setattr(inspect_endpoint_module.os, "replace", boom) + + phase, mgr = _make_phase(flow_dir, message_queue) + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="Failed to write metrics catalog") + + final_path = flow_dir / f"{PHASE_ID}_metrics.jsonl" + assert not final_path.exists() + + +async def test_jsonl_failure_propagates_as_phase_failure(flow_dir, message_queue, monkeypatch): + _install_mock_transport(monkeypatch, _ok_handler(200, PROMETHEUS_BODY, "text/plain")) + phase, mgr = _make_phase(flow_dir, message_queue) + + blocker = flow_dir / f"{PHASE_ID}_metrics.jsonl.tmp" + blocker.mkdir() + + await _assert_phase_fails(phase, mgr, message_queue, error_contains="Failed to write metrics catalog") + + +class _Sample: + """Minimal Sample stand-in for the label-union unit test.""" + + def __init__(self, name: str, labels: dict[str, str], value: float): + self.name = name + self.labels = labels + self.value = value diff --git a/ddev/tests/ai/flows/openmetrics/phases/test_validate_endpoint.py b/ddev/tests/ai/flows/openmetrics/phases/test_validate_endpoint.py deleted file mode 100644 index 114d1d219cf93..0000000000000 --- a/ddev/tests/ai/flows/openmetrics/phases/test_validate_endpoint.py +++ /dev/null @@ -1,345 +0,0 @@ -# (C) Datadog, Inc. 2026-present -# All rights reserved -# Licensed under a 3-clause BSD style license (see LICENSE) - -import asyncio - -import httpx -import pytest -from prometheus_client import Metric -from prometheus_client.parser import text_string_to_metric_families as parse_prometheus - -from ddev.ai.flows.openmetrics.phases import validate_endpoint as validate_endpoint_module -from ddev.ai.flows.openmetrics.phases.validate_endpoint import ( - EndpointValidationError, - ValidateEndpointPhase, - _build_memory_text, - _parse_exposition, -) -from ddev.ai.phases.checkpoint import CheckpointManager -from ddev.ai.phases.config import AgentConfig, CheckpointConfig, FlowConfigError, PhaseConfig, TaskConfig -from ddev.ai.phases.messages import PhaseFailedMessage, PhaseTrigger -from ddev.ai.tools.fs.file_access_policy import FileAccessPolicy -from ddev.ai.tools.fs.file_registry import FileRegistry -from ddev.event_bus.exceptions import MessageProcessingError - -ENDPOINT_URL = "http://example.test:9100/metrics" - -PROMETHEUS_BODY = """\ -# HELP http_requests_total Total HTTP requests. -# TYPE http_requests_total counter -http_requests_total{method="GET",code="200"} 1027 -# HELP process_resident_memory_bytes Resident memory in bytes. -# TYPE process_resident_memory_bytes gauge -process_resident_memory_bytes 1.2e+08 -# HELP request_duration_seconds Request latency. -# TYPE request_duration_seconds histogram -request_duration_seconds_bucket{le="0.1"} 3 -request_duration_seconds_bucket{le="0.5"} 5 -request_duration_seconds_bucket{le="+Inf"} 7 -request_duration_seconds_sum 1.5 -request_duration_seconds_count 7 -""" - -OPENMETRICS_BODY = """\ -# TYPE http_requests counter -# HELP http_requests Total HTTP requests. -# UNIT http_requests requests -http_requests_total{method="GET",code="200"} 1027 -# TYPE process_resident_memory_bytes gauge -# UNIT process_resident_memory_bytes bytes -# HELP process_resident_memory_bytes Resident memory in bytes. -process_resident_memory_bytes 1.2e+08 -# EOF -""" - - -# --------------------------------------------------------------------------- -# Fixtures -# --------------------------------------------------------------------------- - - -@pytest.fixture -def flow_dir(tmp_path): - return tmp_path - - -@pytest.fixture -def message_queue(): - return asyncio.Queue() - - -def _make_phase( - flow_dir, - message_queue, - *, - phase_id: str = "validate_endpoint", - runtime_variables: dict[str, str] | None = None, -) -> tuple[ValidateEndpointPhase, CheckpointManager]: - checkpoint_manager = CheckpointManager(flow_dir / "checkpoints.yaml") - phase = ValidateEndpointPhase( - phase_id=phase_id, - dependencies=[], - config=PhaseConfig(), - checkpoint_manager=checkpoint_manager, - runtime_variables=runtime_variables if runtime_variables is not None else {"endpoint_url": ENDPOINT_URL}, - flow_variables={}, - config_dir=flow_dir, - file_registry=FileRegistry(policy=FileAccessPolicy(write_root=flow_dir)), - ) - phase.queue = message_queue - return phase, checkpoint_manager - - -def _install_mock_transport(monkeypatch, handler): - """Patch httpx.AsyncClient inside the phase module to use a MockTransport handler.""" - real_client_cls = httpx.AsyncClient - - def factory(*args, **kwargs): - kwargs["transport"] = httpx.MockTransport(handler) - return real_client_cls(*args, **kwargs) - - monkeypatch.setattr(validate_endpoint_module.httpx, "AsyncClient", factory) - - -def _ok_handler(status: int, body: str, content_type: str): - def handler(request: httpx.Request) -> httpx.Response: - return httpx.Response(status_code=status, content=body.encode("utf-8"), headers={"Content-Type": content_type}) - - return handler - - -def _raising_handler(exc: Exception): - def handler(request: httpx.Request) -> httpx.Response: - raise exc - - return handler - - -# --------------------------------------------------------------------------- -# Phase happy paths -# --------------------------------------------------------------------------- - - -async def test_success_with_prometheus_body(flow_dir, message_queue, monkeypatch): - _install_mock_transport( - monkeypatch, - _ok_handler(200, PROMETHEUS_BODY, "text/plain; version=0.0.4; charset=utf-8"), - ) - phase, mgr = _make_phase(flow_dir, message_queue) - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - expected_families = list(parse_prometheus(PROMETHEUS_BODY)) - expected_names = [m.name for m in expected_families[:10]] - - memory = mgr.memory_content("validate_endpoint") - assert ENDPOINT_URL in memory - assert "HTTP status:** 200" in memory - assert expected_names[0] in memory - assert expected_families[0].type in memory - - checkpoint = mgr.read()["validate_endpoint"] - assert checkpoint["status"] == "success" - assert checkpoint["exposition_format"] == "prometheus" - assert checkpoint["metric_count"] == len(expected_families) - assert checkpoint["sample_metric_names"] == expected_names - assert checkpoint["status_code"] == 200 - assert checkpoint["endpoint_url"] == ENDPOINT_URL - assert checkpoint["tokens"] == {"total_input": 0, "total_output": 0} - - -async def test_success_with_openmetrics_body(flow_dir, message_queue, monkeypatch): - content_type = "application/openmetrics-text; version=1.0.0; charset=utf-8" - _install_mock_transport(monkeypatch, _ok_handler(200, OPENMETRICS_BODY, content_type)) - phase, mgr = _make_phase(flow_dir, message_queue) - - await phase.process_message(PhaseTrigger(id="start", phase_id=None)) - - checkpoint = mgr.read()["validate_endpoint"] - assert checkpoint["status"] == "success" - assert checkpoint["exposition_format"] == "openmetrics" - assert checkpoint["content_type"] == content_type - assert checkpoint["metric_count"] >= 1 - - -# --------------------------------------------------------------------------- -# Phase failure paths -# --------------------------------------------------------------------------- - - -async def _assert_phase_fails(phase, mgr, message_queue, *, error_contains: str): - """Run execute, expect failure, drive on_error like the framework would.""" - trigger = PhaseTrigger(id="start", phase_id=None) - context = { - "endpoint_url": phase._runtime_variables.get("endpoint_url"), - "phase_name": phase._phase_id, - } - try: - await phase.execute(context) - except Exception as raised: - wrapped = MessageProcessingError(phase._phase_id, trigger, raised) - await phase.on_error(wrapped) - checkpoint = mgr.read()[phase._phase_id] - assert checkpoint["status"] == "failed" - assert error_contains.lower() in checkpoint["error"].lower() - msg = message_queue.get_nowait() - assert isinstance(msg, PhaseFailedMessage) - assert error_contains.lower() in msg.error.lower() - return raised - pytest.fail(f"Expected execute() to raise; error should contain {error_contains!r}") - - -async def test_failure_non_200_status(flow_dir, message_queue, monkeypatch): - _install_mock_transport(monkeypatch, _ok_handler(503, "service unavailable", "text/plain")) - phase, mgr = _make_phase(flow_dir, message_queue) - - await _assert_phase_fails(phase, mgr, message_queue, error_contains="HTTP 503") - - -async def test_failure_body_is_html(flow_dir, message_queue, monkeypatch): - _install_mock_transport(monkeypatch, _ok_handler(200, "not metrics", "text/plain")) - phase, mgr = _make_phase(flow_dir, message_queue) - - await _assert_phase_fails(phase, mgr, message_queue, error_contains="not valid prometheus exposition") - - -async def test_failure_body_is_json(flow_dir, message_queue, monkeypatch): - _install_mock_transport(monkeypatch, _ok_handler(200, '{"hello": "world"}', "text/plain")) - phase, mgr = _make_phase(flow_dir, message_queue) - - await _assert_phase_fails(phase, mgr, message_queue, error_contains="prometheus") - - -async def test_failure_zero_families(flow_dir, message_queue, monkeypatch): - body = "\n\n# just a stray comment, not a HELP/TYPE\n\n" - _install_mock_transport(monkeypatch, _ok_handler(200, body, "text/plain")) - phase, mgr = _make_phase(flow_dir, message_queue) - - await _assert_phase_fails(phase, mgr, message_queue, error_contains="zero metric families") - - -async def test_failure_openmetrics_missing_eof(flow_dir, message_queue, monkeypatch): - _install_mock_transport( - monkeypatch, - _ok_handler(200, PROMETHEUS_BODY, "application/openmetrics-text; version=1.0.0"), - ) - phase, mgr = _make_phase(flow_dir, message_queue) - - await _assert_phase_fails(phase, mgr, message_queue, error_contains="not valid openmetrics exposition") - - -async def test_failure_timeout(flow_dir, message_queue, monkeypatch): - _install_mock_transport(monkeypatch, _raising_handler(httpx.TimeoutException("slow"))) - phase, mgr = _make_phase(flow_dir, message_queue) - - await _assert_phase_fails(phase, mgr, message_queue, error_contains="timed out") - - -async def test_failure_request_error(flow_dir, message_queue, monkeypatch): - _install_mock_transport(monkeypatch, _raising_handler(httpx.ConnectError("refused"))) - phase, mgr = _make_phase(flow_dir, message_queue) - - await _assert_phase_fails(phase, mgr, message_queue, error_contains="request failed") - - -async def test_failure_missing_endpoint_url(flow_dir, message_queue): - phase, mgr = _make_phase(flow_dir, message_queue, runtime_variables={}) - - trigger = PhaseTrigger(id="start", phase_id=None) - with pytest.raises(FlowConfigError, match="endpoint_url"): - await phase.process_message(trigger) - - wrapped = MessageProcessingError(phase._phase_id, trigger, FlowConfigError("endpoint_url required")) - await phase.on_error(wrapped) - - checkpoint = mgr.read()["validate_endpoint"] - assert checkpoint["status"] == "failed" - msg = message_queue.get_nowait() - assert isinstance(msg, PhaseFailedMessage) - - -# --------------------------------------------------------------------------- -# validate_config -# --------------------------------------------------------------------------- - - -def test_validate_config_rejects_agent(): - with pytest.raises(FlowConfigError, match="must not declare 'agent'"): - ValidateEndpointPhase.validate_config("p", PhaseConfig(agent="x"), {"x": AgentConfig()}) - - -def test_validate_config_rejects_tasks(): - config = PhaseConfig(tasks=[TaskConfig(name="t", prompt="hi")]) - with pytest.raises(FlowConfigError, match="must not declare 'tasks'"): - ValidateEndpointPhase.validate_config("p", config, {}) - - -def test_validate_config_rejects_checkpoint(): - config = PhaseConfig(checkpoint=CheckpointConfig(memory_prompt="x")) - with pytest.raises(FlowConfigError, match="must not declare 'checkpoint'"): - ValidateEndpointPhase.validate_config("p", config, {}) - - -def test_validate_config_accepts_minimal(): - ValidateEndpointPhase.validate_config("p", PhaseConfig(), {}) - - -# --------------------------------------------------------------------------- -# _parse_exposition -# --------------------------------------------------------------------------- - - -def test_parse_exposition_prometheus(): - families, fmt = _parse_exposition(PROMETHEUS_BODY, "text/plain; version=0.0.4") - assert fmt == "prometheus" - assert len(families) > 0 - - -def test_parse_exposition_openmetrics(): - families, fmt = _parse_exposition(OPENMETRICS_BODY, "application/openmetrics-text; version=1.0.0") - assert fmt == "openmetrics" - assert len(families) > 0 - - -def test_parse_exposition_empty_content_type_falls_back_to_prometheus(): - families, fmt = _parse_exposition(PROMETHEUS_BODY, "") - assert fmt == "prometheus" - assert len(families) > 0 - - -def test_parse_exposition_raises_on_invalid_body(): - with pytest.raises(EndpointValidationError, match="not valid prometheus exposition"): - _parse_exposition("", "text/plain") - - -def test_parse_exposition_raises_on_zero_families(): - with pytest.raises(EndpointValidationError, match="zero metric families"): - _parse_exposition("", "text/plain") - - -# --------------------------------------------------------------------------- -# _build_memory_text -# --------------------------------------------------------------------------- - - -def test_build_memory_text_renders_all_fields(): - families = [ - Metric("widgets", "Widget count.", "counter"), - Metric("gizmos", "Gizmo gauge.", "gauge"), - ] - text = _build_memory_text( - url="http://example.test:9100/metrics", - status=200, - content_type="text/plain; version=0.0.4", - exposition_format="prometheus", - families=families, - ) - assert "http://example.test:9100/metrics" in text - assert "200" in text - assert "text/plain; version=0.0.4" in text - assert "prometheus" in text - assert "widgets" in text - assert "counter" in text - assert "gizmos" in text - assert "gauge" in text From 7b17c9b7cfbb0b242fb000d4ef87dfce0deb2215 Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Wed, 27 May 2026 16:39:00 +0200 Subject: [PATCH 3/5] Raise if flow not in ddev.ai --- ddev/src/ddev/ai/phases/orchestrator.py | 7 ++++++- ddev/tests/ai/phases/test_orchestrator.py | 25 +++++++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/ddev/src/ddev/ai/phases/orchestrator.py b/ddev/src/ddev/ai/phases/orchestrator.py index 875c83839835a..3a4e4bc94f7e6 100644 --- a/ddev/src/ddev/ai/phases/orchestrator.py +++ b/ddev/src/ddev/ai/phases/orchestrator.py @@ -87,7 +87,12 @@ async def on_initialize(self) -> None: flow_phases_dir = config_dir / "phases" if flow_phases_dir.is_dir(): ai_root = Path(__file__).parent.parent - rel = flow_phases_dir.relative_to(ai_root) + try: + rel = flow_phases_dir.relative_to(ai_root) + except ValueError: + raise FlowConfigError( + f"Flow phases directory {flow_phases_dir} must be inside the ddev.ai package tree ({ai_root})" + ) from None flow_import_prefix = "ddev.ai." + ".".join(rel.parts) _discover_and_register_phases( self._phase_registry, diff --git a/ddev/tests/ai/phases/test_orchestrator.py b/ddev/tests/ai/phases/test_orchestrator.py index 509aa3c9dea44..076f7f596bcdb 100644 --- a/ddev/tests/ai/phases/test_orchestrator.py +++ b/ddev/tests/ai/phases/test_orchestrator.py @@ -274,6 +274,31 @@ async def test_on_initialize_unknown_phase_type_raises_flow_config_error(tmp_pat await orchestrator.on_initialize() +async def test_on_initialize_flow_phases_dir_outside_ai_root_raises(tmp_path, make_orchestrator): + """phases/ directory outside the ddev.ai package tree raises FlowConfigError.""" + (tmp_path / "phases").mkdir() + (tmp_path / "prompts").mkdir() + (tmp_path / "prompts" / "writer.md").write_text("system prompt") + (tmp_path / "flow.yaml").write_text( + dedent("""\ + agents: + writer: + tools: [] + phases: + a: + agent: writer + tasks: + - name: t1 + prompt: do it + flow: + - phase: a + """) + ) + orchestrator = make_orchestrator(tmp_path) + with pytest.raises(FlowConfigError, match="ddev.ai package tree"): + await orchestrator.on_initialize() + + async def test_on_initialize_missing_agent_raises(tmp_path, make_orchestrator): (tmp_path / "prompts").mkdir() (tmp_path / "flow.yaml").write_text( From 218fa6110eee2ce5508a8e9ace62558ec14297c9 Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Wed, 27 May 2026 16:50:50 +0200 Subject: [PATCH 4/5] Check if endpoint returns response bigger than 10MB --- .../openmetrics/phases/inspect_endpoint.py | 50 ++++++++++++------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/ddev/src/ddev/ai/flows/openmetrics/phases/inspect_endpoint.py b/ddev/src/ddev/ai/flows/openmetrics/phases/inspect_endpoint.py index bcc45088802fb..cce8205ccf9ad 100644 --- a/ddev/src/ddev/ai/flows/openmetrics/phases/inspect_endpoint.py +++ b/ddev/src/ddev/ai/flows/openmetrics/phases/inspect_endpoint.py @@ -16,6 +16,7 @@ from ddev.ai.phases.config import AgentConfig, FlowConfigError, PhaseConfig REQUEST_TIMEOUT_SECONDS = 10.0 +RESPONSE_BODY_LIMIT_BYTES = 10 * 1024 * 1024 # 10 MB JSONL_FILENAME_SUFFIX = "_metrics.jsonl" @@ -40,7 +41,9 @@ def _parse_exposition(body: str, content_type: str) -> tuple[list[Metric], str]: try: families = list(parser(body)) except Exception as e: - raise EndpointInspectionError(f"Body is not valid {exposition_format} exposition: {e}") from e + raise EndpointInspectionError( + f"Body is not valid {exposition_format} exposition ({type(e).__name__}): {e}" + ) from e if not families: raise EndpointInspectionError(f"Body parsed as {exposition_format} but contained zero metric families") @@ -68,6 +71,13 @@ def _build_jsonl_rows(families: list[Metric]) -> list[dict[str, Any]]: return rows +def _remove_if_exists(path: Path) -> None: + try: + path.unlink(missing_ok=True) + except OSError: + pass + + def _write_jsonl(path: Path, rows: list[dict[str, Any]]) -> None: """Atomically write rows as JSON Lines to path.""" tmp_path = path.with_suffix(path.suffix + ".tmp") @@ -82,18 +92,10 @@ def _write_jsonl(path: Path, rows: list[dict[str, Any]]) -> None: fh.write("\n") os.replace(tmp_path, path) except EndpointInspectionError: - if tmp_path.exists(): - try: - tmp_path.unlink() - except OSError: - pass + _remove_if_exists(tmp_path) raise except OSError as e: - if tmp_path.exists(): - try: - tmp_path.unlink() - except OSError: - pass + _remove_if_exists(tmp_path) raise EndpointInspectionError(f"Failed to write metrics catalog at {path}: {e}") from e @@ -156,22 +158,32 @@ async def execute(self, context: dict[str, Any]) -> PhaseOutcome: if not endpoint_url: raise FlowConfigError(f"Phase {self._phase_id!r}: 'endpoint_url' runtime variable is required") + limit_mb = RESPONSE_BODY_LIMIT_BYTES // (1024 * 1024) try: async with httpx.AsyncClient(timeout=REQUEST_TIMEOUT_SECONDS, follow_redirects=True) as client: - response = await client.get(endpoint_url) + async with client.stream("GET", endpoint_url) as response: + if response.status_code != 200: + raise EndpointInspectionError( + f"Endpoint returned HTTP {response.status_code} (expected 200): {endpoint_url}" + ) + chunks: list[bytes] = [] + received = 0 + async for chunk in response.aiter_bytes(): + received += len(chunk) + if received > RESPONSE_BODY_LIMIT_BYTES: + raise EndpointInspectionError(f"Response body exceeds {limit_mb} MB limit: {endpoint_url}") + chunks.append(chunk) + body = b"".join(chunks).decode("utf-8", errors="replace") + content_type = response.headers.get("Content-Type", "") + except EndpointInspectionError: + raise except httpx.TimeoutException as e: raise EndpointInspectionError(f"Endpoint timed out after {REQUEST_TIMEOUT_SECONDS}s: {endpoint_url}") from e except httpx.RequestError as e: raise EndpointInspectionError(f"Request failed for {endpoint_url}: {e}") from e - if response.status_code != 200: - raise EndpointInspectionError( - f"Endpoint returned HTTP {response.status_code} (expected 200): {endpoint_url}" - ) - - content_type = response.headers.get("Content-Type", "") try: - families, exposition_format = _parse_exposition(response.text, content_type) + families, exposition_format = _parse_exposition(body, content_type) except EndpointInspectionError as e: raise EndpointInspectionError(f"{e} ({endpoint_url})") from e From 9ecf0b2a416654117b1715cc2c2b4abb2848913b Mon Sep 17 00:00:00 2001 From: Luis Orofino Date: Wed, 27 May 2026 16:59:25 +0200 Subject: [PATCH 5/5] Improve test --- .../phases/test_inspect_endpoint.py | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/ddev/tests/ai/flows/openmetrics/phases/test_inspect_endpoint.py b/ddev/tests/ai/flows/openmetrics/phases/test_inspect_endpoint.py index 004a59dcfcef6..a8b341050291d 100644 --- a/ddev/tests/ai/flows/openmetrics/phases/test_inspect_endpoint.py +++ b/ddev/tests/ai/flows/openmetrics/phases/test_inspect_endpoint.py @@ -148,6 +148,15 @@ def _read_jsonl(path) -> list[dict]: return [json.loads(line) for line in path.read_text(encoding="utf-8").splitlines() if line] +class _Sample: + """Minimal Sample stand-in for the label-union unit test.""" + + def __init__(self, name: str, labels: dict[str, str], value: float): + self.name = name + self.labels = labels + self.value = value + + # --------------------------------------------------------------------------- # Phase happy paths # --------------------------------------------------------------------------- @@ -198,14 +207,10 @@ async def test_success_with_openmetrics_body(flow_dir, message_queue, monkeypatc async def _assert_phase_fails(phase, mgr, message_queue, *, error_contains: str): - """Run execute, expect failure, drive on_error like the framework would.""" + """Run process_message, expect failure, drive on_error like the framework would.""" trigger = PhaseTrigger(id="start", phase_id=None) - context = { - "endpoint_url": phase._runtime_variables.get("endpoint_url"), - "phase_name": phase._phase_id, - } try: - await phase.execute(context) + await phase.process_message(trigger) except Exception as raised: wrapped = MessageProcessingError(phase._phase_id, trigger, raised) await phase.on_error(wrapped) @@ -216,7 +221,7 @@ async def _assert_phase_fails(phase, mgr, message_queue, *, error_contains: str) assert isinstance(msg, PhaseFailedMessage) assert error_contains.lower() in msg.error.lower() return raised - pytest.fail(f"Expected execute() to raise; error should contain {error_contains!r}") + pytest.fail(f"Expected process_message() to raise; error should contain {error_contains!r}") async def test_failure_non_200_status(flow_dir, message_queue, monkeypatch): @@ -583,12 +588,3 @@ async def test_jsonl_failure_propagates_as_phase_failure(flow_dir, message_queue blocker.mkdir() await _assert_phase_fails(phase, mgr, message_queue, error_contains="Failed to write metrics catalog") - - -class _Sample: - """Minimal Sample stand-in for the label-union unit test.""" - - def __init__(self, name: str, labels: dict[str, str], value: float): - self.name = name - self.labels = labels - self.value = value