Skip to content

Commit 0d60241

Browse files
committed
refactor and add integration tests
1 parent da6bdb6 commit 0d60241

16 files changed

Lines changed: 504 additions & 117 deletions

.dockerignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
.venv
2+
__pycache__
3+
.pytest_cache
4+
.ruff_cache
5+
.mypy_cache
6+
dist

.github/workflows/ci.yml

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020
with:
2121
enable-cache: true
2222
cache-dependency-glob: "**/pyproject.toml"
23-
- run: uv python install 3.11
23+
- run: uv python install 3.13
2424
- run: just install lint-ci
2525

2626
pytest:
@@ -33,20 +33,24 @@ jobs:
3333
- "3.12"
3434
- "3.13"
3535
- "3.14"
36+
services:
37+
redpanda:
38+
image: redpandadata/redpanda:latest
39+
ports:
40+
- 9092:9092
41+
options: >-
42+
--health-cmd "rpk cluster health"
43+
--health-interval 10s
44+
--health-timeout 5s
45+
--health-retries 5
3646
steps:
3747
- uses: actions/checkout@v4
38-
- uses: extractions/setup-just@v2
3948
- uses: astral-sh/setup-uv@v3
4049
with:
4150
enable-cache: true
4251
cache-dependency-glob: "**/pyproject.toml"
4352
- run: uv python install ${{ matrix.python-version }}
44-
- run: just install
45-
- run: just test . --cov=. --cov-report xml
46-
- uses: codecov/codecov-action@v4.0.1
53+
- run: uv sync --frozen
54+
- run: uv run --no-sync pytest . --cov=. --cov-report xml
4755
env:
48-
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
49-
with:
50-
files: ./coverage.xml
51-
flags: unittests
52-
name: codecov-${{ matrix.python-version }}
56+
KAFKA_BOOTSTRAP_SERVERS: localhost:9092

CLAUDE.md

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
55
## Commands
66

77
```bash
8-
just install # lock + sync all deps (run after pulling or changing pyproject.toml)
9-
just lint # eof-fixer, ruff format, ruff check --fix, ty check
10-
just lint-ci # same but no auto-fix (used in CI)
11-
just test # pytest with coverage
12-
just test-branch # pytest with branch coverage
13-
just publish # bump version to $GITHUB_REF_NAME, build, publish to PyPI
8+
just install # lock + sync all deps (run after pulling or changing pyproject.toml)
9+
just lint # eof-fixer, ruff format, ruff check --fix, ty check
10+
just lint-ci # same but no auto-fix (used in CI)
11+
just build # build the application Docker image
12+
just test # run all tests in Docker (starts Redpanda, runs pytest, tears down)
13+
just test-branch # same with branch coverage
14+
just down # tear down all containers
15+
just publish # bump version to $GITHUB_REF_NAME, build, publish to PyPI
1416
```
1517

1618
Run a single test file or test by name:
@@ -43,6 +45,19 @@ Runs as a background asyncio task (spawned via `spawn()`). Collects `KafkaCommit
4345

4446
## Key patterns
4547

46-
- **Singleton reset in tests**: `KafkaConcurrentHandler._initialized = False` and `._instance = None` must be reset between tests. Each test file does this directly in an `autouse` `reset_singleton` fixture not via `stop_concurrent_processing`.
48+
- **Singleton reset in tests**: `KafkaConcurrentHandler._initialized = False` and `._instance = None` must be reset between tests. The shared `autouse` `reset_singleton` fixture lives in `tests/conftest.py` — do not re-define it in individual test files.
4749
- **Type suppression**: use `# ty: ignore[rule-name]` (not `# type: ignore`) for ty type checker suppressions.
4850
- **No `from __future__ import annotations`**: annotations are evaluated eagerly; `typing.Self`/`typing.Never` are used directly (requires Python ≥ 3.11).
51+
52+
## Integration tests
53+
54+
`tests/test_integration.py` runs against a real Redpanda container (Kafka-compatible, lightweight) via `testcontainers[kafka]`. The container is session-scoped — one instance for the whole test run.
55+
56+
**Running integration tests** requires Docker — they run automatically as part of `just test`.
57+
58+
**Key findings from building these tests:**
59+
60+
- `async with KafkaBroker():` only calls `connect()`, which sets up the producer. It does **not** start subscribers. You must also call `await broker.start()` explicitly to launch the consumer poll tasks.
61+
- Always use `auto_offset_reset="earliest"` on test subscribers. The default `"latest"` causes the consumer to miss messages published before it gets its partition assignment.
62+
- Pre-create topics with `AIOKafkaAdminClient` before starting the broker. Auto-creation on first publish triggers a `NotLeaderForPartitionError` retry loop that can outlast short sleeps.
63+
- After `await broker.start()`, sleep ~1.5 s before publishing to let the consumer join the group and receive partition assignments.

