From 06365d97397a889cd60d06d549a15ce843e2d77c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Thu, 18 Jun 2026 15:05:49 +0200 Subject: [PATCH] Handle events asynchronously That should help to keep up with the stream of messages and prevent services from being flagged as slow consumers. --- services/clientlog/pkg/service/service.go | 3 ++- services/sse/pkg/service/service.go | 22 ++++++++++++---------- services/userlog/pkg/service/service.go | 2 +- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/services/clientlog/pkg/service/service.go b/services/clientlog/pkg/service/service.go index 029e560a3b..e0648c4cc4 100644 --- a/services/clientlog/pkg/service/service.go +++ b/services/clientlog/pkg/service/service.go @@ -83,7 +83,8 @@ EventLoop: if !ok { break EventLoop } - cl.processEvent(event) + + go cl.processEvent(event) if cl.stopped.Load() { break EventLoop diff --git a/services/sse/pkg/service/service.go b/services/sse/pkg/service/service.go index 97a211e215..b4b9b26fbf 100644 --- a/services/sse/pkg/service/service.go +++ b/services/sse/pkg/service/service.go @@ -49,17 +49,19 @@ func (s SSE) ServeHTTP(w http.ResponseWriter, r *http.Request) { // ListenForEvents listens for events func (s SSE) ListenForEvents() { for e := range s.evChannel { - switch ev := e.Event.(type) { - default: - s.l.Error().Interface("event", ev).Msg("unhandled event") - case events.SendSSE: - for _, uid := range ev.UserIDs { - s.sse.Publish(uid, &sse.Event{ - Event: []byte(ev.Type), - Data: ev.Message, - }) + go func() { + switch ev := e.Event.(type) { + default: + s.l.Error().Interface("event", ev).Msg("unhandled event") + case events.SendSSE: + for _, uid := range ev.UserIDs { + s.sse.Publish(uid, &sse.Event{ + Event: []byte(ev.Type), + Data: ev.Message, + }) + } } - } + }() } } diff --git a/services/userlog/pkg/service/service.go b/services/userlog/pkg/service/service.go index 957a84446f..ee59e7b40c 100644 --- a/services/userlog/pkg/service/service.go +++ b/services/userlog/pkg/service/service.go @@ -102,7 +102,7 @@ func (ul *UserlogService) MemorizeEvents(ch <-chan events.Event) { for i := 0; i < ul.cfg.MaxConcurrency; i++ { go func(ch <-chan events.Event) { for event := range ch { - ul.processEvent(event) + go ul.processEvent(event) } }(ch) }