Skip to content

Commit a25251b

Browse files
authored
Merge pull request #5 from modern-python/refactor
fix design issue with several subscribers
2 parents 7bf653a + 3c4c62a commit a25251b

9 files changed

Lines changed: 399 additions & 374 deletions

File tree

README.md

Lines changed: 68 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,43 @@
11
# faststream-concurrent-aiokafka
22

3+
[![Supported versions](https://img.shields.io/pypi/pyversions/faststream-concurrent-aiokafka.svg)](https://pypi.python.org/pypi/faststream-concurrent-aiokafka)
4+
[![downloads](https://img.shields.io/pypi/dm/faststream-concurrent-aiokafka.svg)](https://pypistats.org/packages/faststream-concurrent-aiokafka)
5+
[![GitHub stars](https://img.shields.io/github/stars/modern-python/faststream-concurrent-aiokafka)](https://github.com/modern-python/faststream-concurrent-aiokafka/stargazers)
6+
37
Concurrent message processing middleware for [FastStream](https://faststream.airt.ai/) with aiokafka.
48

5-
By default FastStream processes Kafka messages sequentially. This library allows you to process multiple messages concurrently using asyncio tasks, with optional batch offset committing.
9+
By default FastStream processes Kafka messages sequentially — one message at a time per subscriber. This library turns each incoming message into an asyncio task so multiple messages are handled concurrently, while keeping offset commits correct and shutdown graceful.
10+
11+
## Features
12+
13+
- Concurrent message processing via asyncio tasks
14+
- Configurable concurrency limit (semaphore-based)
15+
- Batch offset committing per partition after each task completes
16+
- Graceful shutdown: waits up to 10 s for in-flight tasks before exiting
17+
- Signal handling (SIGTERM / SIGINT / SIGQUIT) triggers graceful shutdown
18+
- Background observer task to detect and discard stale completed tasks
19+
- Handler exceptions are logged but do not crash the consumer
20+
21+
## 📦 [PyPi](https://pypi.org/project/faststream-concurrent-aiokafka)
22+
23+
## 📝 [License](LICENSE)
24+
625

726
## Installation
827

928
```bash
1029
pip install faststream-concurrent-aiokafka
1130
```
1231

13-
## Usage
32+
## Quick Start
33+
34+
`ack_policy=AckPolicy.MANUAL` is **required** on every subscriber — the middleware enforces this at runtime.
35+
Without it, aiokafka's auto-commit timer would commit offsets before processing tasks complete, causing silent message loss on crash.
1436

1537
```python
1638
from faststream import FastStream, ContextRepo
1739
from faststream.kafka import KafkaBroker
40+
from faststream.middlewares import AckPolicy
1841
from faststream_concurrent_aiokafka import (
1942
KafkaConcurrentProcessingMiddleware,
2043
initialize_concurrent_processing,
@@ -29,7 +52,9 @@ app = FastStream(broker)
2952
async def on_startup(context: ContextRepo) -> None:
3053
await initialize_concurrent_processing(
3154
context=context,
32-
concurrency_limit=20, # max concurrent tasks (0 = unlimited)
55+
concurrency_limit=20, # max concurrent tasks (minimum: 1)
56+
commit_batch_size=100, # commit after this many completed tasks
57+
commit_batch_timeout_sec=5.0, # or after this many seconds
3358
)
3459

3560

@@ -38,51 +63,62 @@ async def on_shutdown(context: ContextRepo) -> None:
3863
await stop_concurrent_processing(context)
3964

4065

41-
@broker.subscriber("my-topic", group_id="my-group")
66+
@broker.subscriber("my-topic", group_id="my-group", ack_policy=AckPolicy.MANUAL)
4267
async def handle(msg: str) -> None:
4368
# runs concurrently with other messages
4469
...
4570
```
4671

47-
## Batch offset committing
72+
## Core Concepts
4873

49-
By default aiokafka auto-commits offsets. If you manage commits manually, enable `enable_batch_commit=True` to have the library commit offsets in batches after each task completes:
74+
### KafkaConcurrentProcessingMiddleware
5075

51-
```python
52-
await initialize_concurrent_processing(
53-
context=context,
54-
concurrency_limit=20,
55-
commit_batch_size=100,
56-
commit_batch_timeout_sec=5,
57-
enable_batch_commit=True,
58-
)
59-
```
76+
A FastStream `BaseMiddleware` subclass. Add it to your broker to enable concurrent processing. It wraps each incoming message in an asyncio task submitted to `KafkaConcurrentHandler`.
6077

61-
With batch commit enabled, offsets are committed per partition at the highest completed offset in each batch.
78+
### KafkaConcurrentHandler
6279

63-
## Consumer group filtering
80+
The processing engine. Manages:
81+
- An `asyncio.Semaphore` to enforce `concurrency_limit`
82+
- A set of in-flight asyncio tasks
83+
- A background observer that periodically discards stale completed tasks
84+
- Signal handlers for graceful shutdown
6485

65-
When multiple consumer groups subscribe to the same topic, producers can tag messages with a `topic_group` header to direct them to a specific group. The middleware skips messages whose `topic_group` header doesn't match the consumer's group ID. Messages with no `topic_group` header are always processed.
86+
### KafkaBatchCommitter
6687

67-
```python
68-
# Producer side — send to a specific consumer group only
69-
await broker.publish(
70-
{"data": "..."},
71-
topic="my-topic",
72-
headers={"topic_group": "group-a"},
73-
)
74-
```
88+
Runs as a background asyncio task. Receives `KafkaCommitTask` objects, waits for each task's asyncio future to complete, then commits the max offset per partition to Kafka. Batching is triggered by size or timeout. If the committer's task dies, `CommitterIsDeadError` is raised to callers.
89+
90+
## API Reference
7591

76-
## Parameters
92+
### `initialize_concurrent_processing(context, ...)`
7793

78-
### `initialize_concurrent_processing`
94+
Create and start the concurrent processing handler; store it in FastStream's context.
7995

8096
| Parameter | Default | Description |
8197
|---|---|---|
82-
| `concurrency_limit` | `10` | Max concurrent asyncio tasks. `0` disables the limit. |
83-
| `commit_batch_size` | `10` | Max messages per commit batch. |
84-
| `commit_batch_timeout_sec` | `10` | Max seconds before flushing a batch. |
85-
| `enable_batch_commit` | `False` | Enable manual batch offset committing. |
98+
| `context` | required | FastStream `ContextRepo` instance |
99+
| `concurrency_limit` | `10` | Max concurrent asyncio tasks (minimum: 1) |
100+
| `commit_batch_size` | `10` | Max messages per commit batch |
101+
| `commit_batch_timeout_sec` | `10.0` | Max seconds before flushing a batch |
102+
103+
Returns the `KafkaConcurrentHandler` instance.
104+
105+
### `stop_concurrent_processing(context)`
106+
107+
Flush pending commits, wait for in-flight tasks (up to 10 s), then stop the handler.
108+
109+
### `KafkaConcurrentProcessingMiddleware`
110+
111+
FastStream middleware class. Pass it to `KafkaBroker(middlewares=[...])` or `broker.add_middleware(...)`.
112+
113+
## How It Works
114+
115+
1. **Message dispatch**: On each incoming message, `consume_scope` calls `handle_task()`, which acquires a semaphore slot then fires the handler coroutine as a background `asyncio.Task`.
116+
117+
2. **Concurrency control**: The semaphore blocks new tasks when `concurrency_limit` is reached. The slot is released via a done-callback when the task finishes or fails.
118+
119+
3. **Offset committing**: Each dispatched task is paired with its Kafka offset and consumer reference and enqueued in `KafkaBatchCommitter`. Once the task completes, the committer groups offsets by partition and calls `consumer.commit(partitions_to_offsets)` with `offset + 1` (Kafka's "next offset to fetch" convention).
120+
121+
4. **Graceful shutdown**: `stop_concurrent_processing` sets the shutdown event, flushes the committer, cancels the observer task, and calls `asyncio.gather` with a 10-second timeout to wait for all in-flight tasks.
86122

87123
## Requirements
88124

faststream_concurrent_aiokafka/batch_committer.py

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import dataclasses
44
import itertools
55
import logging
6-
import threading
76
import typing
87

98
from faststream.kafka import TopicPartition
@@ -16,7 +15,7 @@
1615
logger = logging.getLogger(__name__)
1716

1817

19-
SHUTDOWN_TIMEOUT_SEC: typing.Final = 20
18+
GRACEFUL_TIMEOUT_SEC: typing.Final = 20
2019

2120

2221
class CommitterIsDeadError(Exception): ...
@@ -37,20 +36,16 @@ def __init__(
3736
commit_batch_size: int = 10,
3837
) -> None:
3938
self._messages_queue: asyncio.Queue[KafkaCommitTask] = asyncio.Queue()
40-
self._asyncio_commit_process_task: asyncio.Task[typing.Any] | None = None
39+
self._commit_task: asyncio.Task[typing.Any] | None = None
4140
self._flush_batch_event = asyncio.Event()
4241

4342
self._commit_batch_timeout_sec = commit_batch_timeout_sec
4443
self._commit_batch_size = commit_batch_size
45-
self._shutdown_timeout = SHUTDOWN_TIMEOUT_SEC
46-
47-
self._spawn_lock = threading.Lock()
44+
self._shutdown_timeout = GRACEFUL_TIMEOUT_SEC
4845

4946
def _check_is_commit_task_running(self) -> None:
50-
is_commit_task_running: typing.Final[bool] = bool(
51-
self._asyncio_commit_process_task
52-
and not self._asyncio_commit_process_task.cancelled()
53-
and not self._asyncio_commit_process_task.done(),
47+
is_commit_task_running = bool(
48+
self._commit_task and not self._commit_task.cancelled() and not self._commit_task.done(),
5449
)
5550
if not is_commit_task_running:
5651
msg: typing.Final = "Committer main task is not running"
@@ -120,35 +115,41 @@ async def _call_committer(
120115
await self._messages_queue.put(task)
121116
return commit_succeeded
122117

123-
async def _commit_tasks_batch(self, tasks_batch: list[KafkaCommitTask]) -> bool:
124-
partitions_to_tasks: typing.Final = itertools.groupby(
125-
sorted(tasks_batch, key=lambda x: x.topic_partition), lambda x: x.topic_partition
118+
@staticmethod
119+
def _map_offsets_per_partition(consumer_tasks: list[KafkaCommitTask]) -> dict[TopicPartition, int]:
120+
partitions_to_tasks = itertools.groupby(
121+
sorted(consumer_tasks, key=lambda x: x.topic_partition), lambda x: x.topic_partition
126122
)
123+
partitions_to_offsets: dict[TopicPartition, int] = {}
124+
for partition, partition_tasks in partitions_to_tasks:
125+
max_offset = max((task.offset for task in partition_tasks), default=None)
126+
if max_offset is not None:
127+
# Kafka commits the *next* offset to fetch, so committed = processed_max + 1
128+
partitions_to_offsets[partition] = max_offset + 1
129+
return partitions_to_offsets
127130

131+
async def _commit_tasks_batch(self, tasks_batch: list[KafkaCommitTask]) -> bool:
128132
results: typing.Final = await asyncio.gather(
129133
*[task.asyncio_task for task in tasks_batch], return_exceptions=True
130134
)
131135
for result in results:
132136
if isinstance(result, BaseException):
133137
logger.error("Task has finished with an exception", exc_info=result)
134138

135-
partitions_to_offsets: typing.Final[dict[TopicPartition, int]] = {}
136-
partition: TopicPartition
137-
tasks: typing.Iterator[KafkaCommitTask]
138-
for partition, tasks in partitions_to_tasks:
139-
max_message_offset: int | None = None
140-
for task in tasks:
141-
if max_message_offset is None or task.offset > max_message_offset:
142-
max_message_offset = task.offset
139+
# Group by consumer instance — each AIOKafkaConsumer can only commit its own partitions
140+
consumers_tasks: dict[int, list[KafkaCommitTask]] = {}
141+
for task in tasks_batch:
142+
consumers_tasks.setdefault(id(task.consumer), []).append(task)
143143

144-
if max_message_offset is not None:
145-
# Kafka commits the *next* offset to fetch, so committed = processed_max + 1
146-
partitions_to_offsets[partition] = max_message_offset + 1
144+
all_succeeded = True
145+
for consumer_tasks in consumers_tasks.values():
146+
partitions_to_offsets = self._map_offsets_per_partition(consumer_tasks)
147+
if not await self._call_committer(consumer_tasks, partitions_to_offsets):
148+
all_succeeded = False
147149

148-
commit_succeeded: typing.Final = await self._call_committer(tasks_batch, partitions_to_offsets)
149150
for _ in tasks_batch:
150151
self._messages_queue.task_done()
151-
return commit_succeeded
152+
return all_succeeded
152153

153154
async def _run_commit_process(self) -> None:
154155
should_shutdown = False
@@ -158,7 +159,7 @@ async def _run_commit_process(self) -> None:
158159
await self._commit_tasks_batch(commit_batch)
159160

160161
async def commit_all(self) -> None:
161-
"""Commit all without shutting down the main process."""
162+
"""Flush and commit all pending tasks, then stop the committer loop."""
162163
self._flush_batch_event.set()
163164
await self._messages_queue.join()
164165

@@ -169,30 +170,29 @@ async def send_task(self, new_task: KafkaCommitTask) -> None:
169170
)
170171

171172
def spawn(self) -> None:
172-
with self._spawn_lock:
173-
if not self._asyncio_commit_process_task:
174-
self._asyncio_commit_process_task = asyncio.create_task(self._run_commit_process())
175-
else:
176-
logger.error("Committer main task already running")
173+
if not self._commit_task:
174+
self._commit_task = asyncio.create_task(self._run_commit_process())
175+
else:
176+
logger.error("Committer main task already running")
177177

178178
async def close(self) -> None:
179179
"""Close committer."""
180-
if not self._asyncio_commit_process_task:
180+
if not self._commit_task:
181181
logger.error("Committer main task is not running, cannot close committer properly")
182182
return
183183

184184
self._flush_batch_event.set()
185185
try:
186-
await asyncio.wait_for(self._asyncio_commit_process_task, timeout=self._shutdown_timeout)
186+
await asyncio.wait_for(self._commit_task, timeout=self._shutdown_timeout)
187187
except TimeoutError:
188188
logger.exception("Committer main task shutdown timed out, forcing cancellation")
189-
self._asyncio_commit_process_task.cancel()
189+
self._commit_task.cancel()
190190
with contextlib.suppress(asyncio.CancelledError):
191-
await self._asyncio_commit_process_task
191+
await self._commit_task
192192
except Exception as exc:
193193
logger.exception("Committer task failed during shutdown", exc_info=exc)
194194
raise
195195

196196
@property
197197
def is_healthy(self) -> bool:
198-
return self._asyncio_commit_process_task is not None and not self._asyncio_commit_process_task.done()
198+
return self._commit_task is not None and not self._commit_task.done()

faststream_concurrent_aiokafka/middleware.py

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,33 +19,25 @@ async def consume_scope( # ty: ignore[invalid-method-override]
1919
msg: KafkaAckableMessage,
2020
) -> typing.Any: # noqa: ANN401
2121
concurrent_processing: typing.Final[KafkaConcurrentHandler] = self.context.get(_PROCESSING_CONTEXT_KEY)
22-
if not concurrent_processing:
23-
logger.error("Kafka middleware. There is no concurrent processing instance in the context")
24-
info = "No concurrent processing instance in the context"
25-
raise RuntimeError(info)
26-
27-
if not concurrent_processing.is_running:
28-
logger.error(
29-
"Kafka middleware. Concurrent processing is not running. Maybe `initialize_concurrent_processing`"
30-
" was forgotten?"
31-
)
32-
info = "Concurrent processing is not running"
33-
raise RuntimeError(info)
22+
if not concurrent_processing or not concurrent_processing.is_running:
23+
err = "Concurrent processing is not running. Call `initialize_concurrent_processing` on app startup."
24+
raise RuntimeError(err)
3425

3526
kafka_message: typing.Final = self.context.get("message")
36-
if concurrent_processing.has_batch_commit and not kafka_message:
37-
logger.error("Kafka middleware. No kafka message in the middleware, it means no consumer to commit batch.")
38-
info = "No kafka message in the middleware"
39-
raise RuntimeError(info)
40-
41-
try:
42-
43-
async def handler_wrapper() -> typing.Any: # noqa: ANN401
44-
return await call_next(msg)
27+
if not kafka_message:
28+
err = "No Kafka message found in context. Ensure the middleware is used with a Kafka subscriber."
29+
raise RuntimeError(err)
30+
31+
if getattr(kafka_message.consumer, "_enable_auto_commit", False):
32+
err = (
33+
"KafkaConcurrentProcessingMiddleware requires ack_policy=AckPolicy.MANUAL on all subscribers. "
34+
"Auto-commit is enabled on this consumer, which commits offsets before processing tasks "
35+
"complete and can cause message loss on crash. "
36+
"Add ack_policy=AckPolicy.MANUAL to your @broker.subscriber(...) decorator."
37+
)
38+
raise RuntimeError(err)
4539

46-
await concurrent_processing.handle_task(handler_wrapper(), self.msg, kafka_message)
47-
except Exception as exc:
48-
raise RuntimeError(f"Kafka middleware. An error while sending task {msg}") from exc
40+
await concurrent_processing.handle_task(call_next(msg), self.msg, kafka_message)
4941

5042

5143
async def initialize_concurrent_processing(
@@ -60,8 +52,11 @@ async def initialize_concurrent_processing(
6052
return existing
6153

6254
concurrent_processing: typing.Final = KafkaConcurrentHandler(
55+
committer=KafkaBatchCommitter(
56+
commit_batch_timeout_sec=commit_batch_timeout_sec,
57+
commit_batch_size=commit_batch_size,
58+
),
6359
concurrency_limit=concurrency_limit,
64-
committer=KafkaBatchCommitter(commit_batch_timeout_sec, commit_batch_size),
6560
)
6661
await concurrent_processing.start()
6762
context.set_global(_PROCESSING_CONTEXT_KEY, concurrent_processing)

0 commit comments

Comments
 (0)