Skip to content

Commit 0b36f8d

Browse files
committed
feat: integrate HITL functionality and enhance cloud command capabilities
- Added support for human-in-the-loop (HITL) interactions in cloud runs, allowing for real-time user input during task execution. - Introduced new cloud command `respond` to submit HITL answers for ongoing runs, improving user engagement and control. - Enhanced the `run` command to support live event streaming and clarification requests, providing a more interactive experience. - Updated CLI components to reflect new features and improve usability, including better handling of clarification events and task statuses. - Added new dependencies and updated configuration to support the integration of the DuckDuckGo search API.
1 parent 72797d7 commit 0b36f8d

69 files changed

Lines changed: 3174 additions & 184 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

devsper/agents/agent.py

Lines changed: 392 additions & 33 deletions
Large diffs are not rendered by default.

devsper/cli/commands/cloud.py

Lines changed: 166 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -468,34 +468,124 @@ def cmd_cloud_run(args: Any) -> int:
468468
)
469469
else:
470470
console.print(f"[green]Queued[/green] run_id={run_id}")
471+
console.print(f"Check status: [bold]devsper cloud status {run_id}[/bold]")
472+
console.print(f"View logs: [bold]devsper cloud logs {run_id}[/bold]")
471473
return 0
472474

473-
try:
474-
final = api.poll_run(
475-
run_id,
476-
interval_seconds=float(getattr(args, "interval", 2.0)),
477-
timeout_seconds=timeout_poll,
478-
terminal_statuses=("completed", "failed", "cancelled", "timeout"),
479-
)
480-
except TimeoutError as e:
481-
console.print(f"[yellow]{e}[/yellow]")
482-
return 1
483-
except PlatformAPIError as e:
484-
console.print(f"[red]poll failed:[/red] {e}")
485-
return 1
475+
use_live_view = not getattr(args, "quiet", False) and not json_out and sys.stdout.isatty()
486476

487-
status = str(final.get("status") or "")
488-
if json_out:
489-
print(
490-
json.dumps({"run_id": run_id, "status": status, "raw": final}, default=str)
491-
)
477+
if use_live_view:
478+
import threading
479+
import tempfile
480+
import time
481+
482+
with tempfile.NamedTemporaryFile("w+", suffix=".jsonl", delete=False) as f:
483+
log_path = f.name
484+
485+
stop_event = threading.Event()
486+
run_status_final = {"status": "pending"}
487+
488+
def stream_events():
489+
import httpx
490+
import json
491+
492+
url = f"{api.base_url}/orgs/{api.org_slug}/runs/{run_id}/stream"
493+
headers = api.build_headers()
494+
try:
495+
with httpx.stream("GET", url, headers=headers, timeout=None) as response:
496+
if response.status_code != 200:
497+
return
498+
for line in response.iter_lines():
499+
if stop_event.is_set():
500+
break
501+
if line.startswith("data: "):
502+
data = line[6:].strip()
503+
if data:
504+
try:
505+
json.loads(data)
506+
with open(log_path, "a", encoding="utf-8") as out_f:
507+
out_f.write(data + "\n")
508+
except Exception:
509+
pass
510+
except Exception:
511+
pass
512+
513+
def check_status():
514+
while not stop_event.is_set():
515+
try:
516+
payload = api.get_run(run_id)
517+
status = str(payload.get("status") or "")
518+
if status in ("completed", "failed", "cancelled", "timeout"):
519+
run_status_final["status"] = status
520+
return True
521+
except Exception:
522+
pass
523+
time.sleep(2.0)
524+
return False
525+
526+
t_stream = threading.Thread(target=stream_events, daemon=True)
527+
t_stream.start()
528+
529+
from devsper.cli.ui.run_view import run_live_view, print_run_summary
530+
531+
def submit_cloud_hitl(request_id: str, answers: dict[str, Any], skipped: bool) -> None:
532+
api.submit_run_input(
533+
run_id,
534+
request_id=request_id,
535+
answers=answers,
536+
skipped=skipped,
537+
)
538+
539+
try:
540+
state = run_live_view(
541+
log_path=log_path,
542+
run_id=run_id,
543+
worker_count=1,
544+
poll_interval=0.1,
545+
stop_check=check_status,
546+
clarification_queue=None,
547+
swarm=None,
548+
cloud_hitl_submit=submit_cloud_hitl,
549+
)
550+
finally:
551+
stop_event.set()
552+
t_stream.join(1.0)
553+
try:
554+
os.remove(log_path)
555+
except Exception:
556+
pass
557+
558+
final = api.get_run(run_id)
559+
status = str(final.get("status") or "")
560+
print_run_summary(state, final.get("result") or {})
561+
return 0 if status == "completed" else 1
492562
else:
493-
console.print(f"run_id={run_id} status={status}")
494-
result = final.get("result")
495-
if result is not None:
496-
console.print(json.dumps(result, indent=2, default=str))
563+
try:
564+
final = api.poll_run(
565+
run_id,
566+
interval_seconds=float(getattr(args, "interval", 2.0)),
567+
timeout_seconds=timeout_poll,
568+
terminal_statuses=("completed", "failed", "cancelled", "timeout"),
569+
)
570+
except TimeoutError as e:
571+
console.print(f"[yellow]{e}[/yellow]")
572+
return 1
573+
except PlatformAPIError as e:
574+
console.print(f"[red]poll failed:[/red] {e}")
575+
return 1
576+
577+
status = str(final.get("status") or "")
578+
if json_out:
579+
print(
580+
json.dumps({"run_id": run_id, "status": status, "raw": final}, default=str)
581+
)
582+
else:
583+
console.print(f"run_id={run_id} status={status}")
584+
result = final.get("result")
585+
if result is not None:
586+
console.print(json.dumps(result, indent=2, default=str))
497587

