Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 62 additions & 9 deletions src/dstack/_internal/cli/services/configurators/fleet.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@
InstanceGroupPlacement,
)
from dstack._internal.core.models.instances import InstanceStatus, SSHKey
from dstack._internal.core.services.diff import diff_models
from dstack._internal.core.services.diff import copy_model, diff_models
from dstack._internal.utils.common import local_time
from dstack._internal.utils.logging import get_logger
from dstack._internal.utils.nested_list import NestedList, NestedListItem
from dstack._internal.utils.ssh import convert_ssh_key_to_pem, generate_public_key, pkey_from_str
from dstack.api.utils import load_profile

Expand Down Expand Up @@ -85,14 +86,10 @@ def _apply_plan(self, plan: FleetPlan, command_args: argparse.Namespace):
)
confirm_message += "Create the fleet?"
else:
effective_spec = plan.get_effective_spec()
diff = _render_fleet_spec_diff(plan.current_resource.spec, effective_spec)
action_message += f"Found fleet [code]{plan.spec.configuration.name}[/]."
if plan.action == ApplyAction.CREATE:
delete_fleet_name = plan.current_resource.name
action_message += (
" Configuration changes detected. Cannot update the fleet in-place"
)
confirm_message += "Re-create the fleet?"
elif plan.current_resource.spec == plan.effective_spec:
if plan.current_resource.spec == effective_spec:
if command_args.yes and not command_args.force:
# --force is required only with --yes,
# otherwise we may ask for force apply interactively.
Expand All @@ -103,8 +100,26 @@ def _apply_plan(self, plan: FleetPlan, command_args: argparse.Namespace):
delete_fleet_name = plan.current_resource.name
action_message += " No configuration changes detected."
confirm_message += "Re-create the fleet?"
elif plan.action == ApplyAction.CREATE:
delete_fleet_name = plan.current_resource.name
if diff is not None:
# TODO: Highlight only the fields that block in-place update instead of
# showing the full detected diff here.
action_message += (
f" Detected changes that [error]cannot[/] be updated in-place:\n{diff}"
)
else:
action_message += (
" Configuration changes detected. Cannot update the fleet in-place."
)
confirm_message += "Re-create the fleet?"
else:
action_message += " Configuration changes detected."
if diff is not None:
action_message += (
f" Detected changes that [code]can[/] be updated in-place:\n{diff}"
)
else:
action_message += " Configuration changes detected."
confirm_message += "Update the fleet in-place?"

console.print(action_message)
Expand Down Expand Up @@ -357,6 +372,44 @@ def _resolve_ssh_key(ssh_key_path: Optional[str]) -> Optional[SSHKey]:
exit()


def _render_fleet_spec_diff(old_spec: FleetSpec, new_spec: FleetSpec) -> Optional[str]:
old_spec = copy_model(old_spec)
new_spec = copy_model(new_spec)
changed_spec_fields = list(diff_models(old_spec, new_spec))
if not changed_spec_fields:
return None

nested_list = NestedList()
for spec_field in changed_spec_fields:
if spec_field == "merged_profile":
continue
if spec_field == "configuration":
item = NestedListItem(
"Configuration properties:",
children=[
NestedListItem(field)
for field in diff_models(old_spec.configuration, new_spec.configuration)
],
)
elif spec_field == "profile":
item = NestedListItem(
"Profile properties:",
children=[
NestedListItem(field)
for field in diff_models(old_spec.profile, new_spec.profile)
],
)
elif spec_field == "configuration_path":
item = NestedListItem("Configuration path")
else:
item = NestedListItem(spec_field.replace("_", " ").capitalize())
nested_list.children.append(item)

if not nested_list.children:
return None
return nested_list.render()


def _print_plan_header(plan: FleetPlan):
def th(s: str) -> str:
return f"[bold]{s}[/bold]"
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/cli/services/configurators/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ def apply_configuration(
confirm_message = "Stop and override the run?"
elif not run_plan.current_resource.status.is_finished():
stop_run_name = run_plan.current_resource.run_spec.run_name
# TODO: Highlight only the fields that block in-place update instead of
# showing the full detected diff here.
console.print(
f"Active run [code]{conf.name}[/] already exists."
f" Detected changes that [error]cannot[/] be updated in-place:\n{diff}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(
workers_num: int = 10,
queue_lower_limit_factor: float = 0.5,
queue_upper_limit_factor: float = 2.0,
min_processing_interval: timedelta = timedelta(seconds=30),
min_processing_interval: timedelta = timedelta(seconds=15),
lock_timeout: timedelta = timedelta(seconds=20),
heartbeat_trigger: timedelta = timedelta(seconds=10),
*,
Expand Down
38 changes: 24 additions & 14 deletions src/dstack/_internal/server/services/fleets.py
Original file line number Diff line number Diff line change
Expand Up @@ -1219,26 +1219,36 @@ def _check_can_update_inner(current: M, new: M, updatable_fields: tuple[str, ...
return diff


@_check_can_update("configuration", "configuration_path")
@_check_can_update("configuration", "configuration_path", "merged_profile")
def _check_can_update_fleet_spec(current: FleetSpec, new: FleetSpec, diff: ModelDiff):
# Allow `merged_profile` only to absorb derived changes from supported configuration updates
# such as `configuration.reservation` and `configuration.tags`.
# Direct `profile` updates are still not in-place updatable.
if "configuration" in diff:
_check_can_update_fleet_configuration(current.configuration, new.configuration)


@_check_can_update("ssh_config")
def _check_can_update_fleet_configuration(
current: FleetConfiguration, new: FleetConfiguration, diff: ModelDiff
):
def _check_can_update_fleet_configuration(current: FleetConfiguration, new: FleetConfiguration):
diff = diff_models(current, new)
current_ssh_config = current.ssh_config
new_ssh_config = new.ssh_config
if current_ssh_config is None:
if new_ssh_config is not None:
raise ServerClientError("Fleet type changed from Cloud to SSH, cannot update")
# TODO: Support best-effort `nodes.target` apply semantics:
# create missing instances and terminate extra idle instances.
# Current in-place update only persists `target`; FleetPipeline reconciles `min`/`max`.
#
# For `reservation` and `tags`, update affects only future provisioning.
_check_can_update_inner(current, new, ("nodes", "reservation", "tags"))
return

if new_ssh_config is None:
raise ServerClientError("Fleet type changed from SSH to Cloud, cannot update")

_check_can_update_inner(current, new, ("ssh_config",))
if "ssh_config" in diff:
current_ssh_config = current.ssh_config
new_ssh_config = new.ssh_config
if current_ssh_config is None:
if new_ssh_config is not None:
raise ServerClientError("Fleet type changed from Cloud to SSH, cannot update")
elif new_ssh_config is None:
raise ServerClientError("Fleet type changed from SSH to Cloud, cannot update")
else:
_check_can_update_ssh_config(current_ssh_config, new_ssh_config)
_check_can_update_ssh_config(current_ssh_config, new_ssh_config)


@_check_can_update("hosts")
Expand Down
Loading
Loading