diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 0000000000..a2ab8c6093 --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1 @@ +# Please follow the guide in [README.md](README.md) diff --git a/HOW_TO_UPDATE.md b/HOW_TO_UPDATE.md index ac0a480422..b860e564d4 100644 --- a/HOW_TO_UPDATE.md +++ b/HOW_TO_UPDATE.md @@ -1,175 +1 @@ -In our fork of librdkafka we have a few patches that are not yet merged to upstream and we might also have to do some thing differently than in upstream librdkafka. For this reasons here are the steps we did to upgrade to new librdkafka versions. - -# 2.8.0 -> 2.14.1 - -## Fixes to apply - -- pthread_set_name_np on freebsd - https://github.com/confluentinc/librdkafka/pull/4982 (no CH PR) -- Do not set _POSIX_C_SOURCE for FreeBSD (makes clang-15 happy) - https://github.com/confluentinc/librdkafka/pull/4157 (no CH PR) -- Race in rd_kafka_fetch_pos2str - https://github.com/confluentinc/librdkafka/pull/4788 (no CH PR) -- Fix data race in timers - https://github.com/confluentinc/librdkafka/pull/5089 (https://github.com/ClickHouse/librdkafka/pull/13) - Merged upstream, but not yet included in 2.14.1 -- Fix possible data-race for statistics - https://github.com/confluentinc/librdkafka/pull/4630 (https://github.com/ClickHouse/librdkafka/pull/11) -- Fix data race in rd_kafka_broker_fetch_toppars - https://github.com/confluentinc/librdkafka/pull/5266 (https://github.com/ClickHouse/librdkafka/pull/14) -- Fix lock-order-inversion in queue refcount operations - https://github.com/confluentinc/librdkafka/pull/5445 (https://github.com/ClickHouse/librdkafka/pull/15) -- Fix lock-order-inversion in rd_kafka_q_concat0 and rd_kafka_q_prepend0 - https://github.com/confluentinc/librdkafka/pull/5446 (https://github.com/ClickHouse/librdkafka/pull/16) - -## ClickHouse only fixes - -- Prefix librdkafka cJSON functions with `kafka_` prefix to separate them from AWS cJSON functions https://github.com/ClickHouse/librdkafka/commit/385e2bd22e90acc1fb14dccab03d55f397830e86 (originally https://github.com/ClickHouse/librdkafka/commit/2a25367b877e73380faab73dad5e5a0806d36b7a). Use regex to replace ` cJSON_([A-Za-z]+)\(` to `kafka_cJSON_$1(` - -``` -git remote add confluentinc https://github.com/confluentinc/librdkafka.git - -git fetch confluentinc v2.14.1 - -git checkout -b ClickHouse/release-2.14.1 confluentinc/v2.14.1 - -git fetch confluentinc refs/pull/4982/head -git cherry-pick bf3dd4fbb2f78e61723a51ff233dee429ee38f29 # pthread_set_name_np on freebsd - -git fetch confluentinc refs/pull/4157/head -git cherry-pick 1e79eba2fda27ed69ff510e774b713af76ccc4d0 # Do not set _POSIX_C_SOURCE for FreeBSD (makes clang-15 happy) - -git fetch confluentinc refs/pull/4788/head -git cherry-pick 0c449f610db0e0a255b0e110775f2a3c044c37e7 # Race in rd_kafka_fetch_pos2str - -git fetch confluentinc refs/pull/5089/head -git cherry-pick 5c185854404abf506d520042f61818d93d96cc91 # Fix data race in timers -git cherry-pick 46d17f637132c34c111880c9556abd13572c9f13 - -git fetch confluentinc refs/pull/4630/head -git cherry-pick ccc6962711709948759068852e0eb0b44a1c5eeb # Fix possible data-race for statistics - -git fetch confluentinc refs/pull/5266/head -git cherry-pick 801a520ec2abf8baee54f769096763398085a6d7 # Fix data race in rd_kafka_broker_fetch_toppars - -git cherry-pick 385e2bd22e90acc1fb14dccab03d55f397830e86 # Prefix librdkafka cJSON functions with `kafka_` prefix to separate them from AWS cJSON functions - -git fetch confluentinc refs/pull/5445/head -git cherry-pick 7ff1c01d6ec573355215ba56f314a9ef20ea55e8 # Fix lock-order-inversion in queue refcount operations -git cherry-pick 180b5f13e4fe6e4a85941a281c6d2bde4ff3a627 - -git fetch confluentinc refs/pull/5446/head -git cherry-pick 336830d10f2e9804e024d9cf04d3d824fdce9aa9 # Fix lock-order-inversion in rd_kafka_q_concat0 and rd_kafka_q_prepend0 -``` - -# 1.6.1 -> 2.8.0 - -Originally described [here](https://gist.github.com/filimonov/ad252aa601d4d99fb57d4d76f14aa2bf), but copied here future reference. - -Listing the commits exist in https://github.com/ClickHouse/librdkafka/tree/39d4ed49ccf3406e2bf825d5d7b0903b5a290782 on top of upstream - -``` -git log upstream/master..39d4ed49ccf3406e2bf825d5d7b0903b5a290782 > ../rdkafka_log.txt -``` - -https://github.com/confluentinc/librdkafka/compare/master...ClickHouse:librdkafka:39d4ed49ccf3406e2bf825d5d7b0903b5a290782 - - -### New fixes for 2.8 - -* https://github.com/confluentinc/librdkafka/pull/4982 -* https://github.com/confluentinc/librdkafka/pull/4788 -* https://github.com/confluentinc/librdkafka/pull/5089 -* https://github.com/confluentinc/librdkafka/pull/5266 -* https://github.com/ClickHouse/librdkafka/pull/15 - Fix lock-order-inversion in queue refcount operations (use atomic refcount instead of mutex-protected) - -### ClickHouse-specific fixes for 2.8 (cannot be upstreamed) - -* Calling 'kafka_' prefixed versions of cJSON to avoid clashes with aws-c-common's version of cJSON: - - https://github.com/ClickHouse/ClickHouse/pull/94343 - - When you upgrade librdkafka, please make sure to search for new or changed calls to cJSON and modify them accordingly. - -### Fixes done earlier - - -#### Redone differently - -* https://github.com/ClickHouse/librdkafka/commit/81b413cc1c2a33ad4e96df856b89184efbd6221c - - https://github.com/ClickHouse/ClickHouse/pull/76621/commits/4f663106eef1c2789b198c6cee10c704d50f34f5 - - -#### not yet merged by the upstream (reapplied) - - -* commit https://github.com/ClickHouse/librdkafka/commit/e685f8c6149171302bf18be36342dddd92e7b3ae + merge https://github.com/ClickHouse/librdkafka/commit/39d4ed49ccf3406e2bf825d5d7b0903b5a290782 - - https://github.com/confluentinc/librdkafka/pull/4630 - -* commit https://github.com/ClickHouse/librdkafka/commit/3d29dcf1fb2e51e15b31c0d0391891ebc355c0e1 + merge https://github.com/ClickHouse/librdkafka/commit/2d2aab6f5b79db1cfca15d7bf0dee75d00d82082 - - https://github.com/confluentinc/librdkafka/pull/4718/ (or alternative https://github.com/confluentinc/librdkafka/pull/4604/ ) - -* commit https://github.com/ClickHouse/librdkafka/commit/3d3bf79bf11d7ecbf96a4b5fd8a59bc63a490833 + merge https://github.com/ClickHouse/librdkafka/commit/6f3b483426a8c8ec950e27e446bec175cf8b553f - - https://github.com/confluentinc/librdkafka/pull/4157 - - -#### Patches already exists in the upstream 2.8 - -* commit https://github.com/ClickHouse/librdkafka/commit/ff32b4e9eeafd0b276f010ee969179e4e9e6d0b2 + merge commit https://github.com/ClickHouse/librdkafka/commit/8fa998984512927850490567bf048ac291ce24a8 - - https://github.com/confluentinc/librdkafka/pull/4232/ - -* commit https://github.com/ClickHouse/librdkafka/commit/f5f098da282bae8a1ba924ca5ee737bd18ab2c37 + merge commit https://github.com/ClickHouse/librdkafka/commit/6062e711a919fb3b669b243b7dceabd045d0e4a2 - - https://github.com/confluentinc/librdkafka/commit/bead2e4acc8f0723fa44d21451f85859d0da76e0 - https://github.com/confluentinc/librdkafka/pull/3786 - - -* commit https://github.com/ClickHouse/librdkafka/commit/b8554f1682062c85ba519eb54ef2f90e02b812cb - - https://github.com/confluentinc/librdkafka/commit/51c49f6975dc8322815d8a95d20dc952e4b1c542 - https://github.com/confluentinc/librdkafka/pull/3285 - -* commit https://github.com/ClickHouse/librdkafka/commit/cb3a4a3fcb35ca88f6f8474e557df938eedf8254 - - https://github.com/confluentinc/librdkafka/commit/c4d56949006cfdab0bb35b1135498d832a3439f1 - https://github.com/confluentinc/librdkafka/pull/3376 - - -* commit https://github.com/ClickHouse/librdkafka/commit/ae0dd50ab6a0bfd0e8bf794e9c5e71bbe783985a + commit https://github.com/ClickHouse/librdkafka/commit/5bf9c8607820a73b9625c9d29d9cdaec22470b07 - - https://github.com/confluentinc/librdkafka/commit/a11e48ef9c1f0b0d7a9188990443de2db00913b9 - https://github.com/confluentinc/librdkafka/pull/3179 - - -* commit https://github.com/ClickHouse/librdkafka/commit/f68f261ceafb887824deb7848e1ff8d2eb33f956 (Port to BoringSSL) - - https://github.com/confluentinc/librdkafka/commit/4c51ce5bca44af3898e79a6f683bffc40170437a - https://github.com/confluentinc/librdkafka/pull/4065 - -```shell -git add remote confluentinc https://github.com/confluentinc/librdkafka.git - -git fetch confluentinc master - -git checkout -b clickhouse_librdkafka_2.8.0 confluentinc/master - -git fetch confluentinc refs/pull/4982/head -git cherry-pick bf3dd4fbb2f78e61723a51ff233dee429ee38f29 # pthread_set_name_np on freebsd - -git fetch confluentinc refs/pull/4157/head -git cherry-pick 1e79eba2fda27ed69ff510e774b713af76ccc4d0 # Do not set _POSIX_C_SOURCE for FreeBSD (makes clang-15 happy) - -git fetch confluentinc refs/pull/4788/head -git cherry-pick 0c449f610db0e0a255b0e110775f2a3c044c37e7 # Race in rd_kafka_fetch_pos2str - -git fetch confluentinc refs/pull/4630/head -git cherry-pick ccc6962711709948759068852e0eb0b44a1c5eeb # Fix possible data-race for statistics - -git fetch confluentinc refs/pull/4718/head -git cherry-pick f979784bd38ff8023bbac87aefdb9ea421ad7744 # Fix data race when a buffer queue is being reset instead of being initialized - -git fetch confluentinc refs/pull/5089/head -git cherry-pick 5c185854404abf506d520042f61818d93d96cc91 # Fix data race in timers - -git fetch confluentinc refs/pull/5266/head -git cherry-pick 801a520ec2abf8baee54f769096763398085a6d7 # Fix data race in rd_kafka_broker_fetch_toppars - -# Now pick the commit with librdkafka modifications for cJSON referenced here: https://github.com/ClickHouse/ClickHouse/pull/94343 -# Please check for new / changed calls to cJSON and adjust them accordingly -``` +# Please check https://github.com/ClickHouse/ClickHouse/blob/master/contrib/librdkafka-cmake/HOW_TO_UPDTE.md diff --git a/README.md b/README.md index 846f333382..dab5aa1f00 100644 --- a/README.md +++ b/README.md @@ -1,199 +1,8 @@ -librdkafka - the Apache Kafka® C/C++ client library -=================================================== - -Copyright (c) 2012-2022, [Magnus Edenhill](http://www.edenhill.se/). - 2023 [Confluent Inc.](https://www.confluent.io/). - -[https://github.com/confluentinc/librdkafka](https://github.com/confluentinc/librdkafka) - -**librdkafka** is a C library implementation of the -[Apache Kafka](https://kafka.apache.org/) protocol, providing Producer, Consumer -and Admin clients. It was designed with message delivery reliability -and high performance in mind, current figures exceed 1 million msgs/second for -the producer and 3 million msgs/second for the consumer. - -**librdkafka** is licensed under the 2-clause BSD license. - -KAFKA is a registered trademark of The Apache Software Foundation and -has been licensed for use by librdkafka. librdkafka has no -affiliation with and is not endorsed by The Apache Software Foundation. - - -# Features - * Full Exactly-Once-Semantics (EOS) support - * High-level producer, including Idempotent and Transactional producers - * High-level balanced KafkaConsumer (requires broker >= 0.9) - * Simple (legacy) consumer - * Admin client - * Compression: snappy, gzip, lz4, zstd - * [SSL](https://github.com/confluentinc/librdkafka/wiki/Using-SSL-with-librdkafka) support - * [SASL](https://github.com/confluentinc/librdkafka/wiki/Using-SASL-with-librdkafka) (GSSAPI/Kerberos/SSPI, PLAIN, SCRAM, OAUTHBEARER) support - * Full list of [supported KIPs](INTRODUCTION.md#supported-kips) - * Broker version support: >=0.8 (see [Broker version compatibility](INTRODUCTION.md#broker-version-compatibility)) - * Guaranteed API stability for C & C++ APIs (ABI safety guaranteed for C) - * [Statistics](STATISTICS.md) metrics - * Debian package: librdkafka1 and librdkafka-dev in Debian and Ubuntu - * RPM package: librdkafka and librdkafka-devel - * Gentoo package: dev-libs/librdkafka - * Portable: runs on Linux, MacOS X, Windows, Solaris, FreeBSD, AIX, ... - -# Documentation - - * Public API in [C header](src/rdkafka.h) and [C++ header](src-cpp/rdkafkacpp.h). - * Introduction and manual in [INTRODUCTION.md](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md). - * Configuration properties in -[CONFIGURATION.md](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). - * Statistics metrics in [STATISTICS.md](https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md). - * [Frequently asked questions](https://github.com/confluentinc/librdkafka/wiki). - * Step-by-step tutorial [Getting Started with Apache Kafka and C/C++](https://developer.confluent.io/get-started/c/). - -**NOTE**: The `master` branch is actively developed, use latest [release](https://github.com/confluentinc/librdkafka/releases) for production use. - - -# Installation - -## Installing prebuilt packages - -On Mac OSX, install librdkafka with homebrew: - -```bash -$ brew install librdkafka -``` - -On Debian and Ubuntu, install librdkafka from the Confluent APT repositories, -see instructions [here](https://docs.confluent.io/current/installation/installing_cp/deb-ubuntu.html#get-the-software) and then install librdkafka: - - ```bash - $ apt install librdkafka-dev - ``` - -On RedHat, CentOS, Fedora, install librdkafka from the Confluent YUM repositories, -instructions [here](https://docs.confluent.io/current/installation/installing_cp/rhel-centos.html#get-the-software) and then install librdkafka: - -```bash -$ yum install librdkafka-devel -``` - -On Windows, reference [librdkafka.redist](https://www.nuget.org/packages/librdkafka.redist/) NuGet package in your Visual Studio project. - - -For other platforms, follow the source building instructions below. - - -## Installing librdkafka using vcpkg - -You can download and install librdkafka using the [vcpkg](https://github.com/Microsoft/vcpkg) dependency manager: - -```bash -# Install vcpkg if not already installed -$ git clone https://github.com/Microsoft/vcpkg.git -$ cd vcpkg -$ ./bootstrap-vcpkg.sh -$ ./vcpkg integrate install - -# Install librdkafka -$ vcpkg install librdkafka -``` - -The librdkafka package in vcpkg is kept up to date by Microsoft team members and community contributors. -If the version is out of date, please [create an issue or pull request](https://github.com/Microsoft/vcpkg) on the vcpkg repository. - - -## Build from source - -### Requirements - The GNU toolchain - GNU make - pthreads - zlib-dev (optional, for gzip compression support) - libssl-dev (optional, for SSL and SASL SCRAM support) - libsasl2-dev (optional, for SASL GSSAPI support) - libzstd-dev (optional, for ZStd compression support) - libcurl-dev (optional, for SASL OAUTHBEARER OIDC support) - -**NOTE**: Static linking of ZStd (requires zstd >= 1.2.1) in the producer - enables encoding the original size in the compression frame header, - which will speed up the consumer. - Use `STATIC_LIB_libzstd=/path/to/libzstd.a ./configure --enable-static` - to enable static ZStd linking. - MacOSX example: - `STATIC_LIB_libzstd=$(brew ls -v zstd | grep libzstd.a$) ./configure --enable-static` - - -### Building - - ./configure - # Or, to automatically install dependencies using the system's package manager: - # ./configure --install-deps - # Or, build dependencies from source: - # ./configure --install-deps --source-deps-only - - make - sudo make install - - -**NOTE**: See [README.win32](README.win32) for instructions how to build - on Windows with Microsoft Visual Studio. - -**NOTE**: See [CMake instructions](packaging/cmake/README.md) for experimental - CMake build (unsupported). - - -## Usage in code - -See [getting Started with Apache Kafka and C/C++](https://developer.confluent.io/get-started/c/) for a basic tutorial. - -1. Refer to the [examples directory](examples/) for code using: - - * Producers: basic producers, idempotent producers, transactional producers. - * Consumers: basic consumers, reading batches of messages. - * Performance and latency testing tools. - -2. Refer to the [examples GitHub repo](https://github.com/confluentinc/examples/tree/master/clients/cloud/c) for code connecting to a cloud streaming data service based on Apache Kafka - -3. Link your program with `-lrdkafka` (C) or `-lrdkafka++` (C++). - - -## Commercial support - -Commercial support is available from [Confluent Inc](https://www.confluent.io/) - - -## Community support - -**Only the [latest official release](https://github.com/confluentinc/librdkafka/releases) is supported for community members.** - -File bug reports and feature requests using [GitHub Issues](https://github.com/confluentinc/librdkafka/issues). - -Questions and discussions are welcome on the [Discussions](https://github.com/confluentinc/librdkafka/discussions) forum, and on the [Confluent Community slack](https://launchpass.com/confluentcommunity) #clients channel. - - -# Language bindings - - * C#/.NET: [confluent-kafka-dotnet](https://github.com/confluentinc/confluent-kafka-dotnet) (based on [rdkafka-dotnet](https://github.com/ah-/rdkafka-dotnet)) - * C++: [cppkafka](https://github.com/mfontanini/cppkafka) - * C++: [modern-cpp-kafka](https://github.com/Morgan-Stanley/modern-cpp-kafka) - * Common Lisp: [cl-rdkafka](https://github.com/SahilKang/cl-rdkafka) - * D (C-like): [librdkafka](https://github.com/DlangApache/librdkafka/) - * D (C++-like): [librdkafkad](https://github.com/tamediadigital/librdkafka-d) - * Erlang: [erlkaf](https://github.com/silviucpp/erlkaf) - * Go: [confluent-kafka-go](https://github.com/confluentinc/confluent-kafka-go) - * Haskell (kafka, conduit, avro, schema registry): [hw-kafka](https://github.com/haskell-works/hw-kafka) - * Kotlin Native: [Kafka-Kotlin-Native](https://github.com/icemachined/kafka-kotlin-native) - * Lua: [luardkafka](https://github.com/mistsv/luardkafka) - * Node.js: [node-rdkafka](https://github.com/Blizzard/node-rdkafka) - * OCaml: [ocaml-kafka](https://github.com/didier-wenzek/ocaml-kafka) - * Perl: [Net::Kafka](https://github.com/bookingcom/perl-Net-Kafka) - * PHP: [php-rdkafka](https://github.com/arnaud-lb/php-rdkafka) - * PHP: [php-simple-kafka-client](https://github.com/php-kafka/php-simple-kafka-client) - * Python: [confluent-kafka-python](https://github.com/confluentinc/confluent-kafka-python) - * Python: [PyKafka](https://github.com/Parsely/pykafka) - * Ruby: [Hermann](https://github.com/reiseburo/hermann) - * Ruby: [rdkafka-ruby](https://github.com/appsignal/rdkafka-ruby) - * Rust: [rust-rdkafka](https://github.com/fede1024/rust-rdkafka) - * Tcl: [KafkaTcl](https://github.com/flightaware/kafkatcl) - * Shell: [kafkacat](https://github.com/edenhill/kafkacat) - Apache Kafka command line tool - * Swift: [Perfect-Kafka](https://github.com/PerfectlySoft/Perfect-Kafka) - - -See [Powered by librdkafka](https://github.com/confluentinc/librdkafka/wiki/Powered-by-librdkafka) for an incomplete list of librdkafka users. +# How to patch our fork + + 1. Create the necessary changes in this repo + 2. Validate the changes either locally or by opening a PR in the main ClickHouse repo referencing the commit temporarily + 3. Open a PR in this repo to merge your changes to the latest release branch + 4. Open the upstream PR + 5. Merge the PR in this repo + 6. In the PR in the ClickHouse repo reference the merged commit and update [contrib/librdkafka-cmake/HOW_TO_UPDATE.md](https://github.com/ClickHouse/ClickHouse/blob/master/contrib/librdkafka-cmake/HOW_TO_UPDATE.md) diff --git a/tests/0154-assign_timeout_mock.c b/tests/0154-assign_timeout_mock.c new file mode 100644 index 0000000000..9e9336a325 --- /dev/null +++ b/tests/0154-assign_timeout_mock.c @@ -0,0 +1,203 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2026, ClickHouse Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" +#include "rdkafka.h" + +/** + * @name Verify that rd_kafka_assign() returns within a bounded time + * even when the group coordinator / cgrp state machine is + * unable to make progress. + * + * Background: + * rd_kafka_assign() is called from the rebalance callback on the + * application thread. Internally it sends an RD_KAFKA_OP_ASSIGN to + * the cgrp ops queue and waits for a reply. Previously the wait + * used RD_POLL_INFINITE, meaning the application thread could hang + * indefinitely inside rd_kafka_consumer_poll() if the cgrp could + * not service the op in a timely manner (e.g. coordinator + * unreachable, queue forwarding stalled during destroy, etc.). + * + * The fix introduces a 30 s timeout in rd_kafka_assign0(). On + * expiry the call returns RD_KAFKA_RESP_ERR__TIMED_OUT instead of + * hanging forever, letting the caller honour its own poll timeout + * and react to shutdown signals. + * + * This test exercises rd_kafka_assign() from a rebalance callback + * while the cluster is repeatedly being torn down and brought back + * up. It does not attempt to deterministically reproduce the + * original hang (which depends on a race in the main rdkafka + * thread) — instead it asserts the *invariant* that no individual + * rd_kafka_assign() call ever exceeds the timeout cap by more than + * a small slack. Without the patch, if the hang is hit, the test + * blocks past TEST_TIMEOUT and fails; with the patch the assert on + * `max_assign_us` enforces the upper bound on every call. + */ + +#define ASSIGN_TIMEOUT_MS 30000 +#define ASSIGN_SLACK_MS 5000 +#define ASSIGN_MAX_US ((ASSIGN_TIMEOUT_MS + ASSIGN_SLACK_MS) * 1000) + +static int64_t max_assign_us = 0; +static int rebalance_cnt = 0; + +static void rebalance_cb(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *parts, + void *opaque) { + int64_t t0, dt; + rd_kafka_resp_err_t aerr; + + rebalance_cnt++; + TEST_SAY("Rebalance #%d: %s: %d partition(s)\n", rebalance_cnt, + rd_kafka_err2name(err), parts->cnt); + + t0 = test_clock(); + + if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) + aerr = rd_kafka_assign(rk, parts); + else + aerr = rd_kafka_assign(rk, NULL); + + dt = test_clock() - t0; + + if (dt > max_assign_us) + max_assign_us = dt; + + TEST_SAY("rd_kafka_assign() returned %s in %" PRId64 "ms\n", + rd_kafka_err2name(aerr), dt / 1000); + + TEST_ASSERT(dt < ASSIGN_MAX_US, + "rd_kafka_assign() blocked for %" PRId64 + "ms, exceeds bounded timeout of %dms (+%dms slack)", + dt / 1000, ASSIGN_TIMEOUT_MS, ASSIGN_SLACK_MS); + + /* TIMED_OUT is the expected error from the bounded wait. Any + * other error (or success) is also acceptable — we only care + * that the call returned. */ + if (aerr && aerr != RD_KAFKA_RESP_ERR__TIMED_OUT) + TEST_SAY("Non-timeout assign error (acceptable): %s\n", + rd_kafka_err2name(aerr)); +} + + +/** + * @brief Drive a consumer through rebalances while the brokers + * flicker up/down, and verify rd_kafka_assign() always + * returns within the bounded timeout. + */ +static void do_test_assign_bounded_timeout(void) { + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_conf_t *conf; + rd_kafka_t *c; + const char *groupid = "bounded-assign-grp"; + const char *topic = "bounded-assign-topic"; + int i; + + SUB_TEST_QUICK(); + + rebalance_cnt = 0; + max_assign_us = 0; + + mcluster = test_mock_cluster_new(3, &bootstraps); + rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1); + rd_kafka_mock_topic_create(mcluster, topic, 4, 1); + + test_conf_init(&conf, NULL, 60); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "security.protocol", "PLAINTEXT"); + test_conf_set(conf, "group.id", groupid); + test_conf_set(conf, "session.timeout.ms", "6000"); + test_conf_set(conf, "heartbeat.interval.ms", "1000"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "partition.assignment.strategy", "range"); + + c = test_create_consumer(groupid, rebalance_cb, conf, NULL); + test_consumer_subscribe(c, topic); + + /* Poll until we have at least one assignment rebalance. */ + TEST_SAY("Waiting for initial assignment\n"); + while (rebalance_cnt == 0) + test_consumer_poll_once(c, NULL, 500); + + /* Now flicker the brokers up/down a few times while polling. + * This stresses the cgrp state machine: coordinator lookup, + * group re-join, partition revocation/assignment all happen + * in rapid succession. Each rebalance invokes the callback + * which times rd_kafka_assign(). */ + for (i = 0; i < 6; i++) { + if (i % 2 == 0) { + TEST_SAY("Iteration %d: bringing all brokers down\n", + i); + rd_kafka_mock_broker_set_down(mcluster, -1); + } else { + TEST_SAY("Iteration %d: bringing all brokers up\n", i); + rd_kafka_mock_broker_set_up(mcluster, -1); + } + + /* Poll for ~3 s — this is the application-visible poll + * timeout we are protecting. Without the fix, a stuck + * cgrp would make this iteration take far longer. */ + test_consumer_poll_no_msgs("flicker", c, 0, 3000); + } + + /* Final state: brokers up, drain any pending rebalances. */ + rd_kafka_mock_broker_set_up(mcluster, -1); + test_consumer_poll_no_msgs("settle", c, 0, 3000); + + TEST_SAY("Total rebalance callbacks: %d, max assign() duration: " + "%" PRId64 "ms\n", + rebalance_cnt, max_assign_us / 1000); + + TEST_ASSERT(rebalance_cnt >= 1, + "Expected at least one rebalance callback, got %d", + rebalance_cnt); + TEST_ASSERT(max_assign_us < ASSIGN_MAX_US, + "Maximum rd_kafka_assign() duration %" PRId64 + "ms exceeds bounded timeout %dms (+%dms slack)", + max_assign_us / 1000, ASSIGN_TIMEOUT_MS, ASSIGN_SLACK_MS); + + rd_kafka_consumer_close(c); + rd_kafka_destroy(c); + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + + +int main_0154_assign_timeout_mock(int argc, char **argv) { + if (test_needs_auth()) { + TEST_SAY("Mock cluster does not support SSL/SASL\n"); + return 0; + } + + do_test_assign_bounded_timeout(); + + return 0; +}