Skip to content

Commit 85acf30

Browse files
committed
usual processing if mocked
1 parent fca3c04 commit 85acf30

3 files changed

Lines changed: 50 additions & 28 deletions

File tree

faststream_concurrent_aiokafka/middleware.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,11 @@ async def consume_scope( # ty: ignore[invalid-method-override]
3737
)
3838
raise RuntimeError(err)
3939

40+
if type(kafka_message.consumer).__name__ == "FakeConsumer":
41+
return await call_next(msg)
42+
4043
await concurrent_processing.handle_task(call_next(msg), self.msg, kafka_message)
44+
return None
4145

4246

4347
async def initialize_concurrent_processing(

faststream_concurrent_aiokafka/processing.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,6 @@ async def handle_task(
6969
task: typing.Final = asyncio.create_task(coroutine)
7070
self._current_tasks.add(task)
7171
task.add_done_callback(self._finish_task)
72-
if type(kafka_message.consumer).__name__ == "FakeConsumer":
73-
return
74-
7572
try:
7673
await self._committer.send_task(
7774
batch_committer.KafkaCommitTask(

tests/test_middleware.py

Lines changed: 46 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
initialize_concurrent_processing,
1414
stop_concurrent_processing,
1515
)
16-
from faststream_concurrent_aiokafka.processing import KafkaConcurrentHandler # used via patch.object
16+
from faststream_concurrent_aiokafka.processing import KafkaConcurrentHandler
1717

1818

1919
@pytest_asyncio.fixture
@@ -72,12 +72,8 @@ async def test(inner_broker: KafkaBroker) -> None:
7272
async with TestKafkaBroker(setup_broker) as test_broker:
7373
await test(test_broker)
7474

75+
# TestKafkaBroker uses FakeConsumer — middleware passes through directly (sequential)
7576
assert len(processed) == expected_size
76-
starts: typing.Final = [t for t in timestamps if t[0] == "start"]
77-
ends: typing.Final = [t for t in timestamps if t[0] == "end"]
78-
last_start: typing.Final = max(s[2] for s in starts)
79-
first_end: typing.Final = min(e[2] for e in ends)
80-
assert last_start < first_end, "Messages were not processed in parallel"
8177
await stop_concurrent_processing(setup_broker.context)
8278

8379

@@ -305,20 +301,20 @@ async def handler(msg: typing.Any) -> None:
305301
raise ValueError(msg)
306302

307303
async with TestKafkaBroker(setup_broker) as test_broker:
308-
hdl: typing.Final = await initialize_concurrent_processing(
304+
await initialize_concurrent_processing(
309305
context=test_broker.context,
310306
commit_batch_size=10,
311307
commit_batch_timeout_sec=5,
312308
)
313309

314310
try:
315-
await test_broker.publish({"id": 1}, topic="handler-error-topic")
316-
await hdl.wait_for_subtasks()
311+
# TestKafkaBroker uses FakeConsumer — middleware passes through, so handler
312+
# exceptions propagate directly from publish rather than being logged.
313+
with pytest.raises(ValueError, match="Handler failed"):
314+
await test_broker.publish({"id": 1}, topic="handler-error-topic")
317315
finally:
318316
await stop_concurrent_processing(test_broker.context)
319317

320-
assert "Task has failed" in caplog.text or "exception" in caplog.text.lower()
321-
322318

323319
async def test_middleware_concurrency_limiter_release_on_error(setup_broker: KafkaBroker) -> None:
324320
failed: typing.Final = []
@@ -331,19 +327,20 @@ async def handler(msg: typing.Any) -> None:
331327
raise ValueError(msg)
332328

333329
async with TestKafkaBroker(setup_broker) as test_broker:
334-
hdl: typing.Final = await initialize_concurrent_processing(
330+
await initialize_concurrent_processing(
335331
context=test_broker.context,
336332
commit_batch_size=10,
337333
commit_batch_timeout_sec=5,
338334
concurrency_limit=1,
339335
)
340336

341337
try:
342-
await test_broker.publish({"id": 1}, topic="limiter-error-topic")
343-
await hdl.wait_for_subtasks()
344-
345-
await test_broker.publish({"id": 2}, topic="limiter-error-topic")
346-
await hdl.wait_for_subtasks()
338+
# TestKafkaBroker uses FakeConsumer — middleware passes through, so handler
339+
# exceptions propagate directly and the concurrency limiter is not involved.
340+
with pytest.raises(ValueError, match="Failed"):
341+
await test_broker.publish({"id": 1}, topic="limiter-error-topic")
342+
with pytest.raises(ValueError, match="Failed"):
343+
await test_broker.publish({"id": 2}, topic="limiter-error-topic")
347344
finally:
348345
await stop_concurrent_processing(test_broker.context)
349346

@@ -386,11 +383,35 @@ async def handler(msg: typing.Any) -> None:
386383
commit_batch_timeout_sec=5,
387384
)
388385

389-
with patch.object(
390-
KafkaConcurrentHandler, "handle_task", side_effect=MemoryError("Out of memory in handle_task")
391-
):
392-
try:
393-
with pytest.raises(MemoryError, match="Out of memory in handle_task"):
394-
await test_broker.publish({"id": 1}, topic="general-error-topic")
395-
finally:
396-
await stop_concurrent_processing(test_broker.context)
386+
try:
387+
# TestKafkaBroker uses FakeConsumer — middleware passes through directly,
388+
# so handle_task is never called. Message is processed without error.
389+
await test_broker.publish({"id": 1}, topic="general-error-topic")
390+
finally:
391+
await stop_concurrent_processing(test_broker.context)
392+
393+
394+
async def test_middleware_fake_consumer_no_commit_error(
395+
setup_broker: KafkaBroker, caplog: pytest.LogCaptureFixture
396+
) -> None:
397+
"""TestKafkaBroker uses FakeConsumer; committing should not raise or log errors."""
398+
caplog.set_level(logging.ERROR)
399+
400+
@setup_broker.subscriber("fake-consumer-topic", group_id="fake-consumer-group")
401+
async def handler(msg: typing.Any) -> None:
402+
pass
403+
404+
async with TestKafkaBroker(setup_broker) as test_broker:
405+
hdl: typing.Final = await initialize_concurrent_processing(
406+
context=test_broker.context,
407+
commit_batch_size=10,
408+
commit_batch_timeout_sec=5,
409+
)
410+
411+
try:
412+
await test_broker.publish({"id": 1}, topic="fake-consumer-topic")
413+
await hdl.wait_for_subtasks()
414+
finally:
415+
await stop_concurrent_processing(test_broker.context)
416+
417+
assert "Error during commit to kafka" not in caplog.text

0 commit comments

Comments
 (0)