Dockerfile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
FROM ghcr.io/astral-sh/uv:python3.13-bookworm-slim
2+
WORKDIR /app
3+
COPY pyproject.toml uv.lock ./
4+
RUN uv sync --frozen --no-install-project
5+
COPY . .
6+
RUN uv sync --frozen

Justfile

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,16 @@
1-
default: install lint test
1+
default: install lint build test
2+
3+
down:
4+
docker compose down --remove-orphans
5+
6+
test *args: build down && down
7+
docker compose run application uv run --no-sync pytest {{ args }}
8+
9+
test-branch:
10+
@just test --cov-branch
11+
12+
build:
13+
docker compose build application
214

315
install:
416
uv lock --upgrade
@@ -16,12 +28,6 @@ lint-ci:
1628
uv run ruff check --no-fix
1729
uv run ty check
1830

19-
test *args:
20-
uv run --no-sync pytest {{ args }}
21-
22-
test-branch:
23-
@just test --cov-branch
24-
2531
publish:
2632
rm -rf dist
2733
uv version $GITHUB_REF_NAME

docker-compose.yml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
services:
2+
redpanda:
3+
image: redpandadata/redpanda:latest
4+
command:
5+
- redpanda
6+
- start
7+
- --overprovisioned
8+
- --smp 1
9+
- --memory 512M
10+
- --reserve-memory 0M
11+
- --node-id 0
12+
- --check=false
13+
- --kafka-addr=PLAINTEXT://0.0.0.0:9092
14+
- --advertise-kafka-addr=PLAINTEXT://redpanda:9092
15+
ports:
16+
- "9092:9092"
17+
18+
application:
19+
build: .
20+
environment:
21+
KAFKA_BOOTSTRAP_SERVERS: redpanda:9092
22+
depends_on:
23+
- redpanda

faststream_concurrent_aiokafka/batch_committer.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -66,31 +66,30 @@ async def _populate_commit_batch(self) -> tuple[list[KafkaCommitTask], bool]:
6666
uncommited_tasks: typing.Final[list[KafkaCommitTask]] = []
6767
should_shutdown = False
6868
queue_get_task: asyncio.Task[typing.Any] | None = None
69-
flush_wait_task: asyncio.Task[typing.Any] | None = None
70-
timeout_task: asyncio.Task[typing.Any] | None = None
69+
# Create timeout and flush-wait tasks once; reused across queue-get iterations.
70+
timeout_task: asyncio.Task[None] = asyncio.create_task(asyncio.sleep(self._commit_batch_timeout_sec))
71+
flush_wait_task: asyncio.Task[bool] = asyncio.create_task(self._flush_batch_event.wait())
7172
try:
72-
timeout_task = asyncio.create_task(asyncio.sleep(self._commit_batch_timeout_sec))
7373
while len(uncommited_tasks) < self._commit_batch_size:
7474
queue_get_task = asyncio.create_task(self._messages_queue.get())
75-
flush_wait_task = asyncio.create_task(self._flush_batch_event.wait())
76-
await asyncio.wait([queue_get_task, flush_wait_task, timeout_task], return_when=asyncio.FIRST_COMPLETED)
75+
done, _ = await asyncio.wait(
76+
[queue_get_task, flush_wait_task, timeout_task],
77+
return_when=asyncio.FIRST_COMPLETED,
78+
)
7779

78-
if queue_get_task.done():
80+
if queue_get_task in done:
7981
uncommited_tasks.append(queue_get_task.result())
8082
else:
8183
queue_get_task.cancel()
8284

83-
# commit_all is called
84-
if flush_wait_task.done():
85-
queue_get_task.cancel()
85+
# commit_all was called — flush remaining queue items and stop
86+
if flush_wait_task in done:
8687
uncommited_tasks.extend(self._flush_tasks_queue())
8788
self._flush_batch_event.clear()
88-
timeout_task.cancel()
8989
should_shutdown = True
9090
break
91-
flush_wait_task.cancel()
9291

93-
if timeout_task.done():
92+
if timeout_task in done:
9493
logger.debug("Timeout exceeded, batch contains %s elements", len(uncommited_tasks))
9594
break
9695

@@ -100,7 +99,8 @@ async def _populate_commit_batch(self) -> tuple[list[KafkaCommitTask], bool]:
10099
uncommited_tasks.extend(self._flush_tasks_queue())
101100

102101
for task in (queue_get_task, flush_wait_task, timeout_task):
103-
task and task.cancel()
102+
if task:
103+
task.cancel()
104104

105105
return uncommited_tasks, should_shutdown
106106

@@ -142,6 +142,7 @@ async def _commit_tasks_batch(self, tasks_batch: list[KafkaCommitTask]) -> bool:
142142
max_message_offset = task.offset
143143

144144
if max_message_offset is not None:
145+
# Kafka commits the *next* offset to fetch, so committed = processed_max + 1
145146
partitions_to_offsets[partition] = max_message_offset + 1
146147

147148
commit_succeeded: typing.Final = await self._call_committer(tasks_batch, partitions_to_offsets)

faststream_concurrent_aiokafka/middleware.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,21 @@ async def consume_scope( # ty: ignore[invalid-method-override]
2222
logger.error("Kafka middleware. There is no concurrent processing instance in the context")
2323
info = "No concurrent processing instance in the context"
2424
raise RuntimeError(info)
25+
2526
if not concurrent_processing.is_running:
2627
logger.error(
2728
"Kafka middleware. Concurrent processing is not running. Maybe `initialize_concurrent_processing`"
2829
" was forgotten?"
2930
)
3031
info = "Concurrent processing is not running"
3132
raise RuntimeError(info)
33+
3234
kafka_message: typing.Final = self.context.get("message")
3335
if concurrent_processing.enable_batch_commit and not kafka_message:
3436
logger.error("Kafka middleware. No kafka message in the middleware, it means no consumer to commit batch.")
3537
info = "No kafka message in the middleware"
3638
raise RuntimeError(info)
39+
3740
try:
3841

3942
async def handler_wrapper() -> typing.Any: # noqa: ANN401
@@ -46,10 +49,10 @@ async def handler_wrapper() -> typing.Any: # noqa: ANN401
4649

4750
async def initialize_concurrent_processing(
4851
context: ContextRepo,
49-
commit_batch_size: int,
50-
commit_batch_timeout_sec: float,
5152
concurrency_limit: int = DEFAULT_CONCURRENCY_LIMIT,
5253
enable_batch_commit: bool = False,
54+
commit_batch_size: int = 10,
55+
commit_batch_timeout_sec: float = 10.0,
5356
) -> None:
5457
concurrent_processing: typing.Final = KafkaConcurrentHandler(
5558
commit_batch_size=commit_batch_size,
@@ -82,5 +85,4 @@ async def stop_concurrent_processing(
8285
await concurrent_processing.stop()
8386
context.set_global(_PROCESSING_CONTEXT_KEY, None)
8487

85-
KafkaConcurrentHandler._initialized = False # noqa: SLF001
86-
KafkaConcurrentHandler._instance = None # noqa: SLF001
88+
KafkaConcurrentHandler.reset()

faststream_concurrent_aiokafka/processing.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,15 @@ def __init__(
5858
self._commit_batch_size: int = commit_batch_size
5959

6060
self._committer: KafkaBatchCommitter | None = None
61+
self._stop_task: asyncio.Task[typing.Any] | None = None
6162
self._initialized = True
6263

64+
@classmethod
65+
def reset(cls) -> None:
66+
with cls._lock:
67+
cls._initialized = False
68+
cls._instance = None
69+
6370
def _is_need_to_process_message(self, message: KafkaAckableMessage) -> bool:
6471
headers_topic_group: typing.Final[str | None] = message.headers.get(TOPIC_GROUP_KEY)
6572
group_id: typing.Final[str] = getattr(message.consumer, "_group_id", "")
@@ -143,7 +150,7 @@ def _setup_signal_handlers(self) -> None:
143150

144151
def _signal_handler(self, sig: signal.Signals) -> None:
145152
logger.info(f"Kafka middleware. Received signal {sig.name}, initiating graceful shutdown...")
146-
asyncio.create_task(self.stop()) # noqa: RUF006
153+
self._stop_task = asyncio.create_task(self.stop())
147154

148155
async def start(self) -> None:
149156
if self._is_running:

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ isort.no-lines-before = ["standard-library", "local-folder"]
8282
]
8383

8484
[tool.pytest.ini_options]
85-
addopts = "--cov=. --cov-report term-missing"
85+
addopts = "-n auto --cov=. --cov-report term-missing"
8686
asyncio_mode = "auto"
8787
asyncio_default_fixture_loop_scope = "function"
8888

0 commit comments

Comments
 (0)