From 5c9ff56483960b546bd5006691eadc0532d7f92a Mon Sep 17 00:00:00 2001 From: Greg Katz Date: Thu, 28 May 2026 15:28:25 -0700 Subject: [PATCH] Free orphaned proxy port on stop and rm When a workload's status file is missing, thv stop and thv rm left the proxy process running and holding the workload's port. The proxy-stop path terminates the proxy by the PID recorded in the status file, so with the file gone nothing was killed. On stop the surviving supervisor then restarted the container, so the workload would not stay stopped; on rm the orphaned proxy kept the port, so it could not be reused without killing the process by hand. Fixes #5393 Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Greg Katz --- docs/arch/08-workloads-lifecycle.md | 6 + pkg/workloads/manager.go | 102 ++++++++---- pkg/workloads/manager_test.go | 242 ++++++++++++++++++++++++++++ 3 files changed, 323 insertions(+), 27 deletions(-) diff --git a/docs/arch/08-workloads-lifecycle.md b/docs/arch/08-workloads-lifecycle.md index 01daf676e2..031f44b00e 100644 --- a/docs/arch/08-workloads-lifecycle.md +++ b/docs/arch/08-workloads-lifecycle.md @@ -96,6 +96,12 @@ thv rm my-server **Implementation**: `pkg/workloads/manager.go` +### Proxy termination + +Stop and delete terminate the proxy using its recorded PID. When that PID is unavailable (for example, the status file is missing or records no PID), they fall back to port-based cleanup: the process holding the proxy port is terminated only after it is confirmed to be this workload's proxy. This prevents an orphaned proxy from continuing to hold the port after the container has been stopped or removed. + +**Implementation**: `pkg/workloads/manager.go` + ### List Listing combines container workloads from the runtime with remote workloads from persisted state. The manager can filter workloads by label or group, and can optionally include stopped workloads. diff --git a/pkg/workloads/manager.go b/pkg/workloads/manager.go index aa2e0eb442..65ebde0251 100644 --- a/pkg/workloads/manager.go +++ b/pkg/workloads/manager.go @@ -101,6 +101,7 @@ type DefaultManager struct { retryConfig *retryConfig newRunner mcpRunnerFactory detachedSpawner detachedProcessSpawner + portFreer portFreer } // mcpRunner is the subset of *runner.Runner that RunWorkload's retry loop @@ -123,6 +124,11 @@ type mcpRunnerFactory func(*runner.RunConfig, statuses.StatusManager) mcpRunner // itself. type detachedProcessSpawner func(ctx context.Context, runConfig *runner.RunConfig) (pid int, err error) +// portFreer kills an orphaned ToolHive proxy still holding a workload's proxy +// port. In production it is freePortHolderIfNeeded; tests override it to observe +// the stop/delete fallback without making real OS calls. +type portFreer func(ctx context.Context, runConfig *runner.RunConfig) + // retryConfig bundles the RunWorkload retry loop's tunable parameters. type retryConfig struct { maxRetries int @@ -160,6 +166,12 @@ func withDetachedSpawner(s detachedProcessSpawner) managerOption { return func(d *DefaultManager) { d.detachedSpawner = s } } +// withPortFreer overrides the function used to free an orphaned proxy port +// during stop/delete. Intended for tests so they do not make real OS calls. +func withPortFreer(f portFreer) managerOption { + return func(d *DefaultManager) { d.portFreer = f } +} + // retryConfigOrDefault returns the manager's retryConfig if set, otherwise defaults. func (d *DefaultManager) retryConfigOrDefault() retryConfig { if d.retryConfig == nil { @@ -187,6 +199,15 @@ func (d *DefaultManager) detachedSpawnerOrDefault() detachedProcessSpawner { return d.spawnDetached } +// portFreerOrDefault returns the manager's port freer if set, otherwise the +// production default that frees the proxy port via freePortHolderIfNeeded. +func (d *DefaultManager) portFreerOrDefault() portFreer { + if d.portFreer != nil { + return d.portFreer + } + return d.freePortHolderIfNeeded +} + // ErrWorkloadNotRunning is returned when a container cannot be found by name. var ErrWorkloadNotRunning = fmt.Errorf("workload not running") @@ -407,9 +428,10 @@ func (d *DefaultManager) stopSingleWorkload(ctx context.Context, name string) er // First, try to load the run configuration to check if it's a remote workload runConfig, err := runner.LoadState(childCtx, name) if err != nil { - // If we can't load the state, it might be a container workload or the workload doesn't exist - // Try to stop it as a container workload - return d.stopContainerWorkload(childCtx, name) + // If we can't load the state, it might be a container workload or the workload doesn't exist. + // Try to stop it as a container workload. Without a run config we cannot recover the proxy + // port for port-based cleanup, so pass nil. + return d.stopContainerWorkload(childCtx, name, nil) } // Check if this is a remote workload @@ -418,7 +440,7 @@ func (d *DefaultManager) stopSingleWorkload(ctx context.Context, name string) er } // This is a container-based workload - return d.stopContainerWorkload(childCtx, name) + return d.stopContainerWorkload(childCtx, name, runConfig) } // stopRemoteWorkload stops a remote workload @@ -467,8 +489,10 @@ func (d *DefaultManager) stopRemoteWorkload(ctx context.Context, name string, ru return nil } -// stopContainerWorkload stops a container-based workload -func (d *DefaultManager) stopContainerWorkload(ctx context.Context, name string) error { +// stopContainerWorkload stops a container-based workload. runConfig may be nil +// when the run config could not be loaded; it is used only for port-based proxy +// cleanup when the tracked PID is unavailable. +func (d *DefaultManager) stopContainerWorkload(ctx context.Context, name string, runConfig *runner.RunConfig) error { container, err := d.runtime.GetWorkloadInfo(ctx, name) if err != nil { if errors.Is(err, rt.ErrWorkloadNotFound) { @@ -492,7 +516,7 @@ func (d *DefaultManager) stopContainerWorkload(ctx context.Context, name string) } // Use the existing stopWorkloads method for container workloads - return d.stopSingleContainerWorkload(ctx, &container) + return d.stopSingleContainerWorkload(ctx, &container, runConfig) } // RunWorkload runs a workload in the foreground with automatic restart on container exit. @@ -842,9 +866,10 @@ func (d *DefaultManager) deleteWorkload(ctx context.Context, name string) error // First, check if this is a remote workload by trying to load its run configuration runConfig, err := runner.LoadState(childCtx, name) if err != nil { - // If we can't load the state, it might be a container workload or the workload doesn't exist - // Continue with the container-based deletion logic - return d.deleteContainerWorkload(childCtx, name) + // If we can't load the state, it might be a container workload or the workload doesn't exist. + // Continue with the container-based deletion logic. Without a run config we cannot recover the + // proxy port for port-based cleanup, so pass nil. + return d.deleteContainerWorkload(childCtx, name, nil) } // If this is a remote workload (has RemoteURL), handle it differently @@ -853,7 +878,7 @@ func (d *DefaultManager) deleteWorkload(ctx context.Context, name string) error } // This is a container-based workload, use the existing logic - return d.deleteContainerWorkload(childCtx, name) + return d.deleteContainerWorkload(childCtx, name, runConfig) } // deleteRemoteWorkload handles deletion of a remote workload @@ -883,8 +908,10 @@ func (d *DefaultManager) deleteRemoteWorkload(ctx context.Context, name string, return nil } -// deleteContainerWorkload handles deletion of a container-based workload (existing logic) -func (d *DefaultManager) deleteContainerWorkload(ctx context.Context, name string) error { +// deleteContainerWorkload handles deletion of a container-based workload (existing logic). +// runConfig may be nil; when present it lets delete fall back to port-based proxy cleanup +// if the tracked PID is unavailable (e.g. a missing status file). +func (d *DefaultManager) deleteContainerWorkload(ctx context.Context, name string, runConfig *runner.RunConfig) error { // Find and validate the container container, err := d.getWorkloadContainer(ctx, name) @@ -921,7 +948,11 @@ func (d *DefaultManager) deleteContainerWorkload(ctx context.Context, name strin // Stop proxy-runner process AFTER container removal to prevent recreation // Skip for auxiliary workloads like inspector that don't use proxy processes if !isAuxiliary { - d.stopProxyIfNeeded(ctx, name, baseName) + if !d.stopProxyIfNeeded(ctx, name, baseName) { + // PID-based stop found no tracked proxy (e.g. the status file is missing); + // fall back to port-based cleanup. + d.portFreerOrDefault()(ctx, runConfig) + } } else { slog.Debug("skipping proxy-runner stop for auxiliary workload", "workload", name) } @@ -976,40 +1007,51 @@ func (d *DefaultManager) isSupervisorProcessAlive(ctx context.Context, name stri return true } -// stopProcess stops the proxy process associated with the container -func (d *DefaultManager) stopProcess(ctx context.Context, name string) { +// stopProcess stops the proxy process associated with the container. It reports +// whether a tracked proxy process was actually stopped: true only when a valid +// PID was found and killed. A false return (no PID recorded, e.g. a missing +// status file, or the kill failed) signals callers to fall back to port-based +// cleanup so an orphaned proxy does not keep holding the workload's port. +func (d *DefaultManager) stopProcess(ctx context.Context, name string) bool { if name == "" { slog.Warn("could not find base container name in labels") - return + return false } // Try to read the PID and kill the process pid, err := d.statuses.GetWorkloadPID(ctx, name) if err != nil { slog.Debug("no PID found, proxy may not be running in detached mode", "workload", name) - return + return false } // PID found, try to kill the process slog.Debug("stopping proxy process", "pid", pid) + stopped := false if err := process.KillProcess(pid); err != nil { slog.Debug("failed to kill proxy process", "error", err) } else { slog.Debug("proxy process stopped") + stopped = true } // Remove the PID of the terminated process if err := d.statuses.ResetWorkloadPID(ctx, name); err != nil { slog.Warn("failed to reset workload PID", "workload", name, "error", err) } + + return stopped } -// stopProxyIfNeeded stops the proxy process if the workload has a base name -func (d *DefaultManager) stopProxyIfNeeded(ctx context.Context, name, baseName string) { +// stopProxyIfNeeded stops the proxy process if the workload has a base name. It +// reports whether a tracked proxy process was stopped (see stopProcess); false +// when there is no base name or no proxy was killed. +func (d *DefaultManager) stopProxyIfNeeded(ctx context.Context, name, baseName string) bool { slog.Debug("removing proxy process", "workload", name) - if baseName != "" { - d.stopProcess(ctx, baseName) + if baseName == "" { + return false } + return d.stopProcess(ctx, baseName) } // freePortHolderIfNeeded kills the process holding the proxy port if it is in use. @@ -1324,7 +1366,7 @@ func (d *DefaultManager) maybeSetupRemoteWorkload( // Ensure port is free before spawning. Kill the process holding the port if bound. // This prevents "address already in use" when the new child tries to bind. - d.freePortHolderIfNeeded(ctx, mcpRunner.Config) + d.portFreerOrDefault()(ctx, mcpRunner.Config) slog.Debug("loaded configuration from state", "workload", runConfig.BaseName) return mcpRunner, nil @@ -1562,8 +1604,12 @@ func (*DefaultManager) cleanupTempPermissionProfile(ctx context.Context, baseNam return nil } -// stopSingleContainerWorkload stops a single container workload -func (d *DefaultManager) stopSingleContainerWorkload(ctx context.Context, workload *rt.ContainerInfo) error { +// stopSingleContainerWorkload stops a single container workload. runConfig may +// be nil; when present it lets stop fall back to port-based proxy cleanup if the +// tracked PID is unavailable (e.g. a missing status file). +func (d *DefaultManager) stopSingleContainerWorkload( + ctx context.Context, workload *rt.ContainerInfo, runConfig *runner.RunConfig, +) error { childCtx, cancel := context.WithTimeout(context.Background(), AsyncOperationTimeout) defer cancel() @@ -1571,8 +1617,10 @@ func (d *DefaultManager) stopSingleContainerWorkload(ctx context.Context, worklo // Stop the proxy process (skip for auxiliary workloads like inspector) if labels.IsAuxiliaryWorkload(workload.Labels) { slog.Debug("skipping proxy stop for auxiliary workload", "workload", name) - } else { - d.stopProcess(ctx, name) + } else if !d.stopProcess(ctx, name) { + // PID-based stop found no tracked proxy (e.g. the status file is missing); + // fall back to port-based cleanup. + d.portFreerOrDefault()(ctx, runConfig) } slog.Debug("stopping containers", "workload", name) diff --git a/pkg/workloads/manager_test.go b/pkg/workloads/manager_test.go index ebbf383d10..d5e378214d 100644 --- a/pkg/workloads/manager_test.go +++ b/pkg/workloads/manager_test.go @@ -7,6 +7,8 @@ import ( "context" "errors" "fmt" + "os/exec" + goruntime "runtime" "testing" "time" @@ -2147,3 +2149,243 @@ func TestDefaultManager_ListWorkloadsUsingSecret(t *testing.T) { assert.NotNil(t, listFunc, "ListWorkloadsUsingSecret method should exist with correct signature") }) } + +// startKillableProcess starts a real, long-lived child process and returns its +// PID. The process is killed and reaped on test cleanup. It is used to exercise +// the path where stopProcess finds a valid PID and KillProcess succeeds. The +// helper relies on the Unix `sleep` command, so it skips on Windows (the CI unit +// test matrix is Linux-only) and if the process cannot be started. +func startKillableProcess(t *testing.T) int { + t.Helper() + if goruntime.GOOS == "windows" { + t.Skip("startKillableProcess relies on the Unix sleep command") + } + cmd := exec.Command("sleep", "300") + if err := cmd.Start(); err != nil { + t.Skipf("could not start helper process: %v", err) + } + t.Cleanup(func() { + _ = cmd.Process.Kill() + _ = cmd.Wait() + }) + return cmd.Process.Pid +} + +// TestDefaultManager_stopProcess verifies the boolean contract that the +// stop/delete port-cleanup fallback relies on: stopProcess reports true only +// when it found a tracked PID and killed it. A false return (no PID recorded — +// e.g. a missing status file — or a failed kill) is what triggers the +// port-based cleanup fallback in stopSingleContainerWorkload/deleteContainerWorkload. +func TestDefaultManager_stopProcess(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + workload string + setupMocks func(t *testing.T, sm *statusMocks.MockStatusManager) + want bool + }{ + { + name: "empty name returns false", + workload: "", + setupMocks: func(_ *testing.T, _ *statusMocks.MockStatusManager) { + // No status manager calls expected for an empty name. + }, + want: false, + }, + { + name: "GetWorkloadPID error returns false", + workload: "test-workload", + setupMocks: func(_ *testing.T, sm *statusMocks.MockStatusManager) { + sm.EXPECT().GetWorkloadPID(gomock.Any(), "test-workload").Return(0, errors.New("boom")) + }, + want: false, + }, + { + name: "missing status file (pid 0) returns false", + workload: "test-workload", + setupMocks: func(_ *testing.T, sm *statusMocks.MockStatusManager) { + // A missing status file yields PID 0 with no error; KillProcess(0) fails, + // so no proxy is stopped. The PID is still reset. + sm.EXPECT().GetWorkloadPID(gomock.Any(), "test-workload").Return(0, nil) + sm.EXPECT().ResetWorkloadPID(gomock.Any(), "test-workload").Return(nil) + }, + want: false, + }, + { + name: "valid running PID returns true", + workload: "test-workload", + setupMocks: func(t *testing.T, sm *statusMocks.MockStatusManager) { + t.Helper() + pid := startKillableProcess(t) + sm.EXPECT().GetWorkloadPID(gomock.Any(), "test-workload").Return(pid, nil) + sm.EXPECT().ResetWorkloadPID(gomock.Any(), "test-workload").Return(nil) + }, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + sm := statusMocks.NewMockStatusManager(ctrl) + tt.setupMocks(t, sm) + + manager := &DefaultManager{statuses: sm} + + got := manager.stopProcess(context.Background(), tt.workload) + assert.Equal(t, tt.want, got) + }) + } +} + +// TestDefaultManager_orphanProxyPortCleanup verifies that stop and rm fall back +// to port-based proxy cleanup when the tracked PID is unavailable (the missing +// status-file scenario), and that the fallback does not run on the normal path +// where the proxy was stopped by PID. +func TestDefaultManager_orphanProxyPortCleanup(t *testing.T) { + t.Parallel() + + const workloadName = "test-workload" + containerInfo := func() runtime.ContainerInfo { + return runtime.ContainerInfo{ + Name: workloadName, + State: "running", + Labels: map[string]string{"toolhive-basename": workloadName}, + } + } + + t.Run("stop frees orphan proxy port when status file missing", func(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + sm := statusMocks.NewMockStatusManager(ctrl) + rtMock := runtimeMocks.NewMockRuntime(ctrl) + + // Missing status file: no tracked PID, so the proxy is not killed by PID. + sm.EXPECT().GetWorkloadPID(gomock.Any(), workloadName).Return(0, nil) + sm.EXPECT().ResetWorkloadPID(gomock.Any(), workloadName).Return(nil) + rtMock.EXPECT().StopWorkload(gomock.Any(), workloadName).Return(nil) + sm.EXPECT().SetWorkloadStatus(gomock.Any(), workloadName, runtime.WorkloadStatusStopped, "").Return(nil) + + var freed []*runner.RunConfig + runConfig := &runner.RunConfig{BaseName: workloadName, Port: 54321} + manager := &DefaultManager{statuses: sm, runtime: rtMock} + withPortFreer(func(_ context.Context, rc *runner.RunConfig) { + freed = append(freed, rc) + })(manager) + + ci := containerInfo() + err := manager.stopSingleContainerWorkload(context.Background(), &ci, runConfig) + require.NoError(t, err) + + require.Len(t, freed, 1, "port-cleanup fallback should run exactly once") + assert.Equal(t, workloadName, freed[0].BaseName, "fallback should receive the workload's run config") + assert.Equal(t, 54321, freed[0].Port, "fallback should receive the workload's proxy port") + }) + + t.Run("stop does not free port on normal shutdown", func(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + sm := statusMocks.NewMockStatusManager(ctrl) + rtMock := runtimeMocks.NewMockRuntime(ctrl) + + // A valid, killable PID: the proxy is stopped by PID, so no fallback. + pid := startKillableProcess(t) + sm.EXPECT().GetWorkloadPID(gomock.Any(), workloadName).Return(pid, nil) + sm.EXPECT().ResetWorkloadPID(gomock.Any(), workloadName).Return(nil) + rtMock.EXPECT().StopWorkload(gomock.Any(), workloadName).Return(nil) + sm.EXPECT().SetWorkloadStatus(gomock.Any(), workloadName, runtime.WorkloadStatusStopped, "").Return(nil) + + freedCalls := 0 + manager := &DefaultManager{statuses: sm, runtime: rtMock} + withPortFreer(func(_ context.Context, _ *runner.RunConfig) { + freedCalls++ + })(manager) + + ci := containerInfo() + err := manager.stopSingleContainerWorkload( + context.Background(), &ci, &runner.RunConfig{BaseName: workloadName, Port: 54321}, + ) + require.NoError(t, err) + + assert.Zero(t, freedCalls, "port-cleanup fallback must not run when the proxy was stopped by PID") + }) + + t.Run("delete frees orphan proxy port when status file missing", func(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + sm := statusMocks.NewMockStatusManager(ctrl) + rtMock := runtimeMocks.NewMockRuntime(ctrl) + + // Container is found and removed cleanly. + rtMock.EXPECT().GetWorkloadInfo(gomock.Any(), workloadName).Return(containerInfo(), nil) + sm.EXPECT().SetWorkloadStatus(gomock.Any(), workloadName, runtime.WorkloadStatusRemoving, "").Return(nil) + rtMock.EXPECT().RemoveWorkload(gomock.Any(), workloadName).Return(nil) + // removeContainer waits for the container to disappear from the runtime. + rtMock.EXPECT().GetWorkloadInfo(gomock.Any(), workloadName).Return(runtime.ContainerInfo{}, runtime.ErrWorkloadNotFound) + // Missing status file: no tracked PID, so the proxy is not killed by PID. + sm.EXPECT().GetWorkloadPID(gomock.Any(), workloadName).Return(0, nil) + sm.EXPECT().ResetWorkloadPID(gomock.Any(), workloadName).Return(nil) + sm.EXPECT().DeleteWorkloadStatus(gomock.Any(), workloadName).Return(nil) + + var freed []*runner.RunConfig + runConfig := &runner.RunConfig{BaseName: workloadName, Port: 54321} + manager := &DefaultManager{statuses: sm, runtime: rtMock} + withPortFreer(func(_ context.Context, rc *runner.RunConfig) { + freed = append(freed, rc) + })(manager) + + err := manager.deleteContainerWorkload(context.Background(), workloadName, runConfig) + require.NoError(t, err) + + require.Len(t, freed, 1, "port-cleanup fallback should run exactly once") + assert.Equal(t, workloadName, freed[0].BaseName, "fallback should receive the workload's run config") + assert.Equal(t, 54321, freed[0].Port, "fallback should receive the workload's proxy port") + }) + + t.Run("delete does not free port on normal shutdown", func(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + sm := statusMocks.NewMockStatusManager(ctrl) + rtMock := runtimeMocks.NewMockRuntime(ctrl) + + rtMock.EXPECT().GetWorkloadInfo(gomock.Any(), workloadName).Return(containerInfo(), nil) + sm.EXPECT().SetWorkloadStatus(gomock.Any(), workloadName, runtime.WorkloadStatusRemoving, "").Return(nil) + rtMock.EXPECT().RemoveWorkload(gomock.Any(), workloadName).Return(nil) + rtMock.EXPECT().GetWorkloadInfo(gomock.Any(), workloadName).Return(runtime.ContainerInfo{}, runtime.ErrWorkloadNotFound) + // A valid, killable PID: the proxy is stopped by PID, so no fallback. + pid := startKillableProcess(t) + sm.EXPECT().GetWorkloadPID(gomock.Any(), workloadName).Return(pid, nil) + sm.EXPECT().ResetWorkloadPID(gomock.Any(), workloadName).Return(nil) + sm.EXPECT().DeleteWorkloadStatus(gomock.Any(), workloadName).Return(nil) + + freedCalls := 0 + manager := &DefaultManager{statuses: sm, runtime: rtMock} + withPortFreer(func(_ context.Context, _ *runner.RunConfig) { + freedCalls++ + })(manager) + + err := manager.deleteContainerWorkload( + context.Background(), workloadName, &runner.RunConfig{BaseName: workloadName, Port: 54321}, + ) + require.NoError(t, err) + + assert.Zero(t, freedCalls, "port-cleanup fallback must not run when the proxy was stopped by PID") + }) +}