Skip to content

Commit da6bdb6

Browse files
authored
Merge pull request #2 from modern-python/fixes
small fixes
2 parents 85c21fe + 245f3f8 commit da6bdb6

5 files changed

Lines changed: 40 additions & 32 deletions

File tree

CLAUDE.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ just publish # bump version to $GITHUB_REF_NAME, build, publish to PyPI
1515

1616
Run a single test file or test by name:
1717
```bash
18-
uv run --no-sync pytest tests/committer/test_kafka_committer.py
18+
uv run --no-sync pytest tests/test_kafka_committer.py
1919
uv run --no-sync pytest -k test_committer_logs_task_exceptions
2020
```
2121

@@ -43,6 +43,6 @@ Runs as a background asyncio task (spawned via `spawn()`). Collects `KafkaCommit
4343

4444
## Key patterns
4545

46-
- **Singleton reset in tests**: `KafkaConcurrentHandler._initialized = False` and `._instance = None` must be reset between tests (done in conftest fixtures via `stop_concurrent_processing`).
46+
- **Singleton reset in tests**: `KafkaConcurrentHandler._initialized = False` and `._instance = None` must be reset between tests. Each test file does this directly in an `autouse` `reset_singleton` fixture — not via `stop_concurrent_processing`.
4747
- **Type suppression**: use `# ty: ignore[rule-name]` (not `# type: ignore`) for ty type checker suppressions.
4848
- **No `from __future__ import annotations`**: annotations are evaluated eagerly; `typing.Self`/`typing.Never` are used directly (requires Python ≥ 3.11).

README.md

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@ app = FastStream(broker)
2929
async def on_startup(context: ContextRepo) -> None:
3030
await initialize_concurrent_processing(
3131
context=context,
32-
concurrency_limit=20, # max concurrent tasks (0 = unlimited)
33-
commit_batch_size=100, # commit after this many messages
34-
commit_batch_timeout_sec=5, # or after this many seconds
32+
concurrency_limit=20, # max concurrent tasks (0 = unlimited)
3533
)
3634

3735

@@ -64,7 +62,16 @@ With batch commit enabled, offsets are committed per partition at the highest co
6462

6563
## Consumer group filtering
6664

67-
When multiple consumer groups share a topic, messages can be tagged with a `topic_group` header. The middleware will only process messages whose `topic_group` header matches the consumer's group ID, skipping the rest.
65+
When multiple consumer groups subscribe to the same topic, producers can tag messages with a `topic_group` header to direct them to a specific group. The middleware skips messages whose `topic_group` header doesn't match the consumer's group ID. Messages with no `topic_group` header are always processed.
66+
67+
```python
68+
# Producer side — send to a specific consumer group only
69+
await broker.publish(
70+
{"data": "..."},
71+
topic="my-topic",
72+
headers={"topic_group": "group-a"},
73+
)
74+
```
6875

6976
## Parameters
7077

faststream_concurrent_aiokafka/middleware.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ async def consume_scope( # ty: ignore[invalid-method-override]
1919
) -> typing.Any: # noqa: ANN401
2020
concurrent_processing: typing.Final[KafkaConcurrentHandler] = self.context.get(_PROCESSING_CONTEXT_KEY)
2121
if not concurrent_processing:
22-
logger.exception("Kafka middleware. There is no concurrent processing instance in the context")
22+
logger.error("Kafka middleware. There is no concurrent processing instance in the context")
2323
info = "No concurrent processing instance in the context"
2424
raise RuntimeError(info)
2525
if not concurrent_processing.is_running:
@@ -47,7 +47,7 @@ async def handler_wrapper() -> typing.Any: # noqa: ANN401
4747
async def initialize_concurrent_processing(
4848
context: ContextRepo,
4949
commit_batch_size: int,
50-
commit_batch_timeout_sec: int,
50+
commit_batch_timeout_sec: float,
5151
concurrency_limit: int = DEFAULT_CONCURRENCY_LIMIT,
5252
enable_batch_commit: bool = False,
5353
) -> None:

