[improve](streaming-job) support cdc_client JVM opts and adopt externally-managed cdc_client#63898
[improve](streaming-job) support cdc_client JVM opts and adopt externally-managed cdc_client#63898JNSimba wants to merge 3 commits into
Conversation
…ally-managed cdc_client
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
/review |
|
run buildall |
There was a problem hiding this comment.
Pull request overview
This PR improves BE-managed cdc_client startup by adding configurable JVM options and allowing BE to adopt a healthy externally managed cdc_client already bound to the configured port.
Changes:
- Adds
cdc_client_java_optsconfig and passes tokenized JVM options before-jar. - Adds external health-probe adoption logic with
_adopted_externalstate tracking. - Adds limited tests for the new adoption flag accessor behavior.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
conf/be.conf |
Adds shipped default JVM option for BE-forked cdc_client. |
be/src/common/config.h |
Declares the new mutable JVM options config. |
be/src/common/config.cpp |
Defines the new JVM options config. |
be/src/runtime/cdc_client_mgr.h |
Adds adoption state and test-only accessors. |
be/src/runtime/cdc_client_mgr.cpp |
Implements external adoption probing and variable-length Java argv construction. |
be/test/runtime/cdc_client_mgr_test.cpp |
Adds tests for _adopted_external default and setter/getter behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Requesting changes for one BE runtime lifecycle issue.
Critical checkpoint conclusions:
- Goal/test: The PR aims to add CDC client JVM options and adopt externally managed CDC clients. The high-level goal is clear, but the production-only option parsing/adoption paths are not covered by meaningful tests because the new logic is mostly behind
#ifndef BE_TEST. - Scope: The change is focused, but the argv construction moved non-trivial work into the post-fork child path.
- Concurrency:
start_cdc_client()is serialized by_start_mutex, but BE is a multi-threaded process; afterfork()only async-signal-safe operations are safe beforeexec. The new C++ allocations/parsing in the child violate that constraint. - Lifecycle/static init: No new cross-TU static initialization dependency found. The child process lifecycle is affected by the issue commented inline.
- Configuration:
cdc_client_java_optsis added and read at startup; runtime changes only affect subsequent forks, which is reasonable for JVM startup options. - Compatibility/protocol: No incompatible FE/BE protocol or storage-format change found.
- Parallel paths: CDC client startup is centralized here; no separate startup path found needing the same option handling.
- Conditional checks: The external adoption check is intentional and logged; no duplicate prior review thread exists.
- Test coverage/results: The added tests only exercise the test accessor/default flag behavior, not the real external adoption or argv-building behavior.
- Observability: Adoption and managed-start transitions have INFO logs; no additional metrics appear necessary for this small lifecycle change.
- Transaction/persistence/data writes: No direct transaction or persistence changes. CDC requests can lead to downstream writes, but this PR only changes process startup.
- FE/BE variables: No new thrift/protobuf variable passing.
- Performance: No hot-path CPU issue found; the blocking issue is post-fork safety.
- User focus: No additional user-provided review focus was supplied.
…de -XX:+ExitOnOutOfMemoryError
|
run buildall |
|
/review |
TPC-H: Total hot run time: 31600 ms |
| if (check_cdc_client_health(1, 0, adopt_response).ok()) { | ||
| if (!_adopted_external.exchange(true)) { | ||
| LOG(INFO) << "Adopting external cdc client on port " | ||
| << doris::config::cdc_client_port; | ||
| } | ||
| return Status::OK(); |
| if (check_cdc_client_health(1, 0, adopt_response).ok()) { | ||
| if (!_adopted_external.exchange(true)) { | ||
| LOG(INFO) << "Adopting external cdc client on port " | ||
| << doris::config::cdc_client_port; | ||
| } | ||
| return Status::OK(); |
|
|
||
| DEFINE_Int32(cdc_client_port, "9096"); | ||
|
|
||
| DEFINE_mString(cdc_client_java_opts, ""); |
There was a problem hiding this comment.
I found one additional concurrency issue beyond the existing inline threads. The existing comments already cover the external-adoption validation risk, missing production-path adoption tests, and prior config/default discussion, so I did not duplicate those.
Critical checkpoint conclusions:
- Goal and tests: the PR aims to add CDC client JVM opts and external process adoption. The JVM arg construction is now moved before fork and the OOM flag is hardcoded, but the external adoption behavior remains insufficiently tested as already noted in existing threads.
- Scope: the change is mostly localized to CDC client startup/configuration.
- Concurrency:
start_cdc_client()is serialized by_start_mutex, but the new mutable string config can be changed concurrently through BE config APIs and is read without the required mutable-string config lock. - Lifecycle: CDC child fork/exec lifecycle is improved by prebuilding argv before fork. Existing review threads still cover adoption lifecycle risks around accepting an arbitrary healthy localhost service.
- Configuration: a new mutable config is added; because it is mutable, runtime update semantics and thread-safe reads must be handled.
- Compatibility and parallel paths: no storage/protocol compatibility issue found.
- Tests: added tests only cover the test accessor state, not the production adoption probe/fallback path, as already raised.
- Observability: adoption/start logs are present; no additional observability issue found beyond the validation/test concerns.
- Transactions/persistence/data writes: not applicable.
- Performance: no meaningful additional performance issue found.
User focus points: no additional user-provided review focus was supplied.
|
|
||
| // Pre-build everything the child needs before fork(): heap allocation after | ||
| // fork() in a multi-threaded process can deadlock on inherited libc locks. | ||
| std::vector<std::string> argv_storage; |
There was a problem hiding this comment.
cdc_client_java_opts is registered with DEFINE_mString, so it can be changed at runtime through set_config(). String config updates are protected by config::get_mutable_string_config_lock() when assigning the underlying std::string, but this read copies the same string without that lock. A concurrent CDC request that reaches start_cdc_client() while an operator updates cdc_client_java_opts can race on the std::string, which is undefined behavior. Please either make this startup-only option immutable (DEFINE_String) or copy it while holding *config::get_mutable_string_config_lock() before parsing it.
TPC-DS: Total hot run time: 172567 ms |
… concurrent-read UB
TPC-H: Total hot run time: 31124 ms |
TPC-DS: Total hot run time: 172750 ms |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
What
Two improvements to the BE-managed cdc_client lifecycle:
1.
cdc_client_java_optsBE config + hardcoded OOM safety netNew
be.confentry to pass extra JVM options to the BE-forked cdc_client process. The value is whitespace-tokenized and inserted into the child argv before-jar.-XX:+ExitOnOutOfMemoryErroris hardcoded in BE (placed after the user opts, so JVM's last-wins rule prevents accidental disabling viacdc_client_java_opts). This guarantees every BE-forked cdc_client exits on OOM so BE can detect the dead child and re-fork — previously the JVM survived OOM in an unresponsive state and BE kept reporting "CDC client X unresponsive" without restarting.Existing clusters get the OOM flag automatically by picking up the new BE binary; no
be.confedit required.The startup uses
execvinstead ofexeclpto support variable-length argv. All heap-backed argv / path construction is done beforefork(), and the child only performs async-signal-safe operations (open,dup2,close,execv,_exit) untilexecv()— this avoids deadlocking on libc/libstdc++ locks inherited from BE worker threads.cdc_client_java_optsis registered asDEFINE_String(immutable), consistent withcdc_client_port— admins change it viabe.conf+ BE restart. This prevents a data race between adminset_config()writes andstart_cdc_client()reads.2. Adopt externally-managed cdc_client
start_cdc_client()now probes127.0.0.1:cdc_client_port/actuator/healthbefore forking. If a healthy cdc_client is already listening (e.g. one started manually for debug / hotfix), BE adopts it and skips fork instead of fork-looping against a port it cannot bind. Edge cases:fork()returns success and health passes but the new child has already exited (port held by an external process answering health): treated as adoption rather than masking the dead PID as "Start success".A
_adopted_externalatomic edge-triggered flag throttles the "Adopting external cdc client" log so each mode transition prints exactly once.Tests
cdc_client_mgr_test.cppcases unchanged (all new lifecycle logic lives behind#ifndef BE_TEST)._adopted_externalflag default value and setter/getter round-trip.check_cdc_client_healthis compiled out underBE_TEST. A follow-up PR will add a test seam (function-pointer indirection or local HTTP fixture) to exercise these paths.Test plan
cdc_client_mgr_testnohup java -jar cdc-client.jar ...on the same host; verify BE adopts it without fork-looping (be.INFOshows one-timeAdopting external cdc client on port 9096).