From ab1357c1f2772f72f823d90a654e51a6fb99567f Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Mon, 25 May 2026 10:55:48 +0200 Subject: [PATCH 1/4] tests: remove some flakyness from test_kafka_consume Producer and consumer runs in two different threads so we cannot assume we have spans in a specific order. Assisted-by: Cursor --- tests/instrumentation/kafka_tests.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/tests/instrumentation/kafka_tests.py b/tests/instrumentation/kafka_tests.py index 5940d570b..aa16784e6 100644 --- a/tests/instrumentation/kafka_tests.py +++ b/tests/instrumentation/kafka_tests.py @@ -133,14 +133,20 @@ def delayed_send(): thread.join() transactions = elasticapm_client.events[TRANSACTION] spans = elasticapm_client.events[SPAN] + producer_transaction = next(transaction for transaction in transactions if transaction["name"] == "foo") + consumer_transactions = [transaction for transaction in transactions if transaction["type"] == "messaging"] + consumer_spans = [span for span in spans if span["name"] == "foo"] # the consumer transactions should have the same trace id as the transaction that triggered the messages - assert transactions[0]["trace_id"] == transactions[1]["trace_id"] == transactions[2]["trace_id"] - assert transactions[1]["name"] == f"Kafka RECEIVE from {test_topic}" - assert transactions[1]["type"] == "messaging" - assert transactions[1]["context"]["message"]["queue"]["name"] == test_topic - - assert spans[2]["transaction_id"] == transactions[1]["id"] - assert spans[3]["transaction_id"] == transactions[2]["id"] + assert len(consumer_transactions) == 2 + assert len(consumer_spans) == 2 + assert ( + producer_transaction["trace_id"] == consumer_transactions[0]["trace_id"] == consumer_transactions[1]["trace_id"] + ) + assert consumer_transactions[0]["name"] == f"Kafka RECEIVE from {test_topic}" + assert consumer_transactions[0]["context"]["message"]["queue"]["name"] == test_topic + + assert consumer_spans[0]["transaction_id"] == consumer_transactions[0]["id"] + assert consumer_spans[1]["transaction_id"] == consumer_transactions[1]["id"] def test_kafka_consume_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics): From acda7c506f5e321c5e9bb93aff116046a6aae224 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Mon, 25 May 2026 17:07:22 +0200 Subject: [PATCH 2/4] tests: remove flakyness from test_kafka_consume_ongoing_transaction Assisted-by: Cursor --- tests/instrumentation/kafka_tests.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tests/instrumentation/kafka_tests.py b/tests/instrumentation/kafka_tests.py index aa16784e6..1328273f8 100644 --- a/tests/instrumentation/kafka_tests.py +++ b/tests/instrumentation/kafka_tests.py @@ -77,7 +77,12 @@ def producer(topics): @pytest.fixture() def consumer(topics): - consumer = KafkaConsumer(bootstrap_servers=f"{KAFKA_HOST}:9092", consumer_timeout_ms=500) + consumer = KafkaConsumer( + bootstrap_servers=f"{KAFKA_HOST}:9092", + consumer_timeout_ms=500, + # Unique topics can be produced to before the consumer's initial offset is resolved. + auto_offset_reset="earliest", + ) consumer.subscribe(topics=topics) deadline = time.time() + 5 while not consumer.assignment() and time.time() < deadline: @@ -163,6 +168,8 @@ def delayed_send(): thread = threading.Thread(target=delayed_send) thread.start() transaction = elasticapm_client.begin_transaction("foo") + # Give Kafka enough time to resolve offsets and deliver both delayed records before iteration stops. + consumer.config["consumer_timeout_ms"] = 2000 for item in consumer: pass thread.join() From 65cd0e2abde52ee82a1940c681a4ae1a894e30a0 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Mon, 25 May 2026 17:13:43 +0200 Subject: [PATCH 3/4] tests: remove more flakyness from test_kafka_consume Assisted-by: Cursor --- tests/instrumentation/kafka_tests.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/instrumentation/kafka_tests.py b/tests/instrumentation/kafka_tests.py index 1328273f8..9ec3e7e14 100644 --- a/tests/instrumentation/kafka_tests.py +++ b/tests/instrumentation/kafka_tests.py @@ -132,6 +132,8 @@ def delayed_send(): thread = threading.Thread(target=delayed_send) thread.start() + # Give Kafka enough time to resolve offsets and deliver both delayed records before iteration stops. + consumer.config["consumer_timeout_ms"] = 2000 for item in consumer: with elasticapm.capture_span("foo"): pass From 4f54d9c85aae61a4a2b749a50a845aa60263920d Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Mon, 25 May 2026 17:15:41 +0200 Subject: [PATCH 4/4] tests: remove flakyness from test_kafka_consumer_ignore_topic Assisted-by: Cursor --- tests/instrumentation/kafka_tests.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/instrumentation/kafka_tests.py b/tests/instrumentation/kafka_tests.py index 9ec3e7e14..6d1c51be1 100644 --- a/tests/instrumentation/kafka_tests.py +++ b/tests/instrumentation/kafka_tests.py @@ -199,6 +199,8 @@ def delayed_send(): thread = threading.Thread(target=delayed_send) thread.start() + # Give Kafka enough time to resolve offsets and deliver all delayed records before iteration stops. + consumer.config["consumer_timeout_ms"] = 2000 for item in consumer: with elasticapm.capture_span("test"): assert item