From 289ae1528024e894f68e1b1144e360f1e382c9ca Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 29 May 2026 16:03:53 +0800 Subject: [PATCH 1/3] [improve](streaming-job) support cdc_client JVM opts and adopt externally-managed cdc_client --- be/src/common/config.cpp | 2 + be/src/common/config.h | 3 ++ be/src/runtime/cdc_client_mgr.cpp | 60 ++++++++++++++++++++++--- be/src/runtime/cdc_client_mgr.h | 4 ++ be/test/runtime/cdc_client_mgr_test.cpp | 24 ++++++++++ conf/be.conf | 3 ++ 6 files changed, 91 insertions(+), 5 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 7b6db1636234aa..7717db58ef3f9f 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -73,6 +73,8 @@ DEFINE_Int32(arrow_flight_sql_port, "8050"); DEFINE_Int32(cdc_client_port, "9096"); +DEFINE_mString(cdc_client_java_opts, ""); + // If the external client cannot directly access priority_networks, set public_host to be accessible // to external client. // There are usually two usage scenarios: diff --git a/be/src/common/config.h b/be/src/common/config.h index 427282a4452bc4..5ad2a68bac8f55 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -122,6 +122,9 @@ DECLARE_Int32(arrow_flight_sql_port); // port for cdc client scan oltp cdc data DECLARE_Int32(cdc_client_port); +// JVM options passed to cdc_client (whitespace-separated). Inserted before -jar. +DECLARE_mString(cdc_client_java_opts); + // If the external client cannot directly access priority_networks, set public_host to be accessible // to external client. // There are usually two usage scenarios: diff --git a/be/src/runtime/cdc_client_mgr.cpp b/be/src/runtime/cdc_client_mgr.cpp index f992aae41af10d..a973a6173eecab 100644 --- a/be/src/runtime/cdc_client_mgr.cpp +++ b/be/src/runtime/cdc_client_mgr.cpp @@ -34,9 +34,12 @@ #include #include +#include #include +#include #include #include +#include #include "common/config.h" #include "common/logging.h" @@ -149,10 +152,26 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) { _child_pid.store(0); } #endif - } else { + } else if (!_adopted_external.load()) { LOG(INFO) << "CDC client has never been started"; } +#ifndef BE_TEST + // Adopt an externally-managed cdc_client if the port already answers + // healthy (e.g. one started manually for debug / hotfix). + { + std::string adopt_response; + if (check_cdc_client_health(1, 0, adopt_response).ok()) { + if (!_adopted_external.exchange(true)) { + LOG(INFO) << "Adopting external cdc client on port " + << doris::config::cdc_client_port; + } + return Status::OK(); + } + } + _adopted_external.store(false); +#endif + const char* doris_home = getenv("DORIS_HOME"); const char* log_dir = getenv("LOG_DIR"); const std::string cdc_jar_path = std::string(doris_home) + "/lib/cdc_client/cdc-client.jar"; @@ -215,10 +234,31 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) { dup2(out_fd, STDERR_FILENO); close(out_fd); - // java -jar -Dlog.path=xx cdc-client.jar --server.port=9096 --backend.http.port=8040 - execlp(java_bin.c_str(), "java", java_opts.c_str(), "-jar", cdc_jar_path.c_str(), - cdc_jar_port.c_str(), backend_http_port.c_str(), cluster_token.c_str(), (char*)NULL); - // If execlp returns, it means it failed + // java -Dlog.path=xx -jar cdc-client.jar + // --server.port=9096 --backend.http.port=8040 --cluster.token=... + std::vector argv_storage; + argv_storage.emplace_back("java"); + const std::string user_java_opts = doris::config::cdc_client_java_opts; + if (!user_java_opts.empty()) { + std::istringstream iss(user_java_opts); + argv_storage.insert(argv_storage.end(), std::istream_iterator(iss), + std::istream_iterator()); + } + argv_storage.emplace_back(java_opts); + argv_storage.emplace_back("-jar"); + argv_storage.emplace_back(cdc_jar_path); + argv_storage.emplace_back(cdc_jar_port); + argv_storage.emplace_back(backend_http_port); + argv_storage.emplace_back(cluster_token); + + std::vector argv; + argv.reserve(argv_storage.size() + 1); + for (auto& s : argv_storage) { + argv.push_back(const_cast(s.c_str())); + } + argv.push_back(nullptr); + execv(java_bin.c_str(), argv.data()); + // If execv returns, it means it failed perror("Cdc client child process error"); exit(1); } else { @@ -233,7 +273,17 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) { _child_pid.store(0); st = Status::InternalError("Start cdc client failed."); st.to_protobuf(result->mutable_status()); + } else if (kill(pid, 0) != 0) { + // Port healthy but our child has exited: an external process is + // answering. Treat as adoption instead of masking dead PID as success. + _child_pid.store(0); + if (!_adopted_external.exchange(true)) { + LOG(INFO) << "Forked cdc client " << pid << " exited but port " + << doris::config::cdc_client_port + << " is healthy, adopting external instance"; + } } else { + _adopted_external.store(false); LOG(INFO) << "Start cdc client success, pid=" << pid << ", status=" << status.to_string() << ", response=" << health_response; } diff --git a/be/src/runtime/cdc_client_mgr.h b/be/src/runtime/cdc_client_mgr.h index 077097086daba2..b3b350154b62b2 100644 --- a/be/src/runtime/cdc_client_mgr.h +++ b/be/src/runtime/cdc_client_mgr.h @@ -53,11 +53,15 @@ class CdcClientMgr { pid_t get_child_pid() const { return _child_pid.load(); } // For testing only: set child PID directly void set_child_pid_for_test(pid_t pid) { _child_pid.store(pid); } + // For testing only: inspect / drive the adopt-external flag + bool get_adopted_external_for_test() const { return _adopted_external.load(); } + void set_adopted_external_for_test(bool v) { _adopted_external.store(v); } #endif private: std::mutex _start_mutex; std::atomic _child_pid {0}; + std::atomic _adopted_external {false}; }; } // namespace doris diff --git a/be/test/runtime/cdc_client_mgr_test.cpp b/be/test/runtime/cdc_client_mgr_test.cpp index 3ab05c394d71fb..c68e71956ad230 100644 --- a/be/test/runtime/cdc_client_mgr_test.cpp +++ b/be/test/runtime/cdc_client_mgr_test.cpp @@ -638,6 +638,30 @@ TEST_F(CdcClientMgrTest, StartWithPreExistingResultStatus) { EXPECT_EQ(result.status().status_code(), 999); } +// Verify _adopted_external defaults to false and start_cdc_client (BE_TEST +// short-circuit) does not alter it. +TEST_F(CdcClientMgrTest, AdoptedExternalDefaultFalse) { + CdcClientMgr mgr; + EXPECT_FALSE(mgr.get_adopted_external_for_test()); + + PRequestCdcClientResult result; + Status status = mgr.start_cdc_client(&result); + EXPECT_TRUE(status.ok()); + EXPECT_FALSE(mgr.get_adopted_external_for_test()); +} + +// Verify the _adopted_external flag round-trips through the setter/getter. +TEST_F(CdcClientMgrTest, AdoptedExternalSetterRoundTrip) { + CdcClientMgr mgr; + EXPECT_FALSE(mgr.get_adopted_external_for_test()); + + mgr.set_adopted_external_for_test(true); + EXPECT_TRUE(mgr.get_adopted_external_for_test()); + + mgr.set_adopted_external_for_test(false); + EXPECT_FALSE(mgr.get_adopted_external_for_test()); +} + // Test send_request_to_cdc_client with empty API TEST_F(CdcClientMgrTest, SendRequestEmptyApi) { CdcClientMgr mgr; diff --git a/conf/be.conf b/conf/be.conf index e9024580a65930..24d868e0f48f49 100644 --- a/conf/be.conf +++ b/conf/be.conf @@ -37,6 +37,9 @@ heartbeat_service_port = 9050 brpc_port = 8060 arrow_flight_sql_port = 8050 +# JVM options for the BE-forked cdc_client process (separate from JAVA_OPTS_FOR_JDK_17 above). +cdc_client_java_opts = -XX:+ExitOnOutOfMemoryError + # HTTPS configures enable_https = false # path of certificate in PEM format. From 95a415ea37740c5009c7af5ce4d8ca641c81d442 Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 29 May 2026 16:55:45 +0800 Subject: [PATCH 2/3] [improve](streaming-job) fix post-fork heap allocation safety; hardcode -XX:+ExitOnOutOfMemoryError --- be/src/runtime/cdc_client_mgr.cpp | 65 +++++++++++++++---------------- conf/be.conf | 3 -- 2 files changed, 32 insertions(+), 36 deletions(-) diff --git a/be/src/runtime/cdc_client_mgr.cpp b/be/src/runtime/cdc_client_mgr.cpp index a973a6173eecab..b37cadc980c920 100644 --- a/be/src/runtime/cdc_client_mgr.cpp +++ b/be/src/runtime/cdc_client_mgr.cpp @@ -200,7 +200,35 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) { } std::string path(java_home); std::string java_bin = path + "/bin/java"; - // Capture signal to prevent child process from becoming a zombie process + + // Pre-build everything the child needs before fork(): heap allocation after + // fork() in a multi-threaded process can deadlock on inherited libc locks. + std::vector argv_storage; + argv_storage.emplace_back("java"); + const std::string user_java_opts = doris::config::cdc_client_java_opts; + if (!user_java_opts.empty()) { + std::istringstream iss(user_java_opts); + argv_storage.insert(argv_storage.end(), std::istream_iterator(iss), + std::istream_iterator()); + } + argv_storage.emplace_back(java_opts); + // OOM safety net (last-wins, user opts cannot disable). + argv_storage.emplace_back("-XX:+ExitOnOutOfMemoryError"); + argv_storage.emplace_back("-jar"); + argv_storage.emplace_back(cdc_jar_path); + argv_storage.emplace_back(cdc_jar_port); + argv_storage.emplace_back(backend_http_port); + argv_storage.emplace_back(cluster_token); + + std::vector argv; + argv.reserve(argv_storage.size() + 1); + for (auto& s : argv_storage) { + argv.push_back(const_cast(s.c_str())); + } + argv.push_back(nullptr); + + const std::string cdc_out_file = std::string(log_dir) + "/cdc-client.out"; + struct sigaction act; act.sa_flags = 0; act.sa_handler = handle_sigchld; @@ -213,54 +241,25 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) { #else pid_t pid = fork(); if (pid < 0) { - // Fork failed st = Status::InternalError("Fork cdc client failed."); st.to_protobuf(result->mutable_status()); return st; } else if (pid == 0) { - // Child process - // When the parent process is killed, the child process also needs to exit + // Child: async-signal-safe operations only until execv(). #ifndef __APPLE__ prctl(PR_SET_PDEATHSIG, SIGKILL); #endif - // Redirect stdout and stderr to log out file - std::string cdc_out_file = std::string(log_dir) + "/cdc-client.out"; int out_fd = open(cdc_out_file.c_str(), O_WRONLY | O_CREAT | O_APPEND | O_CLOEXEC, 0644); if (out_fd < 0) { perror("open cdc-client.out file failed"); - exit(1); + _exit(1); } dup2(out_fd, STDOUT_FILENO); dup2(out_fd, STDERR_FILENO); close(out_fd); - - // java -Dlog.path=xx -jar cdc-client.jar - // --server.port=9096 --backend.http.port=8040 --cluster.token=... - std::vector argv_storage; - argv_storage.emplace_back("java"); - const std::string user_java_opts = doris::config::cdc_client_java_opts; - if (!user_java_opts.empty()) { - std::istringstream iss(user_java_opts); - argv_storage.insert(argv_storage.end(), std::istream_iterator(iss), - std::istream_iterator()); - } - argv_storage.emplace_back(java_opts); - argv_storage.emplace_back("-jar"); - argv_storage.emplace_back(cdc_jar_path); - argv_storage.emplace_back(cdc_jar_port); - argv_storage.emplace_back(backend_http_port); - argv_storage.emplace_back(cluster_token); - - std::vector argv; - argv.reserve(argv_storage.size() + 1); - for (auto& s : argv_storage) { - argv.push_back(const_cast(s.c_str())); - } - argv.push_back(nullptr); execv(java_bin.c_str(), argv.data()); - // If execv returns, it means it failed perror("Cdc client child process error"); - exit(1); + _exit(1); } else { // Parent process: save PID and wait for startup _child_pid.store(pid); diff --git a/conf/be.conf b/conf/be.conf index 24d868e0f48f49..e9024580a65930 100644 --- a/conf/be.conf +++ b/conf/be.conf @@ -37,9 +37,6 @@ heartbeat_service_port = 9050 brpc_port = 8060 arrow_flight_sql_port = 8050 -# JVM options for the BE-forked cdc_client process (separate from JAVA_OPTS_FOR_JDK_17 above). -cdc_client_java_opts = -XX:+ExitOnOutOfMemoryError - # HTTPS configures enable_https = false # path of certificate in PEM format. From 36c9f915b748a1e14514f49572be255f87e1b111 Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 29 May 2026 18:11:51 +0800 Subject: [PATCH 3/3] [improve](streaming-job) make cdc_client_java_opts immutable to avoid concurrent-read UB --- be/src/common/config.cpp | 2 +- be/src/common/config.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 7717db58ef3f9f..ed28a50685b7d4 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -73,7 +73,7 @@ DEFINE_Int32(arrow_flight_sql_port, "8050"); DEFINE_Int32(cdc_client_port, "9096"); -DEFINE_mString(cdc_client_java_opts, ""); +DEFINE_String(cdc_client_java_opts, ""); // If the external client cannot directly access priority_networks, set public_host to be accessible // to external client. diff --git a/be/src/common/config.h b/be/src/common/config.h index 5ad2a68bac8f55..927bd2700d2b09 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -123,7 +123,7 @@ DECLARE_Int32(arrow_flight_sql_port); DECLARE_Int32(cdc_client_port); // JVM options passed to cdc_client (whitespace-separated). Inserted before -jar. -DECLARE_mString(cdc_client_java_opts); +DECLARE_String(cdc_client_java_opts); // If the external client cannot directly access priority_networks, set public_host to be accessible // to external client.