Skip to content

Commit 1869f2c

Browse files
committed
GEODE-10380: use waitingThreadPool to notify dispatcher at re_auth (#7801)
(cherry picked from commit b3fef2a)
1 parent c30c3ab commit 1869f2c

2 files changed

Lines changed: 49 additions & 1 deletion

File tree

geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.List;
2727
import java.util.Map;
2828
import java.util.Set;
29+
import java.util.concurrent.ExecutorService;
2930
import java.util.concurrent.atomic.AtomicBoolean;
3031
import java.util.concurrent.atomic.AtomicInteger;
3132
import java.util.concurrent.atomic.AtomicReference;
@@ -727,7 +728,11 @@ public void notifyReAuthentication() {
727728
if (_messageDispatcher == null) {
728729
return;
729730
}
730-
_messageDispatcher.notifyReAuthentication();
731+
732+
// use another thread to do the notification so that the server operation won't be blocked
733+
ExecutorService threadPool =
734+
_cache.getDistributionManager().getExecutors().getWaitingThreadPool();
735+
threadPool.submit(() -> _messageDispatcher.notifyReAuthentication());
731736
}
732737

733738
/**

geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import static org.assertj.core.api.Assertions.assertThat;
2121
import static org.mockito.ArgumentMatchers.any;
2222
import static org.mockito.ArgumentMatchers.anyBoolean;
23+
import static org.mockito.Mockito.doAnswer;
2324
import static org.mockito.Mockito.doNothing;
25+
import static org.mockito.Mockito.doReturn;
2426
import static org.mockito.Mockito.mock;
2527
import static org.mockito.Mockito.never;
2628
import static org.mockito.Mockito.spy;
@@ -30,19 +32,26 @@
3032

3133
import java.net.InetAddress;
3234
import java.net.Socket;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.atomic.AtomicBoolean;
3337

3438
import org.apache.shiro.subject.Subject;
3539
import org.junit.Before;
40+
import org.junit.Rule;
3641
import org.junit.Test;
42+
import org.mockito.stubbing.Answer;
3743

3844
import org.apache.geode.StatisticsFactory;
45+
import org.apache.geode.distributed.internal.DistributionManager;
46+
import org.apache.geode.distributed.internal.OperationExecutors;
3947
import org.apache.geode.internal.cache.Conflatable;
4048
import org.apache.geode.internal.cache.InternalCache;
4149
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy.CacheClientProxyStatsFactory;
4250
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy.MessageDispatcherFactory;
4351
import org.apache.geode.internal.security.SecurityService;
4452
import org.apache.geode.internal.serialization.KnownVersion;
4553
import org.apache.geode.internal.statistics.StatisticsClock;
54+
import org.apache.geode.test.junit.rules.ExecutorServiceRule;
4655

4756
public class CacheClientProxyTest {
4857
private CacheClientProxy proxyWithSingleUser;
@@ -71,6 +80,7 @@ public void before() throws Exception {
7180
when(socket.getInetAddress()).thenReturn(inetAddress);
7281
when(notifier.getAcceptorStats()).thenReturn(stats);
7382
id = mock(ClientProxyMembershipID.class);
83+
when(id.getDurableId()).thenReturn("proxy_id");
7484
version = KnownVersion.TEST_VERSION;
7585
securityService = mock(SecurityService.class);
7686
subject = mock(Subject.class);
@@ -175,4 +185,37 @@ public void close_multiUser_calls_ClientUserAuthsCleanUp() {
175185
verify(subject, never()).logout();
176186
verify(clientUserAuths, times(1)).cleanup(anyBoolean());
177187
}
188+
189+
@Rule
190+
public ExecutorServiceRule executorService = new ExecutorServiceRule();
191+
192+
@Test
193+
public void notifyReAuthenticationIsNotBlocked() {
194+
CacheClientProxy spy = spy(proxyWithSingleUser);
195+
MessageDispatcher dispatcher = mock(MessageDispatcher.class);
196+
doReturn(dispatcher).when(spy).createMessageDispatcher(any());
197+
spy.initializeMessageDispatcher();
198+
DistributionManager manager = mock(DistributionManager.class);
199+
OperationExecutors executors = mock(OperationExecutors.class);
200+
ExecutorService executor = executorService.getExecutorService();
201+
when(cache.getDistributionManager()).thenReturn(manager);
202+
when(manager.getExecutors()).thenReturn(executors);
203+
when(executors.getWaitingThreadPool()).thenReturn(executor);
204+
205+
AtomicBoolean updated = new AtomicBoolean(false);
206+
207+
// simulating a blocked message dispatcher when notify reauth
208+
doAnswer((Answer<Void>) invocation -> {
209+
while (!updated.get()) {
210+
Thread.sleep(200);
211+
}
212+
return null;
213+
}).when(dispatcher).notifyReAuthentication();
214+
215+
// proxy.notifyReauthentication won't be blocked
216+
spy.notifyReAuthentication();
217+
assertThat(updated.get()).isFalse();
218+
}
219+
220+
178221
}

0 commit comments

Comments
 (0)