Skip to content

Commit 315b8e5

Browse files
committed
chore: removed deadcode, extracted copypasted code into separate funcs, updated docs
1 parent 666d9e1 commit 315b8e5

18 files changed

Lines changed: 409 additions & 461 deletions

backend/app/db/docs/saga.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,7 @@
1111

1212

1313
class SagaDocument(Document):
14-
"""Domain model for saga stored in database.
15-
16-
Copied from Saga/SagaInstance dataclass.
17-
"""
14+
"""Domain model for saga stored in database."""
1815

1916
saga_id: Indexed(str, unique=True) = Field(default_factory=lambda: str(uuid4())) # type: ignore[valid-type]
2017
saga_name: Indexed(str) # type: ignore[valid-type]

backend/app/domain/events/typed.py

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,12 @@ class BaseEvent(BaseModel):
5858
metadata: EventMetadata
5959

6060

61-
# --- Execution Events ---
61+
# --- Execution Spec (shared fields between ExecutionRequestedEvent and CreatePodCommandEvent) ---
6262

6363

64-
class ExecutionRequestedEvent(BaseEvent):
65-
event_type: Literal[EventType.EXECUTION_REQUESTED] = EventType.EXECUTION_REQUESTED
64+
class ExecutionSpec(BaseModel):
65+
"""Shared execution specification fields (mixin for ExecutionRequestedEvent and CreatePodCommandEvent)."""
66+
6667
execution_id: str
6768
script: str
6869
language: str
@@ -78,6 +79,13 @@ class ExecutionRequestedEvent(BaseEvent):
7879
priority: QueuePriority = QueuePriority.NORMAL
7980

8081

82+
# --- Execution Events ---
83+
84+
85+
class ExecutionRequestedEvent(BaseEvent, ExecutionSpec):
86+
event_type: Literal[EventType.EXECUTION_REQUESTED] = EventType.EXECUTION_REQUESTED
87+
88+
8189
class ExecutionAcceptedEvent(BaseEvent):
8290
event_type: Literal[EventType.EXECUTION_ACCEPTED] = EventType.EXECUTION_ACCEPTED
8391
execution_id: str
@@ -413,22 +421,10 @@ class SagaCompensatedEvent(BaseEvent):
413421
# --- Saga Command Events ---
414422

415423

416-
class CreatePodCommandEvent(BaseEvent):
424+
class CreatePodCommandEvent(BaseEvent, ExecutionSpec):
417425
event_type: Literal[EventType.CREATE_POD_COMMAND] = EventType.CREATE_POD_COMMAND
418426
saga_id: str
419-
execution_id: str
420-
script: str
421-
language: str
422-
language_version: str
423-
runtime_image: str
424427
runtime_command: list[str] = Field(default_factory=list)
425-
runtime_filename: str
426-
timeout_seconds: int
427-
cpu_limit: str
428-
memory_limit: str
429-
cpu_request: str
430-
memory_request: str
431-
priority: QueuePriority = QueuePriority.NORMAL
432428

433429

434430
class DeletePodCommandEvent(BaseEvent):

backend/app/domain/saga/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
SagaConfig,
1414
SagaContextData,
1515
SagaFilter,
16-
SagaInstance,
1716
SagaListResult,
1817
SagaQuery,
1918
)
@@ -25,7 +24,6 @@
2524
"SagaCancellationResult",
2625
"SagaConfig",
2726
"SagaContextData",
28-
"SagaInstance",
2927
"SagaFilter",
3028
"SagaListResult",
3129
"SagaQuery",

backend/app/domain/saga/models.py

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -92,25 +92,6 @@ class SagaConfig:
9292
publish_commands: bool = True
9393

9494

95-
@dataclass
96-
class SagaInstance:
97-
"""Runtime instance of a saga execution (domain)."""
98-
99-
saga_name: str
100-
execution_id: str
101-
state: SagaState = SagaState.CREATED
102-
saga_id: str = field(default_factory=lambda: str(uuid4()))
103-
current_step: str | None = None
104-
completed_steps: list[str] = field(default_factory=list)
105-
compensated_steps: list[str] = field(default_factory=list)
106-
context_data: SagaContextData = field(default_factory=SagaContextData)
107-
error_message: str | None = None
108-
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
109-
updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
110-
completed_at: datetime | None = None
111-
retry_count: int = 0
112-
113-
11495
@dataclass
11596
class SagaCancellationResult:
11697
"""Domain result for saga cancellation operations."""

