Skip to content

Commit 7d15d72

Browse files
committed
add tests and update *.md files
1 parent 4136e95 commit 7d15d72

3 files changed

Lines changed: 100 additions & 18 deletions

File tree

CLAUDE.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,6 @@ Runs as a background asyncio task (spawned via `spawn()`). Collects `KafkaCommit
6464
- Always use `auto_offset_reset="earliest"` on test subscribers. The default `"latest"` causes the consumer to miss messages published before it gets its partition assignment.
6565
- Pre-create topics with `AIOKafkaAdminClient` before starting the broker. Auto-creation on first publish triggers a `NotLeaderForPartitionError` retry loop that can outlast short sleeps.
6666
- After `await broker.start()`, sleep ~1.5 s before publishing to let the consumer join the group and receive partition assignments.
67+
- `AsgiFastStream` lifespan tests must use `async with app.start_lifespan_context()` — calling `app.start()` / `app.stop()` bypasses the `lifespan` context manager entirely.
68+
- `AsgiFastStream` injects its own app-level `ContextRepo` into the lifespan, separate from `broker.context`. Pass `broker.context` explicitly to `initialize_concurrent_processing` and `stop_concurrent_processing`.
69+
- Subscriber-level `middlewares` on `@broker.subscriber(...)` takes `SubscriberMiddleware` (a plain `(call_next, msg)` callable), not `BaseMiddleware` subclasses. To scope `KafkaConcurrentProcessingMiddleware` to a subset of subscribers, use `KafkaRouter(middlewares=[KafkaConcurrentProcessingMiddleware])` and `broker.include_router(router)`.

README.md

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,40 +35,50 @@ pip install faststream-concurrent-aiokafka
3535
`ack_policy=AckPolicy.MANUAL` is **required** on every subscriber — the middleware enforces this at runtime.
3636
Without it, aiokafka's auto-commit timer would commit offsets before processing tasks complete, causing silent message loss on crash.
3737

38+
> **`AsgiFastStream` note**: its lifespan receives an app-level `ContextRepo` separate from `broker.context`. Pass `broker.context` explicitly instead of the injected argument.
39+
3840
```python
39-
from faststream import FastStream, ContextRepo
40-
from faststream.kafka import KafkaBroker
41+
from contextlib import asynccontextmanager
42+
from faststream import ContextRepo
43+
from faststream.asgi import AsgiFastStream
44+
from faststream.kafka import KafkaBroker, KafkaRouter
4145
from faststream.middlewares import AckPolicy
4246
from faststream_concurrent_aiokafka import (
4347
KafkaConcurrentProcessingMiddleware,
4448
initialize_concurrent_processing,
45-
is_kafka_handler_healthy,
4649
stop_concurrent_processing,
4750
)
4851

52+
# Middleware applied globally to all subscribers
4953
broker = KafkaBroker(middlewares=[KafkaConcurrentProcessingMiddleware])
50-
app = FastStream(broker)
5154

55+
# Or scope it to specific subscribers via a router
56+
router = KafkaRouter(middlewares=[KafkaConcurrentProcessingMiddleware])
5257

53-
@app.on_startup
54-
async def on_startup(context: ContextRepo) -> None:
58+
@asynccontextmanager
59+
async def lifespan(_context: ContextRepo):
5560
await initialize_concurrent_processing(
56-
context=context,
57-
concurrency_limit=20, # max concurrent tasks (minimum: 1)
58-
commit_batch_size=100, # commit after this many completed tasks
59-
commit_batch_timeout_sec=5.0, # or after this many seconds
61+
context=broker.context,
62+
concurrency_limit=20, # max concurrent tasks (minimum: 1)
63+
commit_batch_size=100, # commit after this many completed tasks
64+
commit_batch_timeout_sec=5.0, # or after this many seconds
6065
)
66+
try:
67+
yield
68+
finally:
69+
await stop_concurrent_processing(broker.context)
6170

62-
63-
@app.on_shutdown
64-
async def on_shutdown(context: ContextRepo) -> None:
65-
await stop_concurrent_processing(context)
66-
71+
app = AsgiFastStream(broker, lifespan=lifespan)
6772

