nri-kafka: fix sendSync hanging forever on delivery failure#159
Conversation
sendSync blocked on a TMVar that the delivery callback only signalled on the success branch, so a broker-side delivery failure (delivery.timeout.ms exceeded, retries exhausted, non-retriable error, no partition leader) left the caller parked forever and muted the error from observability. Add a pure Internal.deliveryReportToResult that maps every DeliveryReport to a Result Error (), and signal the terminator on all branches. A failed delivery now surfaces as a descriptive Task.fail: DeliveryFailed for a broker-side failure, NoMessageDelivered for the message-less NoMessageError report. sendAsync keeps its prior success-only callback contract. The dispatch is unit-tested in test/Spec/Kafka.hs; the full worker integration suite passes against a live broker. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Fixes Kafka.sendSync potentially hanging forever when a message is enqueued successfully but later fails delivery by ensuring the delivery callback always signals completion and propagates a failure result.
Changes:
- Add
Internal.deliveryReportToResultand new internalErrorconstructors to represent all delivery outcomes. - Rewire
sendHelperAsync/sendSyncto receive the fullDeliveryReportand signal the TMVar on both success and failure. - Add unit tests for delivery-report mapping and update docs/changelog accordingly.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| nri-kafka/test/Spec/Kafka.hs | Adds pure unit tests for deliveryReportToResult. |
| nri-kafka/test/Main.hs | Registers the new Spec.Kafka tests in the test runner. |
| nri-kafka/src/Kafka/Internal.hs | Adds DeliveryFailed/NoMessageDelivered and implements deliveryReportToResult. |
| nri-kafka/src/Kafka.hs | Updates async/sync send logic so sync send terminates on all delivery reports; updates helper callback shape. |
| nri-kafka/nri-kafka.cabal | Adds Spec.Kafka to the test-suite module list. |
| nri-kafka/docs/known-issues.md | Marks the sendSync hang issue as resolved with an explanation. |
| nri-kafka/CHANGELOG.md | Adds an Unreleased entry describing the fix. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| errorToText :: Error -> Text | ||
| errorToText err = Text.fromList (Prelude.show err) |
There was a problem hiding this comment.
This was already the case for SendingFailed. I think this is better addressed by ensuring our messages wrap PII in e.g. Log.Secret, so they're still debuggable, than omitting their contents completely.
Match the rest of the repo (src/Kafka.hs, test/Helpers.hs) which builds prTopic explicitly rather than relying on the IsString instance. Addresses PR review feedback. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| let onDeliveryReport deliveryReport = | ||
| case deliveryReport of | ||
| Producer.DeliverySuccess _producerRecord _offset -> onDeliveryCallback | ||
| _ -> Task.succeed () | ||
| sendHelperAsync producer doAnything onDeliveryReport msg' |
There was a problem hiding this comment.
Note that this is unchanged from trunk. We do nothing on failure in the async path.
Fixing this is a tad larger than what we're aiming for here, and I'm not 100% sure how it would work. I've tried and failed to have a LogHandler report stuff in code that runs in a separate thread in our deployer service, for instance.
There was a problem hiding this comment.
out of scope of this pr, agreed
| -- librdkafka invokes this callback exactly once per message, on | ||
| -- both success and failure, so handing it the whole delivery | ||
| -- report lets callers be notified of either outcome. | ||
| ( \deliveryReport -> do | ||
| log <- Platform.silentHandler | ||
| Task.perform log <| | ||
| case deliveryReport of | ||
| Producer.DeliverySuccess _producerRecord _offset -> onDeliveryCallback | ||
| _ -> Task.succeed () | ||
| Task.perform log (onDeliveryReport deliveryReport) |
Problem
Note: We never actually observed the problematic behavior in production or anywhere. It might be we never failed to write a synchronous message to Kafka 😅. Claude pointed out this issue during the
sendSyncperformance work in #151, and since we'll rely onsendSyncin the event platform, I thought we'd better have this covered.Kafka.sendSyncblocked on aTMVarthat the delivery callback only signaled on the success branch. When a message was enqueued but the broker ultimately failed to deliver it (delivery.timeout.msexceeded, retries exhausted, a non-retriable broker error, or no available partition leader), theTMVarwas never written and the calling request handler parked forever — and the failure was muted from observability.The synchronous
ImmediateErrorenqueue-refused path already failed correctly; this was specifically the asynchronous, post-enqueue delivery-report path.Fix
Internal.deliveryReportToResult :: DeliveryReport -> Result Error ()that maps every delivery report to a result, and signal the terminator on all branches.Task.fail:DeliveryFailed (ProducerRecord, KafkaError)— broker-side delivery failure (carries the record).NoMessageDelivered KafkaError— the message-lessNoMessageErrorreport (librdkafka's delivery callback fired with a null message pointer; error read fromerrno).sendHelperAsync's callback now receives the wholeDeliveryReportand fires on every report (was success-only).sendAsynckeeps its prior success-only callback contract.Errorlives in the unexposedKafka.Internalmodule, so the new constructors are not a public-API change.Testing
test/Spec/Kafka.hscover all three report branches (developed test-first: watched them fail against a stub, then pass).sendSync's success path.Docs
docs/known-issues.mdentry marked resolved (original write-up kept for context).CHANGELOG.mdentry added under a new# Unreleasedheading.🤖 Generated with Claude Code