Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/arch/08-workloads-lifecycle.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ thv rm my-server

**Implementation**: `pkg/workloads/manager.go`

### Proxy termination

Stop and delete terminate the proxy using its recorded PID. When that PID is unavailable (for example, the status file is missing or records no PID), they fall back to port-based cleanup: the process holding the proxy port is terminated only after it is confirmed to be this workload's proxy. This prevents an orphaned proxy from continuing to hold the port after the container has been stopped or removed.

**Implementation**: `pkg/workloads/manager.go`

### List

Listing combines container workloads from the runtime with remote workloads from persisted state. The manager can filter workloads by label or group, and can optionally include stopped workloads.
Expand Down
102 changes: 75 additions & 27 deletions pkg/workloads/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type DefaultManager struct {
retryConfig *retryConfig
newRunner mcpRunnerFactory
detachedSpawner detachedProcessSpawner
portFreer portFreer
}

// mcpRunner is the subset of *runner.Runner that RunWorkload's retry loop
Expand All @@ -123,6 +124,11 @@ type mcpRunnerFactory func(*runner.RunConfig, statuses.StatusManager) mcpRunner
// itself.
type detachedProcessSpawner func(ctx context.Context, runConfig *runner.RunConfig) (pid int, err error)

// portFreer kills an orphaned ToolHive proxy still holding a workload's proxy
// port. In production it is freePortHolderIfNeeded; tests override it to observe
// the stop/delete fallback without making real OS calls.
type portFreer func(ctx context.Context, runConfig *runner.RunConfig)

