Skip to content

Commit c077938

Browse files
authored
Merge pull request #427 from trungutt/fix-session-deletion
Delete session properly
2 parents 9c709dd + 76df1d5 commit c077938

1 file changed

Lines changed: 47 additions & 14 deletions

File tree

pkg/server/server.go

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"os"
1515
"path/filepath"
1616
"strings"
17+
"sync"
1718
"time"
1819

1920
"dario.cat/mergo"
@@ -37,12 +38,14 @@ import (
3738
)
3839

3940
type Server struct {
40-
e *echo.Echo
41-
runtimes map[string]runtime.Runtime
42-
sessionStore session.Store
43-
agentsDir string
44-
runConfig config.RuntimeConfig
45-
teams map[string]*team.Team
41+
e *echo.Echo
42+
runtimes map[string]runtime.Runtime
43+
runtimeCancels map[string]context.CancelFunc
44+
cancelsMu sync.RWMutex
45+
sessionStore session.Store
46+
agentsDir string
47+
runConfig config.RuntimeConfig
48+
teams map[string]*team.Team
4649
}
4750

4851
type Opt func(*Server) error
@@ -59,11 +62,12 @@ func New(sessionStore session.Store, runConfig config.RuntimeConfig, teams map[s
5962
e.Use(middleware.CORS())
6063
e.Use(middleware.Logger())
6164
s := &Server{
62-
e: e,
63-
runtimes: make(map[string]runtime.Runtime),
64-
sessionStore: sessionStore,
65-
runConfig: runConfig,
66-
teams: teams,
65+
e: e,
66+
runtimes: make(map[string]runtime.Runtime),
67+
runtimeCancels: make(map[string]context.CancelFunc),
68+
sessionStore: sessionStore,
69+
runConfig: runConfig,
70+
teams: teams,
6771
}
6872

6973
for _, opt := range opts {
@@ -943,8 +947,26 @@ func (s *Server) resumeSession(c echo.Context) error {
943947
}
944948

945949
func (s *Server) deleteSession(c echo.Context) error {
946-
if err := s.sessionStore.DeleteSession(c.Request().Context(), c.Param("id")); err != nil {
947-
slog.Error("Failed to delete session", "session_id", c.Param("id"), "error", err)
950+
sessionID := c.Param("id")
951+
952+
// Cancel the runtime context if it's still running
953+
s.cancelsMu.Lock()
954+
if cancel, exists := s.runtimeCancels[sessionID]; exists {
955+
slog.Debug("Cancelling runtime for session", "session_id", sessionID)
956+
cancel()
957+
delete(s.runtimeCancels, sessionID)
958+
}
959+
s.cancelsMu.Unlock()
960+
961+
// Clean up the runtime
962+
if _, exists := s.runtimes[sessionID]; exists {
963+
slog.Debug("Removing runtime for session", "session_id", sessionID)
964+
delete(s.runtimes, sessionID)
965+
}
966+
967+
// Delete the session from storage
968+
if err := s.sessionStore.DeleteSession(c.Request().Context(), sessionID); err != nil {
969+
slog.Error("Failed to delete session", "session_id", sessionID, "error", err)
948970
return c.JSON(http.StatusInternalServerError, map[string]string{"error": "failed to delete session"})
949971
}
950972

@@ -1027,7 +1049,18 @@ func (s *Server) runAgent(c echo.Context) error {
10271049
c.Response().Header().Set("Connection", "keep-alive")
10281050
c.Response().WriteHeader(http.StatusOK)
10291051

1030-
streamChan := rt.RunStream(c.Request().Context(), sess)
1052+
// Create a cancellable context for this stream
1053+
streamCtx, cancel := context.WithCancel(c.Request().Context())
1054+
s.cancelsMu.Lock()
1055+
s.runtimeCancels[sess.ID] = cancel
1056+
s.cancelsMu.Unlock()
1057+
defer func() {
1058+
s.cancelsMu.Lock()
1059+
delete(s.runtimeCancels, sess.ID)
1060+
s.cancelsMu.Unlock()
1061+
}()
1062+
1063+
streamChan := rt.RunStream(streamCtx, sess)
10311064
for event := range streamChan {
10321065
data, err := json.Marshal(event)
10331066
if err != nil {

0 commit comments

Comments
 (0)