From 787e2debeb33de8cae0a83552d65d66d3b224a07 Mon Sep 17 00:00:00 2001 From: Wangzy Date: Sun, 21 Sep 2025 19:10:46 +0800 Subject: [PATCH 1/5] fix issue --- transport/session.go | 18 ++++++++++++++---- util/logger.go | 33 ++++++++++++++++++++++++--------- 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/transport/session.go b/transport/session.go index 61159d19..577e4178 100644 --- a/transport/session.go +++ b/transport/session.go @@ -142,6 +142,9 @@ type session struct { // callbacks closeCallback callbacks closeCallbackMutex sync.RWMutex + + // wait + closeWait chan struct{} } func newSession(endPoint EndPoint, conn Connection) *session { @@ -155,10 +158,11 @@ func newSession(endPoint EndPoint, conn Connection) *session { period: period, - once: &sync.Once{}, - done: make(chan struct{}), - wait: pendingDuration, - attrs: gxcontext.NewValuesContext(context.Background()), + once: &sync.Once{}, + done: make(chan struct{}), + wait: pendingDuration, + attrs: gxcontext.NewValuesContext(context.Background()), + closeWait: make(chan struct{}), } ss.Connection.SetSession(ss) @@ -603,6 +607,9 @@ func (s *session) handlePackage() { } grNum := s.grNum.Add(-1) log.Infof("%s, [session.handlePackage] gr will exit now, left gr num %d", s.sessionToken(), grNum) + if grNum == 0 { + close(s.closeWait) + } s.stop() if err != nil { log.Errorf("%s, [session.handlePackage] error:%+v", s.sessionToken(), perrors.WithStack(err)) @@ -914,6 +921,9 @@ func (s *session) gc() { // Close will be invoked by NewSessionCallback(if return error is not nil) // or (session)handleLoop automatically. It's thread safe. func (s *session) Close() { + if s.IsClosed() { + return + } s.stop() log.Infof("%s closed now. its current gr num is %d", s.sessionToken(), s.grNum.Load()) } diff --git a/util/logger.go b/util/logger.go index bb163bf2..f5b74cea 100644 --- a/util/logger.go +++ b/util/logger.go @@ -26,14 +26,29 @@ import ( type Logger interface { Info(args ...any) Warn(args ...any) - Error(args ...any) + Error(args ...any) error Debug(args ...any) Infof(fmt string, args ...any) Warnf(fmt string, args ...any) - Errorf(fmt string, args ...any) + Errorf(fmt string, args ...any) error Debugf(fmt string, args ...any) } +// zapLoggerAdapter adapts zap.SugaredLogger to the Logger interface +type zapLoggerAdapter struct { + *zap.SugaredLogger +} + +func (l *zapLoggerAdapter) Error(args ...any) error { + l.SugaredLogger.Error(args...) + return nil +} + +func (l *zapLoggerAdapter) Errorf(fmt string, args ...any) error { + l.SugaredLogger.Errorf(fmt, args...) + return nil +} + type LoggerLevel int8 const ( @@ -79,7 +94,7 @@ var ( func init() { zapLoggerConfig.EncoderConfig = zapLoggerEncoderConfig zapLogger, _ = zapLoggerConfig.Build() - log = zapLogger.Sugar() + log = &zapLoggerAdapter{zapLogger.Sugar()} // todo: flushes buffer when redirect log to file. // var exitSignal = make(chan os.Signal) @@ -114,7 +129,7 @@ func SetLoggerLevel(level LoggerLevel) error { if err != nil { return err } - log = zapLogger.Sugar() + log = &zapLoggerAdapter{zapLogger.Sugar()} return nil } @@ -128,7 +143,7 @@ func SetLoggerCallerDisable() error { if err != nil { return err } - log = zapLogger.Sugar() + log = &zapLoggerAdapter{zapLogger.Sugar()} return nil } @@ -163,11 +178,11 @@ func Warnf(template string, args ...any) { } // Error -func Error(args ...any) { - log.Error(args...) +func Error(args ...any) error { + return log.Error(args...) } // Errorf -func Errorf(template string, args ...any) { - log.Errorf(template, args...) +func Errorf(template string, args ...any) error { + return log.Errorf(template, args...) } From eb0c902de2f2b951fca698c29eb0291fd57739cf Mon Sep 17 00:00:00 2001 From: Wangzy Date: Mon, 22 Sep 2025 22:02:13 +0800 Subject: [PATCH 2/5] fix issue --- transport/callback.go | 4 ++-- transport/connection.go | 2 +- transport/server.go | 4 ++-- transport/session.go | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/transport/callback.go b/transport/callback.go index 4ea94c0d..ba6a0bab 100644 --- a/transport/callback.go +++ b/transport/callback.go @@ -71,7 +71,7 @@ func (t *callbacks) Add(handler, key any, callback func()) { // Guard: avoid runtime panic on non-comparable types if !isComparable(handler) || !isComparable(key) { - log.Error(perrors.New(fmt.Sprintf("callbacks.Add: non-comparable handler/key: %T, %T; ignored", handler, key))) + _ = log.Error(perrors.New(fmt.Sprintf("callbacks.Add: non-comparable handler/key: %T, %T; ignored", handler, key))) return } @@ -109,7 +109,7 @@ func (t *callbacks) Add(handler, key any, callback func()) { func (t *callbacks) Remove(handler, key any) { // Guard: avoid runtime panic on non-comparable types if !isComparable(handler) || !isComparable(key) { - log.Error(perrors.New(fmt.Sprintf("callbacks.Remove: non-comparable handler/key: %T, %T; ignored", handler, key))) + _ = log.Error(perrors.New(fmt.Sprintf("callbacks.Remove: non-comparable handler/key: %T, %T; ignored", handler, key))) return } diff --git a/transport/connection.go b/transport/connection.go index 9f3ff501..592fc770 100644 --- a/transport/connection.go +++ b/transport/connection.go @@ -340,7 +340,7 @@ func (t *gettyTCPConn) CloseConn(waitSec int) { if t.conn != nil { if writer, ok := t.writer.(*snappy.Writer); ok { if err := writer.Close(); err != nil { - log.Errorf("snappy.Writer.Close() = error:%+v", err) + _ = log.Errorf("snappy.Writer.Close() = error:%+v", err) } } if conn, ok := t.conn.(*net.TCPConn); ok { diff --git a/transport/server.go b/transport/server.go index 1528a951..700a0750 100644 --- a/transport/server.go +++ b/transport/server.go @@ -155,7 +155,7 @@ func (s *server) stop() { if err := s.server.Shutdown(ctx); err != nil { // if the log output is "shutdown ctx: context deadline exceeded", it means that // there are still some active connections. - log.Errorf("server shutdown ctx:%s error:%v", ctx, err) + _ = log.Errorf("server shutdown ctx:%s error:%v", ctx, err) } cancel() } @@ -424,7 +424,7 @@ func (s *server) runWSEventLoop(newSession NewSessionCallback) { s.lock.Unlock() err = server.Serve(s.streamListener) if err != nil { - log.Errorf("http.server.Serve(addr{%s}) = err:%+v", s.addr, perrors.WithStack(err)) + _ = log.Errorf("http.server.Serve(addr{%s}) = err:%+v", s.addr, perrors.WithStack(err)) } }() } diff --git a/transport/session.go b/transport/session.go index 577e4178..d9974078 100644 --- a/transport/session.go +++ b/transport/session.go @@ -397,7 +397,7 @@ func (s *session) WritePkg(pkg any, timeout time.Duration) (pkgBytesLenth int, s rBuf := make([]byte, size) rBuf = rBuf[:runtime.Stack(rBuf, false)] err = perrors.WithStack(fmt.Errorf("[session.WritePkg] panic session %s: err=%v\n%s", s.sessionToken(), r, rBuf)) - log.Error(err) + _ = log.Error(err) } }() From 3f948008d29e72c348a000ca9b4920da74bfa1c8 Mon Sep 17 00:00:00 2001 From: Wangzy Date: Tue, 23 Sep 2025 22:22:10 +0800 Subject: [PATCH 3/5] fix issue --- transport/server.go | 2 +- transport/session.go | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/transport/server.go b/transport/server.go index 700a0750..f85d6c85 100644 --- a/transport/server.go +++ b/transport/server.go @@ -484,7 +484,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) { s.lock.Unlock() err = server.Serve(tls.NewListener(s.streamListener, config)) if err != nil { - log.Errorf("http.server.Serve(addr{%s}) = err:%+v", s.addr, perrors.WithStack(err)) + _ = log.Errorf("http.server.Serve(addr{%s}) = err:%+v", s.addr, perrors.WithStack(err)) panic(err) } }() diff --git a/transport/session.go b/transport/session.go index d9974078..074c3190 100644 --- a/transport/session.go +++ b/transport/session.go @@ -557,14 +557,14 @@ func (s *session) run() { if s.Connection == nil || s.listener == nil || s.writer == nil { errStr := fmt.Sprintf("session{name:%s, conn:%#v, listener:%#v, writer:%#v}", s.name, s.Connection, s.listener, s.writer) - log.Error(errStr) + _ = log.Error(errStr) panic(errStr) } // call session opened s.UpdateActive() if err := s.listener.OnOpen(s); err != nil { - log.Errorf("[OnOpen] session %s, error: %#v", s.Stat(), err) + _ = log.Errorf("[OnOpen] session %s, error: %#v", s.Stat(), err) s.Close() return } @@ -582,7 +582,7 @@ func (s *session) addTask(pkg any) { f := func() { // If the session is closed, there is no need to perform CPU-intensive operations. if s.IsClosed() { - log.Errorf("[Id:%d, name=%s, endpoint=%s] Session is closed", s.ID(), s.name, s.EndPoint()) + _ = log.Errorf("[Id:%d, name=%s, endpoint=%s] Session is closed", s.ID(), s.name, s.EndPoint()) return } s.listener.OnMessage(s, pkg) @@ -625,7 +625,7 @@ func (s *session) handlePackage() { if _, ok := s.Connection.(*gettyTCPConn); ok { if s.reader == nil { errStr := fmt.Sprintf("session{name:%s, conn:%#v, reader:%#v}", s.name, s.Connection, s.reader) - log.Error(errStr) + _ = log.Error(errStr) panic(errStr) } @@ -884,7 +884,7 @@ func (s *session) stop() { rBuf = rBuf[:runtime.Stack(rBuf, false)] err := perrors.WithStack(fmt.Errorf("[session.invokeCloseCallbacks] panic session %s: err=%v\n%s", sessionToken, r, rBuf)) - log.Error(err) + _ = log.Error(err) } }() From 6974818002898f49260762cf09a50eb4859af2b3 Mon Sep 17 00:00:00 2001 From: Wangzy Date: Fri, 26 Sep 2025 21:03:39 +0800 Subject: [PATCH 4/5] fix issue --- transport/session.go | 14 +++++++------- transport/tls.go | 12 ++++++------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/transport/session.go b/transport/session.go index 074c3190..a9c2a159 100644 --- a/transport/session.go +++ b/transport/session.go @@ -603,7 +603,7 @@ func (s *session) handlePackage() { const size = 64 << 10 rBuf := make([]byte, size) rBuf = rBuf[:runtime.Stack(rBuf, false)] - log.Errorf("[session.handlePackage] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf) + _ = log.Errorf("[session.handlePackage] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf) } grNum := s.grNum.Add(-1) log.Infof("%s, [session.handlePackage] gr will exit now, left gr num %d", s.sessionToken(), grNum) @@ -612,7 +612,7 @@ func (s *session) handlePackage() { } s.stop() if err != nil { - log.Errorf("%s, [session.handlePackage] error:%+v", s.sessionToken(), perrors.WithStack(err)) + _ = log.Errorf("%s, [session.handlePackage] error:%+v", s.sessionToken(), perrors.WithStack(err)) if s != nil || s.listener != nil { s.listener.OnError(s, err) } @@ -665,7 +665,7 @@ func (s *session) handleTCPPackage() error { ctx, cancel := context.WithTimeout(context.Background(), tlsHandshaketime) defer cancel() if err := tlsConn.HandshakeContext(ctx); err != nil { - log.Errorf("[tlsConn.HandshakeContext] = error:%+v", err) + _ = log.Errorf("[tlsConn.HandshakeContext] = error:%+v", err) return perrors.Wrap(err, "tlsConn.HandshakeContext") } } @@ -702,7 +702,7 @@ func (s *session) handleTCPPackage() error { } break } - log.Errorf("%s, [session.conn.read] = error:%+v", s.sessionToken(), perrors.WithStack(err)) + _ = log.Errorf("%s, [session.conn.read] = error:%+v", s.sessionToken(), perrors.WithStack(err)) exit = true } break @@ -775,14 +775,14 @@ func (s *session) handleUDPPackage() error { continue } if err != nil { - log.Errorf("%s, [session.handleUDPPackage] = len:%d, error:%+v", + _ = log.Errorf("%s, [session.handleUDPPackage] = len:%d, error:%+v", s.sessionToken(), bufLen, perrors.WithStack(err)) err = perrors.Wrapf(err, "conn.read()") break } if bufLen == 0 { - log.Errorf("conn.read() = bufLen:%d, addr:%s, err:%+v", bufLen, addr, perrors.WithStack(err)) + _ = log.Errorf("conn.read() = bufLen:%d, addr:%s, err:%+v", bufLen, addr, perrors.WithStack(err)) continue } @@ -802,7 +802,7 @@ func (s *session) handleUDPPackage() error { continue } if pkgLen == 0 { - log.Errorf("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v", pkg, pkgLen, perrors.WithStack(err)) + _ = log.Errorf("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v", pkg, pkgLen, perrors.WithStack(err)) continue } diff --git a/transport/tls.go b/transport/tls.go index 86323eb3..3669431f 100644 --- a/transport/tls.go +++ b/transport/tls.go @@ -55,7 +55,7 @@ func (s *ServerTlsConfigBuilder) BuildTlsConfig() (*tls.Config, error) { config *tls.Config ) if certificate, err = tls.LoadX509KeyPair(s.ServerKeyCertChainPath, s.ServerPrivateKeyPath); err != nil { - log.Error(fmt.Sprintf("tls.LoadX509KeyPair(certs{%s}, privateKey{%s}) = err:%+v", + _ = log.Error(fmt.Sprintf("tls.LoadX509KeyPair(certs{%s}, privateKey{%s}) = err:%+v", s.ServerKeyCertChainPath, s.ServerPrivateKeyPath, perrors.WithStack(err))) return nil, err } @@ -68,12 +68,12 @@ func (s *ServerTlsConfigBuilder) BuildTlsConfig() (*tls.Config, error) { if s.ServerTrustCertCollectionPath != "" { certPem, err = os.ReadFile(s.ServerTrustCertCollectionPath) if err != nil { - log.Error(fmt.Errorf("os.ReadFile(certFile{%s}) = err:%+v", s.ServerTrustCertCollectionPath, perrors.WithStack(err))) + _ = log.Error(fmt.Errorf("os.ReadFile(certFile{%s}) = err:%+v", s.ServerTrustCertCollectionPath, perrors.WithStack(err))) return nil, err } certPool = x509.NewCertPool() if ok := certPool.AppendCertsFromPEM(certPem); !ok { - log.Error("failed to parse root certificate file") + _ = log.Error("failed to parse root certificate file") return nil, err } config.ClientCAs = certPool @@ -95,18 +95,18 @@ type ClientTlsConfigBuilder struct { func (c *ClientTlsConfigBuilder) BuildTlsConfig() (*tls.Config, error) { cert, err := tls.LoadX509KeyPair(c.ClientKeyCertChainPath, c.ClientPrivateKeyPath) if err != nil { - log.Error(fmt.Sprintf("Unable to load X509 Key Pair %v", err)) + _ = log.Error(fmt.Sprintf("Unable to load X509 Key Pair %v", err)) return nil, err } certBytes, err := os.ReadFile(c.ClientTrustCertCollectionPath) if err != nil { - log.Error(fmt.Sprintf("Unable to read pem file: %s", c.ClientTrustCertCollectionPath)) + _ = log.Error(fmt.Sprintf("Unable to read pem file: %s", c.ClientTrustCertCollectionPath)) return nil, err } clientCertPool := x509.NewCertPool() ok := clientCertPool.AppendCertsFromPEM(certBytes) if !ok { - log.Error("failed to parse root certificate") + _ = log.Error("failed to parse root certificate") return nil, err } return &tls.Config{ From f61e2cbd11b16a031984a8df571cf6716c7bdf35 Mon Sep 17 00:00:00 2001 From: Wangzy Date: Sat, 27 Sep 2025 18:03:52 +0800 Subject: [PATCH 5/5] fix issue --- examples/echo/tcp-echo/client/app/echo.go | 2 +- examples/echo/tcp-echo/client/app/handler.go | 4 ++-- examples/echo/tcp-echo/client/app/readwriter.go | 2 +- examples/echo/tcp-echo/server/app/echo.go | 2 +- examples/echo/tcp-echo/server/app/handler.go | 4 ++-- examples/echo/tcp-echo/server/app/readwriter.go | 2 +- 6 files changed, 8 insertions(+), 8 deletions(-) diff --git a/examples/echo/tcp-echo/client/app/echo.go b/examples/echo/tcp-echo/client/app/echo.go index fffbb418..8302cad3 100644 --- a/examples/echo/tcp-echo/client/app/echo.go +++ b/examples/echo/tcp-echo/client/app/echo.go @@ -132,7 +132,7 @@ func (p *EchoPackage) Unmarshal(buf *bytes.Buffer) (int, error) { return 0, err } if p.H.Magic != echoPkgMagic { - log.Error("@p.H.Magic{%x}, right magic{%x}", p.H.Magic, echoPkgMagic) + _ = log.Error("@p.H.Magic{%x}, right magic{%x}", p.H.Magic, echoPkgMagic) return 0, ErrIllegalMagic } if buf.Len() < (int)(p.H.Len) { diff --git a/examples/echo/tcp-echo/client/app/handler.go b/examples/echo/tcp-echo/client/app/handler.go index 32f41d16..6b264356 100644 --- a/examples/echo/tcp-echo/client/app/handler.go +++ b/examples/echo/tcp-echo/client/app/handler.go @@ -66,7 +66,7 @@ func (h *EchoMessageHandler) OnClose(session getty.Session) { func (h *EchoMessageHandler) OnMessage(session getty.Session, pkg any) { p, ok := pkg.(*EchoPackage) if !ok { - log.Error("illegal packge{%#v}", pkg) + _ = log.Error("illegal packge{%#v}", pkg) return } @@ -77,7 +77,7 @@ func (h *EchoMessageHandler) OnMessage(session getty.Session, pkg any) { func (h *EchoMessageHandler) OnCron(session getty.Session) { clientEchoSession, err := client.getClientEchoSession(session) if err != nil { - log.Error("client.getClientSession(session{%s}) = error{%#v}", session.Stat(), err) + _ = log.Error("client.getClientSession(session{%s}) = error{%#v}", session.Stat(), err) return } if conf.sessionTimeout.Nanoseconds() < time.Since(session.GetActive()).Nanoseconds() { diff --git a/examples/echo/tcp-echo/client/app/readwriter.go b/examples/echo/tcp-echo/client/app/readwriter.go index e6505845..b81ef959 100644 --- a/examples/echo/tcp-echo/client/app/readwriter.go +++ b/examples/echo/tcp-echo/client/app/readwriter.go @@ -68,7 +68,7 @@ func (h *EchoPackageHandler) Write(ss getty.Session, pkg any) ([]byte, error) { startTime = time.Now() if echoPkg, ok = pkg.(*EchoPackage); !ok { - log.Error("illegal pkg:%+v\n", pkg) + _ = log.Error("illegal pkg:%+v\n", pkg) return nil, errors.New("invalid echo package") } diff --git a/examples/echo/tcp-echo/server/app/echo.go b/examples/echo/tcp-echo/server/app/echo.go index a79270b8..c2eab8f6 100644 --- a/examples/echo/tcp-echo/server/app/echo.go +++ b/examples/echo/tcp-echo/server/app/echo.go @@ -132,7 +132,7 @@ func (p *EchoPackage) Unmarshal(buf *bytes.Buffer) (int, error) { return 0, err } if p.H.Magic != echoPkgMagic { - log.Error("@p.H.Magic{%x}, right magic{%x}", p.H.Magic, echoPkgMagic) + _ = log.Error("@p.H.Magic{%x}, right magic{%x}", p.H.Magic, echoPkgMagic) return 0, ErrIllegalMagic } if buf.Len() < (int)(p.H.Len) { diff --git a/examples/echo/tcp-echo/server/app/handler.go b/examples/echo/tcp-echo/server/app/handler.go index 56c5ef1b..dc47df7e 100644 --- a/examples/echo/tcp-echo/server/app/handler.go +++ b/examples/echo/tcp-echo/server/app/handler.go @@ -133,13 +133,13 @@ func (h *EchoMessageHandler) OnClose(session getty.Session) { func (h *EchoMessageHandler) OnMessage(session getty.Session, pkg any) { p, ok := pkg.(*EchoPackage) if !ok { - log.Error("illegal packge{%#v}", pkg) + _ = log.Error("illegal packge{%#v}", pkg) return } handler, ok := h.handlers[p.H.Command] if !ok { - log.Error("illegal command{%d}", p.H.Command) + _ = log.Error("illegal command{%d}", p.H.Command) return } err := handler.Handle(session, p) diff --git a/examples/echo/tcp-echo/server/app/readwriter.go b/examples/echo/tcp-echo/server/app/readwriter.go index b190c0e3..6c8d11ba 100644 --- a/examples/echo/tcp-echo/server/app/readwriter.go +++ b/examples/echo/tcp-echo/server/app/readwriter.go @@ -68,7 +68,7 @@ func (h *EchoPackageHandler) Write(ss getty.Session, pkg any) ([]byte, error) { startTime = time.Now() if echoPkg, ok = pkg.(*EchoPackage); !ok { - log.Error("illegal pkg:%+v\n", pkg) + _ = log.Error("illegal pkg:%+v\n", pkg) return nil, errors.New("invalid echo package") }