faststream_concurrent_aiokafka/processing.py

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def _finish_task(self, task: asyncio.Task[typing.Any]) -> None:
7777
self.limiter.release()
7878
exc: typing.Final[BaseException | None] = task.exception()
7979
if exc:
80-
logger.error("Kafka middleware. Kafka middleware. Task has failed with the exception", exc_info=exc)
80+
logger.error("Kafka middleware. Task has failed with the exception", exc_info=exc)
8181
self._current_tasks.discard(task)
8282

8383
async def handle_task(
@@ -86,28 +86,28 @@ async def handle_task(
8686
record: ConsumerRecord,
8787
kafka_message: KafkaAckableMessage,
8888
) -> None:
89+
if not self._is_need_to_process_message(kafka_message):
90+
coroutine.close()
91+
return
8992
if self.limiter:
9093
await self.limiter.acquire()
91-
if self._is_need_to_process_message(kafka_message):
92-
task: typing.Final = asyncio.create_task(coroutine)
93-
self._current_tasks.add(task)
94-
task.add_done_callback(self._finish_task)
95-
if self.enable_batch_commit and self._committer:
96-
try:
97-
await self._committer.send_task(
98-
batch_committer.KafkaCommitTask(
99-
asyncio_task=task,
100-
offset=record.offset,
101-
consumer=kafka_message.consumer,
102-
topic_partition=TopicPartition(topic=record.topic, partition=record.partition),
103-
)
94+
task: typing.Final = asyncio.create_task(coroutine)
95+
self._current_tasks.add(task)
96+
task.add_done_callback(self._finish_task)
97+
if self.enable_batch_commit and self._committer:
98+
try:
99+
await self._committer.send_task(
100+
batch_committer.KafkaCommitTask(
101+
asyncio_task=task,
102+
offset=record.offset,
103+
consumer=kafka_message.consumer,
104+
topic_partition=TopicPartition(topic=record.topic, partition=record.partition),
104105
)
105-
except batch_committer.CommitterIsDeadError:
106-
logger.exception("Kafka middleware. Committer is dead")
107-
await self.stop()
108-
raise
109-
elif self.limiter:
110-
self.limiter.release()
106+
)
107+
except batch_committer.CommitterIsDeadError:
108+
logger.exception("Kafka middleware. Committer is dead")
109+
await self.stop()
110+
raise
111111

112112
async def _check_tasks_health(self) -> None:
113113
to_discard: typing.Final = []
@@ -116,8 +116,6 @@ async def _check_tasks_health(self) -> None:
116116
to_discard.append(task)
117117
for task in to_discard:
118118
self._current_tasks.discard(task)
119-
if self.limiter:
120-
self.limiter.release()
121119
if to_discard:
122120
logger.info(f"Kafka middleware. Found completed but not discarded tasks, amount: {len(to_discard)}")
123121

tests/test_concurrent_processing.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,9 @@ async def test_concurrent_keeps_running_tasks(handler: KafkaConcurrentHandler) -
328328
await task
329329

330330

331-
async def test_concurrent_releases_limiter_for_done_tasks(handler_with_limit: KafkaConcurrentHandler) -> None:
331+
async def test_concurrent_health_check_discards_done_tasks_without_releasing_limiter(
332+
handler_with_limit: KafkaConcurrentHandler,
333+
) -> None:
332334
handler: typing.Final = handler_with_limit
333335

334336
async def quick_task() -> str:
@@ -340,8 +342,9 @@ async def quick_task() -> str:
340342
assert handler.limiter
341343
initial_value: typing.Final = handler.limiter._value
342344
await handler._check_tasks_health()
345+
assert task not in handler._current_tasks
343346
assert handler.limiter
344-
assert handler.limiter._value == initial_value + 1
347+
assert handler.limiter._value == initial_value # limiter released only by _finish_task callback
345348

346349

347350
async def test_concurrent_logs_found_tasks(handler: KafkaConcurrentHandler, caplog: pytest.LogCaptureFixture) -> None:

0 commit comments

Comments
 (0)