498-
return 0 if status == "completed" else 1
588+
return 0 if status == "completed" else 1
499589

500590

501591
def cmd_cloud_status(args: Any) -> int:
@@ -530,6 +620,59 @@ def cmd_cloud_status(args: Any) -> int:
530620
return 0
531621

532622

623+
def cmd_cloud_respond(args: Any) -> int:
624+
"""POST a HITL answer for a cloud run (Redis devsper:inputs:{run_id} via platform API)."""
625+
run_id = (getattr(args, "run_id", None) or "").strip()
626+
if not run_id:
627+
console.print("[red]run_id required.[/red]")
628+
return 1
629+
630+
request_id = (getattr(args, "request_id", None) or "").strip()
631+
if not request_id:
632+
console.print("[red]--request-id is required.[/red]")
633+
return 1
634+
635+
api = _builder_from_args(
636+
getattr(args, "api_url", None),
637+
getattr(args, "org", None),
638+
getattr(args, "token", None),
639+
)
640+
if not api.enabled():
641+
console.print("[red]Platform not configured.[/red]")
642+
return 1
643+
644+
skipped = bool(getattr(args, "skipped", False))
645+
answers: dict[str, Any] = {}
646+
if not skipped:
647+
raw = (getattr(args, "answers_json", None) or "").strip()
648+
if not raw:
649+
console.print("[red]Pass --answers '{\"...\": \"...\"}' or use --skipped.[/red]")
650+
return 1
651+
try:
652+
parsed = json.loads(raw)
653+
if not isinstance(parsed, dict):
654+
console.print("[red]--answers must be a JSON object.[/red]")
655+
return 1
656+
answers = parsed
657+
except json.JSONDecodeError as e:
658+
console.print(f"[red]Invalid JSON in --answers:[/red] {e}")
659+
return 1
660+
661+
try:
662+
api.submit_run_input(
663+
run_id,
664+
request_id=request_id,
665+
answers=answers,
666+
skipped=skipped,
667+
)
668+
except PlatformAPIError as e:
669+
console.print(f"[red]submit_run_input failed:[/red] {e}")
670+
return 1
671+
672+
console.print("[green]Response sent.[/green] The run should resume if it is waiting for input.")
673+
return 0
674+
675+
533676
def cmd_cloud_logs(args: Any) -> int:
534677
run_id = (getattr(args, "run_id", None) or "").strip()
535678
if not run_id:

devsper/cli/main.py

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -445,10 +445,19 @@ def _run_swarm(args: object) -> int:
445445
summary_only = getattr(args, "summary", False)
446446
json_output = getattr(args, "json_output", False)
447447
plain = getattr(args, "plain", False) or not sys.stdout.isatty()
448-
use_live_view = not quiet and not plain and sys.stdout.isatty()
448+
reporter = getattr(args, "reporter", False)
449+
use_live_view = not quiet and not plain and not reporter and sys.stdout.isatty()
449450

450451
cfg = get_config()
451-
event_log = EventLog(events_folder_path=cfg.events_dir)
452+
from devsper.platform.redis_results_sink import build_reporter_sinks_chain
453+
454+
_prid = (os.environ.get("DEVSPER_PLATFORM_RUN_ID") or "").strip()
455+
_chain = build_reporter_sinks_chain(cfg.events_dir)
456+
event_log = EventLog(
457+
events_folder_path=cfg.events_dir,
458+
run_id=_prid if _prid else None,
459+
platform_sink=_chain,
460+
)
452461
log_path = getattr(event_log, "log_path", None)
453462
memory_router = MemoryRouter(
454463
store=get_default_store(),
@@ -460,7 +469,7 @@ def _run_swarm(args: object) -> int:
460469
)
461470
workers = getattr(cfg.swarm, "workers", 2)
462471
clarification_queue = None
463-
if use_live_view:
472+
if use_live_view or reporter:
464473
import queue
465474

466475
clarification_queue = queue.Queue()
@@ -476,6 +485,35 @@ def _run_swarm(args: object) -> int:
476485
results_holder: list[dict] = []
477486
run_id = getattr(event_log, "run_id", "") or ""
478487

488+
# Headless HITL bridge: publish clarification_requested and wait for answers.
489+
stop_event = None
490+
bridge_thread = None
491+
bridge_event_sink = None
492+
if reporter and run_id:
493+
try:
494+
import redis
495+
496+
redis_url = os.environ.get("REDIS_URL", "redis://localhost:6379")
497+
r = redis.Redis.from_url(redis_url, decode_responses=True)
498+
499+
from devsper.platform.reporter import hitl_bridge_thread
500+
501+
stop_event = threading.Event()
502+
# `event_log` uses `platform_sink=_chain` when reporter is enabled.
503+
bridge_event_sink = _chain
504+
bridge_thread = threading.Thread(
505+
target=hitl_bridge_thread,
506+
args=(r, run_id, clarification_queue, swarm, stop_event, bridge_event_sink),
507+
daemon=True,
508+
)
509+
bridge_thread.start()
510+
except Exception:
511+
from devsper.cli.ui import console
512+
513+
console.print(
514+
"[yellow]Reporter HITL bridge could not start; run may not pause for user input.[/yellow]"
515+
)
516+
479517
hitl_resolver = None
480518
if (
481519
getattr(getattr(cfg, "hitl", None), "enabled", False)
@@ -570,6 +608,10 @@ def _run() -> None:
570608
sys.stderr.write("\n")
571609
sys.stderr.flush()
572610
thread.join()
611+
if stop_event is not None:
612+
stop_event.set()
613+
if bridge_thread is not None:
614+
bridge_thread.join(timeout=1.0)
573615
results = results_holder[0] if results_holder else {}
574616
if json_output:
575617
import json
@@ -2725,6 +2767,7 @@ def _run_cloud_dispatch(args: object) -> int:
27252767
cmd_cloud_run,
27262768
cmd_cloud_logs,
27272769
cmd_cloud_status,
2770+
cmd_cloud_respond,
27282771
cmd_cloud_import_keys,
27292772
)
27302773

@@ -2733,6 +2776,7 @@ def _run_cloud_dispatch(args: object) -> int:
27332776
"logout": cmd_cloud_logout,
27342777
"run": cmd_cloud_run,
27352778
"status": cmd_cloud_status,
2779+
"respond": cmd_cloud_respond,
27362780
"logs": cmd_cloud_logs,
27372781
"import-keys": cmd_cloud_import_keys,
27382782
}
@@ -2826,6 +2870,7 @@ def main() -> int:
28262870
Examples:
28272871
devsper run "Summarize swarm intelligence in one paragraph"
28282872
devsper run "Analyze diffusion models" -q
2873+
devsper run --reporter "task" # headless; set DEVSPER_PLATFORM_RUN_ID + REDIS_URL for live platform SSE
28292874
""",
28302875
formatter_class=argparse.RawDescriptionHelpFormatter,
28312876
)
@@ -2872,6 +2917,12 @@ def main() -> int:
28722917
default=180.0,
28732918
help="Platform run polling timeout seconds.",
28742919
)
2920+
run_parser.add_argument(
2921+
"--reporter",
2922+
action="store_true",
2923+
help="Headless integration: skip TUI; stream events via DEVSPER_PLATFORM_RUN_ID + REDIS_URL "
2924+
"and/or DEVSPER_PLATFORM_RUNTIME_EVENTS (see docs).",
2925+
)
28752926
run_parser.set_defaults(func=lambda a: _run_swarm(a))
28762927

28772928
meta_parser = subparsers.add_parser(
@@ -3679,6 +3730,7 @@ def main() -> int:
36793730
devsper cloud login --api-url http://localhost:8080 --email you@example.com
36803731
devsper cloud run "Summarize the platform README in three bullets."
36813732
devsper cloud status <run_id>
3733+
devsper cloud respond <run_id> --request-id <uuid> --answers '{"Q":"A"}'
36823734
devsper cloud logs <run_id>
36833735
""",
36843736
formatter_class=argparse.RawDescriptionHelpFormatter,
@@ -3749,7 +3801,7 @@ def main() -> int:
37493801
help="Optional x-devsper-run-manifest-version header",
37503802
)
37513803
cloud_run_p.add_argument(
3752-
"--no-wait", action="store_true", help="Print run_id only; do not poll"
3804+
"--no-wait", "--detach", action="store_true", help="Print run_id only; do not poll"
37533805
)
37543806
cloud_run_p.add_argument(
37553807
"--timeout", type=float, default=300.0, help="Poll timeout seconds"
@@ -3777,6 +3829,34 @@ def main() -> int:
37773829
)
37783830
cloud_status_p.set_defaults(cloud_cmd="status")
37793831

3832+
cloud_respond_p = cloud_sub.add_parser(
3833+
"respond",
3834+
help="Send a human-in-the-loop answer for a waiting cloud run",
3835+
)
3836+
cloud_respond_p.add_argument("run_id", help="Run UUID")
3837+
cloud_respond_p.add_argument(
3838+
"--request-id",
3839+
required=True,
3840+
help="Clarification request_id from run stream or logs",
3841+
)
3842+
cloud_respond_p.add_argument(
3843+
"--answers",
3844+
dest="answers_json",
3845+
default="",
3846+
help='JSON object of answers, keys = field questions (e.g. \'{"Which API?":"REST"}\')',
3847+
)
3848+
cloud_respond_p.add_argument(
3849+
"--skipped",
3850+
action="store_true",
3851+
help="Tell the worker to proceed with defaults / without user answers",
3852+
)
3853+
cloud_respond_p.add_argument(
3854+
"--api-url", default=None, help="Override platform API URL"
3855+
)
3856+
cloud_respond_p.add_argument("--org", default=None, help="Override org slug")
3857+
cloud_respond_p.add_argument("--token", default=None, help="Override JWT")
3858+
cloud_respond_p.set_defaults(cloud_cmd="respond")
3859+
37803860
cloud_logs_p = cloud_sub.add_parser("logs", help="List run events (history)")
37813861
cloud_logs_p.add_argument("run_id", help="Run UUID")
37823862
cloud_logs_p.add_argument(

0 commit comments

Comments
 (0)