Skip to content

Commit 57c0126

Browse files
authored
[to dev/1.3] fix AbstractEnv.ensureNodeStatus & Fix: send set configuration only to target nodes and harden compaction schedule interruption handling (#17447)
1 parent 63ea91a commit 57c0126

5 files changed

Lines changed: 68 additions & 9 deletions

File tree

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,10 @@
5656
import org.apache.iotdb.session.Session;
5757
import org.apache.iotdb.session.pool.SessionPool;
5858

59+
import org.apache.thrift.TConfiguration;
5960
import org.apache.thrift.TException;
61+
import org.apache.thrift.transport.TSocket;
62+
import org.apache.thrift.transport.TTransportException;
6063
import org.slf4j.Logger;
6164

6265
import java.io.File;
@@ -1119,7 +1122,7 @@ public void shutdownForciblyAllDataNodes() {
11191122

11201123
@Override
11211124
public void ensureNodeStatus(
1122-
final List<BaseNodeWrapper> nodes, final List<NodeStatus> targetStatus)
1125+
final List<BaseNodeWrapper> nodes, final List<NodeStatus> targetStatusList)
11231126
throws IllegalStateException {
11241127
Throwable lastException = null;
11251128
for (int i = 0; i < retryCount; i++) {
@@ -1147,20 +1150,37 @@ public void ensureNodeStatus(
11471150
+ node.getClientRpcEndPoint().getPort(),
11481151
node.getDataNodeId()));
11491152
for (int j = 0; j < nodes.size(); j++) {
1150-
final String endpoint = nodes.get(j).getIpAndPortString();
1153+
BaseNodeWrapper nodeWrapper = nodes.get(j);
1154+
String ipAndPortString = nodeWrapper.getIpAndPortString();
1155+
final String endpoint = ipAndPortString;
11511156
if (!nodeIds.containsKey(endpoint)) {
11521157
// Node not exist
11531158
// Notice: Never modify this line, since the NodeLocation might be modified in IT
11541159
errorMessages.add("The node " + nodes.get(j).getIpAndPortString() + " is not found!");
11551160
continue;
11561161
}
11571162
final String status = showClusterResp.getNodeStatus().get(nodeIds.get(endpoint));
1158-
if (!targetStatus.get(j).getStatus().equals(status)) {
1163+
final NodeStatus targetStatus = targetStatusList.get(j);
1164+
if (!targetStatus.getStatus().equals(status)) {
11591165
// Error status
11601166
errorMessages.add(
11611167
String.format(
11621168
"Node %s is in status %s, but expected %s",
1163-
endpoint, status, targetStatus.get(j)));
1169+
endpoint, status, targetStatusList.get(j)));
1170+
continue;
1171+
}
1172+
if (nodeWrapper instanceof DataNodeWrapper && targetStatus.equals(NodeStatus.Running)) {
1173+
final String[] ipPort = nodeWrapper.getIpAndPortString().split(":");
1174+
final String ip = ipPort[0];
1175+
final int port = Integer.parseInt(ipPort[1]);
1176+
try (TSocket socket = new TSocket(new TConfiguration(), ip, port, 1000)) {
1177+
socket.open();
1178+
} catch (final TTransportException e) {
1179+
errorMessages.add(
1180+
String.format(
1181+
"DataNode %s is not reachable: %s",
1182+
nodeWrapper.getIpAndPortString(), e.getMessage()));
1183+
}
11641184
}
11651185
}
11661186
if (errorMessages.isEmpty()) {

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -930,7 +930,7 @@ public List<TSStatus> setConfiguration(TSetConfigurationReq req) {
930930
if (!targetDataNodes.isEmpty()) {
931931
DataNodeAsyncRequestContext<Object, TSStatus> clientHandler =
932932
new DataNodeAsyncRequestContext<>(
933-
CnToDnAsyncRequestType.SET_CONFIGURATION, req, dataNodeLocationMap);
933+
CnToDnAsyncRequestType.SET_CONFIGURATION, req, targetDataNodes);
934934
CnToDnInternalServiceAsyncRequestManager.getInstance()
935935
.sendAsyncRequestWithRetry(clientHandler);
936936
responseList.addAll(clientHandler.getResponseList());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class CompactionScheduleTaskManager implements IService {
6565
ConcurrentHashMap.newKeySet();
6666
private ReentrantLock lock = new ReentrantLock();
6767
private volatile boolean init = false;
68+
private volatile boolean isStoppingAllScheduleTask = false;
6869

6970
@Override
7071
public void start() throws StartupException {
@@ -76,8 +77,13 @@ public void start() throws StartupException {
7677
logger.info("Compaction schedule task manager started.");
7778
}
7879

80+
public boolean isStoppingAllScheduleTask() {
81+
return isStoppingAllScheduleTask;
82+
}
83+
7984
public void stopCompactionScheduleTasks() throws InterruptedException {
8085
lock.lock();
86+
isStoppingAllScheduleTask = true;
8187
try {
8288
for (Future<Void> task : submitCompactionScheduleTaskFutures) {
8389
task.cancel(true);
@@ -121,6 +127,7 @@ public void checkAndMayApplyConfigurationChange() throws InterruptedException {
121127

122128
public void startScheduleTasks() {
123129
lock.lock();
130+
isStoppingAllScheduleTask = false;
124131
try {
125132
// compaction selector
126133
for (int workerId = 0; workerId < compactionSelectorNum; workerId++) {
@@ -144,6 +151,7 @@ public void startScheduleTasks() {
144151
@Override
145152
public void stop() {
146153
lock.lock();
154+
isStoppingAllScheduleTask = true;
147155
try {
148156
if (!init) {
149157
return;
@@ -160,6 +168,7 @@ public void stop() {
160168
@Override
161169
public void waitAndStop(long milliseconds) {
162170
lock.lock();
171+
isStoppingAllScheduleTask = true;
163172
try {
164173
if (!init) {
165174
return;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,26 @@ public Void call() {
7272
dataRegion.executeCompaction();
7373
}
7474
} catch (InterruptedException ignored) {
75+
boolean isStoppedByUser =
76+
CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask();
7577
logger.info(
76-
"[CompactionScheduleTaskWorker-{}] compaction schedule is interrupted", workerId);
77-
return null;
78+
"[CompactionScheduleTaskWorker-{}] compaction schedule is interrupted, isStopByUser: {}",
79+
workerId,
80+
isStoppedByUser);
81+
if (isStoppedByUser) {
82+
return null;
83+
}
84+
} catch (Exception e) {
85+
logger.error(
86+
"[CompactionScheduleTaskWorker-{}] Failed to execute compaction schedule task",
87+
workerId,
88+
e);
89+
} catch (Throwable t) {
90+
logger.error(
91+
"[CompactionScheduleTaskWorker-{}] Failed to execute compaction schedule task and cannot recover",
92+
workerId,
93+
t);
94+
throw t;
7895
}
7996
}
8097
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,21 @@ public Void call() throws Exception {
6666
}
6767
}
6868
} catch (InterruptedException ignored) {
69-
logger.info("[TTLCheckTask-{}] TTL checker is interrupted", workerId);
70-
return null;
69+
boolean isStoppedByUser =
70+
CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask();
71+
logger.info(
72+
"[TTLCheckTask-{}] TTL checker is interrupted, isStoppedByUser: {}",
73+
workerId,
74+
isStoppedByUser);
75+
if (isStoppedByUser) {
76+
return null;
77+
}
78+
} catch (Exception e) {
79+
logger.error("[TTLCheckTask-{}] Failed to execute ttl check", workerId, e);
80+
} catch (Throwable t) {
81+
logger.error(
82+
"[TTLCheckTask-{}] Failed to execute ttl check and cannot recover", workerId, t);
83+
throw t;
7184
}
7285
}
7386
}

0 commit comments

Comments
 (0)