Skip to content

Commit b3fe60e

Browse files
MichaelGHSegclaude
andcommitted
Implement unified HTTP response handling per SDD
- Remove 408/503 from Retry-After eligibility (only 429 uses Retry-After) - Add 429 pipeline blocking: shared rate-limit state on AnalyticsClient (rateLimited, rateLimitWaitUntil, rateLimitStartTime), Looper checks before submitting BatchUploadTasks - Add maxTotalBackoffDuration / maxRateLimitDuration config with 12h defaults, exposed via Analytics.Builder - Restructure BatchUploadTask.run() retry loop: RATE_LIMITED strategy sets shared state and sleeps, BACKOFF strategy tracks firstFailureTime for duration guard, success clears rate-limit state - Add tests: 408/503 use BACKOFF, 429 sets/clears rate-limit state, maxTotalBackoffDuration drops batch, maxRateLimitDuration drops batch Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 87855af commit b3fe60e

3 files changed

Lines changed: 285 additions & 53 deletions

File tree

analytics/src/main/java/com/segment/analytics/Analytics.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ public static class Builder {
152152
private int queueCapacity;
153153
private boolean forceTlsV1 = false;
154154
private GsonBuilder gsonBuilder;
155+
private long maxTotalBackoffDurationMs;
156+
private long maxRateLimitDurationMs;
155157

156158
Builder(String writeKey) {
157159
if (writeKey == null || writeKey.trim().length() == 0) {
@@ -356,6 +358,32 @@ public Builder forceTlsVersion1() {
356358
return this;
357359
}
358360

361+
/**
362+
* Set the maximum total duration for backoff-based retries before giving up on a batch. Default
363+
* is 12 hours.
364+
*/
365+
public Builder maxTotalBackoffDuration(long duration, TimeUnit unit) {
366+
long seconds = unit.toSeconds(duration);
367+
if (seconds < 1) {
368+
throw new IllegalArgumentException("maxTotalBackoffDuration must be at least 1 second.");
369+
}
370+
this.maxTotalBackoffDurationMs = unit.toMillis(duration);
371+
return this;
372+
}
373+
374+
/**
375+
* Set the maximum total duration for rate-limit (429) retries before giving up on a batch.
376+
* Default is 12 hours.
377+
*/
378+
public Builder maxRateLimitDuration(long duration, TimeUnit unit) {
379+
long seconds = unit.toSeconds(duration);
380+
if (seconds < 1) {
381+
throw new IllegalArgumentException("maxRateLimitDuration must be at least 1 second.");
382+
}
383+
this.maxRateLimitDurationMs = unit.toMillis(duration);
384+
return this;
385+
}
386+
359387
/** Create a {@link Analytics} client. */
360388
public Analytics build() {
361389
if (gsonBuilder == null) {
@@ -421,6 +449,12 @@ public Analytics build() {
421449
} else {
422450
callbacks = Collections.unmodifiableList(callbacks);
423451
}
452+
if (maxTotalBackoffDurationMs == 0) {
453+
maxTotalBackoffDurationMs = 43200 * 1000L; // 12 hours
454+
}
455+
if (maxRateLimitDurationMs == 0) {
456+
maxRateLimitDurationMs = 43200 * 1000L; // 12 hours
457+
}
424458

425459
HttpLoggingInterceptor interceptor =
426460
new HttpLoggingInterceptor(
@@ -474,7 +508,9 @@ public void log(String message) {
474508
networkExecutor,
475509
callbacks,
476510
writeKey,
477-
gson);
511+
gson,
512+
maxTotalBackoffDurationMs,
513+
maxRateLimitDurationMs);
478514

479515
return new Analytics(analyticsClient, messageTransformers, messageInterceptors, log);
480516
}

analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java

Lines changed: 127 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class AnalyticsClient {
4646
private static final String instanceId = UUID.randomUUID().toString();
4747
private static final int WAIT_FOR_THREAD_COMPLETE_S = 5;
4848
private static final int TERMINATION_TIMEOUT_S = 1;
49-
private static final long MAX_RETRY_AFTER_SECONDS = 300L;
49+
private static final long MAX_RATE_LIMITED_SECONDS = 300L;
5050

5151
static {
5252
Map<String, String> library = new LinkedHashMap<>();
@@ -72,7 +72,12 @@ public class AnalyticsClient {
7272
private final ScheduledExecutorService flushScheduler;
7373
private final AtomicBoolean isShutDown;
7474
private final String writeKey;
75+
final long maxTotalBackoffDurationMs;
76+
final long maxRateLimitDurationMs;
7577
private volatile Future<?> looperFuture;
78+
private volatile boolean rateLimited;
79+
private volatile long rateLimitWaitUntil;
80+
private volatile long rateLimitStartTime;
7681

7782
public static AnalyticsClient create(
7883
HttpUrl uploadUrl,
@@ -87,7 +92,9 @@ public static AnalyticsClient create(
8792
ExecutorService networkExecutor,
8893
List<Callback> callbacks,
8994
String writeKey,
90-
Gson gsonInstance) {
95+
Gson gsonInstance,
96+
long maxTotalBackoffDurationMs,
97+
long maxRateLimitDurationMs) {
9198
return new AnalyticsClient(
9299
new LinkedBlockingQueue<Message>(queueCapacity),
93100
uploadUrl,
@@ -102,7 +109,9 @@ public static AnalyticsClient create(
102109
callbacks,
103110
new AtomicBoolean(false),
104111
writeKey,
105-
gsonInstance);
112+
gsonInstance,
113+
maxTotalBackoffDurationMs,
114+
maxRateLimitDurationMs);
106115
}
107116

108117
public AnalyticsClient(
@@ -119,7 +128,9 @@ public AnalyticsClient(
119128
List<Callback> callbacks,
120129
AtomicBoolean isShutDown,
121130
String writeKey,
122-
Gson gsonInstance) {
131+
Gson gsonInstance,
132+
long maxTotalBackoffDurationMs,
133+
long maxRateLimitDurationMs) {
123134
this.messageQueue = messageQueue;
124135
this.uploadUrl = uploadUrl;
125136
this.service = service;
@@ -133,6 +144,8 @@ public AnalyticsClient(
133144
this.isShutDown = isShutDown;
134145
this.writeKey = writeKey;
135146
this.gsonInstance = gsonInstance;
147+
this.maxTotalBackoffDurationMs = maxTotalBackoffDurationMs;
148+
this.maxRateLimitDurationMs = maxRateLimitDurationMs;
136149

137150
this.currentQueueSizeInBytes = 0;
138151

@@ -216,6 +229,30 @@ public void flush() {
216229
}
217230
}
218231

232+
void setRateLimitState(long retryAfterSeconds) {
233+
long now = System.currentTimeMillis();
234+
if (rateLimitStartTime == 0) {
235+
rateLimitStartTime = now;
236+
}
237+
rateLimitWaitUntil = now + (retryAfterSeconds * 1000);
238+
rateLimited = true;
239+
}
240+
241+
void clearRateLimitState() {
242+
rateLimited = false;
243+
rateLimitWaitUntil = 0;
244+
rateLimitStartTime = 0;
245+
}
246+
247+
boolean isRateLimited() {
248+
if (!rateLimited) return false;
249+
if (System.currentTimeMillis() >= rateLimitWaitUntil) {
250+
rateLimited = false;
251+
return false;
252+
}
253+
return true;
254+
}
255+
219256
public void shutdown() {
220257
if (isShutDown.compareAndSet(false, true)) {
221258
final long start = System.currentTimeMillis();
@@ -365,38 +402,45 @@ public void run() {
365402
Boolean isOverflow = messages.size() >= size;
366403

367404
if (!messages.isEmpty() && (isOverflow || isBlockingSignal || batchSizeLimitReached)) {
368-
Batch batch = Batch.create(CONTEXT, new ArrayList<>(messages), writeKey);
369-
log.print(
370-
VERBOSE,
371-
"Batching %s message(s) into batch %s.",
372-
batch.batch().size(),
373-
batch.sequence());
374-
try {
375-
networkExecutor.submit(
376-
BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries));
377-
} catch (RejectedExecutionException e) {
405+
// Skip submission if rate-limited (unless this is a StopMessage — always flush on
406+
// shutdown)
407+
if (isRateLimited() && message != StopMessage.STOP) {
408+
log.print(DEBUG, "Rate-limited. Deferring batch submission.");
409+
// Don't clear messages — they'll be picked up on the next flush trigger
410+
} else {
411+
Batch batch = Batch.create(CONTEXT, new ArrayList<>(messages), writeKey);
378412
log.print(
379-
ERROR,
380-
e,
381-
"Failed to submit batch %s to network executor during shutdown. Batch will be lost.",
413+
VERBOSE,
414+
"Batching %s message(s) into batch %s.",
415+
batch.batch().size(),
382416
batch.sequence());
383-
// Notify callbacks about the failure
384-
for (Message msg : batch.batch()) {
385-
for (Callback callback : callbacks) {
386-
callback.failure(msg, e);
417+
try {
418+
networkExecutor.submit(
419+
BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries));
420+
} catch (RejectedExecutionException e) {
421+
log.print(
422+
ERROR,
423+
e,
424+
"Failed to submit batch %s to network executor during shutdown. Batch will be lost.",
425+
batch.sequence());
426+
// Notify callbacks about the failure
427+
for (Message msg : batch.batch()) {
428+
for (Callback callback : callbacks) {
429+
callback.failure(msg, e);
430+
}
387431
}
388432
}
389-
}
390433

391-
currentBatchSize.set(0);
392-
messages.clear();
393-
if (batchSizeLimitReached) {
394-
// If this is true that means the last message that would make us go over the limit
395-
// was not added,
396-
// add it to the now cleared messages list so its not lost
397-
messages.add(message);
434+
currentBatchSize.set(0);
435+
messages.clear();
436+
if (batchSizeLimitReached) {
437+
// If this is true that means the last message that would make us go over the limit
438+
// was not added,
439+
// add it to the now cleared messages list so its not lost
440+
messages.add(message);
441+
}
442+
batchSizeLimitReached = false;
398443
}
399-
batchSizeLimitReached = false;
400444
}
401445
}
402446
} catch (InterruptedException e) {
@@ -442,7 +486,7 @@ private void notifyCallbacksWithException(Batch batch, Exception exception) {
442486
private enum RetryStrategy {
443487
NONE,
444488
BACKOFF,
445-
RETRY_AFTER
489+
RATE_LIMITED
446490
}
447491

448492
private static final class UploadResult {
@@ -496,13 +540,14 @@ UploadResult upload(int attempt) {
496540
batch.sequence(),
497541
status,
498542
retryAfterSeconds);
499-
return new UploadResult(RetryStrategy.RETRY_AFTER, retryAfterSeconds);
543+
return new UploadResult(RetryStrategy.RATE_LIMITED, retryAfterSeconds);
500544
}
501545
client.log.print(
502546
DEBUG,
503-
"Status %s did not have a valid Retry-After header for batch %s.",
547+
"Status %s did not have a valid Retry-After header for batch %s. Using backoff.",
504548
status,
505549
batch.sequence());
550+
return new UploadResult(RetryStrategy.RATE_LIMITED, 0);
506551
}
507552

508553
if (isStatusRetryWithBackoff(status)) {
@@ -536,7 +581,7 @@ UploadResult upload(int attempt) {
536581
}
537582

538583
private static boolean isStatusRetryAfterEligible(int status) {
539-
return status == 429 || status == 408 || status == 503;
584+
return status == 429;
540585
}
541586

542587
private static Long parseRetryAfterSeconds(String headerValue) {
@@ -552,8 +597,8 @@ private static Long parseRetryAfterSeconds(String headerValue) {
552597
if (seconds <= 0L) {
553598
return null;
554599
}
555-
if (seconds > MAX_RETRY_AFTER_SECONDS) {
556-
return MAX_RETRY_AFTER_SECONDS;
600+
if (seconds > MAX_RATE_LIMITED_SECONDS) {
601+
return MAX_RATE_LIMITED_SECONDS;
557602
}
558603
return seconds;
559604
} catch (NumberFormatException ignored) {
@@ -566,31 +611,66 @@ public void run() {
566611
int totalAttempts = 0; // counts every HTTP attempt (for header and error message)
567612
int backoffAttempts = 0; // counts attempts that consume backoff-based retries
568613
int maxBackoffAttempts = maxRetries + 1; // preserve existing semantics
614+
long firstFailureTime = 0;
569615

570616
while (true) {
571617
totalAttempts++;
572618
UploadResult result = upload(totalAttempts);
573619

574620
if (result.strategy == RetryStrategy.NONE) {
621+
client.clearRateLimitState();
575622
return;
576623
}
577624

578-
if (result.strategy == RetryStrategy.RETRY_AFTER) {
579-
try {
580-
TimeUnit.SECONDS.sleep(result.retryAfterSeconds);
581-
} catch (InterruptedException e) {
582-
client.log.print(
583-
DEBUG,
584-
"Thread interrupted while waiting for Retry-After for batch %s.",
585-
batch.sequence());
586-
Thread.currentThread().interrupt();
587-
return;
625+
if (result.strategy == RetryStrategy.RATE_LIMITED) {
626+
// Set global rate-limit state (blocks Looper from submitting more)
627+
client.setRateLimitState(result.retryAfterSeconds);
628+
629+
// Check maxRateLimitDuration
630+
if (client.rateLimitStartTime > 0
631+
&& System.currentTimeMillis() - client.rateLimitStartTime
632+
> client.maxRateLimitDurationMs) {
633+
client.clearRateLimitState();
634+
break;
635+
}
636+
637+
// Sleep for Retry-After then retry this batch
638+
if (result.retryAfterSeconds > 0) {
639+
try {
640+
TimeUnit.SECONDS.sleep(result.retryAfterSeconds);
641+
} catch (InterruptedException e) {
642+
client.log.print(
643+
DEBUG,
644+
"Thread interrupted while waiting for Retry-After for batch %s.",
645+
batch.sequence());
646+
Thread.currentThread().interrupt();
647+
return;
648+
}
649+
} else {
650+
// No valid Retry-After header — use backoff-style wait
651+
backoffAttempts++;
652+
if (backoffAttempts >= maxBackoffAttempts) {
653+
break;
654+
}
655+
try {
656+
backo.sleep(backoffAttempts - 1);
657+
} catch (InterruptedException e) {
658+
client.log.print(
659+
DEBUG, "Thread interrupted while backing off for batch %s.", batch.sequence());
660+
Thread.currentThread().interrupt();
661+
return;
662+
}
588663
}
589-
// Do not count Retry-After based retries against maxRetries.
664+
// Retry-After with valid header does not count against maxRetries.
590665
continue;
591666
}
592667

593668
// BACKOFF strategy
669+
if (firstFailureTime == 0) firstFailureTime = System.currentTimeMillis();
670+
if (System.currentTimeMillis() - firstFailureTime > client.maxTotalBackoffDurationMs) {
671+
break;
672+
}
673+
594674
backoffAttempts++;
595675
if (backoffAttempts >= maxBackoffAttempts) {
596676
break;

0 commit comments

Comments
 (0)