Skip to content

Commit c46c89c

Browse files
authored
fix: race-safe asyncSem, safe type assertion, atomic shouldNotify (#21)
* fix: race-safe asyncSem, safe type assertion in GetTraceId, atomic shouldNotify - Replace bare asyncSem channel variable with atomic.Pointer[chan struct{}] to eliminate data race between SetMaxAsyncNotifications and NotifyAsync. Retain sync.Once guard to prevent repeated channel replacement. Reduce default concurrency from 1000 to 20. - Use comma-ok type assertion in GetTraceId() to prevent panic when tracerID option holds a non-string value. - Convert customError.shouldNotify from bool to atomic.Bool to fix data race between concurrent ShouldNotify/Notified calls in NotifyAsync goroutines. * fix: address PR review comments - Update SetMaxAsyncNotifications doc to reflect race-safe behavior - Rewrite TestNotifyAsync_BoundedConcurrency to actually assert drops when semaphore is full - Remove outdated comment about pre-existing shouldNotify race * fix: rewrite bounded concurrency test to be deterministic Pre-fill the semaphore channel directly instead of relying on timing. Assert that len(ch) == cap(ch) after NotifyAsync to verify the drop path was taken. No time.Sleep, no unused channels, no flaky cleanup. * fix: escape $ in Makefile bench target for correct regex Use ^$$ so Make passes literal ^$ to the shell, ensuring go test -run matches no test names (benchmarks only).
1 parent 55be20d commit c46c89c

4 files changed

Lines changed: 113 additions & 23 deletions

File tree

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@ lint:
1414
go tool govulncheck ./...
1515

1616
bench:
17-
go test -run=^$ -bench=. -benchmem ./...
17+
go test -run=^$$ -bench=. -benchmem ./...

errors.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,18 +60,18 @@ type customError struct {
6060
basePath string // snapshot of basePath at capture time
6161
cause error
6262
wrapped error // immediate parent for Unwrap() chain; may differ from cause
63-
shouldNotify bool
63+
shouldNotify atomic.Bool
6464
status *grpcstatus.Status
6565
}
6666

6767
// ShouldNotify returns true if the error should be reported to notifiers.
6868
func (c *customError) ShouldNotify() bool {
69-
return c.shouldNotify
69+
return c.shouldNotify.Load()
7070
}
7171

7272
// Notified marks the error as having been notified (or not).
7373
func (c *customError) Notified(status bool) {
74-
c.shouldNotify = !status
74+
c.shouldNotify.Store(!status)
7575
}
7676

7777
// Error returns the error message.
@@ -233,30 +233,30 @@ func WrapWithSkipAndStatus(err error, msg string, skip int, status *grpcstatus.S
233233
//if we have stack information reuse that
234234
if e, ok := err.(ErrorExt); ok {
235235
c := &customError{
236-
Msg: msg + e.Error(),
237-
cause: e.Cause(),
238-
wrapped: err, // preserve full chain for errors.Is/errors.As
239-
status: status,
240-
shouldNotify: true,
236+
Msg: msg + e.Error(),
237+
cause: e.Cause(),
238+
wrapped: err, // preserve full chain for errors.Is/errors.As
239+
status: status,
241240
}
241+
c.shouldNotify.Store(true)
242242

243243
c.stack = e.Callers()
244244
if ce, ok := e.(*customError); ok {
245245
c.basePath = ce.basePath
246246
}
247247
if n, ok := e.(NotifyExt); ok {
248-
c.shouldNotify = n.ShouldNotify()
248+
c.shouldNotify.Store(n.ShouldNotify())
249249
}
250250
return c
251251
}
252252

253253
c := &customError{
254-
Msg: msg + err.Error(),
255-
cause: err,
256-
wrapped: err,
257-
shouldNotify: true,
258-
status: status,
254+
Msg: msg + err.Error(),
255+
cause: err,
256+
wrapped: err,
257+
status: status,
259258
}
259+
c.shouldNotify.Store(true)
260260
c.captureStack(skip + 1)
261261
return c
262262

notifier/notifier.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"strconv"
99
"strings"
1010
"sync"
11+
"sync/atomic"
1112
"time"
1213

1314
gobrake "github.com/airbrake/gobrake/v5"
@@ -35,12 +36,20 @@ var (
3536
hostname string
3637
traceHeader string = "x-trace-id"
3738

38-
// asyncSem is a semaphore that bounds the number of concurrent async
39-
// notification goroutines. When full, new notifications are dropped
40-
// to prevent goroutine explosion under sustained error bursts.
41-
asyncSem = make(chan struct{}, 1000)
4239
)
4340

41+
// asyncSem is a semaphore that bounds the number of concurrent async
42+
// notification goroutines. When full, new notifications are dropped
43+
// to prevent goroutine explosion under sustained error bursts.
44+
// Stored as atomic.Pointer to eliminate the race between SetMaxAsyncNotifications
45+
// and NotifyAsync goroutines reading the channel variable.
46+
var asyncSem atomic.Pointer[chan struct{}]
47+
48+
func init() {
49+
ch := make(chan struct{}, 20)
50+
asyncSem.Store(&ch)
51+
}
52+
4453
const (
4554
tracerID = "tracerId"
4655
)
@@ -50,11 +59,13 @@ var asyncSemOnce sync.Once
5059
// SetMaxAsyncNotifications sets the maximum number of concurrent async
5160
// notification goroutines. When the limit is reached, new async notifications
5261
// are dropped to prevent goroutine explosion under sustained error bursts.
53-
// Default is 1000. Can only be called once; subsequent calls are no-ops.
62+
// Default is 20. The first successful call wins; subsequent calls are no-ops.
63+
// It is safe to call concurrently with NotifyAsync.
5464
func SetMaxAsyncNotifications(n int) {
5565
if n > 0 {
5666
asyncSemOnce.Do(func() {
57-
asyncSem = make(chan struct{}, n)
67+
ch := make(chan struct{}, n)
68+
asyncSem.Store(&ch)
5869
})
5970
}
6071
}
@@ -67,7 +78,7 @@ func NotifyAsync(err error, rawData ...interface{}) error {
6778
if err == nil {
6879
return nil
6980
}
70-
sem := asyncSem
81+
sem := *asyncSem.Load()
7182
select {
7283
case sem <- struct{}{}:
7384
data := append([]interface{}(nil), rawData...)
@@ -553,7 +564,9 @@ func SetTraceId(ctx context.Context) context.Context {
553564
func GetTraceId(ctx context.Context) string {
554565
if o := options.FromContext(ctx); o != nil {
555566
if data, found := o.Get(tracerID); found {
556-
return data.(string)
567+
if traceID, ok := data.(string); ok {
568+
return traceID
569+
}
557570
}
558571
}
559572
if logCtx := loggers.FromContext(ctx); logCtx != nil {

notifier/notifier_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package notifier
2+
3+
import (
4+
"context"
5+
"sync"
6+
"testing"
7+
8+
"github.com/go-coldbrew/errors"
9+
"github.com/go-coldbrew/options"
10+
)
11+
12+
func TestGetTraceId_NonStringValue(t *testing.T) {
13+
// Regression test: GetTraceId must not panic when the tracerID
14+
// option holds a non-string value.
15+
ctx := options.AddToOptions(context.Background(), tracerID, 12345)
16+
17+
// Before the fix this panicked with "interface conversion: interface {} is int, not string".
18+
got := GetTraceId(ctx)
19+
if got != "" {
20+
t.Errorf("expected empty string for non-string tracerID, got %q", got)
21+
}
22+
}
23+
24+
func TestGetTraceId_StringValue(t *testing.T) {
25+
ctx := options.AddToOptions(context.Background(), tracerID, "abc-123")
26+
27+
got := GetTraceId(ctx)
28+
if got != "abc-123" {
29+
t.Errorf("expected 'abc-123', got %q", got)
30+
}
31+
}
32+
33+
func TestNotifyAsync_BoundedConcurrency(t *testing.T) {
34+
// Use a 1-slot semaphore and pre-fill it to simulate a full pool.
35+
ch := make(chan struct{}, 1)
36+
ch <- struct{}{} // pre-fill: pool is now full
37+
asyncSem.Store(&ch)
38+
t.Cleanup(func() {
39+
// Restore default. Drain first so cleanup is safe.
40+
for len(ch) > 0 {
41+
<-ch
42+
}
43+
def := make(chan struct{}, 20)
44+
asyncSem.Store(&def)
45+
})
46+
47+
// With the semaphore full, NotifyAsync must drop (hit default branch).
48+
// It should not block and should not spawn a goroutine.
49+
NotifyAsync(errors.New("should-drop"))
50+
51+
// Verify the semaphore is still exactly full (1 token, capacity 1).
52+
// If NotifyAsync had somehow acquired a slot, len would be < cap.
53+
if len(ch) != cap(ch) {
54+
t.Errorf("expected semaphore to remain full (len=%d, cap=%d); NotifyAsync should have dropped", len(ch), cap(ch))
55+
}
56+
}
57+
58+
func TestSetMaxAsyncNotifications_ConcurrentAccess(t *testing.T) {
59+
// Regression test: SetMaxAsyncNotifications and NotifyAsync must not
60+
// race on the asyncSem variable. Run with -race to verify.
61+
var wg sync.WaitGroup
62+
63+
wg.Add(2)
64+
go func() {
65+
defer wg.Done()
66+
for i := 0; i < 100; i++ {
67+
SetMaxAsyncNotifications(50)
68+
}
69+
}()
70+
go func() {
71+
defer wg.Done()
72+
for i := 0; i < 20; i++ {
73+
NotifyAsync(errors.New("race test"))
74+
}
75+
}()
76+
wg.Wait()
77+
}

0 commit comments

Comments
 (0)