Skip to content

Commit 1ed7379

Browse files
committed
fix: resolve NPE and timeout issues in RaftEngine.getLeaderGrpcAddress()
- Cache leader PeerId after waitingForLeader() and null-check to avoid NPE when leader election times out - Remove incorrect fallback that derived leader gRPC address from local node's port, causing silent misroutes in multi-node clusters - Wire config.getRpcTimeout() into RaftRpcClient's RpcOptions so Bolt transport timeout is consistent with future.get() caller timeout - Replace hardcoded 10000ms in waitingForLeader() with config.getRpcTimeout() - Remove unused RaftOptions variable and dead imports (ReplicatorGroup, ThreadId) Fixes apache#2959 Related to apache#2952, apache#2962
1 parent 8925408 commit 1ed7379

1 file changed

Lines changed: 26 additions & 14 deletions

File tree

  • hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft

hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,21 +42,18 @@
4242
import com.alipay.sofa.jraft.JRaftUtils;
4343
import com.alipay.sofa.jraft.Node;
4444
import com.alipay.sofa.jraft.RaftGroupService;
45-
import com.alipay.sofa.jraft.ReplicatorGroup;
4645
import com.alipay.sofa.jraft.Status;
4746
import com.alipay.sofa.jraft.conf.Configuration;
4847
import com.alipay.sofa.jraft.core.Replicator;
4948
import com.alipay.sofa.jraft.entity.PeerId;
5049
import com.alipay.sofa.jraft.entity.Task;
5150
import com.alipay.sofa.jraft.error.RaftError;
5251
import com.alipay.sofa.jraft.option.NodeOptions;
53-
import com.alipay.sofa.jraft.option.RaftOptions;
5452
import com.alipay.sofa.jraft.option.RpcOptions;
5553
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
5654
import com.alipay.sofa.jraft.rpc.RpcServer;
5755
import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer;
5856
import com.alipay.sofa.jraft.util.Endpoint;
59-
import com.alipay.sofa.jraft.util.ThreadId;
6057
import com.alipay.sofa.jraft.util.internal.ThrowUtil;
6158

6259
import io.netty.channel.ChannelHandler;
@@ -88,8 +85,12 @@ public synchronized boolean init(PDConfig.Raft config) {
8885
}
8986
this.config = config;
9087

88+
// Wire configured rpc timeout into RaftRpcClient so the Bolt transport
89+
// timeout and the future.get() caller timeout in getLeaderGrpcAddress() are consistent.
9190
raftRpcClient = new RaftRpcClient();
92-
raftRpcClient.init(new RpcOptions());
91+
RpcOptions rpcOptions = new RpcOptions();
92+
rpcOptions.setRpcDefaultTimeout(config.getRpcTimeout());
93+
raftRpcClient.init(rpcOptions);
9394

9495
String raftPath = config.getDataPath() + "/" + groupId;
9596
new File(raftPath).mkdirs();
@@ -121,8 +122,7 @@ public synchronized boolean init(PDConfig.Raft config) {
121122
nodeOptions.setRpcConnectTimeoutMs(config.getRpcTimeout());
122123
nodeOptions.setRpcDefaultTimeout(config.getRpcTimeout());
123124
nodeOptions.setRpcInstallSnapshotTimeout(config.getRpcTimeout());
124-
// Set the raft configuration
125-
RaftOptions raftOptions = nodeOptions.getRaftOptions();
125+
// TODO: tune RaftOptions for PD (see hugegraph-store PartitionEngine for reference)
126126

127127
nodeOptions.setEnableMetrics(true);
128128

@@ -230,31 +230,42 @@ public PeerId getLeader() {
230230
}
231231

232232
/**
233-
* Send a message to the leader to get the grpc address;
233+
* Send a message to the leader to get the grpc address.
234234
*/
235235
public String getLeaderGrpcAddress() throws ExecutionException, InterruptedException {
236236
if (isLeader()) {
237237
return config.getGrpcAddress();
238238
}
239239

240240
if (raftNode.getLeaderId() == null) {
241-
waitingForLeader(10000);
241+
waitingForLeader(config.getRpcTimeout());
242+
}
243+
244+
// Cache leader to avoid repeated getLeaderId() calls and guard against
245+
// waitingForLeader() returning without a leader being elected.
246+
PeerId leader = raftNode.getLeaderId();
247+
if (leader == null) {
248+
throw new ExecutionException(new IllegalStateException("Leader is not ready"));
242249
}
243250

244251
try {
245252
RaftRpcProcessor.GetMemberResponse response = raftRpcClient
246-
.getGrpcAddress(raftNode.getLeaderId().getEndpoint().toString())
253+
.getGrpcAddress(leader.getEndpoint().toString())
247254
.get(config.getRpcTimeout(), TimeUnit.MILLISECONDS);
248255
if (response != null && response.getGrpcAddress() != null) {
249256
return response.getGrpcAddress();
250257
}
251258
} catch (TimeoutException | ExecutionException e) {
252-
log.warn("Failed to get leader gRPC address via RPC, falling back to endpoint derivation", e);
259+
// TODO: a more complete fix would need a source of truth for the leader's
260+
// actual grpcAddress rather than deriving it from the local node's port config.
261+
throw new ExecutionException(
262+
String.format("Failed to resolve leader gRPC address for %s", leader), e);
253263
}
254264

255-
// Fallback: derive from raft endpoint IP + local gRPC port (best effort)
256-
String leaderIp = raftNode.getLeaderId().getEndpoint().getIp();
257-
return leaderIp + ":" + config.getGrpcPort();
265+
log.warn("Leader gRPC address field is null in RPC response for {}", leader);
266+
throw new ExecutionException(
267+
new IllegalStateException(
268+
String.format("Leader gRPC address unavailable for %s", leader)));
258269
}
259270

260271
/**
@@ -380,7 +391,8 @@ private boolean peerEquals(PeerId p1, PeerId p2) {
380391
if (p1 == null || p2 == null) {
381392
return false;
382393
}
383-
return Objects.equals(p1.getIp(), p2.getIp()) && Objects.equals(p1.getPort(), p2.getPort());
394+
return Objects.equals(p1.getIp(), p2.getIp()) &&
395+
Objects.equals(p1.getPort(), p2.getPort());
384396
}
385397

386398
private Replicator.State getReplicatorState(PeerId peerId) {

0 commit comments

Comments
 (0)