Skip to content

Commit 4dc9fb5

Browse files
committed
[GPUHEALTH-1361] Add Jitter to Intial OTLP Push to avoid thundering herd
Signed-off-by: Mukil <mukils@nvidia.com> Modify jitter Signed-off-by: Mukil <mukils@nvidia.com> modify jitter handling Signed-off-by: Mukil <mukils@nvidia.com>
1 parent acbe0a8 commit 4dc9fb5

3 files changed

Lines changed: 44 additions & 14 deletions

File tree

internal/exporter/exporter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ func (e *healthExporter) exportToHTTP(ctx context.Context, data *collector.Healt
231231
return nil
232232
}
233233

234-
newToken, err := e.httpWriter.Send(ctx, data, e.options.config.MetricsEndpoint, e.options.config.LogsEndpoint, e.options.config.RetryMaxAttempts, e.options.config.AuthToken)
234+
newToken, err := e.httpWriter.Send(ctx, data, e.options.config.MetricsEndpoint, e.options.config.LogsEndpoint, e.options.config.RetryMaxAttempts, e.options.config.AuthToken, e.options.config.Interval.Duration)
235235
if err != nil {
236236
return fmt.Errorf("failed to send data: %w", err)
237237
}

internal/exporter/writer/http.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ import (
3535
const (
3636
// defaultRetryDelay is the default delay between retry attempts
3737
defaultRetryDelay = 5 * time.Second
38+
39+
// maxInitialJitter caps the initial jitter so that even with very long
40+
// collection intervals the pre-push delay stays reasonable.
41+
maxInitialJitter = 1 * time.Minute
3842
)
3943

4044
// HTTPError represents an HTTP error with status code
@@ -56,7 +60,7 @@ type JWTRefreshFunc func(ctx context.Context) (string, error)
5660

5761
// HTTPWriter defines the interface for sending health data via HTTP
5862
type HTTPWriter interface {
59-
Send(ctx context.Context, data *collector.HealthData, metricsEndpoint string, logsEndpoint string, maxRetries int, authToken string) (newToken string, err error)
63+
Send(ctx context.Context, data *collector.HealthData, metricsEndpoint string, logsEndpoint string, maxRetries int, authToken string, collectionInterval time.Duration) (newToken string, err error)
6064
SetJWTRefreshFunc(refreshFunc JWTRefreshFunc)
6165
}
6266

@@ -81,7 +85,33 @@ func (w *httpWriter) SetJWTRefreshFunc(refreshFunc JWTRefreshFunc) {
8185
}
8286

8387
// Send sends health data to the specified endpoint
84-
func (w *httpWriter) Send(ctx context.Context, data *collector.HealthData, metricsEndpoint string, logsEndpoint string, maxRetries int, authToken string) (string, error) {
88+
func (w *httpWriter) Send(ctx context.Context, data *collector.HealthData, metricsEndpoint string, logsEndpoint string, maxRetries int, authToken string, collectionInterval time.Duration) (string, error) {
89+
// Add jitter before the initial push to spread agent requests and prevent
90+
// thundering herd when many agents share the same collection tick.
91+
// Use 5% of the collection interval, capped at maxInitialJitter
92+
jitterCap := collectionInterval / 20 // 5%
93+
if jitterCap > maxInitialJitter {
94+
jitterCap = maxInitialJitter
95+
}
96+
if jitterCap <= 0 {
97+
jitterCap = defaultRetryDelay / 2 // fallback for zero/negative intervals
98+
}
99+
100+
// Apply a further limit to the initial jitter of half the remaining context deadline.
101+
// This is to prevent the jitter from consuming the entire timeout budget.
102+
if deadline, ok := ctx.Deadline(); ok {
103+
if remaining := time.Until(deadline) / 2; remaining > 0 && jitterCap > remaining {
104+
jitterCap = remaining
105+
}
106+
}
107+
if jitter := calculateJitter(jitterCap); jitter > 0 {
108+
select {
109+
case <-ctx.Done():
110+
return "", ctx.Err()
111+
case <-time.After(jitter):
112+
}
113+
}
114+
85115
// Convert to OTLP format
86116
otlpData := w.otlpConverter.Convert(data)
87117

internal/exporter/writer/http_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func TestHTTPWriter_Send_Success(t *testing.T) {
8686
writer := NewHTTPWriter(httpClient, otlpConverter)
8787

8888
ctx := context.Background()
89-
newToken, err := writer.Send(ctx, testData, metricsServer.URL, logsServer.URL, 1, "test-token")
89+
newToken, err := writer.Send(ctx, testData, metricsServer.URL, logsServer.URL, 1, "test-token", 1*time.Minute)
9090

9191
require.NoError(t, err)
9292
assert.Empty(t, newToken)
@@ -126,7 +126,7 @@ func TestHTTPWriter_Send_EmptyMetrics(t *testing.T) {
126126
writer := NewHTTPWriter(httpClient, otlpConverter)
127127

128128
ctx := context.Background()
129-
_, err := writer.Send(ctx, testData, "", logsServer.URL, 1, "test-token")
129+
_, err := writer.Send(ctx, testData, "", logsServer.URL, 1, "test-token", 1*time.Minute)
130130

131131
require.NoError(t, err)
132132
assert.Equal(t, 1, logsRequests)
@@ -164,7 +164,7 @@ func TestHTTPWriter_Send_EmptyLogs(t *testing.T) {
164164
writer := NewHTTPWriter(httpClient, otlpConverter)
165165

166166
ctx := context.Background()
167-
_, err := writer.Send(ctx, testData, metricsServer.URL, "", 1, "test-token")
167+
_, err := writer.Send(ctx, testData, metricsServer.URL, "", 1, "test-token", 1*time.Minute)
168168

169169
require.NoError(t, err)
170170
assert.Equal(t, 1, metricsRequests)
@@ -204,7 +204,7 @@ func TestHTTPWriter_Send_RetryOnFailure(t *testing.T) {
204204
writer := NewHTTPWriter(httpClient, otlpConverter)
205205

206206
ctx := context.Background()
207-
_, err := writer.Send(ctx, testData, server.URL, "", 3, "test-token")
207+
_, err := writer.Send(ctx, testData, server.URL, "", 3, "test-token", 1*time.Minute)
208208

209209
require.NoError(t, err)
210210
assert.Equal(t, 3, attempts)
@@ -241,7 +241,7 @@ func TestHTTPWriter_Send_RetryExhausted(t *testing.T) {
241241
writer := NewHTTPWriter(httpClient, otlpConverter)
242242

243243
ctx := context.Background()
244-
_, err := writer.Send(ctx, testData, "", server.URL, maxAttempts, "test-token")
244+
_, err := writer.Send(ctx, testData, "", server.URL, maxAttempts, "test-token", 1*time.Minute)
245245

246246
require.Error(t, err)
247247
assert.Equal(t, maxAttempts, attempts)
@@ -292,7 +292,7 @@ func TestHTTPWriter_Send_UnauthorizedWithJWTRefresh(t *testing.T) {
292292
})
293293

294294
ctx := context.Background()
295-
_, err := writer.Send(ctx, testData, server.URL, "", 3, "old-token")
295+
_, err := writer.Send(ctx, testData, server.URL, "", 3, "old-token", 1*time.Minute)
296296

297297
require.NoError(t, err)
298298
assert.True(t, refreshCalled)
@@ -332,7 +332,7 @@ func TestHTTPWriter_Send_UnauthorizedJWTRefreshFails(t *testing.T) {
332332
})
333333

334334
ctx := context.Background()
335-
_, err := writer.Send(ctx, testData, "", server.URL, 2, "old-token") // Reduce retries
335+
_, err := writer.Send(ctx, testData, "", server.URL, 2, "old-token", 1*time.Minute) // Reduce retries
336336

337337
require.Error(t, err)
338338
assert.GreaterOrEqual(t, attempts, 1)
@@ -368,7 +368,7 @@ func TestHTTPWriter_Send_JWTTokenRefreshFromHeader(t *testing.T) {
368368
writer := NewHTTPWriter(httpClient, otlpConverter)
369369

370370
ctx := context.Background()
371-
returnedToken, err := writer.Send(ctx, testData, server.URL, "", 1, "test-token")
371+
returnedToken, err := writer.Send(ctx, testData, server.URL, "", 1, "test-token", 1*time.Minute)
372372

373373
require.NoError(t, err)
374374
assert.Equal(t, newToken, returnedToken)
@@ -404,7 +404,7 @@ func TestHTTPWriter_Send_ContextCancellation(t *testing.T) {
404404
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
405405
defer cancel()
406406

407-
_, err := writer.Send(ctx, testData, "", server.URL, 1, "test-token")
407+
_, err := writer.Send(ctx, testData, "", server.URL, 1, "test-token", 1*time.Minute)
408408

409409
require.Error(t, err)
410410
}
@@ -443,7 +443,7 @@ func TestHTTPWriter_Send_LogsFailure(t *testing.T) {
443443
writer := NewHTTPWriter(httpClient, otlpConverter)
444444

445445
ctx := context.Background()
446-
_, err := writer.Send(ctx, testData, metricsServer.URL, logsServer.URL, 1, "test-token")
446+
_, err := writer.Send(ctx, testData, metricsServer.URL, logsServer.URL, 1, "test-token", 1*time.Minute)
447447

448448
require.Error(t, err)
449449
assert.Contains(t, err.Error(), "failed to send critical logs data")
@@ -571,7 +571,7 @@ func TestHTTPWriter_MarshalError(t *testing.T) {
571571

572572
ctx := context.Background()
573573
// Pass empty endpoints - should not send anything
574-
newToken, err := writer.Send(ctx, testData, "", "", 1, "test-token")
574+
newToken, err := writer.Send(ctx, testData, "", "", 1, "test-token", 1*time.Minute)
575575

576576
require.NoError(t, err)
577577
assert.Empty(t, newToken)

0 commit comments

Comments
 (0)