6873
@broker.subscriber("my-topic", group_id="my-group", ack_policy=AckPolicy.MANUAL)
6974
async def handle(msg: str) -> None:
70-
# runs concurrently with other messages
7175
...
76+
77+
@router.subscriber("other-topic", group_id="other-group", ack_policy=AckPolicy.MANUAL)
78+
async def handle_other(msg: str) -> None:
79+
...
80+
81+
broker.include_router(router)
7282
```
7383

7484
## Core Concepts
@@ -114,7 +124,7 @@ Returns `True` if the `KafkaConcurrentHandler` stored in `context` is running an
114124

115125
### `KafkaConcurrentProcessingMiddleware`
116126

117-
FastStream middleware class. Pass it to `KafkaBroker(middlewares=[...])` or `broker.add_middleware(...)`.
127+
FastStream middleware class. Pass it to `KafkaBroker(middlewares=[...])`, `broker.add_middleware(...)`, or scope it to a subset of subscribers via `KafkaRouter`. See Quick Start for usage examples.
118128

119129
## How It Works
120130

tests/test_integration.py

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import asyncio
2+
import contextlib
23
import typing
34
import uuid
45

56
from aiokafka.admin import AIOKafkaAdminClient, NewTopic
6-
from faststream.kafka import KafkaBroker
7+
from faststream import ContextRepo
8+
from faststream.asgi import AsgiFastStream
9+
from faststream.kafka import KafkaBroker, KafkaRouter
710
from faststream.middlewares import AckPolicy
811

912
from faststream_concurrent_aiokafka import (
@@ -360,3 +363,69 @@ async def handler_b2(msg: dict[str, int]) -> None: # pragma: no cover
360363

361364
assert replayed_a == [], f"topic_a messages replayed after clean stop: {replayed_a}"
362365
assert replayed_b == [], f"topic_b messages replayed after clean stop: {replayed_b}"
366+
367+
368+
async def test_middleware_on_router(kafka_bootstrap_servers: str) -> None:
369+
"""Middleware set on a KafkaRouter (not broker-level) routes messages through concurrent handler."""
370+
processed: typing.Final[list[dict[str, int]]] = []
371+
topic: typing.Final = _topic("router-mw")
372+
# Plain broker — no broker-level middleware; middleware is scoped to the router
373+
broker: typing.Final = KafkaBroker(kafka_bootstrap_servers)
374+
router: typing.Final = KafkaRouter(middlewares=[KafkaConcurrentProcessingMiddleware])
375+
376+
@router.subscriber(topic, group_id="router-mw-group", auto_offset_reset="earliest", ack_policy=AckPolicy.MANUAL)
377+
async def handler(msg: dict[str, int]) -> None:
378+
processed.append(msg)
379+
380+
broker.include_router(router)
381+
382+
await _create_topic(kafka_bootstrap_servers, topic)
383+
async with broker:
384+
await broker.start()
385+
await initialize_concurrent_processing(
386+
context=broker.context, commit_batch_size=10, commit_batch_timeout_sec=5, concurrency_limit=5
387+
)
388+
await asyncio.sleep(CONSUMER_READY_SLEEP)
389+
try:
390+
await broker.publish({"id": 7}, topic=topic)
391+
await asyncio.sleep(POLL_SLEEP)
392+
finally:
393+
await stop_concurrent_processing(broker.context)
394+
395+
assert len(processed) == 1
396+
assert processed[0]["id"] == 7
397+
398+
399+
async def test_asgi_faststream_basic_processing(kafka_bootstrap_servers: str) -> None:
400+
"""AsgiFastStream lifespan initialises and stops concurrent processing correctly."""
401+
processed: typing.Final[list[dict[str, int]]] = []
402+
topic: typing.Final = _topic("asgi")
403+
broker: typing.Final = _broker(kafka_bootstrap_servers)
404+
405+
@broker.subscriber(topic, group_id="asgi-group", auto_offset_reset="earliest", ack_policy=AckPolicy.MANUAL)
406+
async def handler(msg: dict[str, int]) -> None:
407+
processed.append(msg)
408+
409+
@contextlib.asynccontextmanager
410+
async def lifespan(_context: ContextRepo) -> typing.AsyncIterator[None]:
411+
# AsgiFastStream injects its own app-level context, which is separate from
412+
# broker.context. Use broker.context explicitly so the middleware can find
413+
# the handler via self.context.
414+
await initialize_concurrent_processing(
415+
context=broker.context, commit_batch_size=10, commit_batch_timeout_sec=5, concurrency_limit=5
416+
)
417+
try:
418+
yield
419+
finally:
420+
await stop_concurrent_processing(broker.context)
421+
422+
app: typing.Final = AsgiFastStream(broker, lifespan=lifespan)
423+
424+
await _create_topic(kafka_bootstrap_servers, topic)
425+
async with app.start_lifespan_context():
426+
await asyncio.sleep(CONSUMER_READY_SLEEP)
427+
await broker.publish({"id": 42}, topic=topic)
428+
await asyncio.sleep(POLL_SLEEP)
429+
430+
assert len(processed) == 1
431+
assert processed[0]["id"] == 42

0 commit comments

Comments
 (0)