This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
just install # lock + sync all deps (run after pulling or changing pyproject.toml)
just lint # eof-fixer, ruff format, ruff check --fix, ty check
just lint-ci # same but no auto-fix (used in CI)
just build # build the application Docker image
just test # run all tests in Docker (starts Redpanda, runs pytest, tears down)
just test-branch # same with branch coverage
just down # tear down all containers
just publish # bump version to $GITHUB_REF_NAME, build, publish to PyPIRun a single test file or test by name:
uv run --no-sync pytest tests/test_kafka_committer.py
uv run --no-sync pytest -k test_committer_logs_task_exceptionsThe library provides concurrent Kafka message processing for FastStream. Three modules are exposed:
processing.py — KafkaConcurrentHandler (singleton)
The core engine. Implements the singleton pattern (one instance per process) using __new__ + threading.Lock. Manages:
- An
asyncio.Semaphorefor concurrency limiting (concurrency_limit=0disables it) - A set of in-flight
asyncio.Tasks - A background observer task that periodically calls
_check_tasks_health()to discard stale completed tasks - Signal handlers (SIGTERM/SIGINT/SIGQUIT) that trigger graceful shutdown
- Optional integration with
KafkaBatchCommitterwhenenable_batch_commit=True
Key design: handle_task() fires-and-forgets coroutines as asyncio tasks. Message filtering by consumer group is done via the topic_group header — messages whose header doesn't match the consumer's _group_id are skipped.
middleware.py — FastStream middleware + lifecycle functions
KafkaConcurrentProcessingMiddleware: FastStreamBaseMiddlewaresubclass. Itsconsume_scopewraps each incoming message in a task submitted toKafkaConcurrentHandler(retrieved from FastStream's context).initialize_concurrent_processing(context, ...): call on app startup to create and start the handler, storing it in FastStream's global context.stop_concurrent_processing(context): call on app shutdown; resets the singleton so it can be re-initialized (important for tests).
healthcheck.py — is_kafka_handler_healthy
A single function that accepts a ContextRepo and returns True if the KafkaConcurrentHandler is present and healthy. Intended for readiness/liveness probes.
batch_committer.py — KafkaBatchCommitter
Runs as a background asyncio task (spawned via spawn()). Collects KafkaCommitTask objects from a queue, batches them by topic-partition, waits for each task's asyncio future to complete, then commits the max offset per partition to Kafka. Batching is triggered by timeout or batch size, whichever comes first. CommitterIsDeadError is raised to callers if the committer's main task has died.
- Singleton reset in tests:
KafkaConcurrentHandler._initialized = Falseand._instance = Nonemust be reset between tests. The sharedautousereset_singletonfixture lives intests/conftest.py— do not re-define it in individual test files. - Type suppression: use
# ty: ignore[rule-name](not# type: ignore) for ty type checker suppressions. - No
from __future__ import annotations: annotations are evaluated eagerly;typing.Self/typing.Neverare used directly (requires Python ≥ 3.11).
tests/test_integration.py runs against a real Redpanda container (Kafka-compatible, lightweight) via testcontainers[kafka]. The container is session-scoped — one instance for the whole test run.
Running integration tests requires Docker — they run automatically as part of just test.
Key findings from building these tests:
async with KafkaBroker():only callsconnect(), which sets up the producer. It does not start subscribers. You must also callawait broker.start()explicitly to launch the consumer poll tasks.- Always use
auto_offset_reset="earliest"on test subscribers. The default"latest"causes the consumer to miss messages published before it gets its partition assignment. - Pre-create topics with
AIOKafkaAdminClientbefore starting the broker. Auto-creation on first publish triggers aNotLeaderForPartitionErrorretry loop that can outlast short sleeps. - After
await broker.start(), sleep ~1.5 s before publishing to let the consumer join the group and receive partition assignments. AsgiFastStreamlifespan tests must useasync with app.start_lifespan_context()— callingapp.start()/app.stop()bypasses thelifespancontext manager entirely.AsgiFastStreaminjects its own app-levelContextRepointo the lifespan, separate frombroker.context. Passbroker.contextexplicitly toinitialize_concurrent_processingandstop_concurrent_processing.- Subscriber-level
middlewareson@broker.subscriber(...)takesSubscriberMiddleware(a plain(call_next, msg)callable), notBaseMiddlewaresubclasses. To scopeKafkaConcurrentProcessingMiddlewareto a subset of subscribers, useKafkaRouter(middlewares=[KafkaConcurrentProcessingMiddleware])andbroker.include_router(router).