Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.venv
__pycache__
.pytest_cache
.ruff_cache
.mypy_cache
dist
24 changes: 14 additions & 10 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
with:
enable-cache: true
cache-dependency-glob: "**/pyproject.toml"
- run: uv python install 3.11
- run: uv python install 3.13
- run: just install lint-ci

pytest:
Expand All @@ -33,20 +33,24 @@ jobs:
- "3.12"
- "3.13"
- "3.14"
services:
redpanda:
image: redpandadata/redpanda:latest
ports:
- 9092:9092
options: >-
--health-cmd "rpk cluster health"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4
- uses: extractions/setup-just@v2
- uses: astral-sh/setup-uv@v3
with:
enable-cache: true
cache-dependency-glob: "**/pyproject.toml"
- run: uv python install ${{ matrix.python-version }}
- run: just install
- run: just test . --cov=. --cov-report xml
- uses: codecov/codecov-action@v4.0.1
- run: uv sync --frozen
- run: uv run --no-sync pytest . --cov=. --cov-report xml
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
files: ./coverage.xml
flags: unittests
name: codecov-${{ matrix.python-version }}
KAFKA_BOOTSTRAP_SERVERS: localhost:9092
29 changes: 22 additions & 7 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
## Commands

```bash
just install # lock + sync all deps (run after pulling or changing pyproject.toml)
just lint # eof-fixer, ruff format, ruff check --fix, ty check
just lint-ci # same but no auto-fix (used in CI)
just test # pytest with coverage
just test-branch # pytest with branch coverage
just publish # bump version to $GITHUB_REF_NAME, build, publish to PyPI
just install # lock + sync all deps (run after pulling or changing pyproject.toml)
just lint # eof-fixer, ruff format, ruff check --fix, ty check
just lint-ci # same but no auto-fix (used in CI)
just build # build the application Docker image
just test # run all tests in Docker (starts Redpanda, runs pytest, tears down)
just test-branch # same with branch coverage
just down # tear down all containers
just publish # bump version to $GITHUB_REF_NAME, build, publish to PyPI
```

Run a single test file or test by name:
Expand Down Expand Up @@ -43,6 +45,19 @@ Runs as a background asyncio task (spawned via `spawn()`). Collects `KafkaCommit

## Key patterns

- **Singleton reset in tests**: `KafkaConcurrentHandler._initialized = False` and `._instance = None` must be reset between tests. Each test file does this directly in an `autouse` `reset_singleton` fixture not via `stop_concurrent_processing`.
- **Singleton reset in tests**: `KafkaConcurrentHandler._initialized = False` and `._instance = None` must be reset between tests. The shared `autouse` `reset_singleton` fixture lives in `tests/conftest.py` — do not re-define it in individual test files.
- **Type suppression**: use `# ty: ignore[rule-name]` (not `# type: ignore`) for ty type checker suppressions.
- **No `from __future__ import annotations`**: annotations are evaluated eagerly; `typing.Self`/`typing.Never` are used directly (requires Python ≥ 3.11).

## Integration tests

`tests/test_integration.py` runs against a real Redpanda container (Kafka-compatible, lightweight) via `testcontainers[kafka]`. The container is session-scoped — one instance for the whole test run.

**Running integration tests** requires Docker — they run automatically as part of `just test`.

**Key findings from building these tests:**

- `async with KafkaBroker():` only calls `connect()`, which sets up the producer. It does **not** start subscribers. You must also call `await broker.start()` explicitly to launch the consumer poll tasks.
- Always use `auto_offset_reset="earliest"` on test subscribers. The default `"latest"` causes the consumer to miss messages published before it gets its partition assignment.
- Pre-create topics with `AIOKafkaAdminClient` before starting the broker. Auto-creation on first publish triggers a `NotLeaderForPartitionError` retry loop that can outlast short sleeps.
- After `await broker.start()`, sleep ~1.5 s before publishing to let the consumer join the group and receive partition assignments.
6 changes: 6 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM ghcr.io/astral-sh/uv:python3.13-bookworm-slim
WORKDIR /app
COPY pyproject.toml uv.lock ./
RUN uv sync --frozen --no-install-project
COPY . .
RUN uv sync --frozen
20 changes: 13 additions & 7 deletions Justfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
default: install lint test
default: install lint build test

down:
docker compose down --remove-orphans