backend/app/services/event_replay/replay_service.py

Lines changed: 36 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -183,19 +183,45 @@ async def cleanup_old_sessions(self, older_than_hours: int = 24) -> CleanupResul
183183
self.logger.info("Cleaned up old replay sessions", removed_count=total_removed)
184184
return CleanupResult(removed_sessions=total_removed, message=f"Removed {total_removed} old sessions")
185185

186+
async def _load_next_event(self, session: ReplaySessionState) -> DomainEvent | None:
187+
"""Pop the next event from the buffer, loading a new batch if needed."""
188+
event = self._pop_next_event(session.session_id)
189+
if event is not None:
190+
return event
191+
if not await self._load_next_batch(session.session_id):
192+
return None
193+
return self._pop_next_event(session.session_id)
194+
195+
def _calculate_replay_delay(self, session: ReplaySessionState) -> float:
196+
"""Calculate the delay before dispatching the next event based on speed multiplier."""
197+
next_event = self._peek_next_event(session.session_id)
198+
if next_event and session.last_event_at and session.config.speed_multiplier < 100:
199+
time_diff = (next_event.timestamp - session.last_event_at).total_seconds()
200+
return max(time_diff / session.config.speed_multiplier, 0)
201+
return 0.0
202+
203+
def _reschedule_dispatch(self, session: ReplaySessionState, delay: float) -> None:
204+
"""Schedule the next _dispatch_next call if the session is still running."""
205+
scheduler = self._schedulers.get(session.session_id)
206+
if scheduler and scheduler.running and session.status == ReplayStatus.RUNNING:
207+
scheduler.add_job(
208+
self._dispatch_next,
209+
trigger="date",
210+
run_date=datetime.now(timezone.utc) + timedelta(seconds=delay),
211+
args=[session],
212+
id=f"dispatch_{session.session_id}",
213+
replace_existing=True,
214+
misfire_grace_time=None,
215+
)
216+
186217
async def _dispatch_next(self, session: ReplaySessionState) -> None:
187218
if session.status != ReplayStatus.RUNNING:
188219
return
189220

190-
event = self._pop_next_event(session.session_id)
221+
event = await self._load_next_event(session)
191222
if event is None:
192-
if not await self._load_next_batch(session.session_id):
193-
await self._finalize_session(session, ReplayStatus.COMPLETED)
194-
return
195-
event = self._pop_next_event(session.session_id)
196-
if event is None:
197-
await self._finalize_session(session, ReplayStatus.COMPLETED)
198-
return
223+
await self._finalize_session(session, ReplayStatus.COMPLETED)
224+
return
199225

200226
buf = self._event_buffers.get(session.session_id, [])
201227
idx = self._buffer_indices.get(session.session_id, 0)
@@ -229,26 +255,10 @@ async def _dispatch_next(self, session: ReplaySessionState) -> None:
229255
session.last_event_at = event.timestamp
230256
await self._update_session_in_db(session)
231257

232-
next_event = self._peek_next_event(session.session_id)
233-
delay = 0.0
234-
if next_event and session.last_event_at and session.config.speed_multiplier < 100:
235-
time_diff = (next_event.timestamp - session.last_event_at).total_seconds()
236-
delay = max(time_diff / session.config.speed_multiplier, 0)
237-
258+
delay = self._calculate_replay_delay(session)
238259
if delay > 0:
239260
self._metrics.record_delay_applied(delay)
240-
241-
scheduler = self._schedulers.get(session.session_id)
242-
if scheduler and scheduler.running and session.status == ReplayStatus.RUNNING:
243-
scheduler.add_job(
244-
self._dispatch_next,
245-
trigger="date",
246-
run_date=datetime.now(timezone.utc) + timedelta(seconds=delay),
247-
args=[session],
248-
id=f"dispatch_{session.session_id}",
249-
replace_existing=True,
250-
misfire_grace_time=None,
251-
)
261+
self._reschedule_dispatch(session, delay)
252262

253263
def _pop_next_event(self, session_id: str) -> DomainEvent | None:
254264
idx = self._buffer_indices.get(session_id, 0)

backend/app/services/notification_service.py

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,38 @@ def __init__(
122122
}
123123
# --8<-- [end:channel_handlers]
124124

