Skip to content

Commit c936861

Browse files
committed
init
0 parents  commit c936861

17 files changed

Lines changed: 3055 additions & 0 deletions

.github/workflows/ci.yml

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
name: main
2+
3+
on:
4+
push:
5+
branches:
6+
- main
7+
pull_request: {}
8+
9+
concurrency:
10+
group: ${{ github.head_ref || github.run_id }}
11+
cancel-in-progress: true
12+
13+
jobs:
14+
lint:
15+
runs-on: ubuntu-latest
16+
steps:
17+
- uses: actions/checkout@v4
18+
- uses: extractions/setup-just@v2
19+
- uses: astral-sh/setup-uv@v3
20+
with:
21+
enable-cache: true
22+
cache-dependency-glob: "**/pyproject.toml"
23+
- run: uv python install 3.11
24+
- run: just install lint-ci
25+
26+
pytest:
27+
runs-on: ubuntu-latest
28+
strategy:
29+
fail-fast: false
30+
matrix:
31+
python-version:
32+
- "3.11"
33+
- "3.12"
34+
- "3.13"
35+
- "3.14"
36+
steps:
37+
- uses: actions/checkout@v4
38+
- uses: extractions/setup-just@v2
39+
- uses: astral-sh/setup-uv@v3
40+
with:
41+
enable-cache: true
42+
cache-dependency-glob: "**/pyproject.toml"
43+
- run: uv python install ${{ matrix.python-version }}
44+
- run: just install
45+
- run: just test . --cov=. --cov-report xml
46+
- uses: codecov/codecov-action@v4.0.1
47+
env:
48+
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
49+
with:
50+
files: ./coverage.xml
51+
flags: unittests
52+
name: codecov-${{ matrix.python-version }}

.github/workflows/publish.yml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
name: Publish Package
2+
3+
on:
4+
release:
5+
types:
6+
- published
7+
8+
jobs:
9+
publish:
10+
runs-on: ubuntu-latest
11+
steps:
12+
- uses: actions/checkout@v4
13+
- uses: extractions/setup-just@v2
14+
- uses: astral-sh/setup-uv@v3
15+
- run: just publish
16+
env:
17+
PYPI_TOKEN: ${{ secrets.PYPI_TOKEN }}

.gitignore

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Generic things
2+
*.pyc
3+
*~
4+
__pycache__/*
5+
*.swp
6+
*.sqlite3
7+
*.map
8+
.vscode
9+
.idea
10+
.DS_Store
11+
.env
12+
.pytest_cache
13+
.ruff_cache
14+
.coverage
15+
htmlcov/
16+
coverage.xml
17+
pytest.xml
18+
.python-version
19+
.venv
20+
dist

CLAUDE.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# CLAUDE.md
2+
3+
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
4+
5+
## Commands
6+
7+
```bash
8+
just install # lock + sync all deps (run after pulling or changing pyproject.toml)
9+
just lint # eof-fixer, ruff format, ruff check --fix, ty check
10+
just lint-ci # same but no auto-fix (used in CI)
11+
just test # pytest with coverage
12+
just test-branch # pytest with branch coverage
13+
just publish # bump version to $GITHUB_REF_NAME, build, publish to PyPI
14+
```
15+
16+
Run a single test file or test by name:
17+
```bash
18+
uv run --no-sync pytest tests/committer/test_kafka_committer.py
19+
uv run --no-sync pytest -k test_committer_logs_task_exceptions
20+
```
21+
22+
## Architecture
23+
24+
The library provides concurrent Kafka message processing for FastStream. Three modules are exposed:
25+
26+
**`processing.py``KafkaConcurrentHandler` (singleton)**
27+
The core engine. Implements the singleton pattern (one instance per process) using `__new__` + `threading.Lock`. Manages:
28+
- An `asyncio.Semaphore` for concurrency limiting (`concurrency_limit=0` disables it)
29+
- A set of in-flight `asyncio.Task`s
30+
- A background observer task that periodically calls `_check_tasks_health()` to discard stale completed tasks
31+
- Signal handlers (SIGTERM/SIGINT/SIGQUIT) that trigger graceful shutdown
32+
- Optional integration with `KafkaBatchCommitter` when `enable_batch_commit=True`
33+
34+
Key design: `handle_task()` fires-and-forgets coroutines as asyncio tasks. Message filtering by consumer group is done via the `topic_group` header — messages whose header doesn't match the consumer's `_group_id` are skipped.
35+
36+
**`middleware.py` — FastStream middleware + lifecycle functions**
37+
- `KafkaConcurrentProcessingMiddleware`: FastStream `BaseMiddleware` subclass. Its `consume_scope` wraps each incoming message in a task submitted to `KafkaConcurrentHandler` (retrieved from FastStream's context).
38+
- `initialize_concurrent_processing(context, ...)`: call on app startup to create and start the handler, storing it in FastStream's global context.
39+
- `stop_concurrent_processing(context)`: call on app shutdown; resets the singleton so it can be re-initialized (important for tests).
40+
41+
**`batch_committer.py``KafkaBatchCommitter`**
42+
Runs as a background asyncio task (spawned via `spawn()`). Collects `KafkaCommitTask` objects from a queue, batches them by topic-partition, waits for each task's asyncio future to complete, then commits the max offset per partition to Kafka. Batching is triggered by timeout or batch size, whichever comes first. `CommitterIsDeadError` is raised to callers if the committer's main task has died.
43+
44+
## Key patterns
45+
46+
- **Singleton reset in tests**: `KafkaConcurrentHandler._initialized = False` and `._instance = None` must be reset between tests (done in conftest fixtures via `stop_concurrent_processing`).
47+
- **Type suppression**: use `# ty: ignore[rule-name]` (not `# type: ignore`) for ty type checker suppressions.
48+
- **No `from __future__ import annotations`**: annotations are evaluated eagerly; `typing.Self`/`typing.Never` are used directly (requires Python ≥ 3.11).

Justfile

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
default: install lint test
2+
3+
install:
4+
uv lock --upgrade
5+
uv sync --all-extras --frozen --group lint
6+
7+
lint:
8+
uv run eof-fixer .
9+
uv run ruff format
10+
uv run ruff check --fix
11+
uv run ty check
12+
13+
lint-ci:
14+
uv run eof-fixer . --check
15+
uv run ruff format --check
16+
uv run ruff check --no-fix
17+
uv run ty check
18+
19+
test *args:
20+
uv run --no-sync pytest {{ args }}
21+
22+
test-branch:
23+
@just test --cov-branch
24+
25+
publish:
26+
rm -rf dist
27+
uv version $GITHUB_REF_NAME
28+
uv build
29+
uv publish --token $PYPI_TOKEN

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# kafka-utils
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from faststream_concurrent_aiokafka.middleware import (
2+
KafkaConcurrentProcessingMiddleware,
3+
initialize_concurrent_processing,
4+
stop_concurrent_processing,
5+
)
6+
7+
8+
__all__ = [
9+
"KafkaConcurrentProcessingMiddleware",
10+
"initialize_concurrent_processing",
11+
"stop_concurrent_processing",
12+
]
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
import asyncio
2+
import contextlib
3+
import dataclasses
4+
import itertools
5+
import logging
6+
import threading
7+
import typing
8+
9+
from faststream.kafka import TopicPartition
10+
11+
12+
if typing.TYPE_CHECKING:
13+
from aiokafka import AIOKafkaConsumer
14+
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
SHUTDOWN_TIMEOUT_SEC: typing.Final = 20
20+
21+
22+
class CommitterIsDeadError(Exception): ...
23+
24+
25+
@dataclasses.dataclass(frozen=True, kw_only=True, slots=True)
26+
class KafkaCommitTask:
27+
asyncio_task: asyncio.Task[typing.Any]
28+
topic_partition: TopicPartition
29+
offset: int
30+
consumer: typing.Any
31+
32+
33+
class KafkaBatchCommitter:
34+
def __init__(
35+
self,
36+
commit_batch_timeout_sec: float = 10.0,
37+
commit_batch_size: int = 10,
38+
) -> None:
39+
self._messages_queue: asyncio.Queue[KafkaCommitTask] = asyncio.Queue()
40+
self._asyncio_commit_process_task: asyncio.Task[typing.Any] | None = None
41+
self._flush_batch_event = asyncio.Event()
42+
43+
self._commit_batch_timeout_sec = commit_batch_timeout_sec
44+
self._commit_batch_size = commit_batch_size
45+
self._shutdown_timeout = SHUTDOWN_TIMEOUT_SEC
46+
47+
self._spawn_lock = threading.Lock()
48+
49+
def _check_is_commit_task_running(self) -> None:
50+
is_commit_task_running: typing.Final[bool] = bool(
51+
self._asyncio_commit_process_task
52+
and not self._asyncio_commit_process_task.cancelled()
53+
and not self._asyncio_commit_process_task.done(),
54+
)
55+
if not is_commit_task_running:
56+
msg: typing.Final = "Committer main task is not running"
57+
raise CommitterIsDeadError(msg)
58+
59+
def _flush_tasks_queue(self) -> list[KafkaCommitTask]:
60+
tasks_to_return: typing.Final[list[KafkaCommitTask]] = []
61+
while not self._messages_queue.empty():
62+
tasks_to_return.append(self._messages_queue.get_nowait())
63+
return tasks_to_return
64+
65+
async def _populate_commit_batch(self) -> tuple[list[KafkaCommitTask], bool]:
66+
uncommited_tasks: typing.Final[list[KafkaCommitTask]] = []
67+
should_shutdown = False
68+
queue_get_task: asyncio.Task[typing.Any] | None = None
69+
flush_wait_task: asyncio.Task[typing.Any] | None = None
70+
timeout_task: asyncio.Task[typing.Any] | None = None
71+
try:
72+
timeout_task = asyncio.create_task(asyncio.sleep(self._commit_batch_timeout_sec))
73+
while len(uncommited_tasks) < self._commit_batch_size:
74+
queue_get_task = asyncio.create_task(self._messages_queue.get())
75+
flush_wait_task = asyncio.create_task(self._flush_batch_event.wait())
76+
await asyncio.wait([queue_get_task, flush_wait_task, timeout_task], return_when=asyncio.FIRST_COMPLETED)
77+
78+
if queue_get_task.done():
79+
uncommited_tasks.append(queue_get_task.result())
80+
else:
81+
queue_get_task.cancel()
82+
83+
# commit_all is called
84+
if flush_wait_task.done():
85+
queue_get_task.cancel()
86+
uncommited_tasks.extend(self._flush_tasks_queue())
87+
self._flush_batch_event.clear()
88+
timeout_task.cancel()
89+
should_shutdown = True
90+
break
91+
flush_wait_task.cancel()
92+
93+
if timeout_task.done():
94+
logger.debug("Timeout exceeded, batch contains %s elements", len(uncommited_tasks))
95+
break
96+
97+
logger.debug("Batch condition reached with %s elements", len(uncommited_tasks))
98+
except asyncio.CancelledError:
99+
should_shutdown = True
100+
uncommited_tasks.extend(self._flush_tasks_queue())
101+
102+
for task in (queue_get_task, flush_wait_task, timeout_task):
103+
task and task.cancel()
104+
105+
return uncommited_tasks, should_shutdown
106+
107+
async def _call_committer(
108+
self, tasks_batch: list[KafkaCommitTask], partitions_to_offsets: dict[TopicPartition, int]
109+
) -> bool:
110+
if not partitions_to_offsets:
111+
return True
112+
commit_succeeded = True
113+
consumer: typing.Final[AIOKafkaConsumer] = tasks_batch[0].consumer
114+
try:
115+
await consumer.commit(partitions_to_offsets)
116+
except Exception as exc:
117+
commit_succeeded = False
118+
logger.exception("Error during commit to kafka", exc_info=exc)
119+
for task in tasks_batch:
120+
await self._messages_queue.put(task)
121+
return commit_succeeded
122+
123+
async def _commit_tasks_batch(self, tasks_batch: list[KafkaCommitTask]) -> bool:
124+
partitions_to_tasks: typing.Final = itertools.groupby(
125+
sorted(tasks_batch, key=lambda x: x.topic_partition), lambda x: x.topic_partition
126+
)
127+
128+
results: typing.Final = await asyncio.gather(
129+
*[task.asyncio_task for task in tasks_batch], return_exceptions=True
130+
)
131+
for result in results:
132+
if isinstance(result, BaseException):
133+
logger.error("Task has finished with an exception", exc_info=result)
134+
135+
partitions_to_offsets: typing.Final[dict[TopicPartition, int]] = {}
136+
partition: TopicPartition
137+
tasks: typing.Iterator[KafkaCommitTask]
138+
for partition, tasks in partitions_to_tasks:
139+
max_message_offset: int | None = None
140+
for task in tasks:
141+
if max_message_offset is None or task.offset > max_message_offset:
142+
max_message_offset = task.offset
143+
144+
if max_message_offset is not None:
145+
partitions_to_offsets[partition] = max_message_offset + 1
146+
147+
commit_succeeded: typing.Final = await self._call_committer(tasks_batch, partitions_to_offsets)
148+
for _ in tasks_batch:
149+
self._messages_queue.task_done()
150+
return commit_succeeded
151+
152+
async def _run_commit_process(self) -> None:
153+
should_shutdown = False
154+
while not should_shutdown:
155+
commit_batch, should_shutdown = await self._populate_commit_batch()
156+
if commit_batch:
157+
await self._commit_tasks_batch(commit_batch)
158+
159+
async def commit_all(self) -> None:
160+
"""Commit all without shutting down the main process."""
161+
self._flush_batch_event.set()
162+
await self._messages_queue.join()
163+
164+
async def send_task(self, new_task: KafkaCommitTask) -> None:
165+
self._check_is_commit_task_running()
166+
await self._messages_queue.put(
167+
new_task,
168+
)
169+
170+
def spawn(self) -> None:
171+
with self._spawn_lock:
172+
if not self._asyncio_commit_process_task:
173+
self._asyncio_commit_process_task = asyncio.create_task(self._run_commit_process())
174+
else:
175+
logger.error("Committer main task already running")
176+
177+
async def close(self) -> None:
178+
"""Close committer."""
179+
if not self._asyncio_commit_process_task:
180+
logger.error("Committer main task is not running, cannot close committer properly")
181+
return
182+
183+
self._flush_batch_event.set()
184+
try:
185+
await asyncio.wait_for(self._asyncio_commit_process_task, timeout=self._shutdown_timeout)
186+
except TimeoutError:
187+
logger.exception("Committer main task shutdown timed out, forcing cancellation")
188+
self._asyncio_commit_process_task.cancel()
189+
with contextlib.suppress(asyncio.CancelledError):
190+
await self._asyncio_commit_process_task
191+
except Exception as exc:
192+
logger.exception("Committer task failed during shutdown", exc_info=exc)
193+
raise
194+
195+
@property
196+
def is_healthy(self) -> bool:
197+
return self._asyncio_commit_process_task is not None and not self._asyncio_commit_process_task.done()

0 commit comments

Comments
 (0)