|
1 | | -# kafka-utils |
| 1 | +# faststream-concurrent-aiokafka |
| 2 | + |
| 3 | +Concurrent message processing middleware for [FastStream](https://faststream.airt.ai/) with aiokafka. |
| 4 | + |
| 5 | +By default FastStream processes Kafka messages sequentially. This library allows you to process multiple messages concurrently using asyncio tasks, with optional batch offset committing. |
| 6 | + |
| 7 | +## Installation |
| 8 | + |
| 9 | +```bash |
| 10 | +pip install faststream-concurrent-aiokafka |
| 11 | +``` |
| 12 | + |
| 13 | +## Usage |
| 14 | + |
| 15 | +```python |
| 16 | +from faststream import FastStream, ContextRepo |
| 17 | +from faststream.kafka import KafkaBroker |
| 18 | +from faststream_concurrent_aiokafka import ( |
| 19 | + KafkaConcurrentProcessingMiddleware, |
| 20 | + initialize_concurrent_processing, |
| 21 | + stop_concurrent_processing, |
| 22 | +) |
| 23 | + |
| 24 | +broker = KafkaBroker(middlewares=[KafkaConcurrentProcessingMiddleware]) |
| 25 | +app = FastStream(broker) |
| 26 | + |
| 27 | + |
| 28 | +@app.on_startup |
| 29 | +async def on_startup(context: ContextRepo) -> None: |
| 30 | + await initialize_concurrent_processing( |
| 31 | + context=context, |
| 32 | + concurrency_limit=20, # max concurrent tasks (0 = unlimited) |
| 33 | + commit_batch_size=100, # commit after this many messages |
| 34 | + commit_batch_timeout_sec=5, # or after this many seconds |
| 35 | + ) |
| 36 | + |
| 37 | + |
| 38 | +@app.on_shutdown |
| 39 | +async def on_shutdown(context: ContextRepo) -> None: |
| 40 | + await stop_concurrent_processing(context) |
| 41 | + |
| 42 | + |
| 43 | +@broker.subscriber("my-topic", group_id="my-group") |
| 44 | +async def handle(msg: str) -> None: |
| 45 | + # runs concurrently with other messages |
| 46 | + ... |
| 47 | +``` |
| 48 | + |
| 49 | +## Batch offset committing |
| 50 | + |
| 51 | +By default aiokafka auto-commits offsets. If you manage commits manually, enable `enable_batch_commit=True` to have the library commit offsets in batches after each task completes: |
| 52 | + |
| 53 | +```python |
| 54 | +await initialize_concurrent_processing( |
| 55 | + context=context, |
| 56 | + concurrency_limit=20, |
| 57 | + commit_batch_size=100, |
| 58 | + commit_batch_timeout_sec=5, |
| 59 | + enable_batch_commit=True, |
| 60 | +) |
| 61 | +``` |
| 62 | + |
| 63 | +With batch commit enabled, offsets are committed per partition at the highest completed offset in each batch. |
| 64 | + |
| 65 | +## Consumer group filtering |
| 66 | + |
| 67 | +When multiple consumer groups share a topic, messages can be tagged with a `topic_group` header. The middleware will only process messages whose `topic_group` header matches the consumer's group ID, skipping the rest. |
| 68 | + |
| 69 | +## Parameters |
| 70 | + |
| 71 | +### `initialize_concurrent_processing` |
| 72 | + |
| 73 | +| Parameter | Default | Description | |
| 74 | +|---|---|---| |
| 75 | +| `concurrency_limit` | `10` | Max concurrent asyncio tasks. `0` disables the limit. | |
| 76 | +| `commit_batch_size` | `10` | Max messages per commit batch. | |
| 77 | +| `commit_batch_timeout_sec` | `10` | Max seconds before flushing a batch. | |
| 78 | +| `enable_batch_commit` | `False` | Enable manual batch offset committing. | |
| 79 | + |
| 80 | +## Requirements |
| 81 | + |
| 82 | +- Python >= 3.11 |
| 83 | +- `faststream[kafka]` |
0 commit comments