From 173a5180be17e498553d131237ac7c51ed3fe879 Mon Sep 17 00:00:00 2001 From: eligotts <78387377+eligotts@users.noreply.github.com> Date: Thu, 18 Jun 2026 07:03:49 +0000 Subject: [PATCH 1/9] Support raw image offload in v1 train client --- tests/v1/test_graph.py | 107 ++++++++++ tests/v1/test_train_client_multimodal.py | 246 +++++++++++++++++++++++ verifiers/v1/ARCHITECTURE.md | 17 +- verifiers/v1/clients/client.py | 13 ++ verifiers/v1/clients/train.py | 135 +++++++++++-- verifiers/v1/graph.py | 136 +++++++++---- verifiers/v1/interception/server.py | 8 +- verifiers/v1/trace.py | 12 +- verifiers/v1/types.py | 4 +- verifiers/v1/utils/multimodal.py | 112 +++++++++++ 10 files changed, 727 insertions(+), 63 deletions(-) create mode 100644 tests/v1/test_train_client_multimodal.py create mode 100644 verifiers/v1/utils/multimodal.py diff --git a/tests/v1/test_graph.py b/tests/v1/test_graph.py index a115778a2..6df5d4eaa 100644 --- a/tests/v1/test_graph.py +++ b/tests/v1/test_graph.py @@ -1,6 +1,8 @@ import base64 import numpy as np +import pytest +from renderers.base import MultiModalData, PlaceholderRange import verifiers.v1 as vf from verifiers.v1 import graph @@ -107,6 +109,111 @@ def test_routed_experts_none_when_absent(): assert trace.branches[-1].routed_experts is None +def test_raw_image_sidecar_attributed_round_trips_and_feeds_next_bridge(): + trace = vf.Trace(task=vf.Task(idx=0, prompt="x")) + user = vf.UserMessage( + content=[ + vf.ImageUrlContentPart( + image_url=vf.ImageUrlSource( + url="file:///data/outputs/run_abc/assets/images/img.png" + ) + ) + ] + ) + mm = MultiModalData( + mm_hashes={"image": ["abcd1234abcd1234"]}, + mm_placeholders={"image": [PlaceholderRange(offset=2, length=4)]}, + mm_items={ + "image": [ + { + "image_grid_thw": [1, 2, 2], + "raw_image_id": "img.png", + "image_layout_fingerprint": "f" * 32, + } + ] + }, + ) + + graph.prepare_turn(trace, [user]).commit( + vf.Response( + id="a", + created=0, + model="t", + message=vf.AssistantMessage(content="a1"), + finish_reason="stop", + tokens=TurnTokens( + prompt_ids=[10, 11, 12], + completion_ids=[20], + message_spans=[(0, 2)], + multi_modal_data=mm, + ), + ) + ) + + node_mm = trace.nodes[0].multi_modal_data + assert node_mm is not None + assert node_mm.mm_items["image"][0]["raw_image_id"] == "img.png" + assert node_mm.mm_placeholders["image"][0].offset == 2 + + restored = type(trace).model_validate(trace.model_dump()) + restored_mm = restored.nodes[0].multi_modal_data + assert restored_mm is not None + assert restored_mm.mm_items == node_mm.mm_items + assert restored_mm.mm_placeholders["image"][0].length == 4 + + turn = graph.prepare_turn( + trace, + [user, vf.AssistantMessage(content="a1"), vf.UserMessage(content="next")], + ) + prev_mm = turn.previous_multi_modal_data() + assert prev_mm is not None + assert prev_mm.mm_hashes == mm.mm_hashes + assert prev_mm.mm_items == mm.mm_items + assert prev_mm.mm_placeholders["image"][0].offset == 2 + assert trace.branches[-1].multi_modal_data is not None + + +def test_multimodal_sidecar_rejects_processed_image_payloads(): + with pytest.raises(TypeError, match="processed image payloads"): + graph.MessageNode( + message=vf.UserMessage(content="image"), + multi_modal_data=MultiModalData( + mm_hashes={"image": ["abcd1234abcd1234"]}, + mm_items={ + "image": [ + { + "image_grid_thw": [1, 1, 1], + "pixel_values": np.zeros((1, 2), dtype=np.float32), + } + ] + }, + ), + ) + + old_wire_node = { + "message": {"role": "user", "content": "image"}, + "multi_modal_data": { + "mm_hashes": {"image": ["abcd1234abcd1234"]}, + "mm_placeholders": {}, + "mm_items": { + "image": [ + { + "image_grid_thw": { + "__nd__": True, + "dtype": "int64", + "shape": [3], + "data": b"\x00" * 24, + } + } + ] + }, + }, + } + + with pytest.raises(TypeError, match="raw image descriptors"): + graph.MessageNode.model_validate(old_wire_node) + + def test_tool_call_hash_matches_v0_content_and_arguments_normalization(): left = vf.AssistantMessage( content=None, diff --git a/tests/v1/test_train_client_multimodal.py b/tests/v1/test_train_client_multimodal.py new file mode 100644 index 000000000..30b33966e --- /dev/null +++ b/tests/v1/test_train_client_multimodal.py @@ -0,0 +1,246 @@ +import pytest +from renderers.base import MultiModalData, PlaceholderRange, RenderedTokens + +import verifiers.v1 as vf +from verifiers.v1 import graph +from verifiers.v1.clients.train import ( + TrainClient, + _generate_with_image_ref_retry, +) +from verifiers.v1.dialects import ChatDialect +from verifiers.v1.types import TurnTokens +from verifiers.v1.utils import multimodal + + +DATA_URL = "data:image/png;base64,aGVsbG8=" + + +def test_offload_images_inplace_rewrites_wire_and_typed_messages(monkeypatch): + def fake_offload(url, image_dir): + assert image_dir is None + if url == DATA_URL: + return "file:///tmp/run/assets/images/hello.png", 5 + return None + + monkeypatch.setattr(multimodal, "_offload_image_url", fake_offload) + + body = { + "messages": [ + { + "role": "user", + "content": [ + {"type": "text", "text": "look"}, + {"type": "image_url", "image_url": {"url": DATA_URL}}, + ], + } + ] + } + typed = vf.UserMessage( + content=[vf.ImageUrlContentPart(image_url=vf.ImageUrlSource(url=DATA_URL))] + ) + + stats = multimodal.offload_images_inplace([body, typed]) + + assert stats.images_rewritten == 2 + assert stats.bytes_written == 10 + assert body["messages"][0]["content"][1]["image_url"]["url"] == ( + "file:///tmp/run/assets/images/hello.png" + ) + assert isinstance(typed.content, list) + assert typed.content[0].image_url.url == "file:///tmp/run/assets/images/hello.png" + + +def test_offload_images_inplace_rejects_non_file_image_urls(monkeypatch): + monkeypatch.setattr(multimodal, "_offload_image_url", lambda *_: None) + + body = { + "messages": [ + { + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": {"url": "https://example.com/image.png"}, + } + ], + } + ] + } + + with pytest.raises(RuntimeError, match="file:// run image assets"): + multimodal.offload_images_inplace(body) + + +@pytest.mark.asyncio +async def test_train_client_bridges_multimodal_prompt_with_previous_sidecar( + monkeypatch, +): + import renderers.client as renderer_client + + captured = {} + image_msg = vf.UserMessage( + content=[ + vf.ImageUrlContentPart( + image_url=vf.ImageUrlSource(url="file:///run/assets/images/a.png") + ) + ] + ) + previous_mm = MultiModalData( + mm_hashes={"image": ["a" * 16]}, + mm_placeholders={"image": [PlaceholderRange(offset=1, length=2)]}, + mm_items={"image": [{"image_grid_thw": [1, 1, 2]}]}, + ) + trace = vf.Trace(task=vf.Task(idx=0, prompt="x")) + graph.prepare_turn(trace, [image_msg]).commit( + vf.Response( + id="a", + created=0, + model="t", + message=vf.AssistantMessage(content="a1"), + finish_reason="stop", + tokens=TurnTokens( + prompt_ids=[10, 11], + completion_ids=[20], + message_spans=[(0, 1)], + multi_modal_data=previous_mm, + ), + ) + ) + next_msg = vf.UserMessage(content="next") + turn = graph.prepare_turn( + trace, [image_msg, vf.AssistantMessage(content="a1"), next_msg] + ) + + class FakeRenderer: + is_multimodal = True + + def bridge_to_next_turn( + self, + previous_prompt_ids, + previous_completion_ids, + new_messages, + *, + tools=None, + previous_multi_modal_data=None, + ): + captured["previous_prompt_ids"] = previous_prompt_ids + captured["previous_completion_ids"] = previous_completion_ids + captured["new_messages"] = new_messages + captured["previous_multi_modal_data"] = previous_multi_modal_data + return RenderedTokens( + token_ids=[10, 11, 20, 30, 31], + message_indices=[-1, -1, -1, 0, -1], + sampled_mask=[False] * 5, + is_content=[False] * 5, + message_roles=["user"], + multi_modal_data=previous_multi_modal_data, + ) + + async def fake_maybe_offload(renderer, fn): + return fn() + + async def fake_generate(**kwargs): + captured["generate_kwargs"] = kwargs + return { + "request_id": "r", + "finish_reason": "stop", + "content": "done", + "prompt_ids": kwargs["prompt_ids"], + "completion_ids": [99], + "completion_logprobs": [-0.5], + "prompt_attribution": kwargs["prompt_attribution"], + "multi_modal_data": kwargs["multi_modal_data"], + } + + monkeypatch.setattr(renderer_client, "_maybe_offload", fake_maybe_offload) + monkeypatch.setattr(renderer_client, "generate", fake_generate) + + client = TrainClient(openai=object()) + client._pool = FakeRenderer() + response = await client.get_response( + ChatDialect(), + {"messages": []}, + "model", + vf.SamplingConfig(max_tokens=1), + session_id="trace", + turn=turn, + ) + + assert response.message.content == "done" + assert captured["previous_prompt_ids"] == [10, 11] + assert captured["previous_completion_ids"] == [20] + bridged_mm = captured["previous_multi_modal_data"] + assert bridged_mm.mm_hashes == previous_mm.mm_hashes + assert bridged_mm.mm_placeholders["image"][0].length == 2 + assert captured["generate_kwargs"]["multi_modal_data"] is bridged_mm + assert captured["generate_kwargs"]["materialize_all_image_refs"] is False + + +@pytest.mark.asyncio +async def test_generate_retries_missing_mm_cache_by_materializing_image_refs( + monkeypatch, +): + import renderers.client as renderer_client + + calls = [] + + class MissingCache(Exception): + body = {"error": {"type": "missing_mm_cache_item"}} + + async def fake_generate(**kwargs): + calls.append(kwargs["materialize_all_image_refs"]) + if len(calls) == 1: + raise MissingCache() + return {"ok": True} + + monkeypatch.setattr(renderer_client, "generate", fake_generate) + mm = MultiModalData( + mm_hashes={"image": ["a" * 16]}, + mm_placeholders={"image": [PlaceholderRange(offset=0, length=1)]}, + mm_items={"image": [{"image_grid_thw": [1, 1, 1]}]}, + ) + + result = await _generate_with_image_ref_retry( + client=object(), + renderer=object(), + messages=[], + model="m", + multi_modal_data=mm, + ) + + assert result == {"ok": True} + assert calls == [False, True] + + +@pytest.mark.asyncio +async def test_generate_does_not_retry_missing_cache_for_raw_image_refs( + monkeypatch, +): + import renderers.client as renderer_client + + calls = [] + + class MissingCache(Exception): + body = {"error": {"type": "missing_mm_cache_item"}} + + async def fake_generate(**kwargs): + calls.append(kwargs["materialize_all_image_refs"]) + raise MissingCache() + + monkeypatch.setattr(renderer_client, "generate", fake_generate) + mm = MultiModalData( + mm_hashes={"image": ["a" * 16]}, + mm_placeholders={"image": [PlaceholderRange(offset=0, length=1)]}, + mm_items={"image": [{"image_grid_thw": [1, 1, 1], "raw_image_id": "a.png"}]}, + ) + + with pytest.raises(MissingCache): + await _generate_with_image_ref_retry( + client=object(), + renderer=object(), + messages=[], + model="m", + multi_modal_data=mm, + ) + + assert calls == [False] diff --git a/verifiers/v1/ARCHITECTURE.md b/verifiers/v1/ARCHITECTURE.md index 4251aec7f..5b5afda70 100644 --- a/verifiers/v1/ARCHITECTURE.md +++ b/verifiers/v1/ARCHITECTURE.md @@ -70,12 +70,16 @@ end to end: each surviving context window is just another root→leaf path. `Trace.to_record()` (`trace.py`) is the JSON record dump (`model_dump(mode="json")`) for `results.jsonl` / W&B tables, minus the per-node training tensors (`MessageNode.multi_modal_data`, -`routed_experts`, via `_NODE_DUMP_EXCLUDE`): those hold raw numpy bytes that can't round-trip JSON -(the dump raises `UnicodeDecodeError` on real expert ids) and bloat every line. Computed views +`routed_experts`, via `_NODE_DUMP_EXCLUDE`): routed-expert tensors hold raw numpy bytes that can't +round-trip JSON (the dump raises `UnicodeDecodeError` on real expert ids), and multimodal +descriptors are trainer sidecars rather than rollout records. Computed views (`reward`, `branches`, `num_turns`, per-span `duration`) are pydantic properties, so they're never serialized and recompute on load; `state` is excluded. The tensors still reach the trainer over the env-server *wire*, which uses msgpack `model_dump(mode="python")` and carries them as raw `bin` bytes -(not base64) via the field serializers on `MessageNode` (`graph.py`); only the JSON record strips them. +(not base64) via the field serializers on `MessageNode` (`graph.py`); only the JSON record strips +them. Multimodal training uses raw run-image assets: the train client rewrites base64 image parts to +`file://` refs before tracing, and `MessageNode.multi_modal_data` carries lightweight renderer +descriptors (hashes, placeholder ranges, image metadata/refs) rather than image processor outputs. ### Branching: message-level vs renderer-level, and the token invariant @@ -111,9 +115,10 @@ The renderer client avoids the break entirely when it can: instead of re-renderi each turn, the train client (`clients/train.py`) calls `renderer.bridge_to_next_turn(...)`, which keeps the prior `prompt_ids + completion_ids` **verbatim** and only renders the new tail. Verbatim prior ⇒ the stored prefix matches token-for-token ⇒ no fork, one linear branch, invariant intact. -The token-identity check in `commit` is the backstop for when the bridge can't apply (the renderer -returns `None`, multimodal, the eval relay): the break still surfaces as honest branches rather than -silent corruption. +For multimodal renderers, the train client also passes the reusable prefix's `multi_modal_data` so +prior image placeholders and descriptors remain aligned. The token-identity check in `commit` is the +backstop for when the bridge can't apply (the renderer returns `None`, the eval relay): the break +still surfaces as honest branches rather than silent corruption. ## Model access — interception, dialects, clients diff --git a/verifiers/v1/clients/client.py b/verifiers/v1/clients/client.py index 7642b0986..dbbc19c79 100644 --- a/verifiers/v1/clients/client.py +++ b/verifiers/v1/clients/client.py @@ -33,6 +33,19 @@ class RelayReply: class Client(ABC): + async def prepare_request_body(self, dialect: Dialect, body: dict) -> dict: + """Normalize a provider request before the interception server parses/traces it. + + Relay clients keep the request verbatim. Training clients may rewrite heavy + in-process payloads (for example base64 images) into stable run-asset refs so the + trace, renderer, and trainer all see the same cheap message content. + """ + return body + + async def prepare_messages(self, dialect: Dialect, messages: list) -> list: + """Normalize typed simulator messages before adding them to the wire body/trace.""" + return messages + @abstractmethod async def get_response( self, diff --git a/verifiers/v1/clients/train.py b/verifiers/v1/clients/train.py index 92a65ba7f..c19a69502 100644 --- a/verifiers/v1/clients/train.py +++ b/verifiers/v1/clients/train.py @@ -8,14 +8,17 @@ needs a running vLLM engine. """ +import asyncio import json +import logging from collections.abc import Mapping -from typing import Any +from typing import Any, cast from openai import AsyncOpenAI, OpenAIError from renderers import RenderedTokens from renderers import OverlongPromptError as RendererOverlongPromptError from renderers import RendererConfig +from renderers.base import MultiModalData, is_multimodal from verifiers.v1.clients.client import SESSION_ID_HEADER, Client from verifiers.v1.dialects import FINISH_REASONS, ChatDialect, Dialect, parse_tools @@ -32,6 +35,10 @@ TurnTokens, Usage, ) +from verifiers.v1.utils.multimodal import offload_images_inplace + + +logger = logging.getLogger(__name__) def tool_to_wire(tool: Tool) -> dict: @@ -167,16 +174,89 @@ def _is_valid_incremental_tail(messages: list[dict[str, Any]]) -> bool: return all(role == "tool" for role in roles) -def _has_multimodal_content(messages) -> bool: - for message in messages: - content = getattr(message, "content", None) - if not isinstance(content, list): +_RETRYABLE_MM_ERROR_TYPES = frozenset({"missing_mm_cache_item"}) + + +def _json_error_type(value: Any) -> str | None: + if isinstance(value, str): + try: + value = json.loads(value) + except (TypeError, ValueError): + return None + if not isinstance(value, Mapping): + return None + error_type = value.get("error_type") + return error_type if isinstance(error_type, str) else None + + +def _retryable_mm_error_type(exc: Exception) -> str | None: + candidates: list[Any] = [] + body = getattr(exc, "body", None) + if body is not None: + candidates.append(body) + response = getattr(exc, "response", None) + if response is not None: + try: + candidates.append(response.json()) + except Exception: + text = getattr(response, "text", None) + if text is not None: + candidates.append(text) + + for payload in candidates: + if not isinstance(payload, Mapping): + error_type = _json_error_type(payload) + if error_type in _RETRYABLE_MM_ERROR_TYPES: + return error_type continue - if any(getattr(part, "type", None) == "image_url" for part in content): + error = payload.get("error") + if isinstance(error, Mapping): + error_type = error.get("type") + if error_type in _RETRYABLE_MM_ERROR_TYPES: + return cast(str, error_type) + error_type = _json_error_type(error.get("message")) + if error_type in _RETRYABLE_MM_ERROR_TYPES: + return error_type + error_type = _json_error_type(payload) + if error_type in _RETRYABLE_MM_ERROR_TYPES: + return error_type + return None + + +def _has_descriptor_only_images(mm_data: MultiModalData | None) -> bool: + """True when a prompt carries prior image descriptors without raw refs.""" + if mm_data is None or mm_data.is_empty(): + return False + for item in mm_data.mm_items.get("image") or []: + if isinstance(item, Mapping) and not item.get("raw_image_id"): return True return False +async def _generate_with_image_ref_retry(**kwargs: Any) -> dict[str, Any]: + """Retry a missing-cache MM request by materializing all image refs once. + + The normal bridge path sends descriptor-only entries for prior images and + refs only for newly introduced images. If vLLM says its MM cache no longer + has a prior item, retry by asking the renderer to rebuild refs for every + image from the file-backed messages. This does not send processor outputs; + it only sends raw image refs again. + """ + has_descriptor_only = _has_descriptor_only_images(kwargs.get("multi_modal_data")) + try: + from renderers.client import generate + + return await generate(materialize_all_image_refs=False, **kwargs) + except Exception as exc: + if not has_descriptor_only or _retryable_mm_error_type(exc) is None: + raise + logger.warning( + "vLLM MM cache miss; retrying with all image refs materialized: %r", + exc, + ) + return await generate(materialize_all_image_refs=True, **kwargs) + + class TrainClient(Client): """Renders prompts to token ids and calls a vLLM `/inference/v1/generate` engine.""" @@ -202,6 +282,28 @@ def _renderer_pool(self, model: str): ) return self._pool + async def prepare_request_body(self, dialect: Dialect, body: dict) -> dict: + if isinstance(dialect, ChatDialect): + stats = await asyncio.to_thread(offload_images_inplace, body) + if stats.images_rewritten: + logger.info( + "offloaded %d image(s) to run assets (%.1f MiB)", + stats.images_rewritten, + stats.bytes_written / (1024.0 * 1024.0), + ) + return body + + async def prepare_messages(self, dialect: Dialect, messages: list) -> list: + if isinstance(dialect, ChatDialect): + stats = await asyncio.to_thread(offload_images_inplace, messages) + if stats.images_rewritten: + logger.info( + "offloaded %d simulator image(s) to run assets (%.1f MiB)", + stats.images_rewritten, + stats.bytes_written / (1024.0 * 1024.0), + ) + return messages + async def get_response( self, dialect: Dialect, @@ -232,7 +334,7 @@ async def get_response( else: prompt, tools = dialect.parse_request(body) renderer = self._renderer_pool(model) - from renderers.client import _maybe_offload, generate + from renderers.client import _maybe_offload wire_tools = [tool_to_wire(t) for t in tools] if tools else None wire_messages = ( @@ -244,23 +346,24 @@ async def get_response( sampling_params = sampling_args.model_dump(exclude_none=True) bridged_turn: PendingTurn | None = None - # Only build the (O(context)) previous-turn token ids once the cheap guards pass — a - # multimodal prompt or a tail that isn't a clean `[tool*, user?]` extension can't bridge. - can_bridge = ( - turn is not None - and not _has_multimodal_content(prompt) - and _is_valid_incremental_tail(wire_messages) - ) + # Only build the (O(context)) previous-turn token ids once the cheap guards pass: a + # tail that isn't a clean `[tool*, user?]` extension can't bridge. + can_bridge = turn is not None and _is_valid_incremental_tail(wire_messages) previous_ids = turn.previous_token_ids() if can_bridge else None if previous_ids is not None: previous_prompt_ids, previous_completion_ids = previous_ids def bridge(): + kwargs: dict[str, Any] = {"tools": wire_tools} + if is_multimodal(renderer): + kwargs["previous_multi_modal_data"] = ( + turn.previous_multi_modal_data() + ) return renderer.bridge_to_next_turn( previous_prompt_ids, previous_completion_ids, wire_messages, - tools=wire_tools, + **kwargs, ) bridged = await _maybe_offload(renderer, bridge) @@ -279,7 +382,7 @@ def bridge(): wire_messages = [message_to_wire(m) for m in prompt] try: - result = await generate( + result = await _generate_with_image_ref_retry( client=self.openai, renderer=renderer, messages=wire_messages, diff --git a/verifiers/v1/graph.py b/verifiers/v1/graph.py index 371bd7174..122bb8329 100644 --- a/verifiers/v1/graph.py +++ b/verifiers/v1/graph.py @@ -60,6 +60,44 @@ def _decode_ndarray(d: dict) -> np.ndarray: return np.frombuffer(d["data"], dtype=np.dtype(d["dtype"])).reshape(d["shape"]) +def _contains_array_payload(value: Any) -> bool: + if isinstance(value, np.ndarray): + return True + if isinstance(value, dict): + return bool(value.get("__nd__")) or any( + _contains_array_payload(v) for v in value.values() + ) + if isinstance(value, (list, tuple)): + return any(_contains_array_payload(v) for v in value) + return False + + +def _validate_raw_mm_item(item: Any) -> dict[str, Any]: + if not isinstance(item, dict): + raise TypeError( + "v1 multimodal sidecars must be raw image descriptor dicts, " + f"got {type(item).__name__}" + ) + if "pixel_values" in item: + raise TypeError("v1 does not carry processed image payloads") + if _contains_array_payload(item): + raise TypeError( + "v1 multimodal sidecars must be raw image descriptors, not arrays" + ) + return dict(item) + + +def _validate_raw_mm_data(mmd: MultiModalData) -> MultiModalData: + return MultiModalData( + mm_hashes={k: list(v) for k, v in mmd.mm_hashes.items()}, + mm_placeholders={k: list(v) for k, v in mmd.mm_placeholders.items()}, + mm_items={ + modality: [_validate_raw_mm_item(item) for item in items] + for modality, items in mmd.mm_items.items() + }, + ) + + class MessageNode(StrictBaseModel): """One message in the graph: a message plus the tokens it adds to the cumulative sequence. Concatenating a root→leaf path's nodes reconstructs that branch's full token @@ -97,14 +135,16 @@ class MessageNode(StrictBaseModel): finish_reason: FinishReason = None """The response's finish reason (assistant nodes only) — kept for truncation detection.""" multi_modal_data: MultiModalData | None = None - """The renderer items for the images this message's content introduces (pixel tensors, - grids, hashes, placeholders) — the only carrier of the pixels from the env server to the - trainer. `Branch.multi_modal_data` concatenates them along the path into the training - `mm_kwargs`. Rides the wire as raw bytes (msgpack `bin`) since pydantic can't JSON the numpy; - kept off disk by the dump-site `exclude` in prime-rl (the tensors bloat the rollout jsonl).""" - usage: Usage | None = None - """Provider-reported token usage for this message's response (assistant nodes). Preserved - on the wire and on disk, including cache-read tokens when the provider reports them.""" + """The renderer items for images this message introduces. + + With the raw-image path, items are lightweight descriptors (hashes, grid metadata, and + optional run-image refs), not image processor tensors. `Branch.multi_modal_data` concatenates + them along the path for the trainer. Old processed-payload sidecars are rejected. + """ + usage: Usage | None = Field(default=None, exclude=True) + """Provider-reported token usage for this message's response (assistant nodes). Transient + (excluded from wire/disk); lets the live dashboard show token counts even when the endpoint + returns no token ids (so `token_ids` is empty).""" routed_experts: np.ndarray | None = None """This node's slice of the MoE expert-routing array — uint8 `[len(token_ids), layers, top_k]`, the expert ids inference selected for exactly this node's tokens. Attributed from @@ -116,10 +156,10 @@ class MessageNode(StrictBaseModel): @field_serializer("multi_modal_data") def serialize_multi_modal_data(self, mmd: MultiModalData | None) -> dict | None: - """`MultiModalData` -> msgpack-safe dict so the pixel tensors ride the wire; numpy - `mm_items` values become raw-bytes `__nd__` dicts (every renderer emits `return_tensors="np"`).""" + """`MultiModalData` -> msgpack-safe raw descriptor dict.""" if mmd is None: return None + mmd = _validate_raw_mm_data(mmd) return { "mm_hashes": {k: list(v) for k, v in mmd.mm_hashes.items()}, "mm_placeholders": { @@ -127,9 +167,7 @@ def serialize_multi_modal_data(self, mmd: MultiModalData | None) -> dict | None: for modality, ranges in mmd.mm_placeholders.items() }, "mm_items": { - modality: [ - {k: _encode_ndarray(v) for k, v in item.items()} for item in items - ] + modality: [dict(item) for item in items] for modality, items in mmd.mm_items.items() }, } @@ -137,25 +175,29 @@ def serialize_multi_modal_data(self, mmd: MultiModalData | None) -> dict | None: @field_validator("multi_modal_data", mode="before") @classmethod def deserialize_multi_modal_data(cls, value: Any) -> MultiModalData | None: - if value is None or isinstance(value, MultiModalData): + if value is None: return value + if isinstance(value, MultiModalData): + return _validate_raw_mm_data(value) if not isinstance(value, dict): raise TypeError(f"cannot build MultiModalData from {type(value).__name__}") - return MultiModalData( - mm_hashes={k: list(v) for k, v in (value.get("mm_hashes") or {}).items()}, - mm_placeholders={ - modality: [ - PlaceholderRange(offset=p["offset"], length=p["length"]) - for p in ranges - ] - for modality, ranges in (value.get("mm_placeholders") or {}).items() - }, - mm_items={ - modality: [ - {k: _decode_ndarray(v) for k, v in item.items()} for item in items - ] - for modality, items in (value.get("mm_items") or {}).items() - }, + return _validate_raw_mm_data( + MultiModalData( + mm_hashes={ + k: list(v) for k, v in (value.get("mm_hashes") or {}).items() + }, + mm_placeholders={ + modality: [ + PlaceholderRange(offset=p["offset"], length=p["length"]) + for p in ranges + ] + for modality, ranges in (value.get("mm_placeholders") or {}).items() + }, + mm_items={ + modality: list(items) + for modality, items in (value.get("mm_items") or {}).items() + }, + ) ) @field_serializer("routed_experts") @@ -304,6 +346,23 @@ def prompt_message_spans( for span in tail_spans ] + def previous_multi_modal_data(self) -> MultiModalData | None: + """Concatenate multimodal sidecars attached to the reusable prefix.""" + merged = MultiModalData() + found = False + for nid in self.prefix_node_ids: + mmd = self.trace.nodes[nid].multi_modal_data + if mmd is None or mmd.is_empty(): + continue + found = True + for modality, items in mmd.mm_items.items(): + merged.mm_items.setdefault(modality, []).extend(items) + for modality, hashes in mmd.mm_hashes.items(): + merged.mm_hashes.setdefault(modality, []).extend(hashes) + for modality, placeholders in mmd.mm_placeholders.items(): + merged.mm_placeholders.setdefault(modality, []).extend(placeholders) + return merged if found else None + def commit(self, response: Response) -> None: _commit_turn(self, response) @@ -360,8 +419,9 @@ def _attribute_mm( renderer emits items per modality in prompt order (message order, then content-part order), so we walk the path advancing a per-modality cursor over every message's media but write only the nodes created this turn — `path[:num_reused]` is the reused prefix, already - attributed when first created. Item order is all training needs; placeholder offsets aren't - carried.""" + attributed when first created. Each node gets the hashes/items/placeholders for exactly the + media it introduced, preserving vLLM multimodal-list alignment when those node sidecars are + later merged for bridge or training.""" if mmd is None or mmd.is_empty(): return cursors: dict[str, int] = {} @@ -371,6 +431,7 @@ def _attribute_mm( continue node_items: dict[str, list] = {} node_hashes: dict[str, list] = {} + node_placeholders: dict[str, list[PlaceholderRange]] = {} for part in content: modality = _part_modality(part) if modality is None: @@ -382,13 +443,20 @@ def _attribute_mm( continue items = mmd.mm_items.get(modality) or [] hashes = mmd.mm_hashes.get(modality) or [] + placeholders = mmd.mm_placeholders.get(modality) or [] if k < len(items): node_items.setdefault(modality, []).append(items[k]) if k < len(hashes): node_hashes.setdefault(modality, []).append(hashes[k]) - if node_items: - trace.nodes[node_id].multi_modal_data = MultiModalData( - mm_items=node_items, mm_hashes=node_hashes + if k < len(placeholders): + node_placeholders.setdefault(modality, []).append(placeholders[k]) + if node_items or node_hashes or node_placeholders: + trace.nodes[node_id].multi_modal_data = _validate_raw_mm_data( + MultiModalData( + mm_items=node_items, + mm_hashes=node_hashes, + mm_placeholders=node_placeholders, + ) ) diff --git a/verifiers/v1/interception/server.py b/verifiers/v1/interception/server.py index 5a4ad1c3f..98eaa590a 100644 --- a/verifiers/v1/interception/server.py +++ b/verifiers/v1/interception/server.py @@ -262,6 +262,7 @@ async def handle_request( # alias after parsing so the wire body does not survive model inference. request._read_bytes = None del raw + body = await session.ctx.client.prepare_request_body(dialect, body) logger.debug( "intercept %s: id=%s stream=%s", request.path, @@ -288,7 +289,9 @@ async def handle_request( and session.trace.num_turns == 0 ): if session.opening is None: - session.opening = await session.user("") + session.opening = await session.ctx.client.prepare_messages( + dialect, await session.user("") + ) body = dialect.extend(body, None, session.opening) prompt = [*prompt, *session.opening] # If the simulator ended at the open (its taskset's `@stop` now fires), the loop's @@ -383,6 +386,9 @@ async def handle_request( return _completion_response(completion) try: user_messages = await session.user(response.message.content or "") + user_messages = await session.ctx.client.prepare_messages( + dialect, user_messages + ) except RolloutError as e: return self._fail(session, dialect, e) except Exception as e: diff --git a/verifiers/v1/trace.py b/verifiers/v1/trace.py index 3aa7eff87..8ba6ee681 100644 --- a/verifiers/v1/trace.py +++ b/verifiers/v1/trace.py @@ -130,10 +130,12 @@ def logprobs(self) -> list[float]: @property def multi_modal_data(self) -> MultiModalData | None: - """The branch's multimodal sidecar — every node's images concatenated in path (token) - order. None when the branch has no images. Drives the training `mm_kwargs` (the renderer - items per modality); the per-token `mm_token_type_ids` come from the token ids, so no - placeholder offsets are carried. Never persisted (node mm is transient).""" + """The branch's multimodal sidecar — every node's images concatenated in path order. + + None when the branch has no images. The raw-image path carries lightweight descriptors + plus placeholder ranges, so downstream vLLM/training multimodal payloads can align hashes, + placeholders, and item refs without reprocessing images in the env worker. + """ merged = MultiModalData() found = False for node in self.nodes: @@ -145,6 +147,8 @@ def multi_modal_data(self) -> MultiModalData | None: merged.mm_items.setdefault(modality, []).extend(items) for modality, hashes in mmd.mm_hashes.items(): merged.mm_hashes.setdefault(modality, []).extend(hashes) + for modality, placeholders in mmd.mm_placeholders.items(): + merged.mm_placeholders.setdefault(modality, []).extend(placeholders) return merged if found else None @property diff --git a/verifiers/v1/types.py b/verifiers/v1/types.py index 67365788a..6a62a786d 100644 --- a/verifiers/v1/types.py +++ b/verifiers/v1/types.py @@ -220,8 +220,8 @@ class TurnTokens(StrictBaseModel): default=None, exclude=True ) is_content: list[bool] | None = Field(default=None, exclude=True) - # Transient carrier (excluded): the renderer's multimodal sidecar (image tensors + offsets), - # attributed per node by the turn's `commit`, then dropped — never persisted. + # Transient carrier (excluded): the renderer's multimodal sidecar (raw-image descriptors, + # hashes, and placeholder offsets), attributed per node by the turn's `commit`, then dropped. multi_modal_data: MultiModalData | None = Field(default=None, exclude=True) # Transient carrier (excluded): the MoE expert-routing data from `generate` (expert ids # per token), attributed per node by the turn's `commit` into `MessageNode.routed_experts`, diff --git a/verifiers/v1/utils/multimodal.py b/verifiers/v1/utils/multimodal.py new file mode 100644 index 000000000..83e9c4815 --- /dev/null +++ b/verifiers/v1/utils/multimodal.py @@ -0,0 +1,112 @@ +"""Multimodal ingress helpers for v1 training. + +The env worker stores raw images as run assets before messages enter the trace. +Messages then carry cheap ``file://`` refs, and the renderer/vLLM path decides +which refs must be sent to inference. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Any + + +@dataclass +class ImageOffloadStats: + images_rewritten: int = 0 + bytes_written: int = 0 + + def add(self, other: "ImageOffloadStats") -> None: + self.images_rewritten += other.images_rewritten + self.bytes_written += other.bytes_written + + +def _offload_image_url(url: object, image_dir: Path | None) -> tuple[str, int] | None: + try: + from renderers.mm_store import offload_image_to_run_assets + except ImportError as exc: # pragma: no cover - dependency-version guard + raise RuntimeError( + "Multimodal training requires a renderers version with raw image " + "asset offload support." + ) from exc + + return offload_image_to_run_assets(url, image_dir=image_dir) + + +def _image_source_url(source: Any) -> object: + if isinstance(source, dict): + return source.get("url") + return getattr(source, "url", None) + + +def _set_image_source_url(source: Any, url: str) -> None: + if isinstance(source, dict): + source["url"] = url + else: + source.url = url + + +def _require_file_image_url(source: Any) -> None: + url = _image_source_url(source) + if not isinstance(url, str) or not url.startswith("file://"): + raise RuntimeError( + "v1 multimodal training requires image_url entries to be offloaded " + "to file:// run image assets" + ) + + +def _rewrite_image_source(source: Any, image_dir: Path | None) -> ImageOffloadStats: + stats = ImageOffloadStats() + + result = _offload_image_url(_image_source_url(source), image_dir) + if result is not None: + new_url, nbytes = result + _set_image_source_url(source, new_url) + stats.images_rewritten += 1 + stats.bytes_written += nbytes + _require_file_image_url(source) + return stats + + +def offload_images_inplace( + value: Any, *, image_dir: Path | None = None +) -> ImageOffloadStats: + """Rewrite base64 image URLs reachable from ``value`` to run-asset refs. + + Handles OpenAI wire dicts/lists and the pydantic v1 message/content-part + models used by the trace. Non-image values and already-file-backed URLs are + left untouched. + """ + stats = ImageOffloadStats() + + if isinstance(value, dict): + if value.get("type") == "image_url": + source = value.get("image_url") + if source is not None: + stats.add(_rewrite_image_source(source, image_dir)) + for child in value.values(): + stats.add(offload_images_inplace(child, image_dir=image_dir)) + return stats + + if isinstance(value, list): + for child in value: + stats.add(offload_images_inplace(child, image_dir=image_dir)) + return stats + + if isinstance(value, tuple): + for child in value: + stats.add(offload_images_inplace(child, image_dir=image_dir)) + return stats + + if getattr(value, "type", None) == "image_url": + source = getattr(value, "image_url", None) + if source is not None: + stats.add(_rewrite_image_source(source, image_dir)) + return stats + + content = getattr(value, "content", None) + if isinstance(content, (list, tuple)): + stats.add(offload_images_inplace(content, image_dir=image_dir)) + + return stats From de37650b9b49ffd5541bd97cf4b0cd426c96c160 Mon Sep 17 00:00:00 2001 From: eligotts <78387377+eligotts@users.noreply.github.com> Date: Sat, 20 Jun 2026 07:41:29 +0000 Subject: [PATCH 2/9] Enforce strict raw multimodal descriptors --- tests/v1/test_graph.py | 41 +++++++++++++++--------- tests/v1/test_train_client_multimodal.py | 20 ++++++++++-- verifiers/v1/graph.py | 15 ++++++++- 3 files changed, 56 insertions(+), 20 deletions(-) diff --git a/tests/v1/test_graph.py b/tests/v1/test_graph.py index 6df5d4eaa..0d1cc1ede 100644 --- a/tests/v1/test_graph.py +++ b/tests/v1/test_graph.py @@ -9,6 +9,20 @@ from verifiers.v1.types import TurnTokens +def _qwen_item(grid, *, raw_image_id=None): + item = { + "kind": "prime_raw_mm_item", + "version": 1, + "modality": "image", + "family": "qwen_vl", + "layout_fingerprint": "f" * 32, + "payload": {"image_grid_thw": grid}, + } + if raw_image_id is not None: + item["raw_image_id"] = raw_image_id + return item + + def _response(message: vf.AssistantMessage) -> vf.Response: return vf.Response( id="", @@ -123,15 +137,7 @@ def test_raw_image_sidecar_attributed_round_trips_and_feeds_next_bridge(): mm = MultiModalData( mm_hashes={"image": ["abcd1234abcd1234"]}, mm_placeholders={"image": [PlaceholderRange(offset=2, length=4)]}, - mm_items={ - "image": [ - { - "image_grid_thw": [1, 2, 2], - "raw_image_id": "img.png", - "image_layout_fingerprint": "f" * 32, - } - ] - }, + mm_items={"image": [_qwen_item([1, 2, 2], raw_image_id="img.png")]}, ) graph.prepare_turn(trace, [user]).commit( @@ -182,7 +188,7 @@ def test_multimodal_sidecar_rejects_processed_image_payloads(): mm_items={ "image": [ { - "image_grid_thw": [1, 1, 1], + **_qwen_item([1, 1, 1]), "pixel_values": np.zeros((1, 2), dtype=np.float32), } ] @@ -198,12 +204,15 @@ def test_multimodal_sidecar_rejects_processed_image_payloads(): "mm_items": { "image": [ { - "image_grid_thw": { - "__nd__": True, - "dtype": "int64", - "shape": [3], - "data": b"\x00" * 24, - } + **_qwen_item([1, 1, 1]), + "payload": { + "image_grid_thw": { + "__nd__": True, + "dtype": "int64", + "shape": [3], + "data": b"\x00" * 24, + } + }, } ] }, diff --git a/tests/v1/test_train_client_multimodal.py b/tests/v1/test_train_client_multimodal.py index 30b33966e..ecf3ff0ca 100644 --- a/tests/v1/test_train_client_multimodal.py +++ b/tests/v1/test_train_client_multimodal.py @@ -15,6 +15,20 @@ DATA_URL = "data:image/png;base64,aGVsbG8=" +def _qwen_item(grid, *, raw_image_id=None): + item = { + "kind": "prime_raw_mm_item", + "version": 1, + "modality": "image", + "family": "qwen_vl", + "layout_fingerprint": "f" * 32, + "payload": {"image_grid_thw": grid}, + } + if raw_image_id is not None: + item["raw_image_id"] = raw_image_id + return item + + def test_offload_images_inplace_rewrites_wire_and_typed_messages(monkeypatch): def fake_offload(url, image_dir): assert image_dir is None @@ -88,7 +102,7 @@ async def test_train_client_bridges_multimodal_prompt_with_previous_sidecar( previous_mm = MultiModalData( mm_hashes={"image": ["a" * 16]}, mm_placeholders={"image": [PlaceholderRange(offset=1, length=2)]}, - mm_items={"image": [{"image_grid_thw": [1, 1, 2]}]}, + mm_items={"image": [_qwen_item([1, 1, 2])]}, ) trace = vf.Trace(task=vf.Task(idx=0, prompt="x")) graph.prepare_turn(trace, [image_msg]).commit( @@ -197,7 +211,7 @@ async def fake_generate(**kwargs): mm = MultiModalData( mm_hashes={"image": ["a" * 16]}, mm_placeholders={"image": [PlaceholderRange(offset=0, length=1)]}, - mm_items={"image": [{"image_grid_thw": [1, 1, 1]}]}, + mm_items={"image": [_qwen_item([1, 1, 1])]}, ) result = await _generate_with_image_ref_retry( @@ -231,7 +245,7 @@ async def fake_generate(**kwargs): mm = MultiModalData( mm_hashes={"image": ["a" * 16]}, mm_placeholders={"image": [PlaceholderRange(offset=0, length=1)]}, - mm_items={"image": [{"image_grid_thw": [1, 1, 1], "raw_image_id": "a.png"}]}, + mm_items={"image": [_qwen_item([1, 1, 1], raw_image_id="a.png")]}, ) with pytest.raises(MissingCache): diff --git a/verifiers/v1/graph.py b/verifiers/v1/graph.py index 122bb8329..945edd192 100644 --- a/verifiers/v1/graph.py +++ b/verifiers/v1/graph.py @@ -72,13 +72,26 @@ def _contains_array_payload(value: Any) -> bool: return False +_PROCESSED_MM_KEYS = {"pixel_values", "image_embeds", "image_features"} + + +def _contains_processed_payload_key(value: Any) -> bool: + if isinstance(value, dict): + return bool(_PROCESSED_MM_KEYS.intersection(value)) or any( + _contains_processed_payload_key(v) for v in value.values() + ) + if isinstance(value, (list, tuple)): + return any(_contains_processed_payload_key(v) for v in value) + return False + + def _validate_raw_mm_item(item: Any) -> dict[str, Any]: if not isinstance(item, dict): raise TypeError( "v1 multimodal sidecars must be raw image descriptor dicts, " f"got {type(item).__name__}" ) - if "pixel_values" in item: + if _contains_processed_payload_key(item): raise TypeError("v1 does not carry processed image payloads") if _contains_array_payload(item): raise TypeError( From e6b13dc2b64899c5178b6c2677d118d588b36b9f Mon Sep 17 00:00:00 2001 From: S1ro1 Date: Sat, 27 Jun 2026 00:18:41 +0000 Subject: [PATCH 3/9] Simplify v1 raw multimodal: drop the cache-miss retry subsystem Every image carries its ref, so no cache miss can occur. Removes _generate_with_image_ref_retry / _has_descriptor_only_images / _retryable_mm_error_type / _json_error_type / _RETRYABLE_MM_ERROR_TYPES; rollouts call generate() directly. Obsolete retry tests removed. Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/v1/test_train_client_multimodal.py | 68 ------------------ verifiers/v1/clients/train.py | 89 +++--------------------- 2 files changed, 9 insertions(+), 148 deletions(-) diff --git a/tests/v1/test_train_client_multimodal.py b/tests/v1/test_train_client_multimodal.py index ecf3ff0ca..99884c254 100644 --- a/tests/v1/test_train_client_multimodal.py +++ b/tests/v1/test_train_client_multimodal.py @@ -5,7 +5,6 @@ from verifiers.v1 import graph from verifiers.v1.clients.train import ( TrainClient, - _generate_with_image_ref_retry, ) from verifiers.v1.dialects import ChatDialect from verifiers.v1.types import TurnTokens @@ -187,74 +186,7 @@ async def fake_generate(**kwargs): assert bridged_mm.mm_hashes == previous_mm.mm_hashes assert bridged_mm.mm_placeholders["image"][0].length == 2 assert captured["generate_kwargs"]["multi_modal_data"] is bridged_mm - assert captured["generate_kwargs"]["materialize_all_image_refs"] is False -@pytest.mark.asyncio -async def test_generate_retries_missing_mm_cache_by_materializing_image_refs( - monkeypatch, -): - import renderers.client as renderer_client - - calls = [] - - class MissingCache(Exception): - body = {"error": {"type": "missing_mm_cache_item"}} - - async def fake_generate(**kwargs): - calls.append(kwargs["materialize_all_image_refs"]) - if len(calls) == 1: - raise MissingCache() - return {"ok": True} - - monkeypatch.setattr(renderer_client, "generate", fake_generate) - mm = MultiModalData( - mm_hashes={"image": ["a" * 16]}, - mm_placeholders={"image": [PlaceholderRange(offset=0, length=1)]}, - mm_items={"image": [_qwen_item([1, 1, 1])]}, - ) - - result = await _generate_with_image_ref_retry( - client=object(), - renderer=object(), - messages=[], - model="m", - multi_modal_data=mm, - ) - - assert result == {"ok": True} - assert calls == [False, True] -@pytest.mark.asyncio -async def test_generate_does_not_retry_missing_cache_for_raw_image_refs( - monkeypatch, -): - import renderers.client as renderer_client - - calls = [] - - class MissingCache(Exception): - body = {"error": {"type": "missing_mm_cache_item"}} - - async def fake_generate(**kwargs): - calls.append(kwargs["materialize_all_image_refs"]) - raise MissingCache() - - monkeypatch.setattr(renderer_client, "generate", fake_generate) - mm = MultiModalData( - mm_hashes={"image": ["a" * 16]}, - mm_placeholders={"image": [PlaceholderRange(offset=0, length=1)]}, - mm_items={"image": [_qwen_item([1, 1, 1], raw_image_id="a.png")]}, - ) - - with pytest.raises(MissingCache): - await _generate_with_image_ref_retry( - client=object(), - renderer=object(), - messages=[], - model="m", - multi_modal_data=mm, - ) - - assert calls == [False] diff --git a/verifiers/v1/clients/train.py b/verifiers/v1/clients/train.py index c19a69502..4f17cd8f4 100644 --- a/verifiers/v1/clients/train.py +++ b/verifiers/v1/clients/train.py @@ -174,87 +174,14 @@ def _is_valid_incremental_tail(messages: list[dict[str, Any]]) -> bool: return all(role == "tool" for role in roles) -_RETRYABLE_MM_ERROR_TYPES = frozenset({"missing_mm_cache_item"}) -def _json_error_type(value: Any) -> str | None: - if isinstance(value, str): - try: - value = json.loads(value) - except (TypeError, ValueError): - return None - if not isinstance(value, Mapping): - return None - error_type = value.get("error_type") - return error_type if isinstance(error_type, str) else None - - -def _retryable_mm_error_type(exc: Exception) -> str | None: - candidates: list[Any] = [] - body = getattr(exc, "body", None) - if body is not None: - candidates.append(body) - response = getattr(exc, "response", None) - if response is not None: - try: - candidates.append(response.json()) - except Exception: - text = getattr(response, "text", None) - if text is not None: - candidates.append(text) - - for payload in candidates: - if not isinstance(payload, Mapping): - error_type = _json_error_type(payload) - if error_type in _RETRYABLE_MM_ERROR_TYPES: - return error_type - continue - error = payload.get("error") - if isinstance(error, Mapping): - error_type = error.get("type") - if error_type in _RETRYABLE_MM_ERROR_TYPES: - return cast(str, error_type) - error_type = _json_error_type(error.get("message")) - if error_type in _RETRYABLE_MM_ERROR_TYPES: - return error_type - error_type = _json_error_type(payload) - if error_type in _RETRYABLE_MM_ERROR_TYPES: - return error_type - return None - - -def _has_descriptor_only_images(mm_data: MultiModalData | None) -> bool: - """True when a prompt carries prior image descriptors without raw refs.""" - if mm_data is None or mm_data.is_empty(): - return False - for item in mm_data.mm_items.get("image") or []: - if isinstance(item, Mapping) and not item.get("raw_image_id"): - return True - return False - - -async def _generate_with_image_ref_retry(**kwargs: Any) -> dict[str, Any]: - """Retry a missing-cache MM request by materializing all image refs once. - - The normal bridge path sends descriptor-only entries for prior images and - refs only for newly introduced images. If vLLM says its MM cache no longer - has a prior item, retry by asking the renderer to rebuild refs for every - image from the file-backed messages. This does not send processor outputs; - it only sends raw image refs again. - """ - has_descriptor_only = _has_descriptor_only_images(kwargs.get("multi_modal_data")) - try: - from renderers.client import generate - - return await generate(materialize_all_image_refs=False, **kwargs) - except Exception as exc: - if not has_descriptor_only or _retryable_mm_error_type(exc) is None: - raise - logger.warning( - "vLLM MM cache miss; retrying with all image refs materialized: %r", - exc, - ) - return await generate(materialize_all_image_refs=True, **kwargs) + + + + + + class TrainClient(Client): @@ -382,7 +309,9 @@ def bridge(): wire_messages = [message_to_wire(m) for m in prompt] try: - result = await _generate_with_image_ref_retry( + from renderers.client import generate + + result = await generate( client=self.openai, renderer=renderer, messages=wire_messages, From 4a7b37aca8035be424b4077e91bb7b94dc445f15 Mon Sep 17 00:00:00 2001 From: eligotts <78387377+eligotts@users.noreply.github.com> Date: Sun, 28 Jun 2026 07:01:46 +0000 Subject: [PATCH 4/9] feat: support inline multimodal images --- tests/v1/test_train_client_multimodal.py | 42 +++++++++-- verifiers/v1/clients/train.py | 6 +- verifiers/v1/utils/multimodal.py | 88 +++++++++++++++++------- 3 files changed, 104 insertions(+), 32 deletions(-) diff --git a/tests/v1/test_train_client_multimodal.py b/tests/v1/test_train_client_multimodal.py index 99884c254..85db79926 100644 --- a/tests/v1/test_train_client_multimodal.py +++ b/tests/v1/test_train_client_multimodal.py @@ -84,6 +84,44 @@ def test_offload_images_inplace_rejects_non_file_image_urls(monkeypatch): multimodal.offload_images_inplace(body) +def test_prepare_images_inplace_inline_preserves_data_urls(): + body = { + "messages": [ + { + "role": "user", + "content": [ + {"type": "image_url", "image_url": {"url": DATA_URL}}, + ], + } + ] + } + + stats = multimodal.prepare_images_inplace(body, storage="inline") + + assert stats.images_rewritten == 0 + assert stats.bytes_written == 0 + assert body["messages"][0]["content"][0]["image_url"]["url"] == DATA_URL + + +def test_prepare_images_inplace_inline_rejects_remote_urls(): + body = { + "messages": [ + { + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": {"url": "https://example.com/image.png"}, + } + ], + } + ] + } + + with pytest.raises(RuntimeError, match="data:image"): + multimodal.prepare_images_inplace(body, storage="inline") + + @pytest.mark.asyncio async def test_train_client_bridges_multimodal_prompt_with_previous_sidecar( monkeypatch, @@ -186,7 +224,3 @@ async def fake_generate(**kwargs): assert bridged_mm.mm_hashes == previous_mm.mm_hashes assert bridged_mm.mm_placeholders["image"][0].length == 2 assert captured["generate_kwargs"]["multi_modal_data"] is bridged_mm - - - - diff --git a/verifiers/v1/clients/train.py b/verifiers/v1/clients/train.py index 0bee10a93..28ed2038c 100644 --- a/verifiers/v1/clients/train.py +++ b/verifiers/v1/clients/train.py @@ -35,7 +35,7 @@ TurnTokens, Usage, ) -from verifiers.v1.utils.multimodal import offload_images_inplace +from verifiers.v1.utils.multimodal import prepare_images_inplace logger = logging.getLogger(__name__) @@ -212,7 +212,7 @@ def _renderer_pool( async def prepare_request_body(self, dialect: Dialect, body: dict) -> dict: if isinstance(dialect, ChatDialect): - stats = await asyncio.to_thread(offload_images_inplace, body) + stats = await asyncio.to_thread(prepare_images_inplace, body) if stats.images_rewritten: logger.info( "offloaded %d image(s) to run assets (%.1f MiB)", @@ -223,7 +223,7 @@ async def prepare_request_body(self, dialect: Dialect, body: dict) -> dict: async def prepare_messages(self, dialect: Dialect, messages: list) -> list: if isinstance(dialect, ChatDialect): - stats = await asyncio.to_thread(offload_images_inplace, messages) + stats = await asyncio.to_thread(prepare_images_inplace, messages) if stats.images_rewritten: logger.info( "offloaded %d simulator image(s) to run assets (%.1f MiB)", diff --git a/verifiers/v1/utils/multimodal.py b/verifiers/v1/utils/multimodal.py index 83e9c4815..8e6b3d1e0 100644 --- a/verifiers/v1/utils/multimodal.py +++ b/verifiers/v1/utils/multimodal.py @@ -1,9 +1,4 @@ -"""Multimodal ingress helpers for v1 training. - -The env worker stores raw images as run assets before messages enter the trace. -Messages then carry cheap ``file://`` refs, and the renderer/vLLM path decides -which refs must be sent to inference. -""" +"""Multimodal ingress helpers for v1 training.""" from __future__ import annotations @@ -34,6 +29,20 @@ def _offload_image_url(url: object, image_dir: Path | None) -> tuple[str, int] | return offload_image_to_run_assets(url, image_dir=image_dir) +def _image_storage_mode(storage: str | None) -> str: + if storage is not None: + mode = storage + else: + from renderers.mm_store import image_storage_mode + + mode = image_storage_mode() + if mode not in ("offload", "inline"): + raise ValueError( + f"multimodal image storage must be 'offload' or 'inline', got {mode!r}" + ) + return mode + + def _image_source_url(source: Any) -> object: if isinstance(source, dict): return source.get("url") @@ -56,57 +65,86 @@ def _require_file_image_url(source: Any) -> None: ) -def _rewrite_image_source(source: Any, image_dir: Path | None) -> ImageOffloadStats: +def _require_inline_image_url(source: Any) -> None: + url = _image_source_url(source) + if not isinstance(url, str): + raise RuntimeError( + "v1 inline multimodal training requires image_url.url strings" + ) + if url.startswith("file://"): + return + if url.startswith("data:image/") and ";base64," in url: + return + raise RuntimeError( + "v1 inline multimodal training requires image_url entries to be " + "data:image/...;base64,... or file:// refs" + ) + + +def _prepare_image_source( + source: Any, *, storage: str, image_dir: Path | None +) -> ImageOffloadStats: stats = ImageOffloadStats() - result = _offload_image_url(_image_source_url(source), image_dir) - if result is not None: - new_url, nbytes = result - _set_image_source_url(source, new_url) - stats.images_rewritten += 1 - stats.bytes_written += nbytes - _require_file_image_url(source) + if storage == "offload": + result = _offload_image_url(_image_source_url(source), image_dir) + if result is not None: + new_url, nbytes = result + _set_image_source_url(source, new_url) + stats.images_rewritten += 1 + stats.bytes_written += nbytes + _require_file_image_url(source) + else: + _require_inline_image_url(source) return stats -def offload_images_inplace( - value: Any, *, image_dir: Path | None = None +def prepare_images_inplace( + value: Any, *, storage: str | None = None, image_dir: Path | None = None ) -> ImageOffloadStats: - """Rewrite base64 image URLs reachable from ``value`` to run-asset refs. + """Prepare image URLs reachable from ``value`` according to the storage mode. Handles OpenAI wire dicts/lists and the pydantic v1 message/content-part - models used by the trace. Non-image values and already-file-backed URLs are - left untouched. + models used by the trace. """ + mode = _image_storage_mode(storage) stats = ImageOffloadStats() if isinstance(value, dict): if value.get("type") == "image_url": source = value.get("image_url") if source is not None: - stats.add(_rewrite_image_source(source, image_dir)) + stats.add( + _prepare_image_source(source, storage=mode, image_dir=image_dir) + ) for child in value.values(): - stats.add(offload_images_inplace(child, image_dir=image_dir)) + stats.add(prepare_images_inplace(child, storage=mode, image_dir=image_dir)) return stats if isinstance(value, list): for child in value: - stats.add(offload_images_inplace(child, image_dir=image_dir)) + stats.add(prepare_images_inplace(child, storage=mode, image_dir=image_dir)) return stats if isinstance(value, tuple): for child in value: - stats.add(offload_images_inplace(child, image_dir=image_dir)) + stats.add(prepare_images_inplace(child, storage=mode, image_dir=image_dir)) return stats if getattr(value, "type", None) == "image_url": source = getattr(value, "image_url", None) if source is not None: - stats.add(_rewrite_image_source(source, image_dir)) + stats.add(_prepare_image_source(source, storage=mode, image_dir=image_dir)) return stats content = getattr(value, "content", None) if isinstance(content, (list, tuple)): - stats.add(offload_images_inplace(content, image_dir=image_dir)) + stats.add(prepare_images_inplace(content, storage=mode, image_dir=image_dir)) return stats + + +def offload_images_inplace( + value: Any, *, image_dir: Path | None = None +) -> ImageOffloadStats: + return prepare_images_inplace(value, storage="offload", image_dir=image_dir) From 0b1d73fa1d148b48e2f8165a99c9b10a8777ef1c Mon Sep 17 00:00:00 2001 From: eligotts <78387377+eligotts@users.noreply.github.com> Date: Mon, 29 Jun 2026 06:08:29 +0000 Subject: [PATCH 5/9] Simplify v1 raw image offload path --- tests/v1/test_graph.py | 116 ------------ tests/v1/test_train_client_multimodal.py | 226 ----------------------- verifiers/v1/clients/train.py | 20 +- verifiers/v1/graph.py | 29 +-- verifiers/v1/utils/multimodal.py | 103 ++--------- 5 files changed, 29 insertions(+), 465 deletions(-) delete mode 100644 tests/v1/test_train_client_multimodal.py diff --git a/tests/v1/test_graph.py b/tests/v1/test_graph.py index 0d1cc1ede..a115778a2 100644 --- a/tests/v1/test_graph.py +++ b/tests/v1/test_graph.py @@ -1,28 +1,12 @@ import base64 import numpy as np -import pytest -from renderers.base import MultiModalData, PlaceholderRange import verifiers.v1 as vf from verifiers.v1 import graph from verifiers.v1.types import TurnTokens -def _qwen_item(grid, *, raw_image_id=None): - item = { - "kind": "prime_raw_mm_item", - "version": 1, - "modality": "image", - "family": "qwen_vl", - "layout_fingerprint": "f" * 32, - "payload": {"image_grid_thw": grid}, - } - if raw_image_id is not None: - item["raw_image_id"] = raw_image_id - return item - - def _response(message: vf.AssistantMessage) -> vf.Response: return vf.Response( id="", @@ -123,106 +107,6 @@ def test_routed_experts_none_when_absent(): assert trace.branches[-1].routed_experts is None -def test_raw_image_sidecar_attributed_round_trips_and_feeds_next_bridge(): - trace = vf.Trace(task=vf.Task(idx=0, prompt="x")) - user = vf.UserMessage( - content=[ - vf.ImageUrlContentPart( - image_url=vf.ImageUrlSource( - url="file:///data/outputs/run_abc/assets/images/img.png" - ) - ) - ] - ) - mm = MultiModalData( - mm_hashes={"image": ["abcd1234abcd1234"]}, - mm_placeholders={"image": [PlaceholderRange(offset=2, length=4)]}, - mm_items={"image": [_qwen_item([1, 2, 2], raw_image_id="img.png")]}, - ) - - graph.prepare_turn(trace, [user]).commit( - vf.Response( - id="a", - created=0, - model="t", - message=vf.AssistantMessage(content="a1"), - finish_reason="stop", - tokens=TurnTokens( - prompt_ids=[10, 11, 12], - completion_ids=[20], - message_spans=[(0, 2)], - multi_modal_data=mm, - ), - ) - ) - - node_mm = trace.nodes[0].multi_modal_data - assert node_mm is not None - assert node_mm.mm_items["image"][0]["raw_image_id"] == "img.png" - assert node_mm.mm_placeholders["image"][0].offset == 2 - - restored = type(trace).model_validate(trace.model_dump()) - restored_mm = restored.nodes[0].multi_modal_data - assert restored_mm is not None - assert restored_mm.mm_items == node_mm.mm_items - assert restored_mm.mm_placeholders["image"][0].length == 4 - - turn = graph.prepare_turn( - trace, - [user, vf.AssistantMessage(content="a1"), vf.UserMessage(content="next")], - ) - prev_mm = turn.previous_multi_modal_data() - assert prev_mm is not None - assert prev_mm.mm_hashes == mm.mm_hashes - assert prev_mm.mm_items == mm.mm_items - assert prev_mm.mm_placeholders["image"][0].offset == 2 - assert trace.branches[-1].multi_modal_data is not None - - -def test_multimodal_sidecar_rejects_processed_image_payloads(): - with pytest.raises(TypeError, match="processed image payloads"): - graph.MessageNode( - message=vf.UserMessage(content="image"), - multi_modal_data=MultiModalData( - mm_hashes={"image": ["abcd1234abcd1234"]}, - mm_items={ - "image": [ - { - **_qwen_item([1, 1, 1]), - "pixel_values": np.zeros((1, 2), dtype=np.float32), - } - ] - }, - ), - ) - - old_wire_node = { - "message": {"role": "user", "content": "image"}, - "multi_modal_data": { - "mm_hashes": {"image": ["abcd1234abcd1234"]}, - "mm_placeholders": {}, - "mm_items": { - "image": [ - { - **_qwen_item([1, 1, 1]), - "payload": { - "image_grid_thw": { - "__nd__": True, - "dtype": "int64", - "shape": [3], - "data": b"\x00" * 24, - } - }, - } - ] - }, - }, - } - - with pytest.raises(TypeError, match="raw image descriptors"): - graph.MessageNode.model_validate(old_wire_node) - - def test_tool_call_hash_matches_v0_content_and_arguments_normalization(): left = vf.AssistantMessage( content=None, diff --git a/tests/v1/test_train_client_multimodal.py b/tests/v1/test_train_client_multimodal.py deleted file mode 100644 index 85db79926..000000000 --- a/tests/v1/test_train_client_multimodal.py +++ /dev/null @@ -1,226 +0,0 @@ -import pytest -from renderers.base import MultiModalData, PlaceholderRange, RenderedTokens - -import verifiers.v1 as vf -from verifiers.v1 import graph -from verifiers.v1.clients.train import ( - TrainClient, -) -from verifiers.v1.dialects import ChatDialect -from verifiers.v1.types import TurnTokens -from verifiers.v1.utils import multimodal - - -DATA_URL = "data:image/png;base64,aGVsbG8=" - - -def _qwen_item(grid, *, raw_image_id=None): - item = { - "kind": "prime_raw_mm_item", - "version": 1, - "modality": "image", - "family": "qwen_vl", - "layout_fingerprint": "f" * 32, - "payload": {"image_grid_thw": grid}, - } - if raw_image_id is not None: - item["raw_image_id"] = raw_image_id - return item - - -def test_offload_images_inplace_rewrites_wire_and_typed_messages(monkeypatch): - def fake_offload(url, image_dir): - assert image_dir is None - if url == DATA_URL: - return "file:///tmp/run/assets/images/hello.png", 5 - return None - - monkeypatch.setattr(multimodal, "_offload_image_url", fake_offload) - - body = { - "messages": [ - { - "role": "user", - "content": [ - {"type": "text", "text": "look"}, - {"type": "image_url", "image_url": {"url": DATA_URL}}, - ], - } - ] - } - typed = vf.UserMessage( - content=[vf.ImageUrlContentPart(image_url=vf.ImageUrlSource(url=DATA_URL))] - ) - - stats = multimodal.offload_images_inplace([body, typed]) - - assert stats.images_rewritten == 2 - assert stats.bytes_written == 10 - assert body["messages"][0]["content"][1]["image_url"]["url"] == ( - "file:///tmp/run/assets/images/hello.png" - ) - assert isinstance(typed.content, list) - assert typed.content[0].image_url.url == "file:///tmp/run/assets/images/hello.png" - - -def test_offload_images_inplace_rejects_non_file_image_urls(monkeypatch): - monkeypatch.setattr(multimodal, "_offload_image_url", lambda *_: None) - - body = { - "messages": [ - { - "role": "user", - "content": [ - { - "type": "image_url", - "image_url": {"url": "https://example.com/image.png"}, - } - ], - } - ] - } - - with pytest.raises(RuntimeError, match="file:// run image assets"): - multimodal.offload_images_inplace(body) - - -def test_prepare_images_inplace_inline_preserves_data_urls(): - body = { - "messages": [ - { - "role": "user", - "content": [ - {"type": "image_url", "image_url": {"url": DATA_URL}}, - ], - } - ] - } - - stats = multimodal.prepare_images_inplace(body, storage="inline") - - assert stats.images_rewritten == 0 - assert stats.bytes_written == 0 - assert body["messages"][0]["content"][0]["image_url"]["url"] == DATA_URL - - -def test_prepare_images_inplace_inline_rejects_remote_urls(): - body = { - "messages": [ - { - "role": "user", - "content": [ - { - "type": "image_url", - "image_url": {"url": "https://example.com/image.png"}, - } - ], - } - ] - } - - with pytest.raises(RuntimeError, match="data:image"): - multimodal.prepare_images_inplace(body, storage="inline") - - -@pytest.mark.asyncio -async def test_train_client_bridges_multimodal_prompt_with_previous_sidecar( - monkeypatch, -): - import renderers.client as renderer_client - - captured = {} - image_msg = vf.UserMessage( - content=[ - vf.ImageUrlContentPart( - image_url=vf.ImageUrlSource(url="file:///run/assets/images/a.png") - ) - ] - ) - previous_mm = MultiModalData( - mm_hashes={"image": ["a" * 16]}, - mm_placeholders={"image": [PlaceholderRange(offset=1, length=2)]}, - mm_items={"image": [_qwen_item([1, 1, 2])]}, - ) - trace = vf.Trace(task=vf.Task(idx=0, prompt="x")) - graph.prepare_turn(trace, [image_msg]).commit( - vf.Response( - id="a", - created=0, - model="t", - message=vf.AssistantMessage(content="a1"), - finish_reason="stop", - tokens=TurnTokens( - prompt_ids=[10, 11], - completion_ids=[20], - message_spans=[(0, 1)], - multi_modal_data=previous_mm, - ), - ) - ) - next_msg = vf.UserMessage(content="next") - turn = graph.prepare_turn( - trace, [image_msg, vf.AssistantMessage(content="a1"), next_msg] - ) - - class FakeRenderer: - is_multimodal = True - - def bridge_to_next_turn( - self, - previous_prompt_ids, - previous_completion_ids, - new_messages, - *, - tools=None, - previous_multi_modal_data=None, - ): - captured["previous_prompt_ids"] = previous_prompt_ids - captured["previous_completion_ids"] = previous_completion_ids - captured["new_messages"] = new_messages - captured["previous_multi_modal_data"] = previous_multi_modal_data - return RenderedTokens( - token_ids=[10, 11, 20, 30, 31], - message_indices=[-1, -1, -1, 0, -1], - sampled_mask=[False] * 5, - is_content=[False] * 5, - message_roles=["user"], - multi_modal_data=previous_multi_modal_data, - ) - - async def fake_maybe_offload(renderer, fn): - return fn() - - async def fake_generate(**kwargs): - captured["generate_kwargs"] = kwargs - return { - "request_id": "r", - "finish_reason": "stop", - "content": "done", - "prompt_ids": kwargs["prompt_ids"], - "completion_ids": [99], - "completion_logprobs": [-0.5], - "prompt_attribution": kwargs["prompt_attribution"], - "multi_modal_data": kwargs["multi_modal_data"], - } - - monkeypatch.setattr(renderer_client, "_maybe_offload", fake_maybe_offload) - monkeypatch.setattr(renderer_client, "generate", fake_generate) - - client = TrainClient(openai=object()) - client._pool = FakeRenderer() - response = await client.get_response( - ChatDialect(), - {"messages": []}, - "model", - vf.SamplingConfig(max_tokens=1), - session_id="trace", - turn=turn, - ) - - assert response.message.content == "done" - assert captured["previous_prompt_ids"] == [10, 11] - assert captured["previous_completion_ids"] == [20] - bridged_mm = captured["previous_multi_modal_data"] - assert bridged_mm.mm_hashes == previous_mm.mm_hashes - assert bridged_mm.mm_placeholders["image"][0].length == 2 - assert captured["generate_kwargs"]["multi_modal_data"] is bridged_mm diff --git a/verifiers/v1/clients/train.py b/verifiers/v1/clients/train.py index 28ed2038c..694393014 100644 --- a/verifiers/v1/clients/train.py +++ b/verifiers/v1/clients/train.py @@ -10,7 +10,6 @@ import asyncio import json -import logging from collections.abc import Mapping from typing import Any @@ -38,9 +37,6 @@ from verifiers.v1.utils.multimodal import prepare_images_inplace -logger = logging.getLogger(__name__) - - def tool_to_wire(tool: Tool) -> dict: """A vf tool -> the OpenAI chat wire dict (the renderer's generate request).""" function: dict = { @@ -212,24 +208,12 @@ def _renderer_pool( async def prepare_request_body(self, dialect: Dialect, body: dict) -> dict: if isinstance(dialect, ChatDialect): - stats = await asyncio.to_thread(prepare_images_inplace, body) - if stats.images_rewritten: - logger.info( - "offloaded %d image(s) to run assets (%.1f MiB)", - stats.images_rewritten, - stats.bytes_written / (1024.0 * 1024.0), - ) + await asyncio.to_thread(prepare_images_inplace, body) return body async def prepare_messages(self, dialect: Dialect, messages: list) -> list: if isinstance(dialect, ChatDialect): - stats = await asyncio.to_thread(prepare_images_inplace, messages) - if stats.images_rewritten: - logger.info( - "offloaded %d simulator image(s) to run assets (%.1f MiB)", - stats.images_rewritten, - stats.bytes_written / (1024.0 * 1024.0), - ) + await asyncio.to_thread(prepare_images_inplace, messages) return messages async def get_response( diff --git a/verifiers/v1/graph.py b/verifiers/v1/graph.py index 945edd192..59256a0f4 100644 --- a/verifiers/v1/graph.py +++ b/verifiers/v1/graph.py @@ -60,28 +60,16 @@ def _decode_ndarray(d: dict) -> np.ndarray: return np.frombuffer(d["data"], dtype=np.dtype(d["dtype"])).reshape(d["shape"]) -def _contains_array_payload(value: Any) -> bool: - if isinstance(value, np.ndarray): - return True - if isinstance(value, dict): - return bool(value.get("__nd__")) or any( - _contains_array_payload(v) for v in value.values() - ) - if isinstance(value, (list, tuple)): - return any(_contains_array_payload(v) for v in value) - return False - - -_PROCESSED_MM_KEYS = {"pixel_values", "image_embeds", "image_features"} +_PROCESSED_MM_KEYS = frozenset({"pixel_values", "image_embeds", "image_features"}) -def _contains_processed_payload_key(value: Any) -> bool: +def _contains_processed_mm_key(value: Any) -> bool: if isinstance(value, dict): return bool(_PROCESSED_MM_KEYS.intersection(value)) or any( - _contains_processed_payload_key(v) for v in value.values() + _contains_processed_mm_key(v) for v in value.values() ) if isinstance(value, (list, tuple)): - return any(_contains_processed_payload_key(v) for v in value) + return any(_contains_processed_mm_key(v) for v in value) return False @@ -91,12 +79,13 @@ def _validate_raw_mm_item(item: Any) -> dict[str, Any]: "v1 multimodal sidecars must be raw image descriptor dicts, " f"got {type(item).__name__}" ) - if _contains_processed_payload_key(item): - raise TypeError("v1 does not carry processed image payloads") - if _contains_array_payload(item): + if _contains_processed_mm_key(item): raise TypeError( - "v1 multimodal sidecars must be raw image descriptors, not arrays" + "v1 multimodal sidecars must be raw image descriptors, " + "not processed multimodal payloads" ) + if not isinstance(item.get("raw_image_id"), str) or not item["raw_image_id"]: + raise ValueError("v1 multimodal sidecars require raw_image_id") return dict(item) diff --git a/verifiers/v1/utils/multimodal.py b/verifiers/v1/utils/multimodal.py index 8e6b3d1e0..154904048 100644 --- a/verifiers/v1/utils/multimodal.py +++ b/verifiers/v1/utils/multimodal.py @@ -2,22 +2,11 @@ from __future__ import annotations -from dataclasses import dataclass from pathlib import Path from typing import Any -@dataclass -class ImageOffloadStats: - images_rewritten: int = 0 - bytes_written: int = 0 - - def add(self, other: "ImageOffloadStats") -> None: - self.images_rewritten += other.images_rewritten - self.bytes_written += other.bytes_written - - -def _offload_image_url(url: object, image_dir: Path | None) -> tuple[str, int] | None: +def _offload_image_url(url: object, image_dir: Path | None) -> str | None: try: from renderers.mm_store import offload_image_to_run_assets except ImportError as exc: # pragma: no cover - dependency-version guard @@ -29,20 +18,6 @@ def _offload_image_url(url: object, image_dir: Path | None) -> tuple[str, int] | return offload_image_to_run_assets(url, image_dir=image_dir) -def _image_storage_mode(storage: str | None) -> str: - if storage is not None: - mode = storage - else: - from renderers.mm_store import image_storage_mode - - mode = image_storage_mode() - if mode not in ("offload", "inline"): - raise ValueError( - f"multimodal image storage must be 'offload' or 'inline', got {mode!r}" - ) - return mode - - def _image_source_url(source: Any) -> object: if isinstance(source, dict): return source.get("url") @@ -65,86 +40,44 @@ def _require_file_image_url(source: Any) -> None: ) -def _require_inline_image_url(source: Any) -> None: - url = _image_source_url(source) - if not isinstance(url, str): - raise RuntimeError( - "v1 inline multimodal training requires image_url.url strings" - ) - if url.startswith("file://"): - return - if url.startswith("data:image/") and ";base64," in url: - return - raise RuntimeError( - "v1 inline multimodal training requires image_url entries to be " - "data:image/...;base64,... or file:// refs" - ) - - -def _prepare_image_source( - source: Any, *, storage: str, image_dir: Path | None -) -> ImageOffloadStats: - stats = ImageOffloadStats() - - if storage == "offload": - result = _offload_image_url(_image_source_url(source), image_dir) - if result is not None: - new_url, nbytes = result - _set_image_source_url(source, new_url) - stats.images_rewritten += 1 - stats.bytes_written += nbytes - _require_file_image_url(source) - else: - _require_inline_image_url(source) - return stats +def _prepare_image_source(source: Any, *, image_dir: Path | None) -> None: + result = _offload_image_url(_image_source_url(source), image_dir) + if result is not None: + _set_image_source_url(source, result) + _require_file_image_url(source) -def prepare_images_inplace( - value: Any, *, storage: str | None = None, image_dir: Path | None = None -) -> ImageOffloadStats: - """Prepare image URLs reachable from ``value`` according to the storage mode. +def prepare_images_inplace(value: Any, *, image_dir: Path | None = None) -> None: + """Offload image URLs reachable from ``value`` to run image assets. Handles OpenAI wire dicts/lists and the pydantic v1 message/content-part models used by the trace. """ - mode = _image_storage_mode(storage) - stats = ImageOffloadStats() - if isinstance(value, dict): if value.get("type") == "image_url": source = value.get("image_url") if source is not None: - stats.add( - _prepare_image_source(source, storage=mode, image_dir=image_dir) - ) + _prepare_image_source(source, image_dir=image_dir) for child in value.values(): - stats.add(prepare_images_inplace(child, storage=mode, image_dir=image_dir)) - return stats + prepare_images_inplace(child, image_dir=image_dir) + return if isinstance(value, list): for child in value: - stats.add(prepare_images_inplace(child, storage=mode, image_dir=image_dir)) - return stats + prepare_images_inplace(child, image_dir=image_dir) + return if isinstance(value, tuple): for child in value: - stats.add(prepare_images_inplace(child, storage=mode, image_dir=image_dir)) - return stats + prepare_images_inplace(child, image_dir=image_dir) + return if getattr(value, "type", None) == "image_url": source = getattr(value, "image_url", None) if source is not None: - stats.add(_prepare_image_source(source, storage=mode, image_dir=image_dir)) - return stats + _prepare_image_source(source, image_dir=image_dir) + return content = getattr(value, "content", None) if isinstance(content, (list, tuple)): - stats.add(prepare_images_inplace(content, storage=mode, image_dir=image_dir)) - - return stats - - -def offload_images_inplace( - value: Any, *, image_dir: Path | None = None -) -> ImageOffloadStats: - return prepare_images_inplace(value, storage="offload", image_dir=image_dir) + prepare_images_inplace(content, image_dir=image_dir) From 2d4969b8550ad37662c38771ad3dbc57851c2bab Mon Sep 17 00:00:00 2001 From: eligotts <78387377+eligotts@users.noreply.github.com> Date: Mon, 29 Jun 2026 17:41:58 +0000 Subject: [PATCH 6/9] Preserve v1 node usage in trace dumps --- verifiers/v1/graph.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/verifiers/v1/graph.py b/verifiers/v1/graph.py index 59256a0f4..225a891cc 100644 --- a/verifiers/v1/graph.py +++ b/verifiers/v1/graph.py @@ -143,10 +143,10 @@ class MessageNode(StrictBaseModel): optional run-image refs), not image processor tensors. `Branch.multi_modal_data` concatenates them along the path for the trainer. Old processed-payload sidecars are rejected. """ - usage: Usage | None = Field(default=None, exclude=True) - """Provider-reported token usage for this message's response (assistant nodes). Transient - (excluded from wire/disk); lets the live dashboard show token counts even when the endpoint - returns no token ids (so `token_ids` is empty).""" + usage: Usage | None = None + """Provider-reported token usage for this message's response (assistant nodes). Preserved on + the wire and on disk so dashboards can show token counts and cost even when the endpoint + returns no token ids.""" routed_experts: np.ndarray | None = None """This node's slice of the MoE expert-routing array — uint8 `[len(token_ids), layers, top_k]`, the expert ids inference selected for exactly this node's tokens. Attributed from From 7ade0b2e5efb2f539eb8d591f44872582b160a02 Mon Sep 17 00:00:00 2001 From: eligotts <78387377+eligotts@users.noreply.github.com> Date: Mon, 29 Jun 2026 17:52:04 +0000 Subject: [PATCH 7/9] Surface request preparation failures on traces --- verifiers/v1/interception/server.py | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/verifiers/v1/interception/server.py b/verifiers/v1/interception/server.py index 2a633d2a7..d391cb7f9 100644 --- a/verifiers/v1/interception/server.py +++ b/verifiers/v1/interception/server.py @@ -37,6 +37,7 @@ from verifiers.v1.dialects.base import is_sse_done_event from verifiers.v1 import graph from verifiers.v1.errors import ( + InterceptionError, OverlongPromptError, RolloutError, TasksetError, @@ -262,7 +263,18 @@ async def handle_request( # alias after parsing so the wire body does not survive model inference. request._read_bytes = None del raw - body = await session.ctx.client.prepare_request_body(dialect, body) + try: + body = await session.ctx.client.prepare_request_body(dialect, body) + except RolloutError as e: + return self._fail(session, dialect, e) + except Exception as e: + return self._fail( + session, + dialect, + InterceptionError( + f"request preparation failed: {type(e).__name__}: {e}" + ), + ) logger.debug( "intercept %s: id=%s stream=%s", request.path, @@ -289,9 +301,18 @@ async def handle_request( and session.trace.num_turns == 0 ): if session.opening is None: - session.opening = await session.ctx.client.prepare_messages( - dialect, await session.user("") - ) + try: + session.opening = await session.ctx.client.prepare_messages( + dialect, await session.user("") + ) + except RolloutError as e: + return self._fail(session, dialect, e) + except Exception as e: + return self._fail( + session, + dialect, + UserError(f"user simulator failed: {type(e).__name__}: {e}"), + ) body = dialect.extend(body, None, session.opening) prompt = [*prompt, *session.opening] # If the simulator ended at the open (its taskset's `@stop` now fires), the loop's From 0dc57a149390b11aa879fc3b7a0f69b4f8f59b21 Mon Sep 17 00:00:00 2001 From: eligotts <78387377+eligotts@users.noreply.github.com> Date: Mon, 29 Jun 2026 19:43:15 +0000 Subject: [PATCH 8/9] Require raw image URIs in v1 sidecars --- verifiers/v1/graph.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/verifiers/v1/graph.py b/verifiers/v1/graph.py index 225a891cc..a8104d537 100644 --- a/verifiers/v1/graph.py +++ b/verifiers/v1/graph.py @@ -84,8 +84,8 @@ def _validate_raw_mm_item(item: Any) -> dict[str, Any]: "v1 multimodal sidecars must be raw image descriptors, " "not processed multimodal payloads" ) - if not isinstance(item.get("raw_image_id"), str) or not item["raw_image_id"]: - raise ValueError("v1 multimodal sidecars require raw_image_id") + if not isinstance(item.get("raw_image_uri"), str) or not item["raw_image_uri"]: + raise ValueError("v1 multimodal sidecars require raw_image_uri") return dict(item) From 18b0fbea5cc8ec37d447305f22ce8ec5c4b765ef Mon Sep 17 00:00:00 2001 From: eligotts <78387377+eligotts@users.noreply.github.com> Date: Mon, 29 Jun 2026 22:22:55 +0000 Subject: [PATCH 9/9] Share multimodal image preparation across clients --- verifiers/clients/renderer_client.py | 3 +++ verifiers/types.py | 4 ++-- verifiers/{v1 => }/utils/multimodal.py | 19 +++++++++++++------ verifiers/v1/clients/train.py | 2 +- verifiers/v1/legacy.py | 26 ++++++++++++++++++++++---- 5 files changed, 41 insertions(+), 13 deletions(-) rename verifiers/{v1 => }/utils/multimodal.py (80%) diff --git a/verifiers/clients/renderer_client.py b/verifiers/clients/renderer_client.py index d7a58d0e8..5881482f2 100644 --- a/verifiers/clients/renderer_client.py +++ b/verifiers/clients/renderer_client.py @@ -8,6 +8,7 @@ concurrent rollouts tokenize in parallel instead of blocking the event loop. """ +import asyncio import json import threading from collections.abc import Mapping @@ -56,6 +57,7 @@ UserMessage, ) from verifiers.utils.client_utils import setup_openai_client +from verifiers.utils.multimodal import prepare_images_inplace # Module-level bridge counters. Incremented by every RendererClient instance # that tries to stitch a multi-turn prompt; callers (e.g. prime-rl's @@ -472,6 +474,7 @@ def _get_renderer_or_pool( async def to_native_prompt( self, messages: Messages ) -> tuple[list[RendererMessage], dict]: + await asyncio.to_thread(prepare_images_inplace, messages) return ( _attach_tool_call_names([_to_renderer_message(m) for m in messages]), {}, diff --git a/verifiers/types.py b/verifiers/types.py index feac3b168..80ad8c137 100644 --- a/verifiers/types.py +++ b/verifiers/types.py @@ -213,7 +213,7 @@ class ResponseTokens(CustomBaseModel): completion_logprobs: list[float] routed_experts: RoutedExpertsPayload | None = None # Renderer-emitted multimodal sidecar (renderers.base.MultiModalData) - # carrying processed pixel_values / placeholder ranges per modality. + # carrying raw image descriptors / placeholder ranges per modality. # Populated by the renderer client when the rollout went through a # multimodal-aware renderer; ``None`` otherwise. Stored as ``Any`` to # avoid a hard import dependency on ``renderers`` at this layer. @@ -260,7 +260,7 @@ class TrajectoryStepTokens(TypedDict): is_truncated: bool routed_experts: RoutedExpertsPayload | None # Renderer-emitted multimodal sidecar (renderers.base.MultiModalData) - # carrying processed pixel_values / placeholder ranges per modality. + # carrying raw image descriptors / placeholder ranges per modality. # ``NotRequired`` because text-only rollouts (and non-renderer client # types) never populate it. multi_modal_data: NotRequired[Any] diff --git a/verifiers/v1/utils/multimodal.py b/verifiers/utils/multimodal.py similarity index 80% rename from verifiers/v1/utils/multimodal.py rename to verifiers/utils/multimodal.py index 154904048..d9b9c6939 100644 --- a/verifiers/v1/utils/multimodal.py +++ b/verifiers/utils/multimodal.py @@ -1,15 +1,22 @@ -"""Multimodal ingress helpers for v1 training.""" +"""Multimodal ingress helpers for renderer-backed training.""" from __future__ import annotations +from importlib import import_module from pathlib import Path from typing import Any def _offload_image_url(url: object, image_dir: Path | None) -> str | None: try: - from renderers.mm_store import offload_image_to_run_assets - except ImportError as exc: # pragma: no cover - dependency-version guard + offload_image_to_run_assets = getattr( + import_module("renderers.mm_store"), + "offload_image_to_run_assets", + ) + except ( + ImportError, + AttributeError, + ) as exc: # pragma: no cover - dependency-version guard raise RuntimeError( "Multimodal training requires a renderers version with raw image " "asset offload support." @@ -35,7 +42,7 @@ def _require_file_image_url(source: Any) -> None: url = _image_source_url(source) if not isinstance(url, str) or not url.startswith("file://"): raise RuntimeError( - "v1 multimodal training requires image_url entries to be offloaded " + "multimodal training requires image_url entries to be offloaded " "to file:// run image assets" ) @@ -50,8 +57,8 @@ def _prepare_image_source(source: Any, *, image_dir: Path | None) -> None: def prepare_images_inplace(value: Any, *, image_dir: Path | None = None) -> None: """Offload image URLs reachable from ``value`` to run image assets. - Handles OpenAI wire dicts/lists and the pydantic v1 message/content-part - models used by the trace. + Handles OpenAI wire dicts/lists and the pydantic v0/v1 message/content-part + models used by trajectories and traces. """ if isinstance(value, dict): if value.get("type") == "image_url": diff --git a/verifiers/v1/clients/train.py b/verifiers/v1/clients/train.py index 694393014..36b612240 100644 --- a/verifiers/v1/clients/train.py +++ b/verifiers/v1/clients/train.py @@ -34,7 +34,7 @@ TurnTokens, Usage, ) -from verifiers.v1.utils.multimodal import prepare_images_inplace +from verifiers.utils.multimodal import prepare_images_inplace def tool_to_wire(tool: Tool) -> dict: diff --git a/verifiers/v1/legacy.py b/verifiers/v1/legacy.py index 02410b012..370a2a059 100644 --- a/verifiers/v1/legacy.py +++ b/verifiers/v1/legacy.py @@ -13,6 +13,7 @@ v1 stays importable without the v0 package present. """ +import asyncio import contextlib import logging from pathlib import Path @@ -162,6 +163,7 @@ def _to_v1_tokens(raw: Any) -> TurnTokens | None: prompt_ids=list(raw.get("prompt_ids") or []), completion_ids=list(raw.get("completion_ids") or []), completion_logprobs=list(raw.get("completion_logprobs") or []), + multi_modal_data=raw.get("multi_modal_data"), ) @@ -360,6 +362,20 @@ def _v0_client(self, client_config: ClientConfig, model: str): self._clients[key] = resolve_client(v0_config) return self._clients[key] + async def _state_output_with_live_trajectory(self, state: Any) -> dict: + """Build v0 rollout output metadata while preserving live trajectory sidecars. + + The JSON save path deltas ``tokens.multi_modal_data`` to avoid repeated + cumulative multimodal sidecars. Trace reconstruction needs the live, + cumulative sidecar for each turn so image descriptors align with the + full prompt the renderer saw. + """ + from verifiers.utils.save_utils import state_to_output + + out = await asyncio.to_thread(state_to_output, state, []) + out["trajectory"] = state.get("trajectory", []) + return out + async def _run_v0( self, task_idx: int, @@ -368,13 +384,13 @@ async def _run_v0( sampling: SamplingConfig, ) -> dict: client = self._v0_client(client_config, model) - return await self.env.run_rollout( + state = await self.env._run_rollout_state( input=dict(self.dataset[task_idx]), client=client, model=model, sampling_args=sampling.model_dump(exclude_none=True), - state_columns=["trajectory"], ) + return await self._state_output_with_live_trajectory(state) async def _run_rollout(self, req: RunRolloutRequest) -> RunRolloutResponse: out = await self._run_v0(req.task_idx, req.client, req.model, req.sampling) @@ -385,12 +401,14 @@ async def _run_rollout(self, req: RunRolloutRequest) -> RunRolloutResponse: async def _run_group(self, req: RunGroupRequest) -> RunGroupResponse: client = self._v0_client(req.client, req.model) # run_group scores the rollouts together so group/preference reward funcs apply. - outs = await self.env.run_group( + states = await self.env._run_group_states( group_inputs=[dict(self.dataset[req.task_idx]) for _ in range(req.n)], client=client, model=req.model, sampling_args=req.sampling.model_dump(exclude_none=True), - state_columns=["trajectory"], + ) + outs = await asyncio.gather( + *(self._state_output_with_live_trajectory(state) for state in states) ) traces = [ rollout_output_to_trace(out, req.task_idx).model_dump() for out in outs