Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 68 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,43 @@
# faststream-concurrent-aiokafka

[![Supported versions](https://img.shields.io/pypi/pyversions/faststream-concurrent-aiokafka.svg)](https://pypi.python.org/pypi/faststream-concurrent-aiokafka)
[![downloads](https://img.shields.io/pypi/dm/faststream-concurrent-aiokafka.svg)](https://pypistats.org/packages/faststream-concurrent-aiokafka)
[![GitHub stars](https://img.shields.io/github/stars/modern-python/faststream-concurrent-aiokafka)](https://github.com/modern-python/faststream-concurrent-aiokafka/stargazers)

Concurrent message processing middleware for [FastStream](https://faststream.airt.ai/) with aiokafka.

By default FastStream processes Kafka messages sequentially. This library allows you to process multiple messages concurrently using asyncio tasks, with optional batch offset committing.
By default FastStream processes Kafka messages sequentially — one message at a time per subscriber. This library turns each incoming message into an asyncio task so multiple messages are handled concurrently, while keeping offset commits correct and shutdown graceful.

## Features

- Concurrent message processing via asyncio tasks
- Configurable concurrency limit (semaphore-based)
- Batch offset committing per partition after each task completes
- Graceful shutdown: waits up to 10 s for in-flight tasks before exiting
- Signal handling (SIGTERM / SIGINT / SIGQUIT) triggers graceful shutdown
- Background observer task to detect and discard stale completed tasks
- Handler exceptions are logged but do not crash the consumer

## 📦 [PyPi](https://pypi.org/project/faststream-concurrent-aiokafka)

## 📝 [License](LICENSE)


## Installation

```bash
pip install faststream-concurrent-aiokafka
```

## Usage
## Quick Start

`ack_policy=AckPolicy.MANUAL` is **required** on every subscriber — the middleware enforces this at runtime.
Without it, aiokafka's auto-commit timer would commit offsets before processing tasks complete, causing silent message loss on crash.

```python
from faststream import FastStream, ContextRepo
from faststream.kafka import KafkaBroker
from faststream.middlewares import AckPolicy
from faststream_concurrent_aiokafka import (
KafkaConcurrentProcessingMiddleware,
initialize_concurrent_processing,
Expand All @@ -29,7 +52,9 @@ app = FastStream(broker)
async def on_startup(context: ContextRepo) -> None:
await initialize_concurrent_processing(
context=context,
concurrency_limit=20, # max concurrent tasks (0 = unlimited)
concurrency_limit=20, # max concurrent tasks (minimum: 1)
commit_batch_size=100, # commit after this many completed tasks
commit_batch_timeout_sec=5.0, # or after this many seconds
)


Expand All @@ -38,51 +63,62 @@ async def on_shutdown(context: ContextRepo) -> None:
await stop_concurrent_processing(context)


@broker.subscriber("my-topic", group_id="my-group")
@broker.subscriber("my-topic", group_id="my-group", ack_policy=AckPolicy.MANUAL)
async def handle(msg: str) -> None:
# runs concurrently with other messages
...
```

## Batch offset committing
## Core Concepts

By default aiokafka auto-commits offsets. If you manage commits manually, enable `enable_batch_commit=True` to have the library commit offsets in batches after each task completes:
### KafkaConcurrentProcessingMiddleware

```python
await initialize_concurrent_processing(
context=context,
concurrency_limit=20,
commit_batch_size=100,
commit_batch_timeout_sec=5,
enable_batch_commit=True,
)
```
A FastStream `BaseMiddleware` subclass. Add it to your broker to enable concurrent processing. It wraps each incoming message in an asyncio task submitted to `KafkaConcurrentHandler`.

With batch commit enabled, offsets are committed per partition at the highest completed offset in each batch.
### KafkaConcurrentHandler

## Consumer group filtering
The processing engine. Manages:
- An `asyncio.Semaphore` to enforce `concurrency_limit`
- A set of in-flight asyncio tasks
- A background observer that periodically discards stale completed tasks
- Signal handlers for graceful shutdown

When multiple consumer groups subscribe to the same topic, producers can tag messages with a `topic_group` header to direct them to a specific group. The middleware skips messages whose `topic_group` header doesn't match the consumer's group ID. Messages with no `topic_group` header are always processed.
### KafkaBatchCommitter

```python
# Producer side — send to a specific consumer group only
await broker.publish(
{"data": "..."},
topic="my-topic",
headers={"topic_group": "group-a"},
)
```
Runs as a background asyncio task. Receives `KafkaCommitTask` objects, waits for each task's asyncio future to complete, then commits the max offset per partition to Kafka. Batching is triggered by size or timeout. If the committer's task dies, `CommitterIsDeadError` is raised to callers.

## API Reference

## Parameters
### `initialize_concurrent_processing(context, ...)`

### `initialize_concurrent_processing`
Create and start the concurrent processing handler; store it in FastStream's context.

| Parameter | Default | Description |
|---|---|---|
| `concurrency_limit` | `10` | Max concurrent asyncio tasks. `0` disables the limit. |
| `commit_batch_size` | `10` | Max messages per commit batch. |
| `commit_batch_timeout_sec` | `10` | Max seconds before flushing a batch. |
| `enable_batch_commit` | `False` | Enable manual batch offset committing. |
| `context` | required | FastStream `ContextRepo` instance |
| `concurrency_limit` | `10` | Max concurrent asyncio tasks (minimum: 1) |
| `commit_batch_size` | `10` | Max messages per commit batch |
| `commit_batch_timeout_sec` | `10.0` | Max seconds before flushing a batch |

Returns the `KafkaConcurrentHandler` instance.

### `stop_concurrent_processing(context)`

Flush pending commits, wait for in-flight tasks (up to 10 s), then stop the handler.

### `KafkaConcurrentProcessingMiddleware`

FastStream middleware class. Pass it to `KafkaBroker(middlewares=[...])` or `broker.add_middleware(...)`.

## How It Works

1. **Message dispatch**: On each incoming message, `consume_scope` calls `handle_task()`, which acquires a semaphore slot then fires the handler coroutine as a background `asyncio.Task`.

2. **Concurrency control**: The semaphore blocks new tasks when `concurrency_limit` is reached. The slot is released via a done-callback when the task finishes or fails.

3. **Offset committing**: Each dispatched task is paired with its Kafka offset and consumer reference and enqueued in `KafkaBatchCommitter`. Once the task completes, the committer groups offsets by partition and calls `consumer.commit(partitions_to_offsets)` with `offset + 1` (Kafka's "next offset to fetch" convention).

4. **Graceful shutdown**: `stop_concurrent_processing` sets the shutdown event, flushes the committer, cancels the observer task, and calls `asyncio.gather` with a 10-second timeout to wait for all in-flight tasks.

## Requirements

Expand Down
74 changes: 37 additions & 37 deletions faststream_concurrent_aiokafka/batch_committer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import dataclasses
import itertools
import logging
import threading
import typing

from faststream.kafka import TopicPartition
Expand All @@ -16,7 +15,7 @@
logger = logging.getLogger(__name__)


SHUTDOWN_TIMEOUT_SEC: typing.Final = 20
GRACEFUL_TIMEOUT_SEC: typing.Final = 20


class CommitterIsDeadError(Exception): ...
Expand All @@ -37,20 +36,16 @@ def __init__(
commit_batch_size: int = 10,
) -> None:
self._messages_queue: asyncio.Queue[KafkaCommitTask] = asyncio.Queue()
self._asyncio_commit_process_task: asyncio.Task[typing.Any] | None = None
self._commit_task: asyncio.Task[typing.Any] | None = None
self._flush_batch_event = asyncio.Event()

self._commit_batch_timeout_sec = commit_batch_timeout_sec
self._commit_batch_size = commit_batch_size
self._shutdown_timeout = SHUTDOWN_TIMEOUT_SEC

self._spawn_lock = threading.Lock()
self._shutdown_timeout = GRACEFUL_TIMEOUT_SEC

def _check_is_commit_task_running(self) -> None:
is_commit_task_running: typing.Final[bool] = bool(
self._asyncio_commit_process_task
and not self._asyncio_commit_process_task.cancelled()
and not self._asyncio_commit_process_task.done(),
is_commit_task_running = bool(
self._commit_task and not self._commit_task.cancelled() and not self._commit_task.done(),
)
if not is_commit_task_running:
msg: typing.Final = "Committer main task is not running"
Expand Down Expand Up @@ -120,35 +115,41 @@ async def _call_committer(
await self._messages_queue.put(task)
return commit_succeeded

async def _commit_tasks_batch(self, tasks_batch: list[KafkaCommitTask]) -> bool:
partitions_to_tasks: typing.Final = itertools.groupby(
sorted(tasks_batch, key=lambda x: x.topic_partition), lambda x: x.topic_partition
@staticmethod
def _map_offsets_per_partition(consumer_tasks: list[KafkaCommitTask]) -> dict[TopicPartition, int]:
partitions_to_tasks = itertools.groupby(
sorted(consumer_tasks, key=lambda x: x.topic_partition), lambda x: x.topic_partition
)
partitions_to_offsets: dict[TopicPartition, int] = {}
for partition, partition_tasks in partitions_to_tasks:
max_offset = max((task.offset for task in partition_tasks), default=None)
if max_offset is not None:
# Kafka commits the *next* offset to fetch, so committed = processed_max + 1
partitions_to_offsets[partition] = max_offset + 1
return partitions_to_offsets

async def _commit_tasks_batch(self, tasks_batch: list[KafkaCommitTask]) -> bool:
results: typing.Final = await asyncio.gather(
*[task.asyncio_task for task in tasks_batch], return_exceptions=True
)
for result in results:
if isinstance(result, BaseException):
logger.error("Task has finished with an exception", exc_info=result)

partitions_to_offsets: typing.Final[dict[TopicPartition, int]] = {}
partition: TopicPartition
tasks: typing.Iterator[KafkaCommitTask]
for partition, tasks in partitions_to_tasks:
max_message_offset: int | None = None
for task in tasks:
if max_message_offset is None or task.offset > max_message_offset:
max_message_offset = task.offset
# Group by consumer instance — each AIOKafkaConsumer can only commit its own partitions
consumers_tasks: dict[int, list[KafkaCommitTask]] = {}
for task in tasks_batch:
consumers_tasks.setdefault(id(task.consumer), []).append(task)

if max_message_offset is not None:
# Kafka commits the *next* offset to fetch, so committed = processed_max + 1
partitions_to_offsets[partition] = max_message_offset + 1
all_succeeded = True
for consumer_tasks in consumers_tasks.values():
partitions_to_offsets = self._map_offsets_per_partition(consumer_tasks)
if not await self._call_committer(consumer_tasks, partitions_to_offsets):
all_succeeded = False

commit_succeeded: typing.Final = await self._call_committer(tasks_batch, partitions_to_offsets)
for _ in tasks_batch:
self._messages_queue.task_done()
return commit_succeeded
return all_succeeded

async def _run_commit_process(self) -> None:
should_shutdown = False
Expand All @@ -158,7 +159,7 @@ async def _run_commit_process(self) -> None:
await self._commit_tasks_batch(commit_batch)

async def commit_all(self) -> None:
"""Commit all without shutting down the main process."""
"""Flush and commit all pending tasks, then stop the committer loop."""
self._flush_batch_event.set()
await self._messages_queue.join()

Expand All @@ -169,30 +170,29 @@ async def send_task(self, new_task: KafkaCommitTask) -> None:
)

def spawn(self) -> None:
with self._spawn_lock:
if not self._asyncio_commit_process_task:
self._asyncio_commit_process_task = asyncio.create_task(self._run_commit_process())
else:
logger.error("Committer main task already running")
if not self._commit_task:
self._commit_task = asyncio.create_task(self._run_commit_process())
else:
logger.error("Committer main task already running")

async def close(self) -> None:
"""Close committer."""
if not self._asyncio_commit_process_task:
if not self._commit_task:
logger.error("Committer main task is not running, cannot close committer properly")
return

self._flush_batch_event.set()
try:
await asyncio.wait_for(self._asyncio_commit_process_task, timeout=self._shutdown_timeout)
await asyncio.wait_for(self._commit_task, timeout=self._shutdown_timeout)
except TimeoutError:
logger.exception("Committer main task shutdown timed out, forcing cancellation")
self._asyncio_commit_process_task.cancel()
self._commit_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._asyncio_commit_process_task
await self._commit_task
except Exception as exc:
logger.exception("Committer task failed during shutdown", exc_info=exc)
raise

@property
def is_healthy(self) -> bool:
return self._asyncio_commit_process_task is not None and not self._asyncio_commit_process_task.done()
return self._commit_task is not None and not self._commit_task.done()
45 changes: 20 additions & 25 deletions faststream_concurrent_aiokafka/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,25 @@ async def consume_scope( # ty: ignore[invalid-method-override]
msg: KafkaAckableMessage,
) -> typing.Any: # noqa: ANN401
concurrent_processing: typing.Final[KafkaConcurrentHandler] = self.context.get(_PROCESSING_CONTEXT_KEY)
if not concurrent_processing:
logger.error("Kafka middleware. There is no concurrent processing instance in the context")
info = "No concurrent processing instance in the context"
raise RuntimeError(info)

if not concurrent_processing.is_running:
logger.error(
"Kafka middleware. Concurrent processing is not running. Maybe `initialize_concurrent_processing`"
" was forgotten?"
)
info = "Concurrent processing is not running"
raise RuntimeError(info)
if not concurrent_processing or not concurrent_processing.is_running:
err = "Concurrent processing is not running. Call `initialize_concurrent_processing` on app startup."
raise RuntimeError(err)

kafka_message: typing.Final = self.context.get("message")
if concurrent_processing.has_batch_commit and not kafka_message:
logger.error("Kafka middleware. No kafka message in the middleware, it means no consumer to commit batch.")
info = "No kafka message in the middleware"
raise RuntimeError(info)

try:

async def handler_wrapper() -> typing.Any: # noqa: ANN401
return await call_next(msg)
if not kafka_message:
err = "No Kafka message found in context. Ensure the middleware is used with a Kafka subscriber."
raise RuntimeError(err)

if getattr(kafka_message.consumer, "_enable_auto_commit", False):
err = (
"KafkaConcurrentProcessingMiddleware requires ack_policy=AckPolicy.MANUAL on all subscribers. "
"Auto-commit is enabled on this consumer, which commits offsets before processing tasks "
"complete and can cause message loss on crash. "
"Add ack_policy=AckPolicy.MANUAL to your @broker.subscriber(...) decorator."
)
raise RuntimeError(err)

await concurrent_processing.handle_task(handler_wrapper(), self.msg, kafka_message)
except Exception as exc:
raise RuntimeError(f"Kafka middleware. An error while sending task {msg}") from exc
await concurrent_processing.handle_task(call_next(msg), self.msg, kafka_message)


async def initialize_concurrent_processing(
Expand All @@ -60,8 +52,11 @@ async def initialize_concurrent_processing(
return existing

concurrent_processing: typing.Final = KafkaConcurrentHandler(
committer=KafkaBatchCommitter(
commit_batch_timeout_sec=commit_batch_timeout_sec,
commit_batch_size=commit_batch_size,
),
concurrency_limit=concurrency_limit,
committer=KafkaBatchCommitter(commit_batch_timeout_sec, commit_batch_size),
)
await concurrent_processing.start()
context.set_global(_PROCESSING_CONTEXT_KEY, concurrent_processing)
Expand Down
Loading
Loading