test *args: build down && down
docker compose run application uv run --no-sync pytest {{ args }}

test-branch:
@just test --cov-branch

build:
docker compose build application

install:
uv lock --upgrade
Expand All @@ -16,12 +28,6 @@ lint-ci:
uv run ruff check --no-fix
uv run ty check

test *args:
uv run --no-sync pytest {{ args }}

test-branch:
@just test --cov-branch

publish:
rm -rf dist
uv version $GITHUB_REF_NAME
Expand Down
23 changes: 23 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
services:
redpanda:
image: redpandadata/redpanda:latest
command:
- redpanda
- start
- --overprovisioned
- --smp 1
- --memory 512M
- --reserve-memory 0M
- --node-id 0
- --check=false
- --kafka-addr=PLAINTEXT://0.0.0.0:9092
- --advertise-kafka-addr=PLAINTEXT://redpanda:9092
ports:
- "9092:9092"

application:
build: .
environment:
KAFKA_BOOTSTRAP_SERVERS: redpanda:9092
depends_on:
- redpanda
27 changes: 14 additions & 13 deletions faststream_concurrent_aiokafka/batch_committer.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,31 +66,30 @@ async def _populate_commit_batch(self) -> tuple[list[KafkaCommitTask], bool]:
uncommited_tasks: typing.Final[list[KafkaCommitTask]] = []
should_shutdown = False
queue_get_task: asyncio.Task[typing.Any] | None = None
flush_wait_task: asyncio.Task[typing.Any] | None = None
timeout_task: asyncio.Task[typing.Any] | None = None
# Create timeout and flush-wait tasks once; reused across queue-get iterations.
timeout_task: asyncio.Task[None] = asyncio.create_task(asyncio.sleep(self._commit_batch_timeout_sec))
flush_wait_task: asyncio.Task[bool] = asyncio.create_task(self._flush_batch_event.wait())
try:
timeout_task = asyncio.create_task(asyncio.sleep(self._commit_batch_timeout_sec))
while len(uncommited_tasks) < self._commit_batch_size:
queue_get_task = asyncio.create_task(self._messages_queue.get())
flush_wait_task = asyncio.create_task(self._flush_batch_event.wait())
await asyncio.wait([queue_get_task, flush_wait_task, timeout_task], return_when=asyncio.FIRST_COMPLETED)
done, _ = await asyncio.wait(
[queue_get_task, flush_wait_task, timeout_task],
return_when=asyncio.FIRST_COMPLETED,
)

if queue_get_task.done():
if queue_get_task in done:
uncommited_tasks.append(queue_get_task.result())
else:
queue_get_task.cancel()

# commit_all is called
if flush_wait_task.done():
queue_get_task.cancel()
# commit_all was called — flush remaining queue items and stop
if flush_wait_task in done:
uncommited_tasks.extend(self._flush_tasks_queue())
self._flush_batch_event.clear()
timeout_task.cancel()
should_shutdown = True
break
flush_wait_task.cancel()

if timeout_task.done():
if timeout_task in done:
logger.debug("Timeout exceeded, batch contains %s elements", len(uncommited_tasks))
break

Expand All @@ -100,7 +99,8 @@ async def _populate_commit_batch(self) -> tuple[list[KafkaCommitTask], bool]:
uncommited_tasks.extend(self._flush_tasks_queue())

for task in (queue_get_task, flush_wait_task, timeout_task):
task and task.cancel()
if task:
task.cancel()

return uncommited_tasks, should_shutdown

Expand Down Expand Up @@ -142,6 +142,7 @@ async def _commit_tasks_batch(self, tasks_batch: list[KafkaCommitTask]) -> bool:
max_message_offset = task.offset

if max_message_offset is not None:
# Kafka commits the *next* offset to fetch, so committed = processed_max + 1
partitions_to_offsets[partition] = max_message_offset + 1

