Skip to content

Commit 62e60b5

Browse files
committed
GEODE-10420: Finish distribute() work if interrupted (#7854)
It is possible that an event of which a gateway sender is to be notified is lost if during the process the thread is interrupted. The reason is that the distribute() method in the AbstractGatewaySender when it catches the InterruptedException at some point, just returns, but does not put the event in the queue and neither drops it. The fix consists of handling the event correctly (put it in the queue or drop it) if the InterruptedException is caught but when the method returns set again the interrupt flag so that the caller is aware.
1 parent 8b75180 commit 62e60b5

3 files changed

Lines changed: 186 additions & 9 deletions

File tree

geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,9 @@ public EntryEventImpl(
340340
op = other.op;
341341
distributedMember = other.distributedMember;
342342
filterInfo = other.filterInfo;
343-
keyInfo = other.keyInfo.isDistKeyInfo() ? new DistTxKeyInfo((DistTxKeyInfo) other.keyInfo)
344-
: new KeyInfo(other.keyInfo);
343+
keyInfo =
344+
other.getKeyInfo().isDistKeyInfo() ? new DistTxKeyInfo((DistTxKeyInfo) other.getKeyInfo())
345+
: new KeyInfo(other.getKeyInfo());
345346
if (other.getRawCallbackArgument() instanceof GatewaySenderEventCallbackArgument) {
346347
keyInfo.setCallbackArg((new GatewaySenderEventCallbackArgument(
347348
(GatewaySenderEventCallbackArgument) other.getRawCallbackArgument())));

geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1039,6 +1039,7 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event,
10391039
List<Integer> allRemoteDSIds, boolean isLastEventInTransaction) {
10401040

10411041
final boolean isDebugEnabled = logger.isDebugEnabled();
1042+
boolean wasInterrupted = false;
10421043

10431044
// released by this method or transfers ownership to TmpQueueEvent
10441045
@Released
@@ -1153,15 +1154,17 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event,
11531154
}
11541155
}
11551156
if (enqueuedAllTempQueueEvents) {
1156-
try {
1157-
while (!getLifeCycleLock().readLock().tryLock(10, TimeUnit.MILLISECONDS)) {
1158-
if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, clonedEvent)) {
1159-
return;
1157+
while (true) {
1158+
try {
1159+
while (!getLifeCycleLock().readLock().tryLock(10, TimeUnit.MILLISECONDS)) {
1160+
if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, clonedEvent)) {
1161+
return;
1162+
}
11601163
}
1164+
break;
1165+
} catch (InterruptedException e) {
1166+
wasInterrupted = true;
11611167
}
1162-
} catch (InterruptedException e) {
1163-
Thread.currentThread().interrupt();
1164-
return;
11651168
}
11661169
}
11671170
}
@@ -1210,6 +1213,9 @@ this, getId(), operation, clonedEvent),
12101213
if (freeClonedEvent) {
12111214
clonedEvent.release(); // fix for bug 48035
12121215
}
1216+
if (wasInterrupted) {
1217+
Thread.currentThread().interrupt();
1218+
}
12131219
}
12141220
}
12151221

geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,28 @@
1818
import static org.mockito.Mockito.mock;
1919
import static org.mockito.Mockito.when;
2020

21+
import java.io.IOException;
2122
import java.util.ArrayList;
2223
import java.util.Collection;
24+
import java.util.Collections;
2325
import java.util.HashSet;
26+
import java.util.List;
2427
import java.util.Set;
28+
import java.util.concurrent.CountDownLatch;
2529

2630
import org.junit.Test;
2731

32+
import org.apache.geode.cache.CacheException;
33+
import org.apache.geode.cache.DataPolicy;
34+
import org.apache.geode.cache.EntryEvent;
35+
import org.apache.geode.cache.Operation;
2836
import org.apache.geode.cache.Region;
2937
import org.apache.geode.cache.wan.GatewayQueueEvent;
38+
import org.apache.geode.distributed.internal.DistributionAdvisor;
39+
import org.apache.geode.internal.cache.EntryEventImpl;
40+
import org.apache.geode.internal.cache.EnumListenerEvent;
41+
import org.apache.geode.internal.cache.InternalRegion;
42+
import org.apache.geode.internal.cache.KeyInfo;
3043
import org.apache.geode.internal.cache.RegionQueue;
3144

3245
public class AbstractGatewaySenderTest {
@@ -58,4 +71,161 @@ public void getSynchronizationEventCanHandleRegionIsNullCase() {
5871

5972
assertThat(event).isSameAs(gatewaySenderEvent);
6073
}
74+
75+
@Test
76+
public void distributeFinishesWorkWhenInterrupted() throws InterruptedException {
77+
DummyGatewaySenderEventProcessor processor = new DummyGatewaySenderEventProcessor();
78+
TestableGatewaySender gatewaySender = new TestableGatewaySender(processor);
79+
EnumListenerEvent operationType = EnumListenerEvent.AFTER_CREATE;
80+
EntryEventImpl event = mock(EntryEventImpl.class);
81+
when(event.getKeyInfo()).thenReturn(mock(KeyInfo.class));
82+
Operation operation = mock(Operation.class);
83+
when(operation.isLocal()).thenReturn(false);
84+
when(operation.isExpiration()).thenReturn(false);
85+
when(event.getOperation()).thenReturn(operation);
86+
InternalRegion region = mock(InternalRegion.class);
87+
when(region.getDataPolicy()).thenReturn(DataPolicy.PARTITION);
88+
when(event.getRegion()).thenReturn(region);
89+
List<Integer> allRemoteDSIds = Collections.singletonList(1);
90+
91+
CountDownLatch lockAcquiredLatch = new CountDownLatch(1);
92+
CountDownLatch unlockLatch = new CountDownLatch(1);
93+
94+
// Get lifeCycleLock in write mode in new thread so that
95+
// the thread calling distribute will not be able
96+
// to acquire it
97+
Thread thread = new Thread(() -> {
98+
gatewaySender.getLifeCycleLock().writeLock().lock();
99+
lockAcquiredLatch.countDown();
100+
try {
101+
unlockLatch.await();
102+
} catch (InterruptedException ignore) {
103+
}
104+
gatewaySender.getLifeCycleLock().writeLock().unlock();
105+
});
106+
thread.start();
107+
lockAcquiredLatch.await();
108+
109+
// Send interrupted and then call distribute
110+
Thread.currentThread().interrupt();
111+
gatewaySender.distribute(operationType, event, allRemoteDSIds, true);
112+
113+
unlockLatch.countDown();
114+
115+
// Check that the interrupted exception has been reset
116+
assertThat(Thread.currentThread().isInterrupted()).isTrue();
117+
// Check that the work was finished even if the interrupt signal was set
118+
assertThat(processor.getTimesRegisterEventDroppedInPrimaryQueueCalled()).isEqualTo(1);
119+
}
120+
121+
public static class TestableGatewaySender extends AbstractGatewaySender {
122+
private int isRunningTimesCalled = 0;
123+
124+
public TestableGatewaySender(AbstractGatewaySenderEventProcessor eventProcessor) {
125+
this.eventProcessor = eventProcessor;
126+
enqueuedAllTempQueueEvents = true;
127+
}
128+
129+
@Override
130+
public void fillInProfile(DistributionAdvisor.Profile profile) {}
131+
132+
@Override
133+
public void start() {}
134+
135+
@Override
136+
public boolean isPrimary() {
137+
return true;
138+
}
139+
140+
@Override
141+
public void startWithCleanQueue() {}
142+
143+
@Override
144+
public void stop() {}
145+
146+
@Override
147+
public void setModifiedEventId(EntryEventImpl clonedEvent) {}
148+
149+
@Override
150+
public GatewaySenderStats getStatistics() {
151+
return mock(GatewaySenderStats.class);
152+
}
153+
154+
@Override
155+
public GatewaySenderAdvisor getSenderAdvisor() {
156+
return mock(GatewaySenderAdvisor.class);
157+
}
158+
159+
@Override
160+
public boolean isRunning() {
161+
if (isRunningTimesCalled++ == 0) {
162+
return true;
163+
}
164+
return false;
165+
}
166+
167+
@Override
168+
public String getId() {
169+
return "test";
170+
}
171+
}
172+
173+
public static class DummyGatewaySenderEventProcessor extends AbstractGatewaySenderEventProcessor {
174+
175+
private int timesEnqueueEventCalled = 0;
176+
private int timesRegisterEventDroppedInPrimaryQueueCalled = 0;
177+
178+
public DummyGatewaySenderEventProcessor() {
179+
super("", new DummyGatewaySender(), null);
180+
}
181+
182+
@Override
183+
public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue,
184+
boolean isLastEventInTransaction) throws IOException, CacheException {
185+
timesEnqueueEventCalled++;
186+
}
187+
188+
public int getTimesEnqueueEventCalled() {
189+
return timesEnqueueEventCalled;
190+
}
191+
192+
@Override
193+
protected void initializeMessageQueue(String id, boolean cleanQueues) {}
194+
195+
@Override
196+
protected void rebalance() {}
197+
198+
public int getTimesRegisterEventDroppedInPrimaryQueueCalled() {
199+
return timesRegisterEventDroppedInPrimaryQueueCalled;
200+
}
201+
202+
@Override
203+
protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) {
204+
timesRegisterEventDroppedInPrimaryQueueCalled++;
205+
}
206+
207+
@Override
208+
public void initializeEventDispatcher() {}
209+
210+
@Override
211+
protected void enqueueEvent(GatewayQueueEvent event) {}
212+
}
213+
214+
public static class DummyGatewaySender extends AbstractGatewaySender {
215+
@Override
216+
public void fillInProfile(DistributionAdvisor.Profile profile) {}
217+
218+
@Override
219+
public void start() {}
220+
221+
@Override
222+
public void startWithCleanQueue() {}
223+
224+
@Override
225+
public void stop() {}
226+
227+
@Override
228+
public void setModifiedEventId(EntryEventImpl clonedEvent) {}
229+
230+
}
61231
}

0 commit comments

Comments
 (0)