Skip to content

Commit 7e2108c

Browse files
committed
fix[command]: std thread returns without waiting for the execution to complete.
1 parent fbb9e48 commit 7e2108c

2 files changed

Lines changed: 25 additions & 29 deletions

File tree

monitor/src/main/java/com/zfoo/monitor/util/OSUtils.java

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,7 @@
3333
import java.util.ArrayList;
3434
import java.util.HashMap;
3535
import java.util.List;
36-
import java.util.concurrent.ExecutorService;
37-
import java.util.concurrent.Executors;
38-
import java.util.concurrent.ThreadFactory;
39-
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.*;
4037
import java.util.concurrent.atomic.AtomicInteger;
4138

4239
/**
@@ -275,7 +272,7 @@ public static String execCommand(String command) {
275272
logger.info("execCommand [{}]", command);
276273
try {
277274
return doExecCommand(command, null, 5 * TimeUtils.MILLIS_PER_MINUTE);
278-
} catch (IOException | InterruptedException e) {
275+
} catch (IOException | InterruptedException | ExecutionException e) {
279276
throw new RuntimeException(e);
280277
}
281278
}
@@ -290,37 +287,21 @@ public static String execCommand(String command, String workingDirectory, long t
290287
var wd = new File(workingDirectory);
291288
try {
292289
return doExecCommand(command, wd, timeoutMillis);
293-
} catch (IOException | InterruptedException e) {
290+
} catch (IOException | InterruptedException | ExecutionException e) {
294291
throw new RuntimeException(e);
295292
}
296293
}
297294

298-
private static String doExecCommand(String command, File wd, long timeoutMillis) throws IOException, InterruptedException {
295+
private static String doExecCommand(String command, File wd, long timeoutMillis) throws IOException, InterruptedException, ExecutionException {
299296
var commandSplits = command.split(StringUtils.SPACE_REGEX);
300297
var process = new ProcessBuilder(commandSplits)
301298
.redirectErrorStream(true)
302299
.directory(wd)
303300
.start();
304301

305-
var stdout = new StringBuilder();
306-
var stderr = new StringBuilder();
307-
308302
// 异步读取输出,避免缓冲区阻塞
309-
executors.submit(ThreadUtils.safeRunnable(() -> {
310-
try {
311-
stdout.append(StringUtils.bytesToString(IOUtils.toByteArray(process.getInputStream())));
312-
} catch (IOException e) {
313-
throw new RuntimeException(e);
314-
}
315-
}));
316-
executors.submit(ThreadUtils.safeRunnable(() -> {
317-
try {
318-
stderr.append(StringUtils.bytesToString(IOUtils.toByteArray(process.getErrorStream())));
319-
} catch (IOException e) {
320-
throw new RuntimeException(e);
321-
}
322-
323-
}));
303+
var stdoutFuture = executors.submit(ThreadUtils.safeCallable(() -> StringUtils.bytesToString(IOUtils.toByteArray(process.getInputStream()))));
304+
var stderrFuture = executors.submit(ThreadUtils.safeCallable(() -> StringUtils.bytesToString(IOUtils.toByteArray(process.getErrorStream()))));
324305

325306
var finished = process.waitFor(timeoutMillis, TimeUnit.MILLISECONDS);
326307
if (!finished) {
@@ -329,6 +310,8 @@ private static String doExecCommand(String command, File wd, long timeoutMillis)
329310
}
330311

331312
process.destroy();
313+
var stdout = stdoutFuture.get();
314+
var stderr = stderrFuture.get();
332315

333316
// 获取线程的退出值,0代表正常退出,非0代表异常中止
334317
int exitValue = process.exitValue();

protocol/src/main/java/com/zfoo/protocol/util/ThreadUtils.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@
1818
import org.slf4j.Logger;
1919
import org.slf4j.LoggerFactory;
2020

21-
import java.util.concurrent.Executor;
22-
import java.util.concurrent.ExecutorService;
23-
import java.util.concurrent.ForkJoinPool;
24-
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.*;
2522

2623
/**
2724
* @author godotg
@@ -108,6 +105,22 @@ public void run() {
108105
};
109106
}
110107

108+
public static <T> Callable<T> safeCallable(Callable<T> callable) {
109+
return new Callable<T>() {
110+
@Override
111+
public T call() {
112+
try {
113+
return callable.call();
114+
} catch (Exception e) {
115+
logger.error("unknown exception", e);
116+
} catch (Throwable t) {
117+
logger.error("unknown error", t);
118+
}
119+
return null;
120+
}
121+
};
122+
}
123+
111124
// -----------------------------------------------------------------------------------------------------------------
112125
// threadId -> (Thread, Executor)
113126
private static final CopyOnWriteHashMapLongObject<Pair<Thread, Executor>> threadExecutorMap = new CopyOnWriteHashMapLongObject<>(Runtime.getRuntime().availableProcessors() * 8);

0 commit comments

Comments
 (0)