commit_succeeded: typing.Final = await self._call_committer(tasks_batch, partitions_to_offsets)
Expand Down
10 changes: 6 additions & 4 deletions faststream_concurrent_aiokafka/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,21 @@ async def consume_scope( # ty: ignore[invalid-method-override]
logger.error("Kafka middleware. There is no concurrent processing instance in the context")
info = "No concurrent processing instance in the context"
raise RuntimeError(info)

if not concurrent_processing.is_running:
logger.error(
"Kafka middleware. Concurrent processing is not running. Maybe `initialize_concurrent_processing`"
" was forgotten?"
)
info = "Concurrent processing is not running"
raise RuntimeError(info)

kafka_message: typing.Final = self.context.get("message")
if concurrent_processing.enable_batch_commit and not kafka_message:
logger.error("Kafka middleware. No kafka message in the middleware, it means no consumer to commit batch.")
info = "No kafka message in the middleware"
raise RuntimeError(info)

try:

async def handler_wrapper() -> typing.Any: # noqa: ANN401
Expand All @@ -46,10 +49,10 @@ async def handler_wrapper() -> typing.Any: # noqa: ANN401

async def initialize_concurrent_processing(
context: ContextRepo,
commit_batch_size: int,
commit_batch_timeout_sec: float,
concurrency_limit: int = DEFAULT_CONCURRENCY_LIMIT,
enable_batch_commit: bool = False,
commit_batch_size: int = 10,
commit_batch_timeout_sec: float = 10.0,
) -> None:
concurrent_processing: typing.Final = KafkaConcurrentHandler(
commit_batch_size=commit_batch_size,
Expand Down Expand Up @@ -82,5 +85,4 @@ async def stop_concurrent_processing(
await concurrent_processing.stop()
context.set_global(_PROCESSING_CONTEXT_KEY, None)

KafkaConcurrentHandler._initialized = False # noqa: SLF001
KafkaConcurrentHandler._instance = None # noqa: SLF001
KafkaConcurrentHandler.reset()
9 changes: 8 additions & 1 deletion faststream_concurrent_aiokafka/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,15 @@ def __init__(
self._commit_batch_size: int = commit_batch_size

self._committer: KafkaBatchCommitter | None = None
self._stop_task: asyncio.Task[typing.Any] | None = None
self._initialized = True

@classmethod
def reset(cls) -> None:
with cls._lock:
cls._initialized = False
cls._instance = None

def _is_need_to_process_message(self, message: KafkaAckableMessage) -> bool:
headers_topic_group: typing.Final[str | None] = message.headers.get(TOPIC_GROUP_KEY)
group_id: typing.Final[str] = getattr(message.consumer, "_group_id", "")
Expand Down Expand Up @@ -143,7 +150,7 @@ def _setup_signal_handlers(self) -> None:

def _signal_handler(self, sig: signal.Signals) -> None:
logger.info(f"Kafka middleware. Received signal {sig.name}, initiating graceful shutdown...")
asyncio.create_task(self.stop()) # noqa: RUF006
self._stop_task = asyncio.create_task(self.stop())

async def start(self) -> None:
if self._is_running:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ isort.no-lines-before = ["standard-library", "local-folder"]
]

[tool.pytest.ini_options]
addopts = "--cov=. --cov-report term-missing"
addopts = "-n auto --cov=. --cov-report term-missing"
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"

Expand Down
18 changes: 18 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os
import typing

import pytest

from faststream_concurrent_aiokafka.processing import KafkaConcurrentHandler


@pytest.fixture(scope="session")
def kafka_bootstrap_servers() -> str:
return os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")


@pytest.fixture(autouse=True)
def reset_singleton() -> typing.Iterator[None]:
KafkaConcurrentHandler.reset()
yield
KafkaConcurrentHandler.reset()
38 changes: 38 additions & 0 deletions tests/mocks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Shared mock classes used across multiple test modules."""

import typing
from unittest.mock import AsyncMock, Mock


class MockAIOKafkaConsumer:
def __init__(self, group_id: str = "test-group") -> None:
self._group_id = group_id
self.commit = AsyncMock()


class MockAsyncioTask:
def __init__(self, result: str | None = None, exception: Exception | None = None, done: bool = True) -> None:
self._result: str | None = result
self._exception: Exception | None = exception
self._done: bool = done
self._cancelled: bool = False

def __await__(self) -> typing.Generator[typing.Any, None, str | None]:
if self._exception:
raise self._exception
if False: # pragma: no cover
yield # makes this a generator so it behaves as a proper awaitable
return self._result


class MockKafkaBatchCommitter:
def __init__(self, *_args: object, **_kwargs: object) -> None:
self.send_task = AsyncMock()
self.close = AsyncMock()
self.spawn = Mock()
self.commit_all = AsyncMock()
self._healthy = True

@property
def is_healthy(self) -> bool:
return self._healthy
Loading
Loading