Skip to content

Commit 2e6d49d

Browse files
authored
Merge pull request #6 from modern-python/healthcheck
add healthcheck
2 parents a25251b + 856b617 commit 2e6d49d

5 files changed

Lines changed: 68 additions & 0 deletions

File tree

CLAUDE.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ Key design: `handle_task()` fires-and-forgets coroutines as asyncio tasks. Messa
4040
- `initialize_concurrent_processing(context, ...)`: call on app startup to create and start the handler, storing it in FastStream's global context.
4141
- `stop_concurrent_processing(context)`: call on app shutdown; resets the singleton so it can be re-initialized (important for tests).
4242

43+
**`healthcheck.py``is_kafka_handler_healthy`**
44+
A single function that accepts a `ContextRepo` and returns `True` if the `KafkaConcurrentHandler` is present and healthy. Intended for readiness/liveness probes.
45+
4346
**`batch_committer.py``KafkaBatchCommitter`**
4447
Runs as a background asyncio task (spawned via `spawn()`). Collects `KafkaCommitTask` objects from a queue, batches them by topic-partition, waits for each task's asyncio future to complete, then commits the max offset per partition to Kafka. Batching is triggered by timeout or batch size, whichever comes first. `CommitterIsDeadError` is raised to callers if the committer's main task has died.
4548

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ By default FastStream processes Kafka messages sequentially — one message at a
1717
- Signal handling (SIGTERM / SIGINT / SIGQUIT) triggers graceful shutdown
1818
- Background observer task to detect and discard stale completed tasks
1919
- Handler exceptions are logged but do not crash the consumer
20+
- Health check helper to probe handler status from a `ContextRepo`
2021

2122
## 📦 [PyPi](https://pypi.org/project/faststream-concurrent-aiokafka)
2223

@@ -41,6 +42,7 @@ from faststream.middlewares import AckPolicy
4142
from faststream_concurrent_aiokafka import (
4243
KafkaConcurrentProcessingMiddleware,
4344
initialize_concurrent_processing,
45+
is_kafka_handler_healthy,
4446
stop_concurrent_processing,
4547
)
4648

@@ -106,6 +108,10 @@ Returns the `KafkaConcurrentHandler` instance.
106108

107109
Flush pending commits, wait for in-flight tasks (up to 10 s), then stop the handler.
108110

111+
### `is_kafka_handler_healthy(context)`
112+
113+
Returns `True` if the `KafkaConcurrentHandler` stored in `context` is running and healthy, `False` otherwise (not initialized, stopped, or observer task dead). Useful for readiness/liveness probes.
114+
109115
### `KafkaConcurrentProcessingMiddleware`
110116

111117
FastStream middleware class. Pass it to `KafkaBroker(middlewares=[...])` or `broker.add_middleware(...)`.

faststream_concurrent_aiokafka/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from faststream_concurrent_aiokafka.healthcheck import is_kafka_handler_healthy
12
from faststream_concurrent_aiokafka.middleware import (
23
KafkaConcurrentProcessingMiddleware,
34
initialize_concurrent_processing,
@@ -8,5 +9,6 @@
89
__all__ = [
910
"KafkaConcurrentProcessingMiddleware",
1011
"initialize_concurrent_processing",
12+
"is_kafka_handler_healthy",
1113
"stop_concurrent_processing",
1214
]
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import typing
2+
3+
from faststream import ContextRepo
4+
5+
from faststream_concurrent_aiokafka.middleware import _PROCESSING_CONTEXT_KEY
6+
7+
8+
if typing.TYPE_CHECKING:
9+
from faststream_concurrent_aiokafka.processing import KafkaConcurrentHandler
10+
11+
12+
def is_kafka_handler_healthy(context: ContextRepo) -> bool:
13+
handler: KafkaConcurrentHandler | None = context.get(_PROCESSING_CONTEXT_KEY)
14+
return handler is not None and handler.is_healthy

tests/test_healthcheck.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import typing
2+
from unittest.mock import MagicMock
3+
4+
from faststream.kafka import KafkaBroker, TestKafkaBroker
5+
6+
from faststream_concurrent_aiokafka.healthcheck import is_kafka_handler_healthy
7+
from faststream_concurrent_aiokafka.middleware import (
8+
initialize_concurrent_processing,
9+
stop_concurrent_processing,
10+
)
11+
12+
13+
async def test_healthy_when_handler_is_running() -> None:
14+
broker: typing.Final = KafkaBroker("localhost:9092")
15+
async with TestKafkaBroker(broker) as test_broker:
16+
await initialize_concurrent_processing(context=test_broker.context)
17+
try:
18+
assert is_kafka_handler_healthy(test_broker.context) is True
19+
finally:
20+
await stop_concurrent_processing(test_broker.context)
21+
22+
23+
async def test_unhealthy_when_no_handler_in_context() -> None:
24+
broker: typing.Final = KafkaBroker("localhost:9092")
25+
async with TestKafkaBroker(broker) as test_broker:
26+
assert is_kafka_handler_healthy(test_broker.context) is False
27+
28+
29+
async def test_unhealthy_when_handler_stopped() -> None:
30+
broker: typing.Final = KafkaBroker("localhost:9092")
31+
async with TestKafkaBroker(broker) as test_broker:
32+
await initialize_concurrent_processing(context=test_broker.context)
33+
await stop_concurrent_processing(test_broker.context)
34+
assert is_kafka_handler_healthy(test_broker.context) is False
35+
36+
37+
async def test_unhealthy_when_is_healthy_returns_false() -> None:
38+
broker: typing.Final = KafkaBroker("localhost:9092")
39+
async with TestKafkaBroker(broker) as test_broker:
40+
mock_handler: typing.Final = MagicMock()
41+
mock_handler.is_healthy = False
42+
test_broker.context.set_global("concurrent_processing", mock_handler)
43+
assert is_kafka_handler_healthy(test_broker.context) is False

0 commit comments

Comments
 (0)