Skip to content

Commit 9271f12

Browse files
committed
moved init stuff to DI
1 parent 5e71fbf commit 9271f12

60 files changed

Lines changed: 3027 additions & 5933 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

backend/app/api/routes/sse.py

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,7 @@
33
from fastapi import APIRouter, Request
44
from sse_starlette.sse import EventSourceResponse
55

6-
from app.domain.sse import SSEHealthDomain
7-
from app.schemas_pydantic.sse import (
8-
ShutdownStatusResponse,
9-
SSEExecutionEventData,
10-
SSEHealthResponse,
11-
SSENotificationEventData,
12-
)
6+
from app.schemas_pydantic.sse import SSEExecutionEventData, SSENotificationEventData
137
from app.services.auth_service import AuthService
148
from app.services.sse.sse_service import SSEService
159

@@ -38,24 +32,3 @@ async def execution_events(
3832
return EventSourceResponse(
3933
sse_service.create_execution_stream(execution_id=execution_id, user_id=current_user.user_id)
4034
)
41-
42-
43-
@router.get("/health", response_model=SSEHealthResponse)
44-
async def sse_health(
45-
request: Request,
46-
sse_service: FromDishka[SSEService],
47-
auth_service: FromDishka[AuthService],
48-
) -> SSEHealthResponse:
49-
"""Get SSE service health status."""
50-
_ = await auth_service.get_current_user(request)
51-
domain: SSEHealthDomain = await sse_service.get_health_status()
52-
return SSEHealthResponse(
53-
status=domain.status,
54-
kafka_enabled=domain.kafka_enabled,
55-
active_connections=domain.active_connections,
56-
active_executions=domain.active_executions,
57-
active_consumers=domain.active_consumers,
58-
max_connections_per_user=domain.max_connections_per_user,
59-
shutdown=ShutdownStatusResponse(**vars(domain.shutdown)),
60-
timestamp=domain.timestamp,
61-
)

backend/app/core/dishka_lifespan.py

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import asyncio
44
import logging
5-
from contextlib import AsyncExitStack, asynccontextmanager
5+
from contextlib import asynccontextmanager
66
from typing import AsyncGenerator
77

88
import redis.asyncio as redis
@@ -15,7 +15,6 @@
1515
from app.core.startup import initialize_rate_limits
1616
from app.core.tracing import init_tracing
1717
from app.db.docs import ALL_DOCUMENTS
18-
from app.events.event_store_consumer import EventStoreConsumer
1918
from app.events.schema.schema_registry import SchemaRegistryManager, initialize_event_schemas
2019
from app.services.notification_service import NotificationService
2120
from app.services.sse.kafka_redis_bridge import SSEKafkaRedisBridge
@@ -76,43 +75,23 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
7675
extra={"testing": settings.TESTING, "enable_tracing": settings.ENABLE_TRACING},
7776
)
7877

79-
# Phase 1: Resolve all DI dependencies in parallel
80-
(
81-
schema_registry,
82-
database,
83-
redis_client,
84-
rate_limit_metrics,
85-
sse_bridge,
86-
event_store_consumer,
87-
notification_service,
88-
) = await asyncio.gather(
78+
# Resolve DI dependencies in parallel (fail fast on config issues)
79+
schema_registry, database, redis_client, rate_limit_metrics, _, _ = await asyncio.gather(
8980
container.get(SchemaRegistryManager),
9081
container.get(Database),
9182
container.get(redis.Redis),
9283
container.get(RateLimitMetrics),
9384
container.get(SSEKafkaRedisBridge),
94-
container.get(EventStoreConsumer),
9585
container.get(NotificationService),
9686
)
9787

98-
# Phase 2: Initialize infrastructure in parallel (independent subsystems)
88+
# Initialize infrastructure in parallel
9989
await asyncio.gather(
10090
initialize_event_schemas(schema_registry),
10191
init_beanie(database=database, document_models=ALL_DOCUMENTS),
10292
initialize_rate_limits(redis_client, settings, logger, rate_limit_metrics),
10393
)
10494
logger.info("Infrastructure initialized (schemas, beanie, rate limits)")
10595

106-
# Phase 3: Start Kafka consumers in parallel (providers already started them via async with,
107-
# but __aenter__ is idempotent so this is safe and explicit)
108-
async with AsyncExitStack() as stack:
109-
stack.push_async_callback(sse_bridge.aclose)
110-
stack.push_async_callback(event_store_consumer.aclose)
111-
stack.push_async_callback(notification_service.aclose)
112-
await asyncio.gather(
113-
sse_bridge.__aenter__(),
114-
event_store_consumer.__aenter__(),
115-
notification_service.__aenter__(),
116-
)
117-
logger.info("SSE bridge, EventStoreConsumer, and NotificationService started")
118-
yield
96+
yield
97+
# Container close handles all cleanup automatically

backend/app/core/lifecycle.py

Lines changed: 0 additions & 62 deletions
This file was deleted.

0 commit comments

Comments
 (0)