From 240cf59b695c2b4c0a4e6f408c7d188a7ca60e91 Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Fri, 12 Jun 2026 23:10:33 -0600 Subject: [PATCH 1/2] nri-kafka: fix sendSync hanging forever on delivery failure sendSync blocked on a TMVar that the delivery callback only signalled on the success branch, so a broker-side delivery failure (delivery.timeout.ms exceeded, retries exhausted, non-retriable error, no partition leader) left the caller parked forever and muted the error from observability. Add a pure Internal.deliveryReportToResult that maps every DeliveryReport to a Result Error (), and signal the terminator on all branches. A failed delivery now surfaces as a descriptive Task.fail: DeliveryFailed for a broker-side failure, NoMessageDelivered for the message-less NoMessageError report. sendAsync keeps its prior success-only callback contract. The dispatch is unit-tested in test/Spec/Kafka.hs; the full worker integration suite passes against a live broker. Co-Authored-By: Claude Opus 4.8 (1M context) --- nri-kafka/CHANGELOG.md | 5 ++++ nri-kafka/docs/known-issues.md | 13 +++++++++- nri-kafka/nri-kafka.cabal | 1 + nri-kafka/src/Kafka.hs | 36 ++++++++++++++++++-------- nri-kafka/src/Kafka/Internal.hs | 33 +++++++++++++++++++++++- nri-kafka/test/Main.hs | 4 ++- nri-kafka/test/Spec/Kafka.hs | 45 +++++++++++++++++++++++++++++++++ 7 files changed, 123 insertions(+), 14 deletions(-) create mode 100644 nri-kafka/test/Spec/Kafka.hs diff --git a/nri-kafka/CHANGELOG.md b/nri-kafka/CHANGELOG.md index f17d53fa..6e5c347a 100644 --- a/nri-kafka/CHANGELOG.md +++ b/nri-kafka/CHANGELOG.md @@ -1,3 +1,8 @@ +# Unreleased + +- Fix `sendSync` hanging forever when a message fails to deliver. A failed + delivery now returns a descriptive `Task.fail` instead of parking the caller. + # 0.4.0.1 - Require `nri-prelude >= 0.7.0.0` diff --git a/nri-kafka/docs/known-issues.md b/nri-kafka/docs/known-issues.md index 32045663..abd0afef 100644 --- a/nri-kafka/docs/known-issues.md +++ b/nri-kafka/docs/known-issues.md @@ -1,6 +1,17 @@ # Known issues -## `sendSync` hangs forever on delivery failure +## ~~`sendSync` hangs forever on delivery failure~~ (RESOLVED) + +**Resolved.** `Internal.deliveryReportToResult` now maps every delivery report +to a `Result Internal.Error ()`, the `sendSync` terminator carries that result +instead of a bare `Terminate`, and the callback signals it on both the success +and failure branches. A failed delivery now surfaces as a descriptive +`Task.fail` (`DeliveryFailed` for a broker-side failure, `NoMessageDelivered` +for the message-less `NoMessageError` report) instead of parking the caller +forever. `sendAsync` keeps its previous success-only callback contract. The +dispatch is unit-tested in `test/Spec/Kafka.hs`. + +The original write-up is kept below for context. ### Where diff --git a/nri-kafka/nri-kafka.cabal b/nri-kafka/nri-kafka.cabal index ab7c98ad..72ee3276 100644 --- a/nri-kafka/nri-kafka.cabal +++ b/nri-kafka/nri-kafka.cabal @@ -251,6 +251,7 @@ test-suite tests Kafka.Worker.Settings Kafka.Worker.Stopping Helpers + Spec.Kafka Spec.Kafka.Worker.Integration Spec.Kafka.Worker.Partition Paths_nri_kafka diff --git a/nri-kafka/src/Kafka.hs b/nri-kafka/src/Kafka.hs index 83d309bd..aa65554e 100644 --- a/nri-kafka/src/Kafka.hs +++ b/nri-kafka/src/Kafka.hs @@ -215,18 +215,32 @@ mkHandler settings producer = do Platform.tracingSpan "Async send Kafka messages" <| do let details = Details (List.map Producer.unBrokerAddress (Settings.brokerAddresses settings)) msg' Platform.setTracingSpanDetails details - sendHelperAsync producer doAnything onDeliveryCallback msg' + -- Preserve the existing async contract: the caller's callback only + -- fires once the broker has confirmed delivery. Failures are not + -- forwarded to async callers (they would have to be reported out of + -- band), only to sync callers below. + let onDeliveryReport deliveryReport = + case deliveryReport of + Producer.DeliverySuccess _producerRecord _offset -> onDeliveryCallback + _ -> Task.succeed () + sendHelperAsync producer doAnything onDeliveryReport msg' |> Task.mapError Internal.errorToText, Internal.sendSync = \msg' -> Platform.tracingSpan "Sync send Kafka messages" <| do let details = Details (List.map Producer.unBrokerAddress (Settings.brokerAddresses settings)) msg' Platform.setTracingSpanDetails details terminator <- doSTM doAnything TMVar.newEmptyTMVar - let onDeliveryCallback = doSTM doAnything (TMVar.putTMVar terminator Terminate) - sendHelperAsync producer doAnything onDeliveryCallback msg' + -- The callback runs on every delivery report, so the terminator is + -- signalled exactly once whether delivery succeeds or fails. This + -- is what keeps a failed delivery from parking the caller forever. + let onDeliveryReport deliveryReport = + doSTM doAnything (TMVar.putTMVar terminator (Internal.deliveryReportToResult deliveryReport)) + sendHelperAsync producer doAnything onDeliveryReport msg' |> Task.mapError Internal.errorToText - Terminate <- doSTM doAnything (TMVar.readTMVar terminator) - Task.succeed () + result <- doSTM doAnything (TMVar.readTMVar terminator) + case result of + Ok () -> Task.succeed () + Err err -> Task.fail (Internal.errorToText err) } doSTM :: Platform.DoAnythingHandler -> STM.STM a -> Task e a @@ -269,10 +283,10 @@ mkProducer Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeout sendHelperAsync :: Producer.KafkaProducer -> Platform.DoAnythingHandler -> - Task Never () -> + (Producer.DeliveryReport -> Task Never ()) -> Internal.Msg -> Task Internal.Error () -sendHelperAsync producer doAnything onDeliveryCallback msg' = do +sendHelperAsync producer doAnything onDeliveryReport msg' = do record' <- record msg' Exception.handleAny (\exception -> Prelude.pure (Err (Internal.Uncaught exception))) @@ -281,12 +295,12 @@ sendHelperAsync producer doAnything onDeliveryCallback msg' = do Producer.produceMessage' producer record' + -- librdkafka invokes this callback exactly once per message, on + -- both success and failure, so handing it the whole delivery + -- report lets callers be notified of either outcome. ( \deliveryReport -> do log <- Platform.silentHandler - Task.perform log <| - case deliveryReport of - Producer.DeliverySuccess _producerRecord _offset -> onDeliveryCallback - _ -> Task.succeed () + Task.perform log (onDeliveryReport deliveryReport) ) Prelude.pure <| case maybeFailedMessages of Prelude.Right _ -> Ok () diff --git a/nri-kafka/src/Kafka/Internal.hs b/nri-kafka/src/Kafka/Internal.hs index 2eebce92..67f6bce1 100644 --- a/nri-kafka/src/Kafka/Internal.hs +++ b/nri-kafka/src/Kafka/Internal.hs @@ -48,13 +48,44 @@ instance Show Encodable where -- | Errors. -- If you experience an 'Uncaught' exception, please wrap it here type here! data Error - = SendingFailed (Producer.ProducerRecord, Producer.KafkaError) + = -- | A message could not be enqueued for sending. This is a pre-flight + -- failure surfaced synchronously by librdkafka (e.g. the local producer + -- queue is full, or the message exceeds the configured maximum size); the + -- message was never handed to the broker. + SendingFailed (Producer.ProducerRecord, Producer.KafkaError) + | -- | A message was enqueued and handed to the broker, but delivery + -- ultimately failed (e.g. @delivery.timeout.ms@ exceeded, retries + -- exhausted, a non-retriable broker error, or no available partition + -- leader). This is reported asynchronously through the delivery callback, + -- after a successful enqueue. + DeliveryFailed (Producer.ProducerRecord, Producer.KafkaError) + | -- | librdkafka invoked the delivery callback to report a failure but did + -- not attach the original message, so there is no 'Producer.ProducerRecord' + -- to report — only the 'Producer.KafkaError'. In hw-kafka-client this is + -- the @NoMessageError@ delivery report, which is produced when the C + -- delivery callback fires with a null message pointer and the error code is + -- read from @errno@. It is an exceptional, library-level condition rather + -- than a normal per-message broker rejection (those arrive as + -- 'DeliveryFailed', which carries the record). + NoMessageDelivered Producer.KafkaError | Uncaught Exception.SomeException deriving (Show) errorToText :: Error -> Text errorToText err = Text.fromList (Prelude.show err) +-- | Translate a librdkafka 'Producer.DeliveryReport' into a 'Result' the +-- caller can act on: a success carries no payload, while the two failure +-- reports map to the corresponding 'Error' constructors. Kept pure (no +-- 'Producer.KafkaProducer', no IO) so the dispatch can be unit-tested without +-- a running broker. +deliveryReportToResult :: Producer.DeliveryReport -> Result Error () +deliveryReportToResult deliveryReport = + case deliveryReport of + Producer.DeliverySuccess _record _offset -> Ok () + Producer.DeliveryFailure record kafkaError -> Err (DeliveryFailed (record, kafkaError)) + Producer.NoMessageError kafkaError -> Err (NoMessageDelivered kafkaError) + -- | A kafka topic newtype Topic = Topic {unTopic :: Text} deriving (Aeson.ToJSON, Show) diff --git a/nri-kafka/test/Main.hs b/nri-kafka/test/Main.hs index b4b1dde7..60fe59ff 100644 --- a/nri-kafka/test/Main.hs +++ b/nri-kafka/test/Main.hs @@ -1,5 +1,6 @@ module Main (main) where +import qualified Spec.Kafka import qualified Spec.Kafka.Worker.Integration import qualified Spec.Kafka.Worker.Partition import qualified System.Environment @@ -16,6 +17,7 @@ tests :: Test.Test tests = Test.describe "lib/kafka" - [ Spec.Kafka.Worker.Integration.tests, + [ Spec.Kafka.tests, + Spec.Kafka.Worker.Integration.tests, Spec.Kafka.Worker.Partition.tests ] diff --git a/nri-kafka/test/Spec/Kafka.hs b/nri-kafka/test/Spec/Kafka.hs new file mode 100644 index 00000000..7f468007 --- /dev/null +++ b/nri-kafka/test/Spec/Kafka.hs @@ -0,0 +1,45 @@ +module Spec.Kafka (tests) where + +import qualified Expect +import qualified Kafka.Consumer as Consumer +import qualified Kafka.Internal as Internal +import qualified Kafka.Producer as Producer +import qualified Test + +tests :: Test.Test +tests = + Test.describe + "Kafka" + [ Test.describe + "deliveryReportToResult" + [ Test.test "a successful delivery becomes Ok ()" <| \() -> + case Internal.deliveryReportToResult + (Producer.DeliverySuccess exampleRecord (Consumer.Offset 0)) of + Ok () -> Expect.pass + other -> Expect.fail ("expected Ok (), got " ++ Debug.toString other), + Test.test "a broker delivery failure becomes DeliveryFailed carrying the record and error" <| \() -> + case Internal.deliveryReportToResult + (Producer.DeliveryFailure exampleRecord exampleError) of + Err (Internal.DeliveryFailed payload) -> + Expect.equal payload (exampleRecord, exampleError) + other -> Expect.fail ("expected Err (DeliveryFailed ...), got " ++ Debug.toString other), + Test.test "a message-less failure becomes NoMessageDelivered carrying the error" <| \() -> + case Internal.deliveryReportToResult + (Producer.NoMessageError exampleError) of + Err (Internal.NoMessageDelivered kafkaError) -> + Expect.equal kafkaError exampleError + other -> Expect.fail ("expected Err (NoMessageDelivered ...), got " ++ Debug.toString other) + ] + ] + +exampleRecord :: Producer.ProducerRecord +exampleRecord = + Producer.ProducerRecord + { Producer.prTopic = "the-topic", + Producer.prPartition = Producer.UnassignedPartition, + Producer.prKey = Nothing, + Producer.prValue = Nothing + } + +exampleError :: Producer.KafkaError +exampleError = Producer.KafkaError "boom" From 2eca349788688bb857a2f591ecc36974d7cfa6bc Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Mon, 15 Jun 2026 11:44:23 -0300 Subject: [PATCH 2/2] nri-kafka: construct test topic via Producer.TopicName Match the rest of the repo (src/Kafka.hs, test/Helpers.hs) which builds prTopic explicitly rather than relying on the IsString instance. Addresses PR review feedback. Co-Authored-By: Claude Opus 4.8 (1M context) --- nri-kafka/test/Spec/Kafka.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nri-kafka/test/Spec/Kafka.hs b/nri-kafka/test/Spec/Kafka.hs index 7f468007..a6acc8f2 100644 --- a/nri-kafka/test/Spec/Kafka.hs +++ b/nri-kafka/test/Spec/Kafka.hs @@ -35,7 +35,7 @@ tests = exampleRecord :: Producer.ProducerRecord exampleRecord = Producer.ProducerRecord - { Producer.prTopic = "the-topic", + { Producer.prTopic = Producer.TopicName "the-topic", Producer.prPartition = Producer.UnassignedPartition, Producer.prKey = Nothing, Producer.prValue = Nothing