125+
def _validate_scheduled_time(self, scheduled_for: datetime) -> None:
126+
"""Validate that scheduled_for is in the future and within the max schedule window."""
127+
if scheduled_for < datetime.now(UTC):
128+
raise NotificationValidationError("scheduled_for must be in the future")
129+
max_days = self.settings.NOTIF_MAX_SCHEDULE_DAYS
130+
max_schedule = datetime.now(UTC) + timedelta(days=max_days)
131+
if scheduled_for > max_schedule:
132+
raise NotificationValidationError(f"scheduled_for cannot exceed {max_days} days from now")
133+
134+
async def _check_throttle(self, user_id: str, severity: NotificationSeverity, source: str) -> None:
135+
"""Check throttle and raise NotificationThrottledError if rate limit exceeded."""
136+
if self.settings.ENVIRONMENT == "test":
137+
return
138+
throttled = await self._throttle_cache.check_throttle(
139+
user_id,
140+
severity,
141+
window_hours=self.settings.NOTIF_THROTTLE_WINDOW_HOURS,
142+
max_per_hour=self.settings.NOTIF_THROTTLE_MAX_PER_HOUR,
143+
)
144+
if throttled:
145+
self.logger.warning(
146+
f"Notification rate limit exceeded for user {user_id}. "
147+
f"Max {self.settings.NOTIF_THROTTLE_MAX_PER_HOUR} "
148+
f"per {self.settings.NOTIF_THROTTLE_WINDOW_HOURS} hour(s)"
149+
)
150+
self.metrics.record_notification_throttled(source)
151+
raise NotificationThrottledError(
152+
user_id,
153+
self.settings.NOTIF_THROTTLE_MAX_PER_HOUR,
154+
self.settings.NOTIF_THROTTLE_WINDOW_HOURS,
155+
)
156+
125157
async def create_notification(
126158
self,
127159
user_id: str,
@@ -137,14 +169,7 @@ async def create_notification(
137169
if not tags:
138170
raise NotificationValidationError("tags must be a non-empty list")
139171
if scheduled_for is not None:
140-
if scheduled_for < datetime.now(UTC):
141-
raise NotificationValidationError("scheduled_for must be in the future")
142-
max_days = self.settings.NOTIF_MAX_SCHEDULE_DAYS
143-
max_schedule = datetime.now(UTC) + timedelta(days=max_days)
144-
if scheduled_for > max_schedule:
145-
raise NotificationValidationError(
146-
f"scheduled_for cannot exceed {max_days} days from now"
147-
)
172+
self._validate_scheduled_time(scheduled_for)
148173
self.logger.info(
149174
f"Creating notification for user {user_id}",
150175
user_id=user_id,
@@ -154,25 +179,7 @@ async def create_notification(
154179
scheduled=scheduled_for is not None,
155180
)
156181

157-
# Check throttling
158-
if self.settings.ENVIRONMENT != "test" and await self._throttle_cache.check_throttle(
159-
user_id,
160-
severity,
161-
window_hours=self.settings.NOTIF_THROTTLE_WINDOW_HOURS,
162-
max_per_hour=self.settings.NOTIF_THROTTLE_MAX_PER_HOUR,
163-
):
164-
error_msg = (
165-
f"Notification rate limit exceeded for user {user_id}. "
166-
f"Max {self.settings.NOTIF_THROTTLE_MAX_PER_HOUR} "
167-
f"per {self.settings.NOTIF_THROTTLE_WINDOW_HOURS} hour(s)"
168-
)
169-
self.logger.warning(error_msg)
170-
self.metrics.record_notification_throttled("general")
171-
raise NotificationThrottledError(
172-
user_id,
173-
self.settings.NOTIF_THROTTLE_MAX_PER_HOUR,
174-
self.settings.NOTIF_THROTTLE_WINDOW_HOURS,
175-
)
182+
await self._check_throttle(user_id, severity, "general")
176183

177184
# Create notification
178185
create_data = DomainNotificationCreate(
@@ -290,13 +297,9 @@ async def _create_system_for_user(
290297
) -> str:
291298
try:
292299
if not cfg.throttle_exempt:
293-
throttled = await self._throttle_cache.check_throttle(
294-
user_id,
295-
cfg.severity,
296-
window_hours=self.settings.NOTIF_THROTTLE_WINDOW_HOURS,
297-
max_per_hour=self.settings.NOTIF_THROTTLE_MAX_PER_HOUR,
298-
)
299-
if throttled:
300+
try:
301+
await self._check_throttle(user_id, cfg.severity, "system")
302+
except NotificationThrottledError:
300303
return "throttled"
301304

302305
await self.create_notification(

backend/app/services/saga/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from app.domain.enums import SagaState
2-
from app.domain.saga import SagaConfig, SagaInstance
2+
from app.domain.saga import SagaConfig
33
from app.services.saga.execution_saga import (
44
AllocateResourcesStep,
55
CreatePodStep,
@@ -17,7 +17,6 @@
1717
"SagaService",
1818
"SagaConfig",
1919
"SagaState",
20-
"SagaInstance",
2120
"SagaContext",
2221
"SagaStep",
2322
"CompensationStep",

backend/workers/bootstrap.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import asyncio
2+
from collections.abc import Awaitable, Callable
3+
from typing import Any
4+
5+
import structlog
6+
from app.core.logging import setup_log_exporter, setup_logger
7+
from app.db.docs import ALL_DOCUMENTS
8+
from app.settings import Settings
9+
from beanie import init_beanie
10+
from dishka import AsyncContainer
11+
from dishka.integrations.faststream import setup_dishka
12+
from faststream import FastStream
13+
from faststream.kafka import KafkaBroker
14+
from pymongo import AsyncMongoClient
15+
16+
17+
def run_worker(
18+
worker_name: str,
19+
config_override: str,
20+
container_factory: Callable[[Settings], AsyncContainer],
21+
register_handlers: Callable[[KafkaBroker], None] | None = None,
22+
on_startup: Callable[[AsyncContainer, KafkaBroker, structlog.stdlib.BoundLogger], Awaitable[None]] | None = None,
23+
on_shutdown: Callable[[], Awaitable[None]] | None = None,
24+
) -> None:
25+
"""Boot a worker with standardised init sequence.
26+
27+
Parameters
28+
----------
29+
worker_name:
30+
Human-readable name used in log messages.
31+
config_override:
32+
TOML filename passed to ``Settings(override_path=...)``.
33+
container_factory:
34+
Dishka container factory — receives ``Settings``, returns ``AsyncContainer``.
35+
register_handlers:
36+
Optional callback to register ``@broker.subscriber`` handlers **before**
37+
``setup_dishka`` is called (required for subscriber auto-injection).
38+
on_startup:
39+
Optional async callback executed **after** the broker is ready.
40+
Receives ``(container, broker, logger)`` so it can resolve services
41+
and wire up APScheduler jobs, K8s setup, etc.
42+
on_shutdown:
43+
Optional async callback executed on FastStream shutdown **before**
44+
the container is closed. Use for scheduler teardown etc.
45+
"""
46+
settings = Settings(override_path=config_override)
47+
48+
logger = setup_logger(settings.LOG_LEVEL)
49+
setup_log_exporter(settings, logger)
50+
51+
logger.info(f"Starting {worker_name}...")
52+
53+
async def _run() -> None:
54+
client: AsyncMongoClient[dict[str, Any]] = AsyncMongoClient(settings.MONGODB_URL, tz_aware=True)
55+
await init_beanie(
56+
database=client.get_default_database(default=settings.DATABASE_NAME),
57+
document_models=ALL_DOCUMENTS,
58+
)
59+
logger.info("MongoDB initialized via Beanie")
60+
61+
container = container_factory(settings)
62+
63+
broker: KafkaBroker = await container.get(KafkaBroker)
64+
65+
if register_handlers is not None:
66+
register_handlers(broker)
67+
setup_dishka(container, broker=broker, auto_inject=True)
68+
69+
startup_hooks: list[Callable[[], Awaitable[None]]] = []
70+
shutdown_hooks: list[Callable[[], Awaitable[None]]] = []
71+
72+
if on_startup is not None:
73+
startup_hooks.append(lambda: on_startup(container, broker, logger))
74+
75+
if on_shutdown is not None:
76+
shutdown_hooks.append(on_shutdown)
77+
shutdown_hooks.append(container.close)
78+
79+
app = FastStream(broker, on_startup=startup_hooks, on_shutdown=shutdown_hooks)
80+
await app.run()
81+
logger.info(f"{worker_name} shutdown complete")
82+
83+
asyncio.run(_run())

0 commit comments

Comments
 (0)