Skip to content

Commit 99f2c0f

Browse files
committed
chore: streamlined sse service, no sse bus
1 parent 659b46f commit 99f2c0f

15 files changed

Lines changed: 397 additions & 462 deletions

File tree

backend/app/core/providers.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
from app.services.runtime_settings import RuntimeSettingsLoader
6464
from app.services.saga import SagaOrchestrator, SagaService
6565
from app.services.saved_script_service import SavedScriptService
66-
from app.services.sse import SSERedisBus, SSEService
66+
from app.services.sse import SSEService
6767
from app.services.user_settings_service import UserSettingsService
6868
from app.settings import Settings
6969

@@ -131,6 +131,7 @@ async def get_redis_client(
131131
decode_responses=settings.REDIS_DECODE_RESPONSES,
132132
socket_connect_timeout=5,
133133
socket_timeout=5,
134+
socket_keepalive=True,
134135
)
135136
# Test connection
136137
await client.ping() # type: ignore[misc] # redis-py returns Awaitable[bool] | bool
@@ -351,22 +352,19 @@ class SSEProvider(Provider):
351352
scope = Scope.APP
352353

353354
@provide
354-
def get_sse_redis_bus(
355+
def get_sse_service(
355356
self,
356357
redis_client: redis.Redis,
358+
execution_repository: ExecutionRepository,
357359
logger: structlog.stdlib.BoundLogger,
358360
connection_metrics: ConnectionMetrics,
359-
) -> SSERedisBus:
360-
return SSERedisBus(redis_client, logger, connection_metrics)
361-
362-
@provide
363-
def get_sse_service(
364-
self,
365-
bus: SSERedisBus,
366-
execution_repository: ExecutionRepository,
367-
logger: structlog.stdlib.BoundLogger,
368361
) -> SSEService:
369-
return SSEService(bus=bus, execution_repository=execution_repository, logger=logger)
362+
return SSEService(
363+
redis_client=redis_client,
364+
execution_repository=execution_repository,
365+
logger=logger,
366+
connection_metrics=connection_metrics,
367+
)
370368

371369

372370
class AuthProvider(Provider):
@@ -483,15 +481,15 @@ def get_notification_service(
483481
self,
484482
notification_repository: NotificationRepository,
485483
kafka_event_service: KafkaEventService,
486-
sse_redis_bus: SSERedisBus,
484+
sse_service: SSEService,
487485
settings: Settings,
488486
logger: structlog.stdlib.BoundLogger,
489487
notification_metrics: NotificationMetrics,
490488
) -> NotificationService:
491489
return NotificationService(
492490
notification_repository=notification_repository,
493491
event_service=kafka_event_service,
494-
sse_bus=sse_redis_bus,
492+
sse_service=sse_service,
495493
settings=settings,
496494
logger=logger,
497495
notification_metrics=notification_metrics,
@@ -731,12 +729,12 @@ def get_event_replay_service(
731729
kafka_producer: UnifiedProducer,
732730
replay_metrics: ReplayMetrics,
733731
logger: structlog.stdlib.BoundLogger,
734-
sse_bus: SSERedisBus,
732+
sse_service: SSEService,
735733
) -> EventReplayService:
736734
return EventReplayService(
737735
repository=replay_repository,
738736
producer=kafka_producer,
739737
replay_metrics=replay_metrics,
740738
logger=logger,
741-
sse_bus=sse_bus,
739+
sse_service=sse_service,
742740
)

backend/app/events/handlers.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from app.services.notification_service import NotificationService
2626
from app.services.result_processor import ResultProcessor
2727
from app.services.saga import SagaOrchestrator
28-
from app.services.sse import SSERedisBus
28+
from app.services.sse import SSEService
2929
from app.settings import Settings
3030

3131
_sse_field_names: frozenset[str] = frozenset(f.name for f in dataclasses.fields(SSEExecutionEventData))
@@ -261,14 +261,14 @@ def register_sse_subscriber(broker: KafkaBroker, settings: Settings) -> None:
261261
)
262262
async def on_sse_event(
263263
body: DomainEvent,
264-
sse_bus: FromDishka[SSERedisBus],
264+
sse_service: FromDishka[SSEService],
265265
) -> None:
266266
execution_id = getattr(body, "execution_id", None)
267267
if execution_id:
268268
sse_data = SSEExecutionEventData(**{
269269
k: v for k, v in body.model_dump().items() if k in _sse_field_names
270270
})
271-
await sse_bus.publish_event(execution_id, sse_data)
271+
await sse_service.publish_event(execution_id, sse_data)
272272

273273

274274
def register_notification_subscriber(broker: KafkaBroker) -> None:

backend/app/services/event_replay/replay_service.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
)
2828
from app.domain.sse import DomainReplaySSEPayload
2929
from app.events import UnifiedProducer
30-
from app.services.sse.redis_bus import SSERedisBus
30+
from app.services.sse import SSEService
3131

3232

3333
class EventReplayService:
@@ -37,7 +37,7 @@ def __init__(
3737
producer: UnifiedProducer,
3838
replay_metrics: ReplayMetrics,
3939
logger: structlog.stdlib.BoundLogger,
40-
sse_bus: SSERedisBus,
40+
sse_service: SSEService,
4141
) -> None:
4242
self._sessions: dict[str, ReplaySessionState] = {}
4343
self._schedulers: dict[str, AsyncIOScheduler] = {}
@@ -49,7 +49,7 @@ def __init__(
4949
self.logger = logger
5050
self._file_locks: dict[str, asyncio.Lock] = {}
5151
self._metrics = replay_metrics
52-
self._sse_bus = sse_bus
52+
self._sse_service = sse_service
5353

5454
async def create_session_from_config(self, config: ReplayConfig) -> ReplayOperationResult:
5555
try:
@@ -429,6 +429,6 @@ async def _publish_replay_status(self, session: ReplaySessionState) -> None:
429429
completed_at=session.completed_at,
430430
errors=session.errors,
431431
)
432-
await self._sse_bus.publish_replay_status(session.session_id, payload)
432+
await self._sse_service.publish_replay_status(session.session_id, payload)
433433
except Exception as e:
434434
self.logger.error("Failed to publish replay status to SSE", error=str(e))

backend/app/services/notification_service.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
)
3434
from app.domain.sse import DomainNotificationSSEPayload
3535
from app.services.kafka_event_service import KafkaEventService
36-
from app.services.sse import SSERedisBus
36+
from app.services.sse import SSEService
3737
from app.settings import Settings
3838

3939
# Constants
@@ -100,7 +100,7 @@ def __init__(
100100
self,
101101
notification_repository: NotificationRepository,
102102
event_service: KafkaEventService,
103-
sse_bus: SSERedisBus,
103+
sse_service: SSEService,
104104
settings: Settings,
105105
logger: structlog.stdlib.BoundLogger,
106106
notification_metrics: NotificationMetrics,
@@ -109,7 +109,7 @@ def __init__(
109109
self.event_service = event_service
110110
self.metrics = notification_metrics
111111
self.settings = settings
112-
self.sse_bus = sse_bus
112+
self.sse_service = sse_service
113113
self.logger = logger
114114
self._throttle_cache = ThrottleCache()
115115

@@ -578,7 +578,7 @@ async def _publish_notification_sse(self, notification: DomainNotification) -> N
578578
action_url=notification.action_url,
579579
created_at=notification.created_at,
580580
)
581-
await self.sse_bus.publish_notification(notification.user_id, payload)
581+
await self.sse_service.publish_notification(notification.user_id, payload)
582582

583583
# --8<-- [start:should_skip_notification]
584584
async def _should_skip_notification(
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from app.services.sse.redis_bus import SSERedisBus
21
from app.services.sse.sse_service import SSEService
32

4-
__all__ = ["SSERedisBus", "SSEService"]
3+
__all__ = ["SSEService"]

backend/app/services/sse/redis_bus.py

Lines changed: 0 additions & 98 deletions
This file was deleted.

0 commit comments

Comments
 (0)