Skip to content

Commit 73b7179

Browse files
committed
modify jitter handling
Signed-off-by: Mukil <mukils@nvidia.com>
1 parent 3b0e700 commit 73b7179

1 file changed

Lines changed: 29 additions & 22 deletions

File tree

internal/exporter/writer/http.go

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,32 @@ func (w *httpWriter) SetJWTRefreshFunc(refreshFunc JWTRefreshFunc) {
8686

8787
// Send sends health data to the specified endpoint
8888
func (w *httpWriter) Send(ctx context.Context, data *collector.HealthData, metricsEndpoint string, logsEndpoint string, maxRetries int, authToken string, collectionInterval time.Duration) (string, error) {
89+
// Add jitter before the initial push to spread agent requests and prevent
90+
// thundering herd when many agents share the same collection tick.
91+
// Use 5% of the collection interval, capped at maxInitialJitter
92+
jitterCap := collectionInterval / 20 // 5%
93+
if jitterCap > maxInitialJitter {
94+
jitterCap = maxInitialJitter
95+
}
96+
if jitterCap <= 0 {
97+
jitterCap = defaultRetryDelay / 2 // fallback for zero/negative intervals
98+
}
99+
100+
// Apply a further limit to the initial jitter of half the remaining context deadline.
101+
// This is to prevent the jitter from consuming the entire timeout budget.
102+
if deadline, ok := ctx.Deadline(); ok {
103+
if remaining := time.Until(deadline) / 2; remaining > 0 && jitterCap > remaining {
104+
jitterCap = remaining
105+
}
106+
}
107+
if jitter := calculateJitter(jitterCap); jitter > 0 {
108+
select {
109+
case <-ctx.Done():
110+
return "", ctx.Err()
111+
case <-time.After(jitter):
112+
}
113+
}
114+
89115
// Convert to OTLP format
90116
otlpData := w.otlpConverter.Convert(data)
91117

@@ -98,7 +124,7 @@ func (w *httpWriter) Send(ctx context.Context, data *collector.HealthData, metri
98124
return "", fmt.Errorf("failed to marshal OTLP metrics data: %w", err)
99125
}
100126

101-
token, err := w.sendOTLPRequestWithRetry(ctx, metricsBytes, "metrics", data.CollectionID, data.MachineID, metricsEndpoint, maxRetries, authToken, collectionInterval)
127+
token, err := w.sendOTLPRequestWithRetry(ctx, metricsBytes, "metrics", data.CollectionID, data.MachineID, metricsEndpoint, maxRetries, authToken)
102128
if err != nil {
103129
log.Logger.Errorw("Failed to send metrics data after all retries",
104130
"collection_id", data.CollectionID,
@@ -117,7 +143,7 @@ func (w *httpWriter) Send(ctx context.Context, data *collector.HealthData, metri
117143
return newToken, fmt.Errorf("failed to marshal OTLP logs data: %w", err)
118144
}
119145

120-
token, err := w.sendOTLPRequestWithRetry(ctx, logsBytes, "logs", data.CollectionID, data.MachineID, logsEndpoint, maxRetries, authToken, collectionInterval)
146+
token, err := w.sendOTLPRequestWithRetry(ctx, logsBytes, "logs", data.CollectionID, data.MachineID, logsEndpoint, maxRetries, authToken)
121147
if err != nil {
122148
return newToken, fmt.Errorf("failed to send critical logs data (includes events): %w", err)
123149
} else if token != "" {
@@ -130,7 +156,7 @@ func (w *httpWriter) Send(ctx context.Context, data *collector.HealthData, metri
130156
}
131157

132158
// sendOTLPRequestWithRetry sends the OTLP data with retry logic
133-
func (w *httpWriter) sendOTLPRequestWithRetry(ctx context.Context, reqData []byte, dataType, collectionID, machineID, endpoint string, maxRetries int, authToken string, collectionInterval time.Duration) (string, error) {
159+
func (w *httpWriter) sendOTLPRequestWithRetry(ctx context.Context, reqData []byte, dataType, collectionID, machineID, endpoint string, maxRetries int, authToken string) (string, error) {
134160
if maxRetries <= 0 {
135161
maxRetries = 1 // At least one attempt
136162
}
@@ -139,25 +165,6 @@ func (w *httpWriter) sendOTLPRequestWithRetry(ctx context.Context, reqData []byt
139165
var lastErr error
140166
jwtRefreshAttempted := false
141167

142-
// Add jitter before the initial push to spread agent requests and prevent
143-
// thundering herd when many agents share the same collection tick.
144-
// Use 5% of the collection interval, capped at maxInitialJitter.
145-
initialMaxJitter := collectionInterval / 20 // 5%
146-
if initialMaxJitter > maxInitialJitter {
147-
initialMaxJitter = maxInitialJitter
148-
}
149-
if initialMaxJitter <= 0 {
150-
initialMaxJitter = defaultRetryDelay / 2 // fallback for zero/negative intervals
151-
}
152-
initialJitter := calculateJitter(initialMaxJitter)
153-
if initialJitter > 0 {
154-
select {
155-
case <-ctx.Done():
156-
return "", ctx.Err()
157-
case <-time.After(initialJitter):
158-
}
159-
}
160-
161168
for attempt := 1; attempt <= maxRetries; attempt++ {
162169
token, err := w.sendOTLPRequest(ctx, reqData, dataType, collectionID, machineID, endpoint, currentAuthToken)
163170
if err == nil {

0 commit comments

Comments
 (0)