Skip to content

Commit e3a8f08

Browse files
tian456-dotdcy10000
authored andcommitted
shutdown the old channel
1 parent 3ab607e commit e3a8f08

1 file changed

Lines changed: 90 additions & 69 deletions

File tree

src/main/java/com/actiontech/dble/cluster/general/impl/UcoreSender.java

Lines changed: 90 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.actiontech.dble.util.DebugUtil;
2020
import com.actiontech.dble.util.PropertiesUtil;
2121
import com.actiontech.dble.util.StringUtil;
22-
import io.grpc.Channel;
2322
import io.grpc.ManagedChannel;
2423
import io.grpc.ManagedChannelBuilder;
2524
import org.apache.commons.lang.StringUtils;
@@ -43,6 +42,7 @@ public final class UcoreSender extends AbstractConsulSender {
4342

4443

4544
private volatile UcoreGrpc.UcoreBlockingStub stub = null;
45+
private volatile ManagedChannel channel = null;
4646
private ConcurrentHashMap<String, Thread> lockMap = new ConcurrentHashMap<>();
4747
private List<String> ipList = new ArrayList<>();
4848
private static final String SOURCE_COMPONENT_TYPE = "dble";
@@ -56,7 +56,7 @@ public void initConInfo() {
5656
} catch (Exception e) {
5757
LOGGER.error("error:", e);
5858
}
59-
Channel channel = ManagedChannelBuilder.forAddress(getIpList().get(0),
59+
channel = ManagedChannelBuilder.forAddress(getIpList().get(0),
6060
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
6161
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
6262
}
@@ -70,7 +70,7 @@ public void initCluster() {
7070
} catch (Exception e) {
7171
LOGGER.error("error:", e);
7272
}
73-
Channel channel = ManagedChannelBuilder.forAddress(getIpList().get(0),
73+
channel = ManagedChannelBuilder.forAddress(getIpList().get(0),
7474
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
7575
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
7676
if (!skipSyncUcores()) {
@@ -129,17 +129,18 @@ private void log(String message, Exception e) {
129129
return output.getSessionId();
130130
} catch (Exception e1) {
131131
for (String ip : getIpList()) {
132-
ManagedChannel channel = null;
132+
ManagedChannel newChannel = null;
133133
try {
134-
channel = ManagedChannelBuilder.forAddress(ip,
134+
newChannel = ManagedChannelBuilder.forAddress(ip,
135135
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
136-
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
137-
output = stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).lockOnSession(input);
136+
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
137+
output = newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).lockOnSession(input);
138+
replaceChannelAndStub(newChannel, newStub);
138139
return output.getSessionId();
139140
} catch (Exception e2) {
140141
LOGGER.info("connect to ucore error ", e2);
141-
if (channel != null) {
142-
channel.shutdownNow();
142+
if (newChannel != null) {
143+
newChannel.shutdownNow();
143144
}
144145
}
145146
}
@@ -168,17 +169,18 @@ public void setKV(String path, String value) throws Exception {
168169
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).putKv(input);
169170
} catch (Exception e1) {
170171
for (String ip : getIpList()) {
171-
ManagedChannel channel = null;
172+
ManagedChannel newChannel = null;
172173
try {
173-
channel = ManagedChannelBuilder.forAddress(ip,
174+
newChannel = ManagedChannelBuilder.forAddress(ip,
174175
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
175-
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
176-
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).putKv(input);
176+
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
177+
newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).putKv(input);
178+
replaceChannelAndStub(newChannel, newStub);
177179
return;
178180
} catch (Exception e2) {
179181
LOGGER.info("connect to ucore error ", e2);
180-
if (channel != null) {
181-
channel.shutdownNow();
182+
if (newChannel != null) {
183+
newChannel.shutdownNow();
182184
}
183185
}
184186
}
@@ -195,16 +197,17 @@ public KvBean getKV(String path) {
195197
output = stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).getKv(input);
196198
} catch (Exception e1) {
197199
for (String ip : getIpList()) {
198-
ManagedChannel channel = null;
200+
ManagedChannel newChannel = null;
199201
try {
200-
channel = ManagedChannelBuilder.forAddress(ip,
202+
newChannel = ManagedChannelBuilder.forAddress(ip,
201203
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
202-
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
203-
output = stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).getKv(input);
204+
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
205+
output = newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).getKv(input);
206+
replaceChannelAndStub(newChannel, newStub);
204207
} catch (Exception e2) {
205208
LOGGER.info("connect to ucore error ", e2);
206-
if (channel != null) {
207-
channel.shutdownNow();
209+
if (newChannel != null) {
210+
newChannel.shutdownNow();
208211
}
209212
}
210213
}
@@ -230,16 +233,17 @@ public List<KvBean> getKVPath(String path) {
230233
output = stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).getKvTree(input);
231234
} catch (Exception e1) {
232235
for (String ip : getIpList()) {
233-
ManagedChannel channel = null;
236+
ManagedChannel newChannel = null;
234237
try {
235-
channel = ManagedChannelBuilder.forAddress(ip,
238+
newChannel = ManagedChannelBuilder.forAddress(ip,
236239
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
237-
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
238-
output = stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).getKvTree(input);
240+
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
241+
output = newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).getKvTree(input);
242+
replaceChannelAndStub(newChannel, newStub);
239243
} catch (Exception e2) {
240244
LOGGER.info("connect to ucore error ", e2);
241-
if (channel != null) {
242-
channel.shutdownNow();
245+
if (newChannel != null) {
246+
newChannel.shutdownNow();
243247
}
244248
}
245249
}
@@ -265,17 +269,18 @@ public void cleanPath(String path) {
265269
} catch (Exception e1) {
266270
boolean flag = false;
267271
for (String ip : getIpList()) {
268-
ManagedChannel channel = null;
272+
ManagedChannel newChannel = null;
269273
try {
270-
channel = ManagedChannelBuilder.forAddress(ip,
274+
newChannel = ManagedChannelBuilder.forAddress(ip,
271275
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
272-
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
273-
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).deleteKvTree(input);
276+
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
277+
newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).deleteKvTree(input);
274278
flag = true;
279+
replaceChannelAndStub(newChannel, newStub);
275280
} catch (Exception e2) {
276281
LOGGER.info("connect to ucore error ", e2);
277-
if (channel != null) {
278-
channel.shutdownNow();
282+
if (newChannel != null) {
283+
newChannel.shutdownNow();
279284
}
280285
}
281286
}
@@ -292,17 +297,18 @@ public void cleanKV(String path) {
292297
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).deleteKv(input);
293298
} catch (Exception e1) {
294299
for (String ip : getIpList()) {
295-
ManagedChannel channel = null;
300+
ManagedChannel newChannel = null;
296301
try {
297-
channel = ManagedChannelBuilder.forAddress(ip,
302+
newChannel = ManagedChannelBuilder.forAddress(ip,
298303
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
299-
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
300-
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).deleteKv(input);
304+
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
305+
newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).deleteKv(input);
306+
replaceChannelAndStub(newChannel, newStub);
301307
return;
302308
} catch (Exception e2) {
303309
LOGGER.info("connect to ucore error ", e2);
304-
if (channel != null) {
305-
channel.shutdownNow();
310+
if (newChannel != null) {
311+
newChannel.shutdownNow();
306312
}
307313
}
308314
}
@@ -320,18 +326,19 @@ public SubscribeReturnBean subscribeKvPrefix(SubscribeRequest request) throws Ex
320326
return groupSubscribeResult(output);
321327
} catch (Exception e1) {
322328
for (String ip : getIpList()) {
323-
ManagedChannel channel = null;
329+
ManagedChannel newChannel = null;
324330
try {
325-
channel = ManagedChannelBuilder.forAddress(ip,
331+
newChannel = ManagedChannelBuilder.forAddress(ip,
326332
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
327-
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS);
328-
UcoreInterface.SubscribeKvPrefixOutput output = stub.withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS).subscribeKvPrefix(input);
333+
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS);
334+
UcoreInterface.SubscribeKvPrefixOutput output = newStub.withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS).subscribeKvPrefix(input);
335+
replaceChannelAndStub(newChannel, newStub);
329336
return groupSubscribeResult(output);
330337

331338
} catch (Exception e2) {
332339
LOGGER.info("connect to ucore at " + ip + " failure", e2);
333-
if (channel != null) {
334-
channel.shutdownNow();
340+
if (newChannel != null) {
341+
newChannel.shutdownNow();
335342
}
336343
}
337344
}
@@ -346,16 +353,17 @@ public void alert(ClusterAlertBean alert) {
346353
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).alert(input);
347354
} catch (Exception e) {
348355
for (String ip : getIpList()) {
349-
ManagedChannel channel = null;
356+
ManagedChannel newChannel = null;
350357
try {
351-
channel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
352-
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
353-
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).alert(input);
358+
newChannel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
359+
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
360+
newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).alert(input);
361+
replaceChannelAndStub(newChannel, newStub);
354362
return;
355363
} catch (Exception e2) {
356364
LOGGER.info("alert to ucore error ", e2);
357-
if (channel != null) {
358-
channel.shutdownNow();
365+
if (newChannel != null) {
366+
newChannel.shutdownNow();
359367
}
360368
}
361369
}
@@ -370,16 +378,17 @@ public boolean alertResolve(ClusterAlertBean alert) {
370378
return true;
371379
} catch (Exception e) {
372380
for (String ip : getIpList()) {
373-
ManagedChannel channel = null;
381+
ManagedChannel newChannel = null;
374382
try {
375-
channel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
376-
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
377-
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).alertResolve(input);
383+
newChannel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
384+
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
385+
newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).alertResolve(input);
386+
replaceChannelAndStub(newChannel, newStub);
378387
return true;
379388
} catch (Exception e2) {
380389
LOGGER.info("alertResolve to ucore error ", e2);
381-
if (channel != null) {
382-
channel.shutdownNow();
390+
if (newChannel != null) {
391+
newChannel.shutdownNow();
383392
}
384393
return false;
385394
}
@@ -415,17 +424,18 @@ public boolean renewLock(String sessionId) throws Exception {
415424
} catch (Exception e1) {
416425
LOGGER.info("connect to ucore renew error and will retry");
417426
for (String ip : getIpList()) {
418-
ManagedChannel channel = null;
427+
ManagedChannel newChannel = null;
419428
try {
420-
channel = ManagedChannelBuilder.forAddress(ip,
429+
newChannel = ManagedChannelBuilder.forAddress(ip,
421430
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
422-
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
423-
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).renewSession(input);
431+
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
432+
newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).renewSession(input);
433+
replaceChannelAndStub(newChannel, newStub);
424434
return true;
425435
} catch (Exception e2) {
426436
LOGGER.info("connect to ucore renew error " + stub, e2);
427-
if (channel != null) {
428-
channel.shutdownNow();
437+
if (newChannel != null) {
438+
newChannel.shutdownNow();
429439
}
430440
}
431441
}
@@ -452,16 +462,18 @@ public UcoreInterface.SubscribeNodesOutput subscribeNodes(UcoreInterface.Subscri
452462
} catch (Exception e) {
453463
//the first try failure ,try for all the other ucore ip
454464
for (String ip : getIpList()) {
455-
ManagedChannel channel = null;
465+
ManagedChannel newChannel = null;
456466
try {
457-
channel = ManagedChannelBuilder.forAddress(ip,
467+
newChannel = ManagedChannelBuilder.forAddress(ip,
458468
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
459-
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS);
460-
return stub.withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS).subscribeNodes(subscribeNodesInput);
469+
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS);
470+
UcoreInterface.SubscribeNodesOutput output = newStub.withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS).subscribeNodes(subscribeNodesInput);
471+
replaceChannelAndStub(newChannel, newStub);
472+
return output;
461473
} catch (Exception e2) {
462474
LOGGER.info("try connection IP " + ip + " failure ", e2);
463-
if (channel != null) {
464-
channel.shutdownNow();
475+
if (newChannel != null) {
476+
newChannel.shutdownNow();
465477
}
466478
}
467479
}
@@ -526,4 +538,13 @@ private boolean skipSyncUcores() {
526538
return false;
527539
}
528540

541+
private synchronized void replaceChannelAndStub(ManagedChannel newChannel, UcoreGrpc.UcoreBlockingStub newStub) {
542+
if (channel != null) {
543+
channel.shutdownNow();
544+
}
545+
channel = newChannel;
546+
stub = newStub;
547+
}
548+
549+
529550
}

0 commit comments

Comments
 (0)