@@ -78,7 +78,7 @@ class MessageDispatcher {
7878 private final MessageWaiter messagesWaiter ;
7979
8080 // Maps ID to "total expiration time". If it takes longer than this, stop extending.
81- private final ConcurrentMap <AckHandler , Instant > pendingMessages = new ConcurrentHashMap <>();
81+ private final ConcurrentMap <String , AckHandler > pendingMessages = new ConcurrentHashMap <>();
8282
8383 private final LinkedBlockingQueue <String > pendingAcks = new LinkedBlockingQueue <>();
8484 private final LinkedBlockingQueue <String > pendingNacks = new LinkedBlockingQueue <>();
@@ -133,16 +133,25 @@ private class AckHandler implements ApiFutureCallback<AckReply> {
133133 private final String ackId ;
134134 private final int outstandingBytes ;
135135 private final long receivedTimeMillis ;
136+ private final Instant totalExpiration ;
136137
137- AckHandler (String ackId , int outstandingBytes ) {
138+ AckHandler (String ackId , int outstandingBytes , Instant totalExpiration ) {
138139 this .ackId = ackId ;
139140 this .outstandingBytes = outstandingBytes ;
140- receivedTimeMillis = clock .millisTime ();
141+ this .receivedTimeMillis = clock .millisTime ();
142+ this .totalExpiration = totalExpiration ;
141143 }
142144
143- private void onBoth (LinkedBlockingQueue <String > destination ) {
144- pendingMessages .remove (this );
145- destination .add (ackId );
145+ /** Stop extending deadlines for this message and free flow control. */
146+ private void forget () {
147+ if (pendingMessages .remove (ackId ) == null ) {
148+ /*
149+ * We're forgetting the message for the second time. Probably because we ran out of total
150+ * expiration, forget the message, then the user finishes working on the message, and forget
151+ * again. Turn the second forget into a no-op so we don't free twice.
152+ */
153+ return ;
154+ }
146155 flowController .release (1 , outstandingBytes );
147156 messagesWaiter .incrementPendingMessages (-1 );
148157 processOutstandingBatches ();
@@ -154,7 +163,8 @@ public void onFailure(Throwable t) {
154163 Level .WARNING ,
155164 "MessageReceiver failed to processes ack ID: " + ackId + ", the message will be nacked." ,
156165 t );
157- onBoth (pendingNacks );
166+ pendingNacks .add (ackId );
167+ forget ();
158168 }
159169
160170 @ Override
@@ -174,7 +184,8 @@ public void onSuccess(AckReply reply) {
174184 default :
175185 throw new IllegalArgumentException (String .format ("AckReply: %s not supported" , reply ));
176186 }
177- onBoth (destination );
187+ destination .add (ackId );
188+ forget ();
178189 }
179190 }
180191
@@ -327,18 +338,32 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
327338 doneCallback .run ();
328339 return ;
329340 }
330- messagesWaiter .incrementPendingMessages (messages .size ());
331-
332341
333342 Instant totalExpiration = now ().plus (maxAckExtensionPeriod );
334343 OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch (doneCallback );
335344 for (ReceivedMessage message : messages ) {
336345 AckHandler ackHandler =
337- new AckHandler (message .getAckId (), message .getMessage ().getSerializedSize ());
346+ new AckHandler (
347+ message .getAckId (), message .getMessage ().getSerializedSize (), totalExpiration );
348+ if (pendingMessages .putIfAbsent (message .getAckId (), ackHandler ) != null ){
349+ // putIfAbsent puts ackHandler if ackID isn't previously mapped, then return the previously-mapped element.
350+ // If the previous element is not null, we already have the message and the new one is definitely a duplicate.
351+ // Don't nack this, because that'd also nack the one we already have in queue.
352+
353+ // TODO(pongad): We could update the total expiration time, but I'm not 100% sure how that plays with
354+ // various resources. Think about this more.
355+ continue ;
356+ }
338357 outstandingBatch .addMessage (message , ackHandler );
339358 pendingReceipts .add (message .getAckId ());
340- pendingMessages .put (ackHandler , totalExpiration );
341359 }
360+
361+ if (outstandingBatch .messages .isEmpty ()) {
362+ doneCallback .run ();
363+ return ;
364+ }
365+
366+ messagesWaiter .incrementPendingMessages (outstandingBatch .messages .size ());
342367 synchronized (outstandingMessageBatches ) {
343368 outstandingMessageBatches .add (outstandingBatch );
344369 }
@@ -398,6 +423,14 @@ public void nack() {
398423 @ Override
399424 public void run () {
400425 try {
426+ if (ackHandler .totalExpiration .plusSeconds (messageDeadlineSeconds .get ()).isBefore (now ())) {
427+ // Message expired while waiting. We don't extend these messages anymore,
428+ // so it was probably sent to someone else. Don't work on it.
429+ // Don't nack it either, because we'd be nacking someone else's message.
430+ ackHandler .forget ();
431+ return ;
432+ }
433+
401434 receiver .receiveMessage (message , consumer );
402435 } catch (Exception e ) {
403436 response .setException (e );
@@ -433,35 +466,26 @@ void extendDeadlines() {
433466 Instant now = now ();
434467 Instant extendTo = now .plusSeconds (extendSeconds );
435468
436- int count = 0 ;
437- Iterator <Map .Entry <AckHandler , Instant >> it = pendingMessages .entrySet ().iterator ();
438- while (it .hasNext ()) {
439- Map .Entry <AckHandler , Instant > entry = it .next ();
440- String ackId = entry .getKey ().ackId ;
441- Instant totalExpiration = entry .getValue ();
442- // TODO(pongad): PendingModifyAckDeadline is created to dance around polling pull,
443- // since one modack RPC only takes one expiration.
444- // Whenever we delete polling pull, we should also delete PendingModifyAckDeadline,
445- // and just construct StreamingPullRequest directly.
469+ for (Map .Entry <String , AckHandler > entry : pendingMessages .entrySet ()) {
470+ String ackId = entry .getKey ();
471+ Instant totalExpiration = entry .getValue ().totalExpiration ;
446472 if (totalExpiration .isAfter (extendTo )) {
447473 modack .ackIds .add (ackId );
448- count ++;
449474 continue ;
450475 }
451- it .remove ();
476+
477+ // forget removes from pendingMessages; this is OK, concurrent maps can
478+ // handle concurrent iterations and modifications.
479+ entry .getValue ().forget ();
452480 if (totalExpiration .isAfter (now )) {
453481 int sec = Math .max (1 , (int ) now .until (totalExpiration , ChronoUnit .SECONDS ));
454482 modacks .add (new PendingModifyAckDeadline (sec , ackId ));
455- count ++;
456- } else {
457- flowController .release (1 , entry .getKey ().outstandingBytes );
458- messagesWaiter .incrementPendingMessages (-1 );
459483 }
460484 }
485+ logger .log (Level .FINER , "Sending {0} modacks" , modack .ackIds .size () + modacks .size ());
461486 modacks .add (modack );
462- logger .log (Level .FINER , "Sending {0} modacks" , count );
463487
464- List <String > acksToSend = Collections .< String > emptyList ();
488+ List <String > acksToSend = Collections .emptyList ();
465489 ackProcessor .sendAckOperations (acksToSend , modacks );
466490 }
467491
0 commit comments