Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion core/app/biz/task_runtime/executors/skill_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,15 @@

from __future__ import annotations

import logging
import os
import shutil
from functools import cache
from pathlib import Path
from typing import TYPE_CHECKING, Any

from app.llmhubs.hub import DEFAULT_MODEL_KEY

from ..artifact_store import ArtifactStore
from ..models import ArtifactRef, ErrorClass, SandboxLeaseRef, TaskResult, TaskRun, TaskStatus
from ..naming import sanitize_dns_label
Expand All @@ -70,6 +74,11 @@
_RESULT_DIR_NAME = "skill-results"
_STDOUT_HEAD = 1000
_STDERR_HEAD = 500
_LLMHUB_MODEL_PARAMETER_NAME = "llmhub_model"
_ANDROID_TESTER_SKILL_NAME = "android-tester"
_ANDROID_TESTER_BLOCKED_EXIT_CODE = 2

_LOGGER = logging.getLogger(__name__)

_ANDROID_SANDBOX_PARAMETER_ALIASES = ("device_id", "android_device_id", "adb_endpoint")
_TASK_CONTEXT_PARAMETER_NAMES = frozenset(
Expand Down Expand Up @@ -188,6 +197,8 @@ def _build_result(
timed_out = outcome.return_code == -1 and "timed out" in outcome.system_error.lower()
if timed_out:
status = TaskStatus.TIMED_OUT
elif _is_android_tester_blocked_exit(run, outcome):
status = TaskStatus.BLOCKED
elif outcome.system_error or outcome.return_code != 0:
status = TaskStatus.FAILED
else:
Expand All @@ -201,6 +212,9 @@ def _build_result(
elif status == TaskStatus.FAILED:
error_class = ErrorClass.TRANSIENT if outcome.system_error else ErrorClass.SKILL_RUNTIME
error_message = outcome.system_error or stderr or f"{label} exited with {outcome.return_code}"
elif status == TaskStatus.BLOCKED:
error_class = ErrorClass.SKILL_RUNTIME
error_message = stderr or f"{label} exited blocked"

artifacts = self._collect_artifacts(run.run_id, result_root, _workspace_dir(run))
summary = self._build_summary(label, status, stdout, stderr)
Expand All @@ -225,7 +239,8 @@ def _build_result(
def _build_summary(label: str, status: TaskStatus, stdout: str, stderr: str) -> str:
# Fold the action's output into the summary so the caller sees it via the
# batch digest (which only carries ``summary``), not just a status label.
lines = [f"{label} {'finished' if status == TaskStatus.COMPLETED else 'failed'}"]
outcome = "finished" if status == TaskStatus.COMPLETED else status.value
lines = [f"{label} {outcome}"]
trimmed_stdout = truncate_stream(stdout, _STDOUT_HEAD)
if trimmed_stdout:
lines.append(f"stdout:\n{trimmed_stdout}")
Expand All @@ -248,6 +263,10 @@ def _put_artifact(self, run_id: str, path: Path, workspace: Path) -> ArtifactRef
return artifact


def _is_android_tester_blocked_exit(run: TaskRun, outcome: CommandResult) -> bool:
return run.spec.skill_name == _ANDROID_TESTER_SKILL_NAME and outcome.return_code == _ANDROID_TESTER_BLOCKED_EXIT_CODE


def _prepare_parameters(action: ResolvedAction, run: TaskRun) -> dict[str, Any]:
"""Project the run's args + sandbox lease into validated action parameters.

Expand Down Expand Up @@ -323,6 +342,7 @@ def _normalize_invocation_parameters(
for alias in _ANDROID_SANDBOX_PARAMETER_ALIASES:
if alias in parameter_names:
_setdefault_nonempty(normalized, alias, endpoint)
_normalize_llmhub_model_parameter(normalized, parameter_names)
if filter_task_context:
known = parameter_names | set(action.infra_requirements)
for name in list(normalized):
Expand All @@ -331,6 +351,43 @@ def _normalize_invocation_parameters(
return normalized


def _normalize_llmhub_model_parameter(parameters: dict[str, Any], parameter_names: set[str]) -> None:
if _LLMHUB_MODEL_PARAMETER_NAME not in parameter_names:
return
value = parameters.get(_LLMHUB_MODEL_PARAMETER_NAME)
if not isinstance(value, str) or not value.strip():
return
model_key = value.strip().lower()
available_model_keys = _available_llmhub_model_keys()
if model_key not in available_model_keys:
fallback = _default_llmhub_model_key()
_LOGGER.info(
"normalizing skill llmhub_model from=%s to=%s available=%s",
model_key,
fallback,
sorted(available_model_keys),
)
parameters[_LLMHUB_MODEL_PARAMETER_NAME] = fallback


@cache
def _available_llmhub_model_keys() -> frozenset[str]:
try:
from app.llmhubs.config_loader import ModelConfigLoader

definitions = ModelConfigLoader().load()
except Exception:
_LOGGER.warning("failed to load llmhub model keys for skill parameter normalization", exc_info=True)
return frozenset({_default_llmhub_model_key()})
keys = {key.strip().lower() for key in definitions if key.strip()}
keys.add(_default_llmhub_model_key())
return frozenset(keys)


def _default_llmhub_model_key() -> str:
return (os.getenv("CORE_DEFAULT_MODEL_KEY") or os.getenv("CORE_DEFAULT_LLM_MODEL") or DEFAULT_MODEL_KEY).strip().lower()


def _setdefault_nonempty(parameters: dict[str, Any], name: str, value: Any) -> None:
if name not in parameters or not _is_nonempty_value(parameters.get(name)):
if _is_nonempty_value(value):
Expand Down
41 changes: 39 additions & 2 deletions core/tests/task_runtime/test_skill_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import pytest

from app.biz.task_runtime.executors import skill_executor as skill_executor_module
from app.biz.task_runtime.executors.base import DispatchRouter
from app.biz.task_runtime.executors.command_backend import (
CommandResult,
Expand Down Expand Up @@ -183,11 +184,17 @@ def _write_skill(workspace: Path, *, skill_id: int = 100, name: str, steps: list
)


def _skill_run(workspace: Path, *, args: dict | None = None, timeout_seconds: int = 600) -> TaskRun:
def _skill_run(
workspace: Path,
*,
args: dict | None = None,
timeout_seconds: int = 600,
skill_name: str = "android-test",
) -> TaskRun:
spec = TaskSpec(
task_id="t-skill",
title="Run a skill",
dispatch=SkillDispatch(skill_name="android-test", action_name="run"),
dispatch=SkillDispatch(skill_name=skill_name, action_name="run"),
args={"greeting": "hello", **(args or {})},
)
return TaskRun(
Expand Down Expand Up @@ -342,6 +349,20 @@ async def test_skill_timeout_maps_to_timed_out(tmp_path) -> None:
assert result.error_class == ErrorClass.TIMEOUT


@pytest.mark.asyncio
async def test_android_tester_exit_code_two_maps_to_blocked(tmp_path) -> None:
workspace = tmp_path / "workspace"
_write_skill(workspace, name="android-tester", steps=[{"argv": ["android-tester", "{greeting}"]}])
backend = _FakeBackend(CommandResult(return_code=2, stdout='{"event":"task_result","status":"blocked"}\n'))
executor = SkillExecutor(SkillLoader(workspace), artifact_store=_artifact_store(tmp_path), sandbox_backend=backend)

result = await executor.run(_skill_run(workspace, skill_name="android-tester"), _FakeStore())

assert result.status == TaskStatus.BLOCKED
assert result.error_class == ErrorClass.SKILL_RUNTIME
assert "android-tester.run blocked" in result.summary


@pytest.mark.asyncio
async def test_skill_unknown_action_is_user_input_failure(tmp_path) -> None:
workspace = tmp_path / "workspace"
Expand Down Expand Up @@ -626,6 +647,22 @@ def test_normalize_invocation_parameters_fills_android_context() -> None:
}


def test_normalize_invocation_parameters_replaces_unknown_llmhub_model(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(skill_executor_module, "_available_llmhub_model_keys", lambda: frozenset({"gpt5.4"}))
action = _android_action("device_id", "instructions", "llmhub_model")

parameters = _normalize_invocation_parameters(
action,
{
"sandbox.android": "127.0.0.1:16416",
"instructions": "Open https://bing.com.",
"llmhub_model": "gpt-4o",
},
)

assert parameters["llmhub_model"] == "gpt5.4"


def test_prepare_parameters_injects_lease_and_normalizes_context() -> None:
action = _android_action("device_id", "instructions", "task_name")
lease = SandboxLeaseRef(sandbox_id="sandbox-1", type="emulator", endpoint="127.0.0.1:16416", acquired_at=1)
Expand Down
7 changes: 6 additions & 1 deletion deploy/docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,14 @@ services:
- EVENT_BUS_TOPIC=core-backend
- KAFKA_BOOTSTRAP_SERVERS=kafka:9092
- MEM0_QDRANT_HOST=qdrant
- MEM0_AZURE_OPENAI_ENDPOINT=${MEM0_AZURE_OPENAI_ENDPOINT:-}
- MEM0_AZURE_OPENAI_API_KEY=${MEM0_AZURE_OPENAI_API_KEY:-}
- EXPERIENCES_ENABLED=${EXPERIENCES_ENABLED:-true}
- RUN_PYTHON_TOOL_SANDBOX_LOCAL_MODE=${RUN_PYTHON_TOOL_SANDBOX_LOCAL_MODE:-false}
- CHAT_AGENT_MAX_ITERATIONS=${CHAT_AGENT_MAX_ITERATIONS:-1000}
- OPENROUTER_API_KEY=${OPENROUTER_API_KEY:-}
- AZURE_OPENAI_GPT54_ENDPOINT=${AZURE_OPENAI_GPT54_ENDPOINT:-}
- AZURE_OPENAI_GPT54_API_KEY=${AZURE_OPENAI_GPT54_API_KEY:-}
- TASK_RUNTIME_RUN_STORE=${TASK_RUNTIME_RUN_STORE:-backend}
- TASK_RUNTIME_ARTIFACT_STORE=${TASK_RUNTIME_ARTIFACT_STORE:-seaweedfs}
- TASK_RUNTIME_ARTIFACT_PUBLIC_BASE_URL=${TASK_RUNTIME_ARTIFACT_PUBLIC_BASE_URL:-/storage}
Expand All @@ -164,7 +169,7 @@ services:
qdrant:
condition: service_healthy
healthcheck:
test: ["CMD", "python", "-c", "import grpc; ch = grpc.insecure_channel('localhost:50053'); grpc.channel_ready_future(ch).result(timeout=2)"]
test: ["CMD", "python", "-c", "import socket; s = socket.create_connection(('127.0.0.1', 50053), timeout=2); s.close()"]
interval: 10s
timeout: 5s
retries: 5
Expand Down
49 changes: 49 additions & 0 deletions sandbox/emulator/app/routers/emulators.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,21 @@
_DEFAULT_MUMU_RESOLUTION_MODE = "phone"
_DOWNLOAD_CHUNK_SIZE = 1024 * 1024
_DOWNLOAD_TIMEOUT_SECONDS = 60
_CHROME_PACKAGE = "com.android.chrome"

_ANDROID_RESET_PREFLIGHT_COMMANDS = (
("settings", "put", "global", "captive_portal_mode", "0"),
("settings", "put", "global", "captive_portal_detection_enabled", "0"),
("settings", "put", "global", "private_dns_mode", "off"),
("svc", "wifi", "disable"),
("svc", "wifi", "enable"),
("am", "force-stop", _CHROME_PACKAGE),
)

_ANDROID_FREEZE_TIME_COMMANDS = (
("settings", "put", "global", "auto_time", "0"),
("settings", "put", "global", "auto_time_zone", "0"),
)

router = APIRouter(prefix="/emulators", tags=["emulators"])

Expand Down Expand Up @@ -828,6 +843,8 @@ def _soft_reset_and_collect(mumu, device_map, index: int) -> dict:

closed_packages: list[str] = []
errors: list[str] = []
preflight = _run_android_reset_preflight(mumu, serial)
errors.extend(preflight["errors"])

code, out, _err = mumu._run_adb(
["-s", serial, "shell", "pm", "list", "packages", "-3"]
Expand All @@ -851,10 +868,42 @@ def _soft_reset_and_collect(mumu, device_map, index: int) -> dict:

return {
"closed_packages": closed_packages,
"preflight": preflight,
"errors": errors if errors else None,
}


def _run_android_reset_preflight(mumu, serial: str) -> dict[str, list[str]]:
applied: list[str] = []
errors: list[str] = []
for command in _ANDROID_RESET_PREFLIGHT_COMMANDS:
_run_best_effort_shell(mumu, serial, command, applied, errors)
if _run_best_effort_shell(mumu, serial, ("su", "0", "date", _android_date_arg()), applied, errors):
for command in _ANDROID_FREEZE_TIME_COMMANDS:
_run_best_effort_shell(mumu, serial, command, applied, errors)
return {"applied": applied, "errors": errors}


def _run_best_effort_shell(
mumu,
serial: str,
command: tuple[str, ...],
applied: list[str],
errors: list[str],
) -> bool:
rc, out, err = mumu._run_adb(["-s", serial, "shell", *command])
command_text = " ".join(command)
if rc == 0:
applied.append(command_text)
return True
errors.append(f"{command_text}: {err or out or f'exited {rc}'}")
return False


def _android_date_arg(timestamp: float | None = None) -> str:
return time.strftime("%m%d%H%M%Y.%S", time.localtime(time.time() if timestamp is None else timestamp))


@router.post("/{index}/restart")
def restart_emulator(index: int, mumu=Depends(get_mumu), device_map=Depends(get_device_index_map)):
_ensure_index(mumu, index)
Expand Down
72 changes: 72 additions & 0 deletions sandbox/emulator/tests/test_emulators_reset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright (c) 2026 Sico Authors
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from __future__ import annotations

import time

from app.routers import emulators


class FakeMuMu:
def __init__(self, *, fail_root_date: bool = False) -> None:
self.calls: list[list[str]] = []
self.fail_root_date = fail_root_date

def _run_adb(self, args: list[str]) -> tuple[int, str, str]:
self.calls.append(args)
if args[-5:] == ["settings", "put", "global", "private_dns_mode", "off"]:
return 1, "", "private dns unsupported"
if self.fail_root_date and args[-4:] == ["su", "0", "date", "062004162026.00"]:
return 1, "", "su unavailable"
return 0, "", ""


def test_android_date_arg_uses_android_format(monkeypatch) -> None:
fixed_time = time.struct_time((2026, 6, 20, 4, 16, 0, 5, 171, -1))
monkeypatch.setattr(emulators.time, "localtime", lambda timestamp: fixed_time)

assert emulators._android_date_arg(123) == "062004162026.00"


def test_run_android_reset_preflight_is_best_effort(monkeypatch) -> None:
mumu = FakeMuMu()
monkeypatch.setattr(emulators, "_android_date_arg", lambda: "062004162026.00")

result = emulators._run_android_reset_preflight(mumu, "127.0.0.1:5555")

assert ["-s", "127.0.0.1:5555", "shell", "settings", "put", "global", "captive_portal_mode", "0"] in mumu.calls
assert ["-s", "127.0.0.1:5555", "shell", "svc", "wifi", "disable"] in mumu.calls
assert ["-s", "127.0.0.1:5555", "shell", "svc", "wifi", "enable"] in mumu.calls
assert ["-s", "127.0.0.1:5555", "shell", "su", "0", "date", "062004162026.00"] in mumu.calls
assert ["-s", "127.0.0.1:5555", "shell", "settings", "put", "global", "auto_time", "0"] in mumu.calls
assert "settings put global private_dns_mode off: private dns unsupported" in result["errors"]
assert "settings put global captive_portal_mode 0" in result["applied"]


def test_run_android_reset_preflight_keeps_auto_time_when_root_date_fails(monkeypatch) -> None:
mumu = FakeMuMu(fail_root_date=True)
monkeypatch.setattr(emulators, "_android_date_arg", lambda: "062004162026.00")

result = emulators._run_android_reset_preflight(mumu, "127.0.0.1:5555")

assert ["-s", "127.0.0.1:5555", "shell", "su", "0", "date", "062004162026.00"] in mumu.calls
assert ["-s", "127.0.0.1:5555", "shell", "settings", "put", "global", "auto_time", "0"] not in mumu.calls
assert "su 0 date 062004162026.00: su unavailable" in result["errors"]
Loading