// retryConfig bundles the RunWorkload retry loop's tunable parameters.
type retryConfig struct {
maxRetries int
Expand Down Expand Up @@ -160,6 +166,12 @@ func withDetachedSpawner(s detachedProcessSpawner) managerOption {
return func(d *DefaultManager) { d.detachedSpawner = s }
}

// withPortFreer overrides the function used to free an orphaned proxy port
// during stop/delete. Intended for tests so they do not make real OS calls.
func withPortFreer(f portFreer) managerOption {
return func(d *DefaultManager) { d.portFreer = f }
}

// retryConfigOrDefault returns the manager's retryConfig if set, otherwise defaults.
func (d *DefaultManager) retryConfigOrDefault() retryConfig {
if d.retryConfig == nil {
Expand Down Expand Up @@ -187,6 +199,15 @@ func (d *DefaultManager) detachedSpawnerOrDefault() detachedProcessSpawner {
return d.spawnDetached
}

// portFreerOrDefault returns the manager's port freer if set, otherwise the
// production default that frees the proxy port via freePortHolderIfNeeded.
func (d *DefaultManager) portFreerOrDefault() portFreer {
if d.portFreer != nil {
return d.portFreer
}
return d.freePortHolderIfNeeded
}

// ErrWorkloadNotRunning is returned when a container cannot be found by name.
var ErrWorkloadNotRunning = fmt.Errorf("workload not running")

Expand Down Expand Up @@ -407,9 +428,10 @@ func (d *DefaultManager) stopSingleWorkload(ctx context.Context, name string) er
// First, try to load the run configuration to check if it's a remote workload
runConfig, err := runner.LoadState(childCtx, name)
if err != nil {
// If we can't load the state, it might be a container workload or the workload doesn't exist
// Try to stop it as a container workload
return d.stopContainerWorkload(childCtx, name)
// If we can't load the state, it might be a container workload or the workload doesn't exist.
// Try to stop it as a container workload. Without a run config we cannot recover the proxy
// port for port-based cleanup, so pass nil.
return d.stopContainerWorkload(childCtx, name, nil)
}

// Check if this is a remote workload
Expand All @@ -418,7 +440,7 @@ func (d *DefaultManager) stopSingleWorkload(ctx context.Context, name string) er
}

// This is a container-based workload
return d.stopContainerWorkload(childCtx, name)
return d.stopContainerWorkload(childCtx, name, runConfig)
}

// stopRemoteWorkload stops a remote workload
Expand Down Expand Up @@ -467,8 +489,10 @@ func (d *DefaultManager) stopRemoteWorkload(ctx context.Context, name string, ru
return nil
}

// stopContainerWorkload stops a container-based workload
func (d *DefaultManager) stopContainerWorkload(ctx context.Context, name string) error {
// stopContainerWorkload stops a container-based workload. runConfig may be nil
// when the run config could not be loaded; it is used only for port-based proxy
// cleanup when the tracked PID is unavailable.
func (d *DefaultManager) stopContainerWorkload(ctx context.Context, name string, runConfig *runner.RunConfig) error {
container, err := d.runtime.GetWorkloadInfo(ctx, name)
if err != nil {
if errors.Is(err, rt.ErrWorkloadNotFound) {
Expand All @@ -492,7 +516,7 @@ func (d *DefaultManager) stopContainerWorkload(ctx context.Context, name string)
}

// Use the existing stopWorkloads method for container workloads
return d.stopSingleContainerWorkload(ctx, &container)
return d.stopSingleContainerWorkload(ctx, &container, runConfig)
}

// RunWorkload runs a workload in the foreground with automatic restart on container exit.
Expand Down Expand Up @@ -842,9 +866,10 @@ func (d *DefaultManager) deleteWorkload(ctx context.Context, name string) error
// First, check if this is a remote workload by trying to load its run configuration
runConfig, err := runner.LoadState(childCtx, name)
if err != nil {
// If we can't load the state, it might be a container workload or the workload doesn't exist
// Continue with the container-based deletion logic
return d.deleteContainerWorkload(childCtx, name)
// If we can't load the state, it might be a container workload or the workload doesn't exist.
// Continue with the container-based deletion logic. Without a run config we cannot recover the
// proxy port for port-based cleanup, so pass nil.
return d.deleteContainerWorkload(childCtx, name, nil)
}

// If this is a remote workload (has RemoteURL), handle it differently
Expand All @@ -853,7 +878,7 @@ func (d *DefaultManager) deleteWorkload(ctx context.Context, name string) error
}

// This is a container-based workload, use the existing logic
return d.deleteContainerWorkload(childCtx, name)
return d.deleteContainerWorkload(childCtx, name, runConfig)
}

// deleteRemoteWorkload handles deletion of a remote workload
Expand Down Expand Up @@ -883,8 +908,10 @@ func (d *DefaultManager) deleteRemoteWorkload(ctx context.Context, name string,
return nil
}

// deleteContainerWorkload handles deletion of a container-based workload (existing logic)
func (d *DefaultManager) deleteContainerWorkload(ctx context.Context, name string) error {
// deleteContainerWorkload handles deletion of a container-based workload (existing logic).
// runConfig may be nil; when present it lets delete fall back to port-based proxy cleanup
// if the tracked PID is unavailable (e.g. a missing status file).
func (d *DefaultManager) deleteContainerWorkload(ctx context.Context, name string, runConfig *runner.RunConfig) error {

// Find and validate the container
container, err := d.getWorkloadContainer(ctx, name)
Expand Down Expand Up @@ -921,7 +948,11 @@ func (d *DefaultManager) deleteContainerWorkload(ctx context.Context, name strin
// Stop proxy-runner process AFTER container removal to prevent recreation
// Skip for auxiliary workloads like inspector that don't use proxy processes
if !isAuxiliary {
d.stopProxyIfNeeded(ctx, name, baseName)
if !d.stopProxyIfNeeded(ctx, name, baseName) {
// PID-based stop found no tracked proxy (e.g. the status file is missing);
// fall back to port-based cleanup.
d.portFreerOrDefault()(ctx, runConfig)
}
} else {
slog.Debug("skipping proxy-runner stop for auxiliary workload", "workload", name)
}
Expand Down Expand Up @@ -976,40 +1007,51 @@ func (d *DefaultManager) isSupervisorProcessAlive(ctx context.Context, name stri
return true
}

// stopProcess stops the proxy process associated with the container
func (d *DefaultManager) stopProcess(ctx context.Context, name string) {
// stopProcess stops the proxy process associated with the container. It reports
// whether a tracked proxy process was actually stopped: true only when a valid
// PID was found and killed. A false return (no PID recorded, e.g. a missing
// status file, or the kill failed) signals callers to fall back to port-based
// cleanup so an orphaned proxy does not keep holding the workload's port.
func (d *DefaultManager) stopProcess(ctx context.Context, name string) bool {
if name == "" {
slog.Warn("could not find base container name in labels")
return
return false
}

// Try to read the PID and kill the process
pid, err := d.statuses.GetWorkloadPID(ctx, name)
if err != nil {
slog.Debug("no PID found, proxy may not be running in detached mode", "workload", name)
return
return false
}

// PID found, try to kill the process
slog.Debug("stopping proxy process", "pid", pid)
stopped := false
if err := process.KillProcess(pid); err != nil {
slog.Debug("failed to kill proxy process", "error", err)
} else {
slog.Debug("proxy process stopped")
stopped = true
}

// Remove the PID of the terminated process
if err := d.statuses.ResetWorkloadPID(ctx, name); err != nil {
slog.Warn("failed to reset workload PID", "workload", name, "error", err)
}

return stopped
}

// stopProxyIfNeeded stops the proxy process if the workload has a base name
func (d *DefaultManager) stopProxyIfNeeded(ctx context.Context, name, baseName string) {
// stopProxyIfNeeded stops the proxy process if the workload has a base name. It
// reports whether a tracked proxy process was stopped (see stopProcess); false
// when there is no base name or no proxy was killed.
func (d *DefaultManager) stopProxyIfNeeded(ctx context.Context, name, baseName string) bool {
slog.Debug("removing proxy process", "workload", name)
if baseName != "" {
d.stopProcess(ctx, baseName)
if baseName == "" {
return false
}
return d.stopProcess(ctx, baseName)
}

// freePortHolderIfNeeded kills the process holding the proxy port if it is in use.
Expand Down Expand Up @@ -1324,7 +1366,7 @@ func (d *DefaultManager) maybeSetupRemoteWorkload(

// Ensure port is free before spawning. Kill the process holding the port if bound.
// This prevents "address already in use" when the new child tries to bind.
d.freePortHolderIfNeeded(ctx, mcpRunner.Config)
d.portFreerOrDefault()(ctx, mcpRunner.Config)

slog.Debug("loaded configuration from state", "workload", runConfig.BaseName)
return mcpRunner, nil
Expand Down Expand Up @@ -1562,17 +1604,23 @@ func (*DefaultManager) cleanupTempPermissionProfile(ctx context.Context, baseNam
return nil
}

// stopSingleContainerWorkload stops a single container workload
func (d *DefaultManager) stopSingleContainerWorkload(ctx context.Context, workload *rt.ContainerInfo) error {
// stopSingleContainerWorkload stops a single container workload. runConfig may
// be nil; when present it lets stop fall back to port-based proxy cleanup if the
// tracked PID is unavailable (e.g. a missing status file).
func (d *DefaultManager) stopSingleContainerWorkload(
ctx context.Context, workload *rt.ContainerInfo, runConfig *runner.RunConfig,
) error {
childCtx, cancel := context.WithTimeout(context.Background(), AsyncOperationTimeout)
defer cancel()

name := labels.GetContainerBaseName(workload.Labels)
// Stop the proxy process (skip for auxiliary workloads like inspector)
if labels.IsAuxiliaryWorkload(workload.Labels) {
slog.Debug("skipping proxy stop for auxiliary workload", "workload", name)
} else {
d.stopProcess(ctx, name)
} else if !d.stopProcess(ctx, name) {
// PID-based stop found no tracked proxy (e.g. the status file is missing);
// fall back to port-based cleanup.
d.portFreerOrDefault()(ctx, runConfig)
}

slog.Debug("stopping containers", "workload", name)
Expand Down
Loading
Loading