Skip to content

Commit a01c226

Browse files
MichaelGHSegclaude
andcommitted
Fix rate-limited Looper deferral to preserve overflow messages
When the Looper defers batch submission due to rate limiting and the trigger was a batch-size overflow, the overflow message was being lost because it had already been consumed from the queue but not added to the batch. Now re-offers the overflow message back to the queue so it can be retried on the next flush cycle. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent b3fe60e commit a01c226

2 files changed

Lines changed: 67 additions & 0 deletions

File tree

analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,16 @@ public void run() {
407407
if (isRateLimited() && message != StopMessage.STOP) {
408408
log.print(DEBUG, "Rate-limited. Deferring batch submission.");
409409
// Don't clear messages — they'll be picked up on the next flush trigger
410+
if (batchSizeLimitReached) {
411+
// Preserve overflow message while deferring submission due to rate limiting.
412+
// This message was consumed from the queue but not added to the current batch.
413+
if (!messageQueue.offer(message)) {
414+
log.print(
415+
ERROR,
416+
"Failed to preserve overflow message while rate-limited; message may be dropped.");
417+
}
418+
batchSizeLimitReached = false;
419+
}
410420
} else {
411421
Batch batch = Batch.create(CONTEXT, new ArrayList<>(messages), writeKey);
412422
log.print(

analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import static org.mockito.Mockito.never;
1212
import static org.mockito.Mockito.spy;
1313
import static org.mockito.Mockito.timeout;
14+
import static org.mockito.Mockito.doReturn;
1415
import static org.mockito.Mockito.times;
1516
import static org.mockito.Mockito.verify;
1617
import static org.mockito.Mockito.verifyNoInteractions;
@@ -151,6 +152,62 @@ public void flushInsertsPoison() throws InterruptedException {
151152
verify(messageQueue).put(FlushMessage.POISON);
152153
}
153154

155+
@Test
156+
public void rateLimitedDeferralPreservesOverflowMessage() throws InterruptedException {
157+
// Use a real queue with real messages. Two large messages trigger batchSizeLimitReached
158+
// on the second one. StopMessage ends the Looper (bypasses rate-limit on shutdown).
159+
LinkedBlockingQueue<Message> localQueue = new LinkedBlockingQueue<>();
160+
161+
// Create messages large enough that the second exceeds BATCH_MAX_SIZE (512000 bytes).
162+
// ~300KB each: first fits, second triggers batchSizeLimitReached.
163+
String largePayload = new String(new char[300000]).replace('\0', 'x');
164+
Map<String, Object> largeProps = new java.util.HashMap<>();
165+
largeProps.put("data", largePayload);
166+
167+
TrackMessage firstMessage =
168+
TrackMessage.builder("first").userId("user").properties(largeProps).build();
169+
TrackMessage overflowMessage =
170+
TrackMessage.builder("overflow").userId("user").properties(largeProps).build();
171+
172+
localQueue.put(firstMessage);
173+
localQueue.put(overflowMessage);
174+
localQueue.put(StopMessage.STOP);
175+
176+
// Pass isShutDown=true to prevent the constructor from auto-starting a Looper
177+
// (which would race with our manually-created Looper and consume queue messages).
178+
AnalyticsClient client =
179+
new AnalyticsClient(
180+
localQueue,
181+
null,
182+
segmentService,
183+
50,
184+
TimeUnit.HOURS.toMillis(1),
185+
0,
186+
MAX_BATCH_SIZE,
187+
log,
188+
threadFactory,
189+
networkExecutor,
190+
Collections.singletonList(callback),
191+
new AtomicBoolean(true),
192+
writeKey,
193+
new Gson(),
194+
DEFAULT_MAX_TOTAL_BACKOFF_DURATION_MS,
195+
DEFAULT_MAX_RATE_LIMIT_DURATION_MS);
196+
197+
// Set rate-limited state so the Looper defers batch submission
198+
client.setRateLimitState(60);
199+
200+
AnalyticsClient.Looper looper = client.new Looper();
201+
looper.run();
202+
203+
// After: msg1 added to messages, msg2 triggers batchSizeLimitReached,
204+
// rate-limited deferral offers msg2 back to queue, StopMessage bypasses
205+
// rate-limit and submits batch with msg1. msg2 remains in queue.
206+
assertThat(localQueue).contains(overflowMessage);
207+
// Batch with msg1 was submitted on StopMessage (shutdown always flushes)
208+
verify(networkExecutor).submit(any(Runnable.class));
209+
}
210+
154211
/** Wait until the queue is drained. */
155212
static void wait(Queue<?> queue) {
156213
// noinspection StatementWithEmptyBody

0 commit comments

Comments
 (0)