diff --git a/docs/docs/concepts/fleets.md b/docs/docs/concepts/fleets.md index 01f5d39ac..b927e94d4 100644 --- a/docs/docs/concepts/fleets.md +++ b/docs/docs/concepts/fleets.md @@ -517,6 +517,20 @@ Fleet my-gcp-fleet deleted Alternatively, you can delete a fleet by passing the fleet name to `dstack fleet delete`. To terminate and delete specific instances from a fleet, pass `-i INSTANCE_NUM`. +### List offers + +To inspect offers available through a fleet, pass `--fleet` to `dstack offer`. + +
+ +```shell +$ dstack offer --gpu H100 --fleet my-fleet +``` + +
+ +Use `--group-by gpu,backend` to aggregate offers. + !!! info "What's next?" 1. Check [dev environments](dev-environments.md), [tasks](tasks.md), and [services](services.md) diff --git a/docs/docs/guides/protips.md b/docs/docs/guides/protips.md index dcf3fe196..f090fcf64 100644 --- a/docs/docs/guides/protips.md +++ b/docs/docs/guides/protips.md @@ -458,6 +458,9 @@ Getting offers... +By default, `dstack offer` ignores fleet configurations and shows all available offers that match the request. +To inspect offers available through a specific fleet, pass `--fleet NAME`. + ??? info "Grouping offers" Use `--group-by` to aggregate offers. Accepted values: `gpu`, `backend`, `region`, and `count`. diff --git a/docs/docs/guides/troubleshooting.md b/docs/docs/guides/troubleshooting.md index 87169081f..7b42bd73f 100644 --- a/docs/docs/guides/troubleshooting.md +++ b/docs/docs/guides/troubleshooting.md @@ -54,7 +54,15 @@ If you run `dstack apply` and don't see any instance offers, it means that `dstack` could not find instances that match the requirements in your configuration. Below are some of the reasons why this might happen. -> Feel free to use `dstack offer` to view available offers. +Feel free to use `dstack offer` to inspect available offers: + +```shell +# All matching offers, ignoring fleet configurations +$ dstack offer --gpu H100 + +# Offers available through a specific fleet +$ dstack offer --gpu H100 --fleet my-fleet +``` #### Cause 1: No backends diff --git a/docs/docs/reference/cli/dstack/offer.md b/docs/docs/reference/cli/dstack/offer.md index 8da816eda..793f13e03 100644 --- a/docs/docs/reference/cli/dstack/offer.md +++ b/docs/docs/reference/cli/dstack/offer.md @@ -4,9 +4,6 @@ Displays available offers (hardware configurations) from configured backends or The output shows backend, region, instance type, resources, spot availability, and pricing. -!!! info "Experimental" - `dstack offer` command is currently an experimental feature. Backward compatibility is not guaranteed across releases. - ## Usage This command accepts most of the same arguments as [`dstack apply`](apply.md). @@ -20,9 +17,28 @@ $ dstack offer --help +## Fleet offers + +By default, `dstack offer` ignores fleet configurations and shows all available offers that match the request. + +Use `--fleet` to inspect offers available through specific fleets. With one `--fleet`, +`dstack offer` shows offers available through that fleet. With multiple `--fleet`, it +combines offers available through the selected fleets. + +
+ +```shell +$ dstack offer --gpu H100 --fleet my-fleet +``` + +
+ +The same fleet filtering applies to `--group-by` output, e.g. `--group-by gpu,backend` +or `--group-by gpu,backend,region`. + ## Examples -### Filtering offers +### Filtering offers { #list-gpu-offers } The `--gpu` flag accepts the same specification format as the `gpu` property in [`dev environment`](../../../concepts/dev-environments.md), [`task`](../../../concepts/tasks.md), [`service`](../../../concepts/services.md), and [`fleet`](../../../concepts/fleets.md) configurations. diff --git a/skills/dstack/SKILL.md b/skills/dstack/SKILL.md index 8b720c23a..5acd09c54 100644 --- a/skills/dstack/SKILL.md +++ b/skills/dstack/SKILL.md @@ -459,7 +459,7 @@ dstack stop my-run-name --abort ### List offers -Offers represent available instance configurations available for provisioning across backends. `dstack offer` lists offers regardless of configured fleets. +Offers represent available instance configurations available for provisioning across backends. By default, `dstack offer` ignores fleet configurations and shows all available offers that match the request. Use `--fleet` to inspect offers available through specific fleets. ```bash # Filter by specific backend @@ -474,10 +474,18 @@ dstack offer --gpu 24GB..80GB # Combine filters dstack offer --backend aws --gpu A100:80GB +# Limit to a specific fleet +dstack offer --fleet my-fleet + +# Combine offers from multiple fleets +dstack offer --fleet my-fleet --fleet other-fleet + # JSON output (for troubleshooting/scripting) dstack offer --json ``` +With one `--fleet`, `dstack offer` shows offers available through that fleet. With multiple `--fleet`, it combines offers available through the selected fleets. Identical backend offers are shown once, while matching existing instances stay separate. + **Max offers:** By default, `dstack offer` returns first N offers (output also includes the total number). Use `--max-offers N` to increase the limit. **Grouping:** Prefer `--group-by gpu` (other supported values: `gpu,backend`, `gpu,backend,region`) for aggregated output across all offers, not `--max-offers`. diff --git a/src/dstack/_internal/cli/commands/offer.py b/src/dstack/_internal/cli/commands/offer.py index 1744b0bfb..92157ad9b 100644 --- a/src/dstack/_internal/cli/commands/offer.py +++ b/src/dstack/_internal/cli/commands/offer.py @@ -15,11 +15,8 @@ from dstack._internal.core.models.configurations import ApplyConfigurationType, TaskConfiguration from dstack._internal.core.models.gpus import GpuGroup from dstack._internal.core.models.runs import RunSpec -from dstack._internal.utils.logging import get_logger from dstack.api.utils import load_profile -logger = get_logger(__name__) - class OfferConfigurator(BaseRunConfigurator): TYPE = ApplyConfigurationType.TASK @@ -77,11 +74,6 @@ def _register(self): def _command(self, args: argparse.Namespace): super()._command(args) - if args.fleets: - logger.warning( - "Specifying `--fleet` in `dstack offer` has no defined effect" - " and may be disallowed in a future release" - ) # Set image and user so that the server (a) does not default gpu.vendor # to nvidia — `dstack offer` should show all vendors, and (b) does not # attempt to pull image config from the Docker registry. @@ -114,7 +106,11 @@ def _command(self, args: argparse.Namespace): run_spec, max_offers=args.max_offers, ) - print_run_plan(run_plan, include_run_properties=False) + print_run_plan( + run_plan, + include_run_properties=False, + show_offer_fleet_hint=run_spec.merged_profile.fleets is None, + ) else: if args.group_by: gpus = self._list_gpus(args, run_spec) diff --git a/src/dstack/_internal/cli/services/profile.py b/src/dstack/_internal/cli/services/profile.py index 6340719bd..e8086bb43 100644 --- a/src/dstack/_internal/cli/services/profile.py +++ b/src/dstack/_internal/cli/services/profile.py @@ -70,7 +70,7 @@ def register_profile_args(parser: argparse.ArgumentParser): action="append", metavar="NAME", dest="fleets", - help="Consider only instances from the specified fleet(s) for reuse", + help="Consider only the specified fleet(s)", ) fleets_group_exc = fleets_group.add_mutually_exclusive_group() fleets_group_exc.add_argument( diff --git a/src/dstack/_internal/cli/utils/run.py b/src/dstack/_internal/cli/utils/run.py index 35f9c8848..0095feae2 100644 --- a/src/dstack/_internal/cli/utils/run.py +++ b/src/dstack/_internal/cli/utils/run.py @@ -55,6 +55,12 @@ class RunWaitStatus(str, Enum): WAITING_FOR_SCHEDULE = "waiting for schedule" +_OFFER_FLEET_HINT = ( + "Hint: Existing fleets are ignored, and all available offers are shown." + " To filter by fleet, pass --fleet NAME." +) + + def print_offers_json(run_plan: RunPlan, run_spec): """Print offers information in JSON format.""" job_plan = run_plan.job_plans[0] @@ -92,6 +98,7 @@ def print_run_plan( include_run_properties: bool = True, no_fleets: bool = False, verbose: bool = False, + show_offer_fleet_hint: bool = False, ): run_spec = run_plan.get_effective_run_spec() job_plan = run_plan.job_plans[0] @@ -171,9 +178,9 @@ def th(s: str) -> str: offers.add_column("PRICE", style="grey58", ratio=1) offers.add_column() - job_plan.offers = job_plan.offers[:max_offers] if max_offers else job_plan.offers + displayed_offers = job_plan.offers[:max_offers] if max_offers else job_plan.offers - for i, offer in enumerate(job_plan.offers, start=1): + for i, offer in enumerate(displayed_offers, start=1): r = offer.instance.resources instance = offer.instance.name @@ -188,19 +195,32 @@ def th(s: str) -> str: format_instance_availability(offer.availability), style=None if i == 1 or not include_run_properties else "secondary", ) - if job_plan.total_offers > len(job_plan.offers): + if job_plan.total_offers > len(displayed_offers): offers.add_row("", "...", style="secondary") console.print(props) console.print() - if len(job_plan.offers) > 0: + if len(displayed_offers) > 0: + show_offer_fleet_hint_before_table = ( + show_offer_fleet_hint + and job_plan.total_offers <= len(displayed_offers) + and len(displayed_offers) < 3 + ) + show_offer_fleet_hint_after_table = ( + show_offer_fleet_hint and not show_offer_fleet_hint_before_table + ) + if show_offer_fleet_hint_before_table: + console.print(f"[secondary]{_OFFER_FLEET_HINT}[/]") + console.print() console.print(offers) - if job_plan.total_offers > len(job_plan.offers): + if job_plan.total_offers > len(displayed_offers): console.print( - f"[secondary] Shown {len(job_plan.offers)} of {job_plan.total_offers} offers, " + f"[secondary] Shown {len(displayed_offers)} of {job_plan.total_offers} offers, " f"${job_plan.max_price:3f}".rstrip("0").rstrip(".") + "max[/]" ) + if show_offer_fleet_hint_after_table: + console.print(f"[secondary]{_OFFER_FLEET_HINT}[/]") console.print() else: console.print(NO_FLEETS_WARNING if no_fleets else NO_OFFERS_WARNING) diff --git a/src/dstack/_internal/server/routers/gpus.py b/src/dstack/_internal/server/routers/gpus.py index 3a701fb1e..83918c9c1 100644 --- a/src/dstack/_internal/server/routers/gpus.py +++ b/src/dstack/_internal/server/routers/gpus.py @@ -2,8 +2,10 @@ from fastapi import APIRouter, Depends from packaging.version import Version +from sqlalchemy.ext.asyncio import AsyncSession from dstack._internal.server.compatibility.gpus import patch_list_gpus_response +from dstack._internal.server.db import get_session from dstack._internal.server.models import ProjectModel, UserModel from dstack._internal.server.schemas.gpus import ListGpusRequest, ListGpusResponse from dstack._internal.server.security.permissions import ProjectMember @@ -23,10 +25,16 @@ @project_router.post("/list", response_model=ListGpusResponse, response_model_exclude_none=True) async def list_gpus( body: ListGpusRequest, + session: Annotated[AsyncSession, Depends(get_session)], client_version: Annotated[Optional[Version], Depends(get_client_version)], user_project: Tuple[UserModel, ProjectModel] = Depends(ProjectMember()), ) -> ListGpusResponse: _, project = user_project - resp = await list_gpus_grouped(project=project, run_spec=body.run_spec, group_by=body.group_by) + resp = await list_gpus_grouped( + session=session, + project=project, + run_spec=body.run_spec, + group_by=body.group_by, + ) patch_list_gpus_response(resp, client_version) return resp diff --git a/src/dstack/_internal/server/services/gpus.py b/src/dstack/_internal/server/services/gpus.py index 9c52523d7..1497c9536 100644 --- a/src/dstack/_internal/server/services/gpus.py +++ b/src/dstack/_internal/server/services/gpus.py @@ -1,5 +1,7 @@ from typing import Dict, List, Literal, Optional, Tuple +from sqlalchemy.ext.asyncio import AsyncSession + from dstack._internal.core.backends.base.backend import Backend from dstack._internal.core.errors import ServerClientError from dstack._internal.core.models.backends.base import BackendType @@ -10,17 +12,22 @@ from dstack._internal.core.models.runs import Requirements, RunSpec, get_policy_map from dstack._internal.server.models import ProjectModel from dstack._internal.server.schemas.gpus import ListGpusResponse +from dstack._internal.server.services.jobs import get_jobs_from_run_spec from dstack._internal.server.services.offers import get_offers_by_requirements +from dstack._internal.server.services.runs.plan import ( + get_backend_offers_in_run_candidate_fleets, +) from dstack._internal.utils.common import get_or_error async def list_gpus_grouped( + session: AsyncSession, project: ProjectModel, run_spec: RunSpec, group_by: Optional[List[Literal["backend", "region", "count"]]] = None, ) -> ListGpusResponse: """Retrieves available GPU specifications based on a run spec, with optional grouping.""" - offers = await _get_gpu_offers(project=project, run_spec=run_spec) + offers = await _get_gpu_offers(session=session, project=project, run_spec=run_spec) backend_gpus = _process_offers_into_backend_gpus(offers) group_by_set = set(group_by) if group_by else set() if "region" in group_by_set and "backend" not in group_by_set: @@ -47,10 +54,24 @@ async def list_gpus_grouped( async def _get_gpu_offers( - project: ProjectModel, run_spec: RunSpec + session: AsyncSession, + project: ProjectModel, + run_spec: RunSpec, ) -> List[Tuple[Backend, InstanceOfferWithAvailability]]: """Fetches all available instance offers that match the run spec's GPU requirements.""" profile = run_spec.merged_profile + if profile.fleets is not None: + jobs = await get_jobs_from_run_spec(run_spec=run_spec, secrets={}, replica_num=0) + if len(jobs) == 0: + return [] + return await get_backend_offers_in_run_candidate_fleets( + session=session, + project=project, + run_spec=run_spec, + job=jobs[0], + volumes=None, + max_offers_per_fleet=None, + ) requirements = Requirements( resources=run_spec.configuration.resources, max_price=profile.max_price, diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index ecf2f8bdd..5d05fcaa6 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -1,4 +1,6 @@ import math +from collections.abc import Hashable, Mapping +from enum import Enum from typing import Optional, Union from sqlalchemy import and_, exists, not_, or_, select @@ -119,13 +121,32 @@ async def get_job_plans( volumes=volumes, exclude_not_available=False, ) - if _should_force_non_fleet_offers(run_spec) or ( + if _should_force_non_fleet_offers(run_spec): + if profile.fleets is None: + instance_offers, backend_offers = await _get_non_fleet_offers( + session=session, + project=project, + profile=profile, + run_spec=run_spec, + job=jobs[0], + volumes=volumes, + ) + else: + instance_offers, backend_offers = await _get_offers_in_run_candidate_fleets( + session=session, + project=project, + run_spec=run_spec, + job=jobs[0], + volumes=volumes, + ) + elif ( FeatureFlags.AUTOCREATED_FLEETS_ENABLED and profile.fleets is None and fleet_model is None ): - # Keep the old behavior returning all offers irrespective of fleets. - # Needed for supporting offers with autocreated fleets flow (and for `dstack offer`). + # Keep the old behavior returning all offers irrespective of fleets + # when no fleets are explicitly specified. Needed for supporting + # offers with autocreated fleets flow. instance_offers, backend_offers = await _get_non_fleet_offers( session=session, project=project, @@ -172,13 +193,32 @@ async def get_job_plans( volumes=volumes, exclude_not_available=False, ) - if _should_force_non_fleet_offers(run_spec) or ( + if _should_force_non_fleet_offers(run_spec): + if profile.fleets is None: + instance_offers, backend_offers = await _get_non_fleet_offers( + session=session, + project=project, + profile=profile, + run_spec=run_spec, + job=jobs[0], + volumes=volumes, + ) + else: + instance_offers, backend_offers = await _get_offers_in_run_candidate_fleets( + session=session, + project=project, + run_spec=run_spec, + job=jobs[0], + volumes=volumes, + ) + elif ( FeatureFlags.AUTOCREATED_FLEETS_ENABLED and profile.fleets is None and fleet_model is None ): - # Keep the old behavior returning all offers irrespective of fleets. - # Needed for supporting offers with autocreated fleets flow (and for `dstack offer`). + # Keep the old behavior returning all offers irrespective of fleets + # when no fleets are explicitly specified. Needed for supporting + # offers with autocreated fleets flow. instance_offers, backend_offers = await _get_non_fleet_offers( session=session, project=project, @@ -672,6 +712,137 @@ async def _get_non_fleet_offers( return instance_offers, backend_offers +async def get_backend_offers_in_run_candidate_fleets( + session: AsyncSession, + project: ProjectModel, + run_spec: RunSpec, + job: Job, + volumes: Optional[list[list[Volume]]], + max_offers_per_fleet: Optional[int] = None, +) -> list[tuple[Backend, InstanceOfferWithAvailability]]: + """ + Returns backend offers across the run's selected candidate fleets. + + Used by `dstack offer --fleet ...` and `dstack offer --group-by ... --fleet ...`. + It resolves the selected fleets from `run_spec`, requests backend offers in each fleet, + merges them, and deduplicates identical backend offers across fleets. + """ + candidate_fleet_models = await _select_candidate_fleet_models( + session=session, + project=project, + run_model=None, + run_spec=run_spec, + ) + deduplicated_backend_offers: dict[ + Hashable, + tuple[Backend, InstanceOfferWithAvailability], + ] = {} + for candidate_fleet_model in candidate_fleet_models: + for backend, offer in await _get_backend_offers_in_fleet( + project=project, + fleet_model=candidate_fleet_model, + run_spec=run_spec, + job=job, + volumes=volumes, + max_offers=max_offers_per_fleet, + ): + deduplicated_backend_offers.setdefault( + _get_backend_offer_identity(offer), + (backend, offer), + ) + backend_offers = list(deduplicated_backend_offers.values()) + backend_offers.sort(key=lambda offer: offer[1].price) + return backend_offers + + +async def _get_offers_in_run_candidate_fleets( + session: AsyncSession, + project: ProjectModel, + run_spec: RunSpec, + job: Job, + volumes: list[list[Volume]], +) -> tuple[ + list[tuple[InstanceModel, InstanceOfferWithAvailability]], + list[tuple[Backend, InstanceOfferWithAvailability]], +]: + """ + Returns existing-instance and backend offers across the run's candidate fleets. + + Used by plain/json `dstack offer --fleet ...`. Unlike normal `dstack apply`, it does not + choose a single best fleet. Instead, it gathers existing-instance and backend offers from + each selected fleet, keeps existing instances as separate reusable options, and deduplicates + identical backend offers across fleets. + """ + candidate_fleet_models = await _select_candidate_fleet_models( + session=session, + project=project, + run_model=None, + run_spec=run_spec, + ) + instance_offers: list[tuple[InstanceModel, InstanceOfferWithAvailability]] = [] + for candidate_fleet_model in candidate_fleet_models: + instance_offers.extend( + get_instance_offers_in_fleet( + fleet_model=candidate_fleet_model, + run_spec=run_spec, + job=job, + volumes=volumes, + exclude_not_available=False, + ) + ) + instance_offers.sort(key=lambda offer: offer[1].price or 0) + # TODO: Intentionally pass `max_offers_per_fleet=None` here. `dstack offer --fleet ...` + # is expected to return the exact `total_offers`, so capping backend offers per selected + # fleet would make that total approximate. We already deduplicate identical backend offers + # while merging selected fleets via `_get_backend_offer_identity()`. Revisit adding a cap + # only if this path causes real performance or memory problems. + backend_offers = await get_backend_offers_in_run_candidate_fleets( + session=session, + project=project, + run_spec=run_spec, + job=job, + volumes=volumes, + max_offers_per_fleet=None, + ) + return instance_offers, backend_offers + + +def _get_backend_offer_identity(offer: InstanceOfferWithAvailability) -> Hashable: + """ + Returns a hashable identity for a backend offer using the full offer payload. + + Needed to deduplicate identical backend offers when merging offers from multiple fleets for + `dstack offer --fleet ...`. + """ + return _freeze_offer_identity_value(offer.dict()) + + +def _freeze_offer_identity_value(value: object) -> Hashable: + """Converts nested offer payload values into a deterministic hashable form.""" + if isinstance(value, Mapping): + return tuple( + sorted( + ( + ( + _freeze_offer_identity_value(key), + _freeze_offer_identity_value(nested_value), + ) + for key, nested_value in value.items() + ), + key=repr, + ) + ) + if isinstance(value, Enum): + return value.value + if isinstance(value, (list, tuple)): + return tuple(_freeze_offer_identity_value(item) for item in value) + if isinstance(value, (set, frozenset)): + return tuple(sorted((_freeze_offer_identity_value(item) for item in value), key=repr)) + if not isinstance(value, Hashable): + raise TypeError(f"Unsupported backend offer identity value: {type(value)!r}") + return value + + def _get_job_plan( instance_offers: list[tuple[InstanceModel, InstanceOfferWithAvailability]], backend_offers: list[tuple[Backend, InstanceOfferWithAvailability]], diff --git a/src/tests/_internal/cli/utils/test_offer.py b/src/tests/_internal/cli/utils/test_offer.py new file mode 100644 index 000000000..394354715 --- /dev/null +++ b/src/tests/_internal/cli/utils/test_offer.py @@ -0,0 +1,96 @@ +import asyncio + +from dstack._internal.cli.utils.common import console +from dstack._internal.cli.utils.run import print_run_plan +from dstack._internal.core.models.backends.base import BackendType +from dstack._internal.core.models.common import ApplyAction +from dstack._internal.core.models.instances import ( + InstanceAvailability, + InstanceOfferWithAvailability, + InstanceType, + Resources, +) +from dstack._internal.core.models.runs import JobPlan, RunPlan +from dstack._internal.server.services.jobs import get_jobs_from_run_spec +from dstack._internal.server.testing.common import get_run_spec + +_OFFER_FLEET_HINT = ( + "Hint: Existing fleets are ignored, and all available offers are shown." + " To filter by fleet, pass --fleet NAME." +) +_OFFER_FLEET_HINT_START = "Hint: Existing fleets are ignored" + + +def _get_offer(index: int) -> InstanceOfferWithAvailability: + return InstanceOfferWithAvailability( + backend=BackendType.AWS, + instance=InstanceType( + name=f"instance-{index}", + resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]), + ), + region="us-east-1", + price=float(index), + availability=InstanceAvailability.AVAILABLE, + ) + + +def _get_run_plan(*, offers: list[InstanceOfferWithAvailability], total_offers: int) -> RunPlan: + run_spec = get_run_spec(repo_id="test-repo") + # Keep this helper's asyncio state isolated. `asyncio.run()` clears the current event loop, + # which breaks later Python 3.9 tests that still construct asyncio primitives via + # `get_event_loop()` on the main thread. + loop = asyncio.new_event_loop() + try: + job = loop.run_until_complete( + get_jobs_from_run_spec(run_spec=run_spec, secrets={}, replica_num=0) + )[0] + finally: + loop.close() + return RunPlan( + project_name="test-project", + user="test-user", + run_spec=run_spec, + effective_run_spec=run_spec, + job_plans=[ + JobPlan( + job_spec=job.job_spec, + offers=offers, + total_offers=total_offers, + max_price=max((offer.price for offer in offers), default=None), + ) + ], + action=ApplyAction.CREATE, + ) + + +class TestPrintRunPlanOfferHint: + def test_prints_hint_before_short_offer_table(self): + run_plan = _get_run_plan(offers=[_get_offer(1), _get_offer(2)], total_offers=2) + + with console.capture() as capture: + print_run_plan( + run_plan, + include_run_properties=False, + show_offer_fleet_hint=True, + ) + + output = capture.get() + assert " ".join(_OFFER_FLEET_HINT.split()) in " ".join(output.split()) + assert output.index(_OFFER_FLEET_HINT_START) < output.index("1 aws (us-east-1)") + + def test_prints_hint_after_truncated_offer_table(self): + offers = [_get_offer(index) for index in range(1, 4)] + run_plan = _get_run_plan(offers=offers, total_offers=10) + + with console.capture() as capture: + print_run_plan( + run_plan, + include_run_properties=False, + show_offer_fleet_hint=True, + ) + + output = capture.get() + shown_footer = "Shown 3 of 10 offers, $3max" + assert shown_footer in output + assert " ".join(_OFFER_FLEET_HINT.split()) in " ".join(output.split()) + assert output.index(shown_footer) < output.index(_OFFER_FLEET_HINT_START) diff --git a/src/tests/_internal/server/routers/test_gpus.py b/src/tests/_internal/server/routers/test_gpus.py index 32c862231..a09b99e4e 100644 --- a/src/tests/_internal/server/routers/test_gpus.py +++ b/src/tests/_internal/server/routers/test_gpus.py @@ -7,6 +7,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from dstack._internal.core.models.backends.base import BackendType +from dstack._internal.core.models.configurations import TaskConfiguration from dstack._internal.core.models.instances import ( Gpu, InstanceAvailability, @@ -14,14 +15,17 @@ InstanceType, Resources, ) +from dstack._internal.core.models.profiles import Profile from dstack._internal.core.models.runs import RunSpec from dstack._internal.core.models.users import GlobalRole, ProjectRole from dstack._internal.server.services.projects import add_project_member from dstack._internal.server.testing.common import ( + create_fleet, create_project, create_repo, create_user, get_auth_headers, + get_fleet_spec, get_run_spec, ) @@ -149,6 +153,70 @@ async def test_returns_gpus_without_group_by( assert isinstance(response_data["gpus"], list) assert len(response_data["gpus"]) >= 1 + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_filters_gpus_by_multiple_specified_fleets( + self, test_db, session: AsyncSession, client: AsyncClient + ): + user, project, repo, _ = await gpu_test_setup(session) + await create_fleet( + session=session, + project=project, + spec=get_fleet_spec(profile=Profile(backends=[BackendType.AWS])), + name="aws-fleet", + ) + await create_fleet( + session=session, + project=project, + spec=get_fleet_spec(profile=Profile(backends=[BackendType.RUNPOD])), + name="runpod-fleet", + ) + await create_fleet( + session=session, + project=project, + spec=get_fleet_spec(profile=Profile(backends=[BackendType.VASTAI])), + name="vastai-fleet", + ) + run_spec = get_run_spec( + run_name="test-run", + repo_id=repo.name, + configuration=TaskConfiguration( + commands=[":"], + image="scratch", + user="root", + fleets=["aws-fleet", "runpod-fleet"], + ), + ) + + offers_by_backend = { + BackendType.AWS: [create_gpu_offer(BackendType.AWS, "T4", 16384, 0.50)], + BackendType.RUNPOD: [ + create_gpu_offer( + BackendType.RUNPOD, + "RTX4090", + 24576, + 0.35, + region="us-east-1", + ) + ], + BackendType.VASTAI: [create_gpu_offer(BackendType.VASTAI, "A100", 81920, 1.20)], + } + mocked_backends = create_mock_backends_with_offers(offers_by_backend) + + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + m.return_value = mocked_backends + response = await call_gpus_api( + client, + project.name, + user.token, + run_spec, + group_by=["backend"], + ) + + assert response.status_code == 200 + response_data = response.json() + assert {gpu["backend"] for gpu in response_data["gpus"]} == {"aws", "runpod"} + @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_returns_empty_gpus_when_no_offers( diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py index 0eaba1619..8fb902a01 100644 --- a/src/tests/_internal/server/routers/test_runs.py +++ b/src/tests/_internal/server/routers/test_runs.py @@ -1854,6 +1854,283 @@ async def test_returns_no_offers_if_imported_fleet_specified_without_project_pre assert response_json["project_name"] == "importer" assert len(response_json["job_plans"][0]["offers"]) == 0 + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_offer_cli_returns_offers_from_all_specified_fleets( + self, + test_db, + session: AsyncSession, + client: AsyncClient, + ) -> None: + user = await create_user(session=session, global_role=GlobalRole.USER) + project = await create_project(session=session, owner=user) + await add_project_member( + session=session, + project=project, + user=user, + project_role=ProjectRole.USER, + ) + repo = await create_repo(session=session, project_id=project.id) + + fleet_a = await create_fleet( + session=session, + project=project, + spec=get_fleet_spec(get_ssh_fleet_configuration(name="fleet-a")), + ) + await create_instance( + session=session, + project=project, + fleet=fleet_a, + backend=BackendType.REMOTE, + price=1.0, + ) + fleet_b = await create_fleet( + session=session, + project=project, + spec=get_fleet_spec(get_ssh_fleet_configuration(name="fleet-b")), + ) + await create_instance( + session=session, + project=project, + fleet=fleet_b, + backend=BackendType.REMOTE, + price=2.0, + ) + fleet_c = await create_fleet( + session=session, + project=project, + spec=get_fleet_spec(get_ssh_fleet_configuration(name="fleet-c")), + ) + await create_instance( + session=session, + project=project, + fleet=fleet_c, + backend=BackendType.REMOTE, + price=3.0, + ) + + run_spec = get_run_spec( + repo_id=repo.name, + configuration=TaskConfiguration( + commands=[":"], + image="scratch", + user="root", + fleets=["fleet-a", "fleet-b"], + ), + ) + response = await client.post( + f"/api/project/{project.name}/runs/get_plan", + headers=get_auth_headers(user.token), + json={"run_spec": run_spec.dict()}, + ) + + assert response.status_code == 200, response.json() + offers = response.json()["job_plans"][0]["offers"] + assert len(offers) == 2 + assert [offer["price"] for offer in offers] == [1.0, 2.0] + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_offer_cli_deduplicates_identical_backend_offers_across_specified_fleets( + self, + test_db, + session: AsyncSession, + client: AsyncClient, + ) -> None: + user = await create_user(session=session, global_role=GlobalRole.USER) + project = await create_project(session=session, owner=user) + await add_project_member( + session=session, + project=project, + user=user, + project_role=ProjectRole.USER, + ) + repo = await create_repo(session=session, project_id=project.id) + await create_fleet( + session=session, + project=project, + name="fleet-a", + spec=get_fleet_spec(profile=Profile(backends=[BackendType.AWS])), + ) + await create_fleet( + session=session, + project=project, + name="fleet-b", + spec=get_fleet_spec(profile=Profile(backends=[BackendType.AWS])), + ) + + run_spec = get_run_spec( + repo_id=repo.name, + configuration=TaskConfiguration( + commands=[":"], + image="scratch", + user="root", + fleets=["fleet-a", "fleet-b"], + ), + ) + body = {"run_spec": run_spec.dict()} + + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + backend_mock_aws = Mock() + backend_mock_aws.TYPE = BackendType.AWS + backend_mock_aws.compute.return_value.get_offers.return_value = [ + InstanceOfferWithAvailability( + backend=BackendType.AWS, + instance=InstanceType( + name="instance-aws", + resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]), + ), + region="us", + price=1.0, + backend_data={"provider_data": {"zone": "us-a"}, "labels": ["gpu"]}, + availability=InstanceAvailability.AVAILABLE, + availability_zones=["us-a"], + ) + ] + m.return_value = [backend_mock_aws] + + response = await client.post( + f"/api/project/{project.name}/runs/get_plan", + headers=get_auth_headers(user.token), + json=body, + ) + + assert response.status_code == 200, response.json() + job_plan = response.json()["job_plans"][0] + assert job_plan["total_offers"] == 1 + assert len(job_plan["offers"]) == 1 + assert job_plan["offers"][0]["price"] == 1.0 + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_offer_cli_keeps_identical_existing_instances_from_specified_fleets( + self, + test_db, + session: AsyncSession, + client: AsyncClient, + ) -> None: + user = await create_user(session=session, global_role=GlobalRole.USER) + project = await create_project(session=session, owner=user) + await add_project_member( + session=session, + project=project, + user=user, + project_role=ProjectRole.USER, + ) + repo = await create_repo(session=session, project_id=project.id) + + fleet_a = await create_fleet( + session=session, + project=project, + spec=get_fleet_spec(get_ssh_fleet_configuration(name="fleet-a")), + ) + await create_instance( + session=session, + project=project, + fleet=fleet_a, + backend=BackendType.REMOTE, + price=1.0, + ) + fleet_b = await create_fleet( + session=session, + project=project, + spec=get_fleet_spec(get_ssh_fleet_configuration(name="fleet-b")), + ) + await create_instance( + session=session, + project=project, + fleet=fleet_b, + backend=BackendType.REMOTE, + price=1.0, + ) + + run_spec = get_run_spec( + repo_id=repo.name, + configuration=TaskConfiguration( + commands=[":"], + image="scratch", + user="root", + fleets=["fleet-a", "fleet-b"], + ), + ) + response = await client.post( + f"/api/project/{project.name}/runs/get_plan", + headers=get_auth_headers(user.token), + json={"run_spec": run_spec.dict()}, + ) + + assert response.status_code == 200, response.json() + job_plan = response.json()["job_plans"][0] + assert job_plan["total_offers"] == 2 + assert len(job_plan["offers"]) == 2 + assert [offer["price"] for offer in job_plan["offers"]] == [1.0, 1.0] + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_offer_cli_without_fleet_keeps_global_offers( + self, + test_db, + session: AsyncSession, + client: AsyncClient, + ) -> None: + user = await create_user(session=session, global_role=GlobalRole.USER) + project = await create_project(session=session, owner=user) + await add_project_member( + session=session, + project=project, + user=user, + project_role=ProjectRole.USER, + ) + repo = await create_repo(session=session, project_id=project.id) + run_spec = get_run_spec( + repo_id=repo.name, + configuration=TaskConfiguration( + commands=[":"], + image="scratch", + user="root", + ), + ) + body = {"run_spec": run_spec.dict()} + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + backend_mock_aws = Mock() + backend_mock_aws.TYPE = BackendType.AWS + backend_mock_aws.compute.return_value.get_offers.return_value = [ + InstanceOfferWithAvailability( + backend=BackendType.AWS, + instance=InstanceType( + name="instance-aws", + resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]), + ), + region="us", + price=1.0, + availability=InstanceAvailability.AVAILABLE, + ) + ] + backend_mock_runpod = Mock() + backend_mock_runpod.TYPE = BackendType.RUNPOD + backend_mock_runpod.compute.return_value.get_offers.return_value = [ + InstanceOfferWithAvailability( + backend=BackendType.RUNPOD, + instance=InstanceType( + name="instance-runpod", + resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]), + ), + region="us", + price=2.0, + availability=InstanceAvailability.AVAILABLE, + ) + ] + m.return_value = [backend_mock_aws, backend_mock_runpod] + response = await client.post( + f"/api/project/{project.name}/runs/get_plan", + headers=get_auth_headers(user.token), + json=body, + ) + + assert response.status_code == 200, response.json() + offers = response.json()["job_plans"][0]["offers"] + assert [offer["backend"] for offer in offers] == ["aws", "runpod"] + @pytest.mark.parametrize( ("client_version", "expected_availability"), [ diff --git a/src/tests/_internal/server/services/runs/test_plan.py b/src/tests/_internal/server/services/runs/test_plan.py index fab032d68..5836319bc 100644 --- a/src/tests/_internal/server/services/runs/test_plan.py +++ b/src/tests/_internal/server/services/runs/test_plan.py @@ -1,3 +1,4 @@ +import copy from unittest.mock import AsyncMock import pytest @@ -5,8 +6,13 @@ from dstack._internal.core.models.configurations import TaskConfiguration from dstack._internal.core.models.fleets import FleetNodesSpec, InstanceGroupPlacement +from dstack._internal.core.models.instances import InstanceAvailability from dstack._internal.server.services.jobs import get_jobs_from_run_spec -from dstack._internal.server.services.runs.plan import _get_backend_offers_in_fleet +from dstack._internal.server.services.runs.plan import ( + _freeze_offer_identity_value, + _get_backend_offer_identity, + _get_backend_offers_in_fleet, +) from dstack._internal.server.testing.common import ( create_fleet, create_instance, @@ -22,6 +28,44 @@ pytestmark = pytest.mark.usefixtures("image_config_mock") +class TestFreezeOfferIdentityValue: + def test_normalizes_nested_mappings_and_sets(self) -> None: + first = { + "b": [1, {"y": InstanceAvailability.IDLE, "x": {3, 2}}], + "a": ("z", None), + } + second = { + "a": ("z", None), + "b": [1, {"x": {2, 3}, "y": InstanceAvailability.IDLE}], + } + + frozen_first = _freeze_offer_identity_value(first) + frozen_second = _freeze_offer_identity_value(second) + + assert frozen_first == frozen_second + assert hash(frozen_first) == hash(frozen_second) + + def test_get_backend_offer_identity_uses_full_offer_payload(self) -> None: + offer = get_instance_offer_with_availability(availability=InstanceAvailability.UNKNOWN) + offer.backend_data = { + "region_hint": {"b": 2, "a": 1}, + "azs": ["us-east-1b", "us-east-1a"], + } + same_offer = copy.deepcopy(offer) + same_offer.backend_data = { + "azs": ["us-east-1b", "us-east-1a"], + "region_hint": {"a": 1, "b": 2}, + } + different_offer = copy.deepcopy(offer) + different_offer.backend_data = { + "azs": ["us-east-1b", "us-east-1a"], + "region_hint": {"a": 3, "b": 2}, + } + + assert _get_backend_offer_identity(offer) == _get_backend_offer_identity(same_offer) + assert _get_backend_offer_identity(offer) != _get_backend_offer_identity(different_offer) + + class TestGetBackendOffersInFleet: @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)