diff --git a/services/clientlog/pkg/service/service.go b/services/clientlog/pkg/service/service.go index 029e560a3b..e0648c4cc4 100644 --- a/services/clientlog/pkg/service/service.go +++ b/services/clientlog/pkg/service/service.go @@ -83,7 +83,8 @@ EventLoop: if !ok { break EventLoop } - cl.processEvent(event) + + go cl.processEvent(event) if cl.stopped.Load() { break EventLoop diff --git a/services/sse/pkg/service/service.go b/services/sse/pkg/service/service.go index 97a211e215..b4b9b26fbf 100644 --- a/services/sse/pkg/service/service.go +++ b/services/sse/pkg/service/service.go @@ -49,17 +49,19 @@ func (s SSE) ServeHTTP(w http.ResponseWriter, r *http.Request) { // ListenForEvents listens for events func (s SSE) ListenForEvents() { for e := range s.evChannel { - switch ev := e.Event.(type) { - default: - s.l.Error().Interface("event", ev).Msg("unhandled event") - case events.SendSSE: - for _, uid := range ev.UserIDs { - s.sse.Publish(uid, &sse.Event{ - Event: []byte(ev.Type), - Data: ev.Message, - }) + go func() { + switch ev := e.Event.(type) { + default: + s.l.Error().Interface("event", ev).Msg("unhandled event") + case events.SendSSE: + for _, uid := range ev.UserIDs { + s.sse.Publish(uid, &sse.Event{ + Event: []byte(ev.Type), + Data: ev.Message, + }) + } } - } + }() } } diff --git a/services/userlog/pkg/service/service.go b/services/userlog/pkg/service/service.go index 957a84446f..ee59e7b40c 100644 --- a/services/userlog/pkg/service/service.go +++ b/services/userlog/pkg/service/service.go @@ -102,7 +102,7 @@ func (ul *UserlogService) MemorizeEvents(ch <-chan events.Event) { for i := 0; i < ul.cfg.MaxConcurrency; i++ { go func(ch <-chan events.Event) { for event := range ch { - ul.processEvent(event) + go ul.processEvent(event) } }(ch) }