Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ Key design: `handle_task()` fires-and-forgets coroutines as asyncio tasks. Messa
- `initialize_concurrent_processing(context, ...)`: call on app startup to create and start the handler, storing it in FastStream's global context.
- `stop_concurrent_processing(context)`: call on app shutdown; resets the singleton so it can be re-initialized (important for tests).

**`healthcheck.py` — `is_kafka_handler_healthy`**
A single function that accepts a `ContextRepo` and returns `True` if the `KafkaConcurrentHandler` is present and healthy. Intended for readiness/liveness probes.

**`batch_committer.py` — `KafkaBatchCommitter`**
Runs as a background asyncio task (spawned via `spawn()`). Collects `KafkaCommitTask` objects from a queue, batches them by topic-partition, waits for each task's asyncio future to complete, then commits the max offset per partition to Kafka. Batching is triggered by timeout or batch size, whichever comes first. `CommitterIsDeadError` is raised to callers if the committer's main task has died.

Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ By default FastStream processes Kafka messages sequentially — one message at a
- Signal handling (SIGTERM / SIGINT / SIGQUIT) triggers graceful shutdown
- Background observer task to detect and discard stale completed tasks
- Handler exceptions are logged but do not crash the consumer
- Health check helper to probe handler status from a `ContextRepo`

## 📦 [PyPi](https://pypi.org/project/faststream-concurrent-aiokafka)

Expand All @@ -41,6 +42,7 @@ from faststream.middlewares import AckPolicy
from faststream_concurrent_aiokafka import (
KafkaConcurrentProcessingMiddleware,
initialize_concurrent_processing,
is_kafka_handler_healthy,
stop_concurrent_processing,
)

Expand Down Expand Up @@ -106,6 +108,10 @@ Returns the `KafkaConcurrentHandler` instance.

Flush pending commits, wait for in-flight tasks (up to 10 s), then stop the handler.

### `is_kafka_handler_healthy(context)`

Returns `True` if the `KafkaConcurrentHandler` stored in `context` is running and healthy, `False` otherwise (not initialized, stopped, or observer task dead). Useful for readiness/liveness probes.

### `KafkaConcurrentProcessingMiddleware`

FastStream middleware class. Pass it to `KafkaBroker(middlewares=[...])` or `broker.add_middleware(...)`.
Expand Down
2 changes: 2 additions & 0 deletions faststream_concurrent_aiokafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from faststream_concurrent_aiokafka.healthcheck import is_kafka_handler_healthy
from faststream_concurrent_aiokafka.middleware import (
KafkaConcurrentProcessingMiddleware,
initialize_concurrent_processing,
Expand All @@ -8,5 +9,6 @@
__all__ = [
"KafkaConcurrentProcessingMiddleware",
"initialize_concurrent_processing",
"is_kafka_handler_healthy",
"stop_concurrent_processing",
]
14 changes: 14 additions & 0 deletions faststream_concurrent_aiokafka/healthcheck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import typing

from faststream import ContextRepo

from faststream_concurrent_aiokafka.middleware import _PROCESSING_CONTEXT_KEY


if typing.TYPE_CHECKING:
from faststream_concurrent_aiokafka.processing import KafkaConcurrentHandler


def is_kafka_handler_healthy(context: ContextRepo) -> bool:
handler: KafkaConcurrentHandler | None = context.get(_PROCESSING_CONTEXT_KEY)
return handler is not None and handler.is_healthy
43 changes: 43 additions & 0 deletions tests/test_healthcheck.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import typing
from unittest.mock import MagicMock

from faststream.kafka import KafkaBroker, TestKafkaBroker

from faststream_concurrent_aiokafka.healthcheck import is_kafka_handler_healthy
from faststream_concurrent_aiokafka.middleware import (
initialize_concurrent_processing,
stop_concurrent_processing,
)


async def test_healthy_when_handler_is_running() -> None:
broker: typing.Final = KafkaBroker("localhost:9092")
async with TestKafkaBroker(broker) as test_broker:
await initialize_concurrent_processing(context=test_broker.context)
try:
assert is_kafka_handler_healthy(test_broker.context) is True
finally:
await stop_concurrent_processing(test_broker.context)


async def test_unhealthy_when_no_handler_in_context() -> None:
broker: typing.Final = KafkaBroker("localhost:9092")
async with TestKafkaBroker(broker) as test_broker:
assert is_kafka_handler_healthy(test_broker.context) is False


async def test_unhealthy_when_handler_stopped() -> None:
broker: typing.Final = KafkaBroker("localhost:9092")
async with TestKafkaBroker(broker) as test_broker:
await initialize_concurrent_processing(context=test_broker.context)
await stop_concurrent_processing(test_broker.context)
assert is_kafka_handler_healthy(test_broker.context) is False


async def test_unhealthy_when_is_healthy_returns_false() -> None:
broker: typing.Final = KafkaBroker("localhost:9092")
async with TestKafkaBroker(broker) as test_broker:
mock_handler: typing.Final = MagicMock()
mock_handler.is_healthy = False
test_broker.context.set_global("concurrent_processing", mock_handler)
assert is_kafka_handler_healthy(test_broker.context) is False
Loading