Skip to content

Commit ab12b35

Browse files
authored
fix(pd): add timeout and null-safety to getLeaderGrpcAddress() (#2961)
* fix(pd): add timeout and null-safety to getLeaderGrpcAddress() The bolt RPC call in getLeaderGrpcAddress() returns null in Docker bridge network mode, causing NPE when a follower PD node attempts to discover the leader's gRPC address. This breaks store registration and partition distribution when any node other than pd0 wins the raft leader election. Add a bounded timeout using the configured rpc-timeout, null-check the RPC response, and fall back to deriving the address from the raft endpoint IP when the RPC fails. Closes #2959
1 parent 5adf14c commit ab12b35

3 files changed

Lines changed: 237 additions & 24 deletions

File tree

hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.concurrent.CountDownLatch;
2929
import java.util.concurrent.ExecutionException;
3030
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.TimeoutException;
3132
import java.util.concurrent.atomic.AtomicReference;
3233
import java.util.stream.Collectors;
3334

@@ -49,7 +50,6 @@
4950
import com.alipay.sofa.jraft.entity.Task;
5051
import com.alipay.sofa.jraft.error.RaftError;
5152
import com.alipay.sofa.jraft.option.NodeOptions;
52-
import com.alipay.sofa.jraft.option.RaftOptions;
5353
import com.alipay.sofa.jraft.option.RpcOptions;
5454
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
5555
import com.alipay.sofa.jraft.rpc.RpcServer;
@@ -86,8 +86,12 @@ public synchronized boolean init(PDConfig.Raft config) {
8686
}
8787
this.config = config;
8888

89+
// Wire configured rpc timeout into RaftRpcClient so the Bolt transport
90+
// timeout and the future.get() caller timeout in getLeaderGrpcAddress() are consistent.
8991
raftRpcClient = new RaftRpcClient();
90-
raftRpcClient.init(new RpcOptions());
92+
RpcOptions rpcOptions = new RpcOptions();
93+
rpcOptions.setRpcDefaultTimeout(config.getRpcTimeout());
94+
raftRpcClient.init(rpcOptions);
9195

9296
String raftPath = config.getDataPath() + "/" + groupId;
9397
new File(raftPath).mkdirs();
@@ -119,10 +123,7 @@ public synchronized boolean init(PDConfig.Raft config) {
119123
nodeOptions.setRpcConnectTimeoutMs(config.getRpcTimeout());
120124
nodeOptions.setRpcDefaultTimeout(config.getRpcTimeout());
121125
nodeOptions.setRpcInstallSnapshotTimeout(config.getRpcTimeout());
122-
// Set the raft configuration
123-
RaftOptions raftOptions = nodeOptions.getRaftOptions();
124-
125-
nodeOptions.setEnableMetrics(true);
126+
// TODO: tune RaftOptions for PD (see hugegraph-store PartitionEngine for reference)
126127

127128
final PeerId serverId = JRaftUtils.getPeerId(config.getAddress());
128129

@@ -228,19 +229,57 @@ public PeerId getLeader() {
228229
}
229230

230231
/**
231-
* Send a message to the leader to get the grpc address;
232+
* Send a message to the leader to get the grpc address.
232233
*/
233234
public String getLeaderGrpcAddress() throws ExecutionException, InterruptedException {
234235
if (isLeader()) {
235236
return config.getGrpcAddress();
236237
}
237238

238239
if (raftNode.getLeaderId() == null) {
239-
waitingForLeader(10000);
240+
waitingForLeader(config.getRpcTimeout());
241+
}
242+
243+
// Cache leader to avoid repeated getLeaderId() calls and guard against
244+
// waitingForLeader() returning without a leader being elected.
245+
PeerId leader = raftNode.getLeaderId();
246+
if (leader == null) {
247+
throw new ExecutionException(new IllegalStateException("Leader is not ready"));
248+
}
249+
250+
RaftRpcProcessor.GetMemberResponse response = null;
251+
try {
252+
// TODO: a more complete fix would need a source of truth for the leader's
253+
// actual grpcAddress rather than deriving it from the local node's port config.
254+
response = raftRpcClient
255+
.getGrpcAddress(leader.getEndpoint().toString())
256+
.get(config.getRpcTimeout(), TimeUnit.MILLISECONDS);
257+
if (response != null && response.getGrpcAddress() != null) {
258+
return response.getGrpcAddress();
259+
}
260+
if (response == null) {
261+
log.warn("Leader RPC response is null for {}, falling back to derived address",
262+
leader);
263+
} else {
264+
log.warn("Leader gRPC address field is null in RPC response for {}, "
265+
+ "falling back to derived address", leader);
266+
}
267+
} catch (TimeoutException e) {
268+
log.warn("Timed out resolving leader gRPC address for {}, falling back to derived "
269+
+ "address", leader);
270+
} catch (ExecutionException e) {
271+
Throwable cause = e.getCause() != null ? e.getCause() : e;
272+
log.warn("Failed to resolve leader gRPC address for {}, falling back to derived "
273+
+ "address", leader, cause);
240274
}
241275

242-
return raftRpcClient.getGrpcAddress(raftNode.getLeaderId().getEndpoint().toString()).get()
243-
.getGrpcAddress();
276+
// Best-effort fallback: derive from leader raft endpoint IP + local gRPC port.
277+
// WARNING: this may be incorrect in clusters where PD nodes use different grpc.port
278+
// values, a proper fix requires a cluster-wide source of truth for gRPC addresses.
279+
String derived = leader.getEndpoint().getIp() + ":" + config.getGrpcPort();
280+
log.info("Using derived leader gRPC address {} - may be incorrect if nodes use different ports",
281+
derived);
282+
return derived;
244283
}
245284

246285
/**
@@ -322,14 +361,7 @@ public Status changePeerList(String peerList) {
322361
newPeers.parse(peerList);
323362
CountDownLatch latch = new CountDownLatch(1);
324363
this.raftNode.changePeers(newPeers, status -> {
325-
// Use compareAndSet so a late callback does not overwrite a timeout status
326364
result.compareAndSet(null, status);
327-
// Refresh inside callback so it fires even if caller already timed out
328-
// Note: changePeerList() uses Configuration.parse() which only supports
329-
// plain comma-separated peer addresses with no learner syntax.
330-
// getLearners() will always be empty here. Learner support is handled
331-
// in PDService.updatePdRaft() which uses PeerUtil.parseConfig()
332-
// and supports the /learner suffix.
333365
if (status != null && status.isOk()) {
334366
IpAuthHandler handler = IpAuthHandler.getInstance();
335367
if (handler != null) {
@@ -347,16 +379,12 @@ public Status changePeerList(String peerList) {
347379
}
348380
latch.countDown();
349381
});
350-
// Use 3x configured RPC timeout — bare await() would block forever if
351-
// the callback is never invoked (e.g. node not started / RPC failure)
352-
boolean completed = latch.await(3L * config.getRpcTimeout(),
353-
TimeUnit.MILLISECONDS);
382+
boolean completed = latch.await(3L * config.getRpcTimeout(), TimeUnit.MILLISECONDS);
354383
if (!completed && result.get() == null) {
355384
Status timeoutStatus = new Status(RaftError.EINTERNAL,
356385
"changePeerList timed out after %d ms",
357386
3L * config.getRpcTimeout());
358387
if (!result.compareAndSet(null, timeoutStatus)) {
359-
// Callback arrived just before us — keep its result
360388
timeoutStatus = null;
361389
}
362390
if (timeoutStatus != null) {
@@ -387,15 +415,15 @@ public PeerId waitingForLeader(long timeOut) {
387415
long start = System.currentTimeMillis();
388416
while ((System.currentTimeMillis() - start < timeOut) && (leader == null)) {
389417
try {
390-
this.wait(1000);
418+
long remaining = timeOut - (System.currentTimeMillis() - start);
419+
this.wait(Math.min(1000, Math.max(0, remaining)));
391420
} catch (InterruptedException e) {
392421
log.error("Raft wait for leader exception", e);
393422
}
394423
leader = getLeader();
395424
}
396425
return leader;
397426
}
398-
399427
}
400428

401429
public Node getRaftNode() {

hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/core/PDCoreSuiteTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.hugegraph.pd.core.store.HgKVStoreImplTest;
2222
import org.apache.hugegraph.pd.raft.IpAuthHandlerTest;
2323
import org.apache.hugegraph.pd.raft.RaftEngineIpAuthIntegrationTest;
24+
import org.apache.hugegraph.pd.raft.RaftEngineLeaderAddressTest;
2425
import org.junit.runner.RunWith;
2526
import org.junit.runners.Suite;
2627

@@ -40,6 +41,7 @@
4041
TaskScheduleServiceTest.class,
4142
IpAuthHandlerTest.class,
4243
RaftEngineIpAuthIntegrationTest.class,
44+
RaftEngineLeaderAddressTest.class,
4345
// StoreNodeServiceTest.class,
4446
})
4547
@Slf4j
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hugegraph.pd.raft;
19+
20+
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.ExecutionException;
22+
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.TimeoutException;
24+
25+
import org.apache.hugegraph.pd.config.PDConfig;
26+
import org.apache.hugegraph.testutil.Whitebox;
27+
import org.junit.After;
28+
import org.junit.Assert;
29+
import org.junit.Before;
30+
import org.junit.Test;
31+
32+
import com.alipay.sofa.jraft.Node;
33+
import com.alipay.sofa.jraft.entity.PeerId;
34+
import com.alipay.sofa.jraft.util.Endpoint;
35+
36+
import static org.mockito.ArgumentMatchers.anyLong;
37+
import static org.mockito.ArgumentMatchers.anyString;
38+
import static org.mockito.ArgumentMatchers.eq;
39+
import static org.mockito.Mockito.mock;
40+
import static org.mockito.Mockito.when;
41+
42+
public class RaftEngineLeaderAddressTest {
43+
44+
private static final String LEADER_IP = "10.0.0.1";
45+
private static final int GRPC_PORT = 8686;
46+
private static final String LEADER_GRPC_ADDRESS = "10.0.0.1:8686";
47+
48+
private Node originalRaftNode;
49+
private RaftRpcClient originalRaftRpcClient;
50+
private PDConfig.Raft originalConfig;
51+
52+
private Node mockNode;
53+
private RaftRpcClient mockRpcClient;
54+
private PDConfig.Raft mockConfig;
55+
private PeerId mockLeader;
56+
57+
@Before
58+
public void setUp() {
59+
RaftEngine engine = RaftEngine.getInstance();
60+
61+
// Save originals
62+
originalRaftNode = engine.getRaftNode();
63+
originalRaftRpcClient = Whitebox.getInternalState(engine, "raftRpcClient");
64+
originalConfig = Whitebox.getInternalState(engine, "config");
65+
66+
// Build mock leader PeerId with real Endpoint
67+
mockLeader = mock(PeerId.class);
68+
Endpoint endpoint = new Endpoint(LEADER_IP, 8610);
69+
when(mockLeader.getEndpoint()).thenReturn(endpoint);
70+
71+
// Build mock Node that reports itself as follower with a known leader
72+
mockNode = mock(Node.class);
73+
when(mockNode.isLeader(true)).thenReturn(false);
74+
when(mockNode.getLeaderId()).thenReturn(mockLeader);
75+
76+
// Build mock config
77+
// Use a short default timeout (100ms); specific tests may override getRpcTimeout()
78+
mockConfig = mock(PDConfig.Raft.class);
79+
when(mockConfig.getGrpcAddress()).thenReturn("127.0.0.1:" + GRPC_PORT);
80+
when(mockConfig.getGrpcPort()).thenReturn(GRPC_PORT);
81+
when(mockConfig.getRpcTimeout()).thenReturn(100);
82+
83+
// Build mock RpcClient
84+
mockRpcClient = mock(RaftRpcClient.class);
85+
86+
// Inject mocks
87+
Whitebox.setInternalState(engine, "raftNode", mockNode);
88+
Whitebox.setInternalState(engine, "raftRpcClient", mockRpcClient);
89+
Whitebox.setInternalState(engine, "config", mockConfig);
90+
}
91+
92+
@After
93+
public void tearDown() {
94+
RaftEngine engine = RaftEngine.getInstance();
95+
Whitebox.setInternalState(engine, "raftNode", originalRaftNode);
96+
Whitebox.setInternalState(engine, "raftRpcClient", originalRaftRpcClient);
97+
Whitebox.setInternalState(engine, "config", originalConfig);
98+
}
99+
100+
@Test
101+
public void testSuccessReturnsGrpcAddress() throws Exception {
102+
// RPC succeeds and returns a valid gRPC address
103+
RaftRpcProcessor.GetMemberResponse response =
104+
mock(RaftRpcProcessor.GetMemberResponse.class);
105+
when(response.getGrpcAddress()).thenReturn(LEADER_GRPC_ADDRESS);
106+
107+
CompletableFuture<RaftRpcProcessor.GetMemberResponse> future =
108+
CompletableFuture.completedFuture(response);
109+
when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future);
110+
111+
String result = RaftEngine.getInstance().getLeaderGrpcAddress();
112+
Assert.assertEquals(LEADER_GRPC_ADDRESS, result);
113+
}
114+
115+
@Test
116+
public void testTimeoutFallsBackToDerivedAddress() throws Exception {
117+
// RPC times out — should fall back to leaderIp:grpcPort
118+
CompletableFuture<RaftRpcProcessor.GetMemberResponse> future =
119+
mock(CompletableFuture.class);
120+
when(future.get(anyLong(), eq(TimeUnit.MILLISECONDS)))
121+
.thenThrow(new TimeoutException("simulated timeout"));
122+
when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future);
123+
124+
String result = RaftEngine.getInstance().getLeaderGrpcAddress();
125+
Assert.assertEquals(LEADER_IP + ":" + GRPC_PORT, result);
126+
}
127+
128+
@Test
129+
public void testRpcExceptionFallsBackToDerivedAddress() throws Exception {
130+
// RPC throws ExecutionException — should fall back to leaderIp:grpcPort
131+
CompletableFuture<RaftRpcProcessor.GetMemberResponse> future =
132+
mock(CompletableFuture.class);
133+
when(future.get(anyLong(), eq(TimeUnit.MILLISECONDS)))
134+
.thenThrow(new ExecutionException("simulated rpc failure",
135+
new RuntimeException("bolt error")));
136+
when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future);
137+
138+
String result = RaftEngine.getInstance().getLeaderGrpcAddress();
139+
Assert.assertEquals(LEADER_IP + ":" + GRPC_PORT, result);
140+
}
141+
142+
@Test
143+
public void testNullResponseFallsBackToDerivedAddress() throws Exception {
144+
// RPC returns null response — should fall back to leaderIp:grpcPort
145+
CompletableFuture<RaftRpcProcessor.GetMemberResponse> future =
146+
CompletableFuture.completedFuture(null);
147+
when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future);
148+
149+
String result = RaftEngine.getInstance().getLeaderGrpcAddress();
150+
Assert.assertEquals(LEADER_IP + ":" + GRPC_PORT, result);
151+
}
152+
153+
@Test
154+
public void testNullGrpcAddressInResponseFallsBackToDerivedAddress() throws Exception {
155+
// RPC returns a response but grpcAddress field is null — should fall back
156+
RaftRpcProcessor.GetMemberResponse response =
157+
mock(RaftRpcProcessor.GetMemberResponse.class);
158+
when(response.getGrpcAddress()).thenReturn(null);
159+
160+
CompletableFuture<RaftRpcProcessor.GetMemberResponse> future =
161+
CompletableFuture.completedFuture(response);
162+
when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future);
163+
164+
String result = RaftEngine.getInstance().getLeaderGrpcAddress();
165+
Assert.assertEquals(LEADER_IP + ":" + GRPC_PORT, result);
166+
}
167+
168+
@Test
169+
public void testNullLeaderAfterWaitThrowsExecutionException() throws Exception {
170+
// Use 0ms timeout so waitingForLeader(0) skips the wait loop and returns immediately
171+
when(mockConfig.getRpcTimeout()).thenReturn(0);
172+
// Leader is still null after waitingForLeader() — should throw ExecutionException
173+
when(mockNode.getLeaderId()).thenReturn(null);
174+
175+
try {
176+
RaftEngine.getInstance().getLeaderGrpcAddress();
177+
Assert.fail("Expected ExecutionException");
178+
} catch (ExecutionException e) {
179+
Assert.assertTrue(e.getCause() instanceof IllegalStateException);
180+
Assert.assertEquals("Leader is not ready", e.getCause().getMessage());
181+
}
182+
}
183+
}

0 commit comments

Comments
 (0)