From aba5ba519ce896dba41bc2e5b8974405032abc14 Mon Sep 17 00:00:00 2001 From: Shinnosuke Okada Date: Sat, 14 Jun 2025 04:55:09 -0400 Subject: [PATCH 1/4] Implement reconnect delay with decorrelated jitter in netSink --- net_sink.go | 55 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/net_sink.go b/net_sink.go index d4a42c0..e7c888d 100644 --- a/net_sink.go +++ b/net_sink.go @@ -5,6 +5,7 @@ import ( "bytes" "fmt" "math" + "math/rand/v2" "net" "os" "strconv" @@ -23,9 +24,10 @@ type Logger interface { } const ( - defaultRetryInterval = time.Second * 3 - defaultDialTimeout = defaultRetryInterval / 2 - defaultWriteTimeout = time.Second + baseReconnectDelay = 3 * time.Second + maxReconnectDelay = time.Minute + defaultDialTimeout = baseReconnectDelay / 2 + defaultWriteTimeout = time.Second flushInterval = time.Second logOnEveryNDroppedBytes = 1 << 15 // Log once per 32kb of dropped stats @@ -102,7 +104,8 @@ func NewNetSink(opts ...SinkOption) FlushableSink { log: &loggingSink{writer: os.Stderr, now: time.Now}, // TODO (CEV): auto loading from the env is bad and should be removed. - conf: GetSettings(), + conf: GetSettings(), + reconnectDelay: baseReconnectDelay, } for _, opt := range opts { opt.apply(s) @@ -129,15 +132,16 @@ func NewNetSink(opts ...SinkOption) FlushableSink { } type netSink struct { - conn net.Conn - outc chan *bytes.Buffer - retryc chan *bytes.Buffer - mu sync.Mutex - bufWriter *bufio.Writer - doFlush chan chan struct{} - droppedBytes uint64 - log Logger - conf Settings + conn net.Conn + outc chan *bytes.Buffer + retryc chan *bytes.Buffer + mu sync.Mutex + bufWriter *bufio.Writer + doFlush chan chan struct{} + droppedBytes uint64 + log Logger + conf Settings + reconnectDelay time.Duration } type sinkWriter struct { @@ -282,8 +286,6 @@ func (s *netSink) run() { for { if s.conn == nil { if err := s.connect(addr); err != nil { - s.log.Warnf("connection error: %s", err) - // If the previous reconnect attempt failed, drain the flush // queue to prevent Flush() from blocking indefinitely. if reconnectFailed { @@ -291,11 +293,30 @@ func (s *netSink) run() { } reconnectFailed = true - // TODO (CEV): don't sleep on the first retry - time.Sleep(defaultRetryInterval) + // Decorrelated Jitter: sleep = min(cap, random_between(base, prev_sleep * 3)) + prevSleep := s.reconnectDelay + upperBound := prevSleep * 3 + + randomRange := upperBound - baseReconnectDelay + randomPart := time.Duration(rand.Int64N(int64(randomRange))) + nextSleep := baseReconnectDelay + randomPart + + if nextSleep > maxReconnectDelay { + nextSleep = maxReconnectDelay + } + + s.log.Warnf("connection error: %s, reconnecting in %s", err, nextSleep) + time.Sleep(nextSleep) + s.reconnectDelay = nextSleep + continue } + + if reconnectFailed { + s.log.Warnf("reconnected to %s", addr) + } reconnectFailed = false + s.reconnectDelay = baseReconnectDelay } // Handle buffers that need to be retried first, if they exist. From aee2300c5ad29c7faf7f0cc3286742b90ca844eb Mon Sep 17 00:00:00 2001 From: Shinnosuke Okada Date: Sun, 15 Jun 2025 01:32:15 -0400 Subject: [PATCH 2/4] Refactor to make the backoff logic unit testable --- net_sink.go | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/net_sink.go b/net_sink.go index e7c888d..232f944 100644 --- a/net_sink.go +++ b/net_sink.go @@ -293,17 +293,7 @@ func (s *netSink) run() { } reconnectFailed = true - // Decorrelated Jitter: sleep = min(cap, random_between(base, prev_sleep * 3)) - prevSleep := s.reconnectDelay - upperBound := prevSleep * 3 - - randomRange := upperBound - baseReconnectDelay - randomPart := time.Duration(rand.Int64N(int64(randomRange))) - nextSleep := baseReconnectDelay + randomPart - - if nextSleep > maxReconnectDelay { - nextSleep = maxReconnectDelay - } + nextSleep := calculateNextSleep(s.reconnectDelay) s.log.Warnf("connection error: %s, reconnecting in %s", err, nextSleep) time.Sleep(nextSleep) @@ -440,3 +430,22 @@ func (b *buffer) WriteUnit64(val uint64) { func (b *buffer) WriteFloat64(val float64) { *b = strconv.AppendFloat(*b, val, 'f', 6, 64) } + +func calculateNextSleep(prevSleep time.Duration) time.Duration { + // Decorrelated Jitter: sleep = min(cap, random_between(base, prev_sleep * 3)) + upperBound := prevSleep * 3 + + var nextSleep time.Duration + if upperBound > baseReconnectDelay { + randomRange := upperBound - baseReconnectDelay + jitter := time.Duration(rand.Int64N(int64(randomRange))) + nextSleep = baseReconnectDelay + jitter + } else { + nextSleep = baseReconnectDelay + } + + if nextSleep > maxReconnectDelay { + nextSleep = maxReconnectDelay + } + return nextSleep +} From f0caf20eca787bbe80df82f56895a5a4cdfcdb40 Mon Sep 17 00:00:00 2001 From: Shinnosuke Okada Date: Sun, 15 Jun 2025 01:33:12 -0400 Subject: [PATCH 3/4] Add unit tests --- go.mod | 7 +++++++ go.sum | 10 ++++++++++ net_sink_test.go | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+) diff --git a/go.mod b/go.mod index 75bdc60..37953a7 100644 --- a/go.mod +++ b/go.mod @@ -3,3 +3,10 @@ module github.com/lyft/gostats go 1.18 require github.com/kelseyhightower/envconfig v1.4.0 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/testify v1.10.0 + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum index 8642a1a..35b8098 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,12 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/net_sink_test.go b/net_sink_test.go index de3203c..66826b5 100644 --- a/net_sink_test.go +++ b/net_sink_test.go @@ -16,6 +16,8 @@ import ( "sync/atomic" "testing" "time" + + "github.com/stretchr/testify/assert" ) func foreverNow() time.Time { @@ -878,3 +880,35 @@ func BenchmarkFlushTimer(b *testing.B) { sink.FlushTimer("TestTImer.___f=i.__tag1=v1", float64(i)/3) } } + +func TestCalculateNextSleep(t *testing.T) { + t.Parallel() + + // 1. Test that the sleep duration is within the expected bounds. + // Run it a few times to get some randomness. + for i := 0; i < 100; i++ { + // Start with the base delay + d := calculateNextSleep(baseReconnectDelay) + assert.GreaterOrEqual(t, d, baseReconnectDelay) + assert.Less(t, d, baseReconnectDelay*3) + + // Try with a larger previous delay + prev := 10 * time.Second + d = calculateNextSleep(prev) + assert.GreaterOrEqual(t, d, baseReconnectDelay) + assert.Less(t, d, prev*3) + } + + // 2. Test the cap + d := calculateNextSleep(maxReconnectDelay) + assert.GreaterOrEqual(t, d, baseReconnectDelay) + assert.LessOrEqual(t, d, maxReconnectDelay) + + // 3. Test edge case where prevSleep * 3 <= base. + // It should default back to the base. + d = calculateNextSleep(baseReconnectDelay / 3) + assert.Equal(t, baseReconnectDelay, d) + + d = calculateNextSleep(0) + assert.Equal(t, baseReconnectDelay, d) +} From a3b9f3003f3f13afc00e308295d6dec01a088ed2 Mon Sep 17 00:00:00 2001 From: Shinnosuke Okada Date: Sun, 15 Jun 2025 01:34:18 -0400 Subject: [PATCH 4/4] Address race conditions with the new max timeout --- net_sink_test.go | 8 ++++---- net_util_test.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/net_sink_test.go b/net_sink_test.go index 66826b5..edf372c 100644 --- a/net_sink_test.go +++ b/net_sink_test.go @@ -667,12 +667,12 @@ func testNetSinkReconnect(t *testing.T, protocol string) { // This test is flaky with UDP and the race detector, but good // to have so we log instead of fail the test. if protocol == "udp" { - stat := ts.WaitForStat(replaceFatalWithLog{t}, defaultRetryInterval*3) + stat := ts.WaitForStat(replaceFatalWithLog{t}, baseReconnectDelay*5) if stat != "" && stat != expected { t.Fatalf("stats got: %q want: %q", stat, expected) } } else { - stat := ts.WaitForStat(t, defaultRetryInterval*3) + stat := ts.WaitForStat(t, baseReconnectDelay*5) if stat != expected { t.Fatalf("stats got: %q want: %q", stat, expected) } @@ -721,7 +721,7 @@ func testNetSinkReconnectFailure(t *testing.T, protocol string) { select { case <-flushed: // Ok - case <-time.After(defaultRetryInterval * 2): + case <-time.After(baseReconnectDelay * 5): t.Fatalf("Only %d of %d Flush() calls succeeded", atomic.LoadInt64(flushCount), N) } @@ -843,7 +843,7 @@ func testNetSinkIntegration(t *testing.T, protocol string) { if err != nil { t.Fatal(err) } - case <-time.After(defaultRetryInterval * 2): + case <-time.After(baseReconnectDelay * 5): t.Fatal("Timed out waiting for command to exit") } }) diff --git a/net_util_test.go b/net_util_test.go index 13ac2b0..683b4b5 100644 --- a/net_util_test.go +++ b/net_util_test.go @@ -231,7 +231,7 @@ func (s *netTestSink) Restart(t testing.TB, resetBuffer bool) { func (s *netTestSink) WaitForStat(t testing.TB, timeout time.Duration) string { t.Helper() if timeout <= 0 { - timeout = defaultRetryInterval * 2 + timeout = baseReconnectDelay * 5 } to := time.NewTimer(timeout) defer to.Stop()