Skip to content

Commit 92861e5

Browse files
committed
chore: improve websocket handling
1 parent e36a64e commit 92861e5

4 files changed

Lines changed: 173 additions & 24 deletions

File tree

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package domain
22

33
import (
4+
"sync"
5+
46
"github.com/highcard-dev/daemon/internal/utils/logger"
57
)
68

@@ -10,31 +12,46 @@ type BroadcastChannel struct {
1012

1113
// Inbound messages from the clients.
1214
Broadcast chan []byte
15+
16+
// Mutex to protect concurrent access to Clients map
17+
mu sync.RWMutex
1318
}
1419

1520
func NewHub() *BroadcastChannel {
1621
return &BroadcastChannel{
17-
Broadcast: make(chan []byte),
22+
Broadcast: make(chan []byte, 100), // Buffered channel to handle bursts of file changes
1823
Clients: make(map[chan *[]byte]bool),
1924
}
2025
}
2126

2227
func (h *BroadcastChannel) Subscribe() chan *[]byte {
23-
client := make(chan *[]byte, 10) // buffered channel to avoid blocking
28+
h.mu.Lock()
29+
defer h.mu.Unlock()
30+
31+
client := make(chan *[]byte, 50) // Increased buffer size to handle message bursts
2432
h.Clients[client] = true
2533
return client
2634
}
2735

2836
func (h *BroadcastChannel) Unsubscribe(client chan *[]byte) {
29-
delete(h.Clients, client)
30-
close(client)
37+
h.mu.Lock()
38+
defer h.mu.Unlock()
39+
40+
if _, exists := h.Clients[client]; exists {
41+
delete(h.Clients, client)
42+
close(client)
43+
}
3144
}
3245

3346
func (h *BroadcastChannel) CloseChannel() {
47+
h.mu.Lock()
48+
defer h.mu.Unlock()
49+
3450
close(h.Broadcast)
3551
for client := range h.Clients {
36-
h.Unsubscribe(client)
52+
close(client)
3753
}
54+
h.Clients = make(map[chan *[]byte]bool)
3855
}
3956

4057
func (h *BroadcastChannel) Run() {
@@ -44,15 +61,39 @@ func (h *BroadcastChannel) Run() {
4461
logger.Log().Debug("Broadcast channel closed")
4562
return
4663
}
64+
65+
h.mu.RLock()
66+
clients := make([]chan *[]byte, 0, len(h.Clients))
4767
for client := range h.Clients {
68+
clients = append(clients, client)
69+
}
70+
h.mu.RUnlock()
71+
72+
// Track clients to remove (dead connections)
73+
var deadClients []chan *[]byte
74+
75+
for _, client := range clients {
4876
select {
49-
case client <- &message: // Try to send the message.
77+
case client <- &message:
78+
// Successfully sent the message
5079
default:
51-
// logger.Log().Warn("Channel seems to be closed, removing")
52-
logger.Log().Warn("Channel seems to be closed")
53-
// delete(h.Clients, client)
54-
// h.Unsubscribe(client)
80+
// Client channel is blocked or closed, mark for removal
81+
logger.Log().Debug("Client channel blocked or closed, marking for removal")
82+
deadClients = append(deadClients, client)
83+
}
84+
}
85+
86+
// Remove dead clients
87+
if len(deadClients) > 0 {
88+
h.mu.Lock()
89+
for _, deadClient := range deadClients {
90+
if _, exists := h.Clients[deadClient]; exists {
91+
delete(h.Clients, deadClient)
92+
// Don't close the channel here as it might be closed elsewhere
93+
logger.Log().Debug("Removed dead client from broadcast channel")
94+
}
5595
}
96+
h.mu.Unlock()
5697
}
5798
}
5899
}

internal/core/services/ui_dev_service.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,12 +252,12 @@ func (uds *UiDevService) handleFileEvent(event fsnotify.Event) {
252252
return
253253
}
254254

255-
// Broadcast the event to all subscribers
255+
// Broadcast the event to all subscribers with a timeout to prevent blocking
256256
select {
257257
case uds.broadcastChannel.Broadcast <- eventData:
258258
logger.Log().Debug("File change event broadcasted", zap.String("path", event.Name), zap.String("op", event.Op.String()))
259-
default:
260-
logger.Log().Warn("Failed to broadcast file change event - channel full")
259+
case <-time.After(100 * time.Millisecond): // Timeout after 100ms
260+
logger.Log().Warn("Failed to broadcast file change event - channel full (timed out)")
261261
}
262262

263263
// Handle directory creation - add new directories to watcher

internal/handler/ui_dev_handler.go

Lines changed: 87 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ package handler
22

33
import (
44
"encoding/json"
5+
"net"
56
"path/filepath"
7+
"strings"
8+
"syscall"
69
"time"
710

811
"github.com/gofiber/contrib/websocket"
@@ -42,6 +45,36 @@ func NewUiDevHandler(uiDevService ports.UiDevServiceInterface, scrollService por
4245
}
4346
}
4447

48+
// isConnectionError checks if the error is related to a broken connection
49+
func isConnectionError(err error) bool {
50+
if err == nil {
51+
return false
52+
}
53+
54+
// Check for common connection error patterns
55+
errStr := strings.ToLower(err.Error())
56+
if strings.Contains(errStr, "broken pipe") ||
57+
strings.Contains(errStr, "connection reset") ||
58+
strings.Contains(errStr, "connection refused") ||
59+
strings.Contains(errStr, "use of closed network connection") {
60+
return true
61+
}
62+
63+
// Check for specific error types
64+
if netErr, ok := err.(*net.OpError); ok {
65+
if netErr.Op == "write" {
66+
return true
67+
}
68+
}
69+
70+
// Check for syscall errors
71+
if errno, ok := err.(syscall.Errno); ok {
72+
return errno == syscall.EPIPE || errno == syscall.ECONNRESET
73+
}
74+
75+
return false
76+
}
77+
4578
// @Summary Enable development mode
4679
// @ID enableDev
4780
// @Tags ui, dev, druid, daemon
@@ -148,19 +181,23 @@ func (udh *UiDevHandler) Status(ctx *fiber.Ctx) error {
148181

149182
// NotifyChange handles WebSocket connections for real-time file change notifications
150183
func (udh *UiDevHandler) NotifyChange(c *websocket.Conn) {
151-
defer func() {
152-
if err := c.Close(); err != nil {
153-
logger.Log().Error("Error closing WebSocket connection", zap.Error(err))
154-
}
155-
}()
156-
157184
// Set connection timeouts
158185
const (
159186
writeWait = 10 * time.Second
160187
pongWait = 60 * time.Second
161188
pingPeriod = (pongWait * 9) / 10
162189
)
163190

191+
// Create a done channel to signal when the connection should be closed
192+
done := make(chan struct{})
193+
194+
defer func() {
195+
close(done)
196+
if err := c.Close(); err != nil {
197+
logger.Log().Debug("Error closing WebSocket connection", zap.Error(err))
198+
}
199+
}()
200+
164201
c.SetReadDeadline(time.Now().Add(pongWait))
165202
c.SetPongHandler(func(string) error {
166203
c.SetReadDeadline(time.Now().Add(pongWait))
@@ -199,17 +236,50 @@ func (udh *UiDevHandler) NotifyChange(c *websocket.Conn) {
199236
"watchedPaths": udh.uiDevService.GetWatchedPaths(),
200237
"timestamp": time.Now(),
201238
}
202-
c.WriteJSON(connectMsg)
239+
240+
c.SetWriteDeadline(time.Now().Add(writeWait))
241+
if err := c.WriteJSON(connectMsg); err != nil {
242+
logger.Log().Error("Failed to send initial connection message", zap.Error(err))
243+
return
244+
}
203245

204246
logger.Log().Info("WebSocket client connected for file change notifications")
205247

206248
// Start ping ticker
207249
ticker := time.NewTicker(pingPeriod)
208250
defer ticker.Stop()
209251

252+
// Start a goroutine to read messages (to handle pong responses and detect broken connections)
253+
go func() {
254+
defer func() {
255+
select {
256+
case <-done:
257+
// Connection is already being closed
258+
default:
259+
close(done)
260+
}
261+
}()
262+
263+
for {
264+
_, _, err := c.ReadMessage()
265+
if err != nil {
266+
if isConnectionError(err) {
267+
logger.Log().Debug("WebSocket client disconnected", zap.Error(err))
268+
} else {
269+
logger.Log().Warn("WebSocket read error", zap.Error(err))
270+
}
271+
return
272+
}
273+
}
274+
}()
275+
210276
// Handle messages and file changes
211277
for {
212278
select {
279+
case <-done:
280+
logger.Log().Debug("WebSocket connection done signal received")
281+
return
282+
213283
case data := <-changesChan:
214284
if data == nil {
215285
logger.Log().Info("File change channel closed")
@@ -232,7 +302,11 @@ func (udh *UiDevHandler) NotifyChange(c *websocket.Conn) {
232302

233303
c.SetWriteDeadline(time.Now().Add(writeWait))
234304
if err := c.WriteJSON(changeMessage); err != nil {
235-
logger.Log().Error("Failed to write file change to WebSocket", zap.Error(err))
305+
if isConnectionError(err) {
306+
logger.Log().Debug("WebSocket client disconnected while sending file change", zap.Error(err))
307+
} else {
308+
logger.Log().Error("Failed to write file change to WebSocket", zap.Error(err))
309+
}
236310
return
237311
}
238312

@@ -243,7 +317,11 @@ func (udh *UiDevHandler) NotifyChange(c *websocket.Conn) {
243317
case <-ticker.C:
244318
c.SetWriteDeadline(time.Now().Add(writeWait))
245319
if err := c.WriteMessage(websocket.PingMessage, nil); err != nil {
246-
logger.Log().Error("Failed to send ping", zap.Error(err))
320+
if isConnectionError(err) {
321+
logger.Log().Debug("WebSocket client disconnected during ping", zap.Error(err))
322+
} else {
323+
logger.Log().Error("Failed to send ping", zap.Error(err))
324+
}
247325
return
248326
}
249327
}

internal/handler/websocket_handler.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"github.com/gofiber/fiber/v2"
88
"github.com/highcard-dev/daemon/internal/core/domain"
99
"github.com/highcard-dev/daemon/internal/core/ports"
10+
"github.com/highcard-dev/daemon/internal/utils/logger"
11+
"go.uber.org/zap"
1012
)
1113

1214
type WebsocketHandler struct {
@@ -82,8 +84,12 @@ func (ah WebsocketHandler) Consoles(c *fiber.Ctx) error {
8284
func (wh WebsocketHandler) HandleProcess(c *websocket.Conn) {
8385
param := c.Params("console")
8486

87+
// Create a done channel to signal when the connection should be closed
88+
done := make(chan struct{})
89+
8590
ticker := time.NewTicker(pingPeriod)
8691
defer func() {
92+
close(done)
8793
ticker.Stop()
8894
c.Close()
8995
}()
@@ -94,31 +100,55 @@ func (wh WebsocketHandler) HandleProcess(c *websocket.Conn) {
94100
}
95101

96102
subscriptionChannel := channel.Channel.Subscribe()
97-
98103
defer channel.Channel.Unsubscribe(subscriptionChannel)
99104

100105
c.SetReadLimit(maxMessageSize)
101106
c.SetReadDeadline(time.Now().Add(pongWait))
102107
c.SetPongHandler(func(string) error { c.SetReadDeadline(time.Now().Add(pongWait)); return nil })
103108

109+
// Start a goroutine to read messages (to handle pong responses and detect broken connections)
110+
go func() {
111+
defer func() {
112+
select {
113+
case <-done:
114+
// Connection is already being closed
115+
default:
116+
close(done)
117+
}
118+
}()
119+
120+
for {
121+
_, _, err := c.ReadMessage()
122+
if err != nil {
123+
logger.Log().Debug("WebSocket client disconnected", zap.Error(err))
124+
return
125+
}
126+
}
127+
}()
128+
104129
//fetch channel and send to websocket
105130
for {
106131
select {
132+
case <-done:
133+
logger.Log().Debug("WebSocket connection done signal received")
134+
return
135+
107136
//send 1024 bytes at a time
108137
case buffer, ok := <-subscriptionChannel:
109-
110138
c.SetWriteDeadline(time.Now().Add(writeWait))
111139
//if nil is send, assume the channel is closed
112140
if buffer == nil || !ok {
113141
return
114142
}
115143
err := c.WriteMessage(websocket.TextMessage, *buffer)
116144
if err != nil {
145+
logger.Log().Debug("WebSocket client disconnected while sending message", zap.Error(err))
117146
return
118147
}
119148
case <-ticker.C:
120149
c.SetWriteDeadline(time.Now().Add(writeWait))
121150
if err := c.WriteMessage(websocket.PingMessage, nil); err != nil {
151+
logger.Log().Debug("WebSocket client disconnected during ping", zap.Error(err))
122152
return
123153
}
124154
}

0 commit comments

Comments
 (0)