Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ DEFINE_Int32(arrow_flight_sql_port, "8050");

DEFINE_Int32(cdc_client_port, "9096");

DEFINE_String(cdc_client_java_opts, "");

// If the external client cannot directly access priority_networks, set public_host to be accessible
// to external client.
// There are usually two usage scenarios:
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ DECLARE_Int32(arrow_flight_sql_port);
// port for cdc client scan oltp cdc data
DECLARE_Int32(cdc_client_port);

// JVM options passed to cdc_client (whitespace-separated). Inserted before -jar.
DECLARE_String(cdc_client_java_opts);

// If the external client cannot directly access priority_networks, set public_host to be accessible
// to external client.
// There are usually two usage scenarios:
Expand Down
77 changes: 63 additions & 14 deletions be/src/runtime/cdc_client_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@

#include <atomic>
#include <chrono>
#include <iterator>
#include <mutex>
#include <sstream>
#include <string>
#include <thread>
#include <vector>

#include "common/config.h"
#include "common/logging.h"
Expand Down Expand Up @@ -149,10 +152,26 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
_child_pid.store(0);
}
#endif
} else {
} else if (!_adopted_external.load()) {
LOG(INFO) << "CDC client has never been started";
}

#ifndef BE_TEST
// Adopt an externally-managed cdc_client if the port already answers
// healthy (e.g. one started manually for debug / hotfix).
{
std::string adopt_response;
if (check_cdc_client_health(1, 0, adopt_response).ok()) {
if (!_adopted_external.exchange(true)) {
LOG(INFO) << "Adopting external cdc client on port "
<< doris::config::cdc_client_port;
}
return Status::OK();
Comment thread
JNSimba marked this conversation as resolved.
Comment on lines +164 to +169
Comment on lines +164 to +169
}
}
_adopted_external.store(false);
#endif

const char* doris_home = getenv("DORIS_HOME");
const char* log_dir = getenv("LOG_DIR");
const std::string cdc_jar_path = std::string(doris_home) + "/lib/cdc_client/cdc-client.jar";
Expand Down Expand Up @@ -181,7 +200,35 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
}
std::string path(java_home);
std::string java_bin = path + "/bin/java";
// Capture signal to prevent child process from becoming a zombie process

// Pre-build everything the child needs before fork(): heap allocation after
// fork() in a multi-threaded process can deadlock on inherited libc locks.
std::vector<std::string> argv_storage;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cdc_client_java_opts is registered with DEFINE_mString, so it can be changed at runtime through set_config(). String config updates are protected by config::get_mutable_string_config_lock() when assigning the underlying std::string, but this read copies the same string without that lock. A concurrent CDC request that reaches start_cdc_client() while an operator updates cdc_client_java_opts can race on the std::string, which is undefined behavior. Please either make this startup-only option immutable (DEFINE_String) or copy it while holding *config::get_mutable_string_config_lock() before parsing it.

argv_storage.emplace_back("java");
const std::string user_java_opts = doris::config::cdc_client_java_opts;
if (!user_java_opts.empty()) {
std::istringstream iss(user_java_opts);
argv_storage.insert(argv_storage.end(), std::istream_iterator<std::string>(iss),
std::istream_iterator<std::string>());
}
argv_storage.emplace_back(java_opts);
// OOM safety net (last-wins, user opts cannot disable).
argv_storage.emplace_back("-XX:+ExitOnOutOfMemoryError");
argv_storage.emplace_back("-jar");
argv_storage.emplace_back(cdc_jar_path);
argv_storage.emplace_back(cdc_jar_port);
argv_storage.emplace_back(backend_http_port);
argv_storage.emplace_back(cluster_token);

std::vector<char*> argv;
argv.reserve(argv_storage.size() + 1);
for (auto& s : argv_storage) {
argv.push_back(const_cast<char*>(s.c_str()));
}
argv.push_back(nullptr);

const std::string cdc_out_file = std::string(log_dir) + "/cdc-client.out";

struct sigaction act;
act.sa_flags = 0;
act.sa_handler = handle_sigchld;
Expand All @@ -194,33 +241,25 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
#else
pid_t pid = fork();
if (pid < 0) {
// Fork failed
st = Status::InternalError("Fork cdc client failed.");
st.to_protobuf(result->mutable_status());
return st;
} else if (pid == 0) {
// Child process
// When the parent process is killed, the child process also needs to exit
// Child: async-signal-safe operations only until execv().
#ifndef __APPLE__
prctl(PR_SET_PDEATHSIG, SIGKILL);
#endif
// Redirect stdout and stderr to log out file
std::string cdc_out_file = std::string(log_dir) + "/cdc-client.out";
int out_fd = open(cdc_out_file.c_str(), O_WRONLY | O_CREAT | O_APPEND | O_CLOEXEC, 0644);
if (out_fd < 0) {
perror("open cdc-client.out file failed");
exit(1);
_exit(1);
}
dup2(out_fd, STDOUT_FILENO);
dup2(out_fd, STDERR_FILENO);
close(out_fd);

// java -jar -Dlog.path=xx cdc-client.jar --server.port=9096 --backend.http.port=8040
execlp(java_bin.c_str(), "java", java_opts.c_str(), "-jar", cdc_jar_path.c_str(),
cdc_jar_port.c_str(), backend_http_port.c_str(), cluster_token.c_str(), (char*)NULL);
// If execlp returns, it means it failed
execv(java_bin.c_str(), argv.data());
perror("Cdc client child process error");
exit(1);
_exit(1);
} else {
// Parent process: save PID and wait for startup
_child_pid.store(pid);
Expand All @@ -233,7 +272,17 @@ Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
_child_pid.store(0);
st = Status::InternalError("Start cdc client failed.");
st.to_protobuf(result->mutable_status());
} else if (kill(pid, 0) != 0) {
// Port healthy but our child has exited: an external process is
// answering. Treat as adoption instead of masking dead PID as success.
_child_pid.store(0);
if (!_adopted_external.exchange(true)) {
LOG(INFO) << "Forked cdc client " << pid << " exited but port "
<< doris::config::cdc_client_port
<< " is healthy, adopting external instance";
}
} else {
_adopted_external.store(false);
LOG(INFO) << "Start cdc client success, pid=" << pid
<< ", status=" << status.to_string() << ", response=" << health_response;
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/cdc_client_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,15 @@ class CdcClientMgr {
pid_t get_child_pid() const { return _child_pid.load(); }
// For testing only: set child PID directly
void set_child_pid_for_test(pid_t pid) { _child_pid.store(pid); }
// For testing only: inspect / drive the adopt-external flag
bool get_adopted_external_for_test() const { return _adopted_external.load(); }
void set_adopted_external_for_test(bool v) { _adopted_external.store(v); }
#endif

private:
std::mutex _start_mutex;
std::atomic<pid_t> _child_pid {0};
std::atomic<bool> _adopted_external {false};
};

} // namespace doris
24 changes: 24 additions & 0 deletions be/test/runtime/cdc_client_mgr_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,30 @@ TEST_F(CdcClientMgrTest, StartWithPreExistingResultStatus) {
EXPECT_EQ(result.status().status_code(), 999);
}

// Verify _adopted_external defaults to false and start_cdc_client (BE_TEST
// short-circuit) does not alter it.
TEST_F(CdcClientMgrTest, AdoptedExternalDefaultFalse) {
CdcClientMgr mgr;
EXPECT_FALSE(mgr.get_adopted_external_for_test());

PRequestCdcClientResult result;
Status status = mgr.start_cdc_client(&result);
EXPECT_TRUE(status.ok());
EXPECT_FALSE(mgr.get_adopted_external_for_test());
}

// Verify the _adopted_external flag round-trips through the setter/getter.
TEST_F(CdcClientMgrTest, AdoptedExternalSetterRoundTrip) {
CdcClientMgr mgr;
EXPECT_FALSE(mgr.get_adopted_external_for_test());

mgr.set_adopted_external_for_test(true);
EXPECT_TRUE(mgr.get_adopted_external_for_test());

mgr.set_adopted_external_for_test(false);
EXPECT_FALSE(mgr.get_adopted_external_for_test());
}

// Test send_request_to_cdc_client with empty API
TEST_F(CdcClientMgrTest, SendRequestEmptyApi) {
CdcClientMgr mgr;
Expand Down
Loading