Skip to content

Commit 3366a1f

Browse files
committed
moved init stuff to DI
1 parent f2307de commit 3366a1f

59 files changed

Lines changed: 2901 additions & 5831 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

backend/app/api/routes/sse.py

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,7 @@
33
from fastapi import APIRouter, Request
44
from sse_starlette.sse import EventSourceResponse
55

6-
from app.domain.sse import SSEHealthDomain
7-
from app.schemas_pydantic.sse import (
8-
ShutdownStatusResponse,
9-
SSEExecutionEventData,
10-
SSEHealthResponse,
11-
SSENotificationEventData,
12-
)
6+
from app.schemas_pydantic.sse import SSEExecutionEventData, SSENotificationEventData
137
from app.services.auth_service import AuthService
148
from app.services.sse.sse_service import SSEService
159

@@ -38,24 +32,3 @@ async def execution_events(
3832
return EventSourceResponse(
3933
sse_service.create_execution_stream(execution_id=execution_id, user_id=current_user.user_id)
4034
)
41-
42-
43-
@router.get("/health", response_model=SSEHealthResponse)
44-
async def sse_health(
45-
request: Request,
46-
sse_service: FromDishka[SSEService],
47-
auth_service: FromDishka[AuthService],
48-
) -> SSEHealthResponse:
49-
"""Get SSE service health status."""
50-
_ = await auth_service.get_current_user(request)
51-
domain: SSEHealthDomain = await sse_service.get_health_status()
52-
return SSEHealthResponse(
53-
status=domain.status,
54-
kafka_enabled=domain.kafka_enabled,
55-
active_connections=domain.active_connections,
56-
active_executions=domain.active_executions,
57-
active_consumers=domain.active_consumers,
58-
max_connections_per_user=domain.max_connections_per_user,
59-
shutdown=ShutdownStatusResponse(**vars(domain.shutdown)),
60-
timestamp=domain.timestamp,
61-
)

backend/app/core/dishka_lifespan.py

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import asyncio
22
import logging
3-
from contextlib import AsyncExitStack, asynccontextmanager
3+
from contextlib import asynccontextmanager
44
from typing import AsyncGenerator
55

66
import redis.asyncio as redis
@@ -13,7 +13,6 @@
1313
from app.core.startup import initialize_rate_limits
1414
from app.core.tracing import init_tracing
1515
from app.db.docs import ALL_DOCUMENTS
16-
from app.events.event_store_consumer import EventStoreConsumer
1716
from app.events.schema.schema_registry import SchemaRegistryManager, initialize_event_schemas
1817
from app.services.notification_service import NotificationService
1918
from app.services.sse.kafka_redis_bridge import SSEKafkaRedisBridge
@@ -74,43 +73,23 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
7473
extra={"testing": settings.TESTING, "enable_tracing": settings.ENABLE_TRACING},
7574
)
7675

77-
# Phase 1: Resolve all DI dependencies in parallel
78-
(
79-
schema_registry,
80-
database,
81-
redis_client,
82-
rate_limit_metrics,
83-
sse_bridge,
84-
event_store_consumer,
85-
notification_service,
86-
) = await asyncio.gather(
76+
# Resolve DI dependencies in parallel (fail fast on config issues)
77+
schema_registry, database, redis_client, rate_limit_metrics, _, _ = await asyncio.gather(
8778
container.get(SchemaRegistryManager),
8879
container.get(Database),
8980
container.get(redis.Redis),
9081
container.get(RateLimitMetrics),
9182
container.get(SSEKafkaRedisBridge),
92-
container.get(EventStoreConsumer),
9383
container.get(NotificationService),
9484
)
9585

96-
# Phase 2: Initialize infrastructure in parallel (independent subsystems)
86+
# Initialize infrastructure in parallel
9787
await asyncio.gather(
9888
initialize_event_schemas(schema_registry),
9989
init_beanie(database=database, document_models=ALL_DOCUMENTS),
10090
initialize_rate_limits(redis_client, settings, logger, rate_limit_metrics),
10191
)
10292
logger.info("Infrastructure initialized (schemas, beanie, rate limits)")
10393

104-
# Phase 3: Start Kafka consumers in parallel (providers already started them via async with,
105-
# but __aenter__ is idempotent so this is safe and explicit)
106-
async with AsyncExitStack() as stack:
107-
stack.push_async_callback(sse_bridge.aclose)
108-
stack.push_async_callback(event_store_consumer.aclose)
109-
stack.push_async_callback(notification_service.aclose)
110-
await asyncio.gather(
111-
sse_bridge.__aenter__(),
112-
event_store_consumer.__aenter__(),
113-
notification_service.__aenter__(),
114-
)
115-
logger.info("SSE bridge, EventStoreConsumer, and NotificationService started")
116-
yield
94+
yield
95+
# Container close handles all cleanup automatically

backend/app/core/lifecycle.py

Lines changed: 0 additions & 62 deletions
This file was deleted.

0 commit comments

Comments
 (0)