Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions src/dstack/_internal/cli/services/configurators/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@
from dstack._internal.cli.services.resources import apply_resources_args, register_resources_args
from dstack._internal.cli.utils.common import confirm_ask, console
from dstack._internal.cli.utils.rich import MultiItemStatus
from dstack._internal.cli.utils.run import get_runs_table, print_run_plan
from dstack._internal.cli.utils.run import (
RunWaitStatus,
get_run_wait_status,
get_runs_table,
print_run_plan,
)
from dstack._internal.core.errors import (
CLIError,
ConfigurationError,
Expand Down Expand Up @@ -192,10 +197,14 @@ def apply_configuration(
try:
# We can attach to run multiple times if it goes from running to pending (retried).
while True:
with MultiItemStatus(f"Launching [code]{run.name}[/]...", console=console) as live:
with MultiItemStatus(_get_apply_status(run), console=console) as live:
while not _is_ready_to_attach(run):
table = get_runs_table([run])
live.update(table)
live.update(
table,
*_get_apply_wait_renderables(run),
status=_get_apply_status(run),
)
time.sleep(5)
run.refresh()

Expand Down Expand Up @@ -793,14 +802,38 @@ def _detect_windsurf_version(exe: str = "windsurf") -> Optional[str]:
def _print_service_urls(run: Run) -> None:
if run._run.run_spec.configuration.type != RunConfigurationType.SERVICE.value:
return
console.print(f"Service is published at:\n [link={run.service_url}]{run.service_url}[/]")
console.print(_get_service_url_renderable(run))
if model := run.service_model:
console.print(
f"Model [code]{model.name}[/] is published at:\n [link={model.url}]{model.url}[/]"
)
console.print()


def _get_apply_status(run: Run) -> str:
wait_status = get_run_wait_status(run._run)
if wait_status is None:
return f"Launching [code]{run.name}[/]..."
return f"[code]{run.name}[/] is {wait_status.value}..."


def _get_apply_wait_renderables(run: Run) -> list[str]:
wait_status = get_run_wait_status(run._run)
if wait_status is RunWaitStatus.WAITING_FOR_REQUESTS and run._run.service is not None:
return [_get_service_url_renderable(run)]
if (
wait_status is RunWaitStatus.WAITING_FOR_SCHEDULE
and run._run.next_triggered_at is not None
):
next_run = run._run.next_triggered_at.astimezone().strftime("%Y-%m-%d %H:%M %Z")
return [f"Next run: {next_run}"]
return []


def _get_service_url_renderable(run: Run) -> str:
return f"Service is published at:\n [link={run.service_url}]{run.service_url}[/]"


def _print_dev_environment_connection_info(run: Run) -> None:
if not FeatureFlags.CLI_PRINT_JOB_CONNECTION_INFO:
return
Expand Down
6 changes: 5 additions & 1 deletion src/dstack/_internal/cli/utils/rich.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,11 @@ def __init__(self, status: "RenderableType", *, console: Optional["Console"] = N
transient=True,
)

def update(self, *renderables: "RenderableType") -> None:
def update(
self, *renderables: "RenderableType", status: Optional["RenderableType"] = None
) -> None:
if status is not None:
self._spinner.update(text=status)
self._live.update(renderable=Group(self._spinner, *renderables))

def __enter__(self) -> "MultiItemStatus":
Expand Down
40 changes: 40 additions & 0 deletions src/dstack/_internal/cli/utils/run.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import shutil
from enum import Enum
from typing import Any, Dict, List, Optional

from rich.markup import escape
Expand Down Expand Up @@ -49,6 +50,11 @@
from dstack.api import Run


class RunWaitStatus(str, Enum):
WAITING_FOR_REQUESTS = "waiting for requests"
WAITING_FOR_SCHEDULE = "waiting for schedule"


def print_offers_json(run_plan: RunPlan, run_spec):
"""Print offers information in JSON format."""
job_plan = run_plan.job_plans[0]
Expand Down Expand Up @@ -200,6 +206,40 @@ def th(s: str) -> str:
console.print(NO_FLEETS_WARNING if no_fleets else NO_OFFERS_WARNING)


def get_run_wait_status(run: CoreRun) -> Optional[RunWaitStatus]:
# Only synthesize a CLI-specific waiting state when the server did not provide
# a more specific run-level message such as "retrying".
if run.status_message not in ("", run.status.value):
return None

if run.status == RunStatus.PENDING and run.next_triggered_at is not None:
return RunWaitStatus.WAITING_FOR_SCHEDULE

if _is_waiting_for_requests(run):
return RunWaitStatus.WAITING_FOR_REQUESTS

return None


def _is_waiting_for_requests(run: CoreRun) -> bool:
if run.run_spec.configuration.type != "service":
return False
if run.service is None or run.next_triggered_at is not None:
return False
if run.status not in (RunStatus.SUBMITTED, RunStatus.PENDING):
return False
return not any(_is_job_active(job.job_submissions[-1].status) for job in run.jobs)


def _is_job_active(status: JobStatus) -> bool:
return status in (
JobStatus.SUBMITTED,
JobStatus.PROVISIONING,
JobStatus.PULLING,
JobStatus.RUNNING,
)


def _format_run_status(run) -> str:
status_text = (
run.latest_job_submission.status_message
Expand Down
Loading