Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions backend/app/db/docs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
from app.db.docs.saved_script import SavedScriptDocument
from app.db.docs.user import UserDocument
from app.db.docs.user_settings import (
EditorSettings,
NotificationSettings,
UserSettingsDocument,
UserSettingsSnapshotDocument,
)
Expand Down Expand Up @@ -60,8 +58,6 @@
# User Settings
"UserSettingsDocument",
"UserSettingsSnapshotDocument",
"NotificationSettings",
"EditorSettings",
# Saga
"SagaDocument",
# DLQ
Expand Down
5 changes: 1 addition & 4 deletions backend/app/db/docs/saga.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@


class SagaDocument(Document):
"""Domain model for saga stored in database.
Copied from Saga/SagaInstance dataclass.
"""
"""Domain model for saga stored in database."""

saga_id: Indexed(str, unique=True) = Field(default_factory=lambda: str(uuid4())) # type: ignore[valid-type]
saga_name: Indexed(str) # type: ignore[valid-type]
Expand Down
31 changes: 4 additions & 27 deletions backend/app/db/docs/user_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@
from typing import Any

from beanie import Document, Indexed
from pydantic import BaseModel, ConfigDict, Field, field_validator
from pydantic import BaseModel, ConfigDict, Field

from app.domain.enums import NotificationChannel, Theme


class NotificationSettings(BaseModel):
"""User notification preferences (embedded document).

Copied from user_settings.py NotificationSettings.
"""
"""User notification preferences (embedded document)."""

model_config = ConfigDict(from_attributes=True)

Expand All @@ -23,10 +20,7 @@ class NotificationSettings(BaseModel):


class EditorSettings(BaseModel):
"""Code editor preferences (embedded document).

Copied from user_settings.py EditorSettings.
"""
"""Code editor preferences (embedded document)."""

model_config = ConfigDict(from_attributes=True)

Expand All @@ -36,26 +30,9 @@ class EditorSettings(BaseModel):
word_wrap: bool = True
show_line_numbers: bool = True

Comment thread
HardMax71 marked this conversation as resolved.
@field_validator("font_size")
@classmethod
def validate_font_size(cls, v: int) -> int:
if v < 8 or v > 32:
raise ValueError("Font size must be between 8 and 32")
return v

@field_validator("tab_size")
@classmethod
def validate_tab_size(cls, v: int) -> int:
if v not in (2, 4, 8):
raise ValueError("Tab size must be 2, 4, or 8")
return v


class UserSettingsDocument(Document):
"""Complete user settings model.

Copied from UserSettings schema.
"""
"""Complete user settings model."""

user_id: Indexed(str, unique=True) # type: ignore[valid-type]
theme: Theme = Theme.AUTO
Expand Down
28 changes: 12 additions & 16 deletions backend/app/domain/events/typed.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ class BaseEvent(BaseModel):
metadata: EventMetadata


# --- Execution Events ---
# --- Execution Spec (shared fields between ExecutionRequestedEvent and CreatePodCommandEvent) ---


class ExecutionRequestedEvent(BaseEvent):
event_type: Literal[EventType.EXECUTION_REQUESTED] = EventType.EXECUTION_REQUESTED
class ExecutionSpec(BaseModel):
"""Shared execution specification fields (mixin for ExecutionRequestedEvent and CreatePodCommandEvent)."""

execution_id: str
script: str
language: str
Expand All @@ -78,6 +79,13 @@ class ExecutionRequestedEvent(BaseEvent):
priority: QueuePriority = QueuePriority.NORMAL


# --- Execution Events ---


class ExecutionRequestedEvent(BaseEvent, ExecutionSpec):
event_type: Literal[EventType.EXECUTION_REQUESTED] = EventType.EXECUTION_REQUESTED


class ExecutionAcceptedEvent(BaseEvent):
event_type: Literal[EventType.EXECUTION_ACCEPTED] = EventType.EXECUTION_ACCEPTED
execution_id: str
Expand Down Expand Up @@ -413,22 +421,10 @@ class SagaCompensatedEvent(BaseEvent):
# --- Saga Command Events ---


class CreatePodCommandEvent(BaseEvent):
class CreatePodCommandEvent(BaseEvent, ExecutionSpec):
event_type: Literal[EventType.CREATE_POD_COMMAND] = EventType.CREATE_POD_COMMAND
saga_id: str
execution_id: str
script: str
language: str
language_version: str
runtime_image: str
runtime_command: list[str] = Field(default_factory=list)
Comment thread
HardMax71 marked this conversation as resolved.
runtime_filename: str
timeout_seconds: int
cpu_limit: str
memory_limit: str
cpu_request: str
memory_request: str
priority: QueuePriority = QueuePriority.NORMAL


class DeletePodCommandEvent(BaseEvent):
Expand Down
2 changes: 0 additions & 2 deletions backend/app/domain/saga/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
SagaConfig,
SagaContextData,
SagaFilter,
SagaInstance,
SagaListResult,
SagaQuery,
)
Expand All @@ -25,7 +24,6 @@
"SagaCancellationResult",
"SagaConfig",
"SagaContextData",
"SagaInstance",
"SagaFilter",
"SagaListResult",
"SagaQuery",
Expand Down
19 changes: 0 additions & 19 deletions backend/app/domain/saga/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,25 +92,6 @@ class SagaConfig:
publish_commands: bool = True


@dataclass
class SagaInstance:
"""Runtime instance of a saga execution (domain)."""

saga_name: str
execution_id: str
state: SagaState = SagaState.CREATED
saga_id: str = field(default_factory=lambda: str(uuid4()))
current_step: str | None = None
completed_steps: list[str] = field(default_factory=list)
compensated_steps: list[str] = field(default_factory=list)
context_data: SagaContextData = field(default_factory=SagaContextData)
error_message: str | None = None
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
completed_at: datetime | None = None
retry_count: int = 0


@dataclass
class SagaCancellationResult:
"""Domain result for saga cancellation operations."""
Expand Down
16 changes: 1 addition & 15 deletions backend/app/schemas_pydantic/user_settings.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime, timezone
from typing import Any

from pydantic import BaseModel, ConfigDict, Field, field_validator
from pydantic import BaseModel, ConfigDict, Field
Comment thread
HardMax71 marked this conversation as resolved.

from app.domain.enums import EventType, NotificationChannel, Theme

Expand Down Expand Up @@ -29,20 +29,6 @@ class EditorSettings(BaseModel):
word_wrap: bool = True
show_line_numbers: bool = True

Comment thread
HardMax71 marked this conversation as resolved.
@field_validator("font_size")
@classmethod
def validate_font_size(cls, v: int) -> int:
if v < 8 or v > 32:
raise ValueError("Font size must be between 8 and 32")
return v

@field_validator("tab_size")
@classmethod
def validate_tab_size(cls, v: int) -> int:
if v not in (2, 4, 8):
raise ValueError("Tab size must be 2, 4, or 8")
return v


class UserSettings(BaseModel):
"""Complete user settings model"""
Expand Down
62 changes: 36 additions & 26 deletions backend/app/services/event_replay/replay_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,19 +183,45 @@ async def cleanup_old_sessions(self, older_than_hours: int = 24) -> CleanupResul
self.logger.info("Cleaned up old replay sessions", removed_count=total_removed)
return CleanupResult(removed_sessions=total_removed, message=f"Removed {total_removed} old sessions")

async def _load_next_event(self, session: ReplaySessionState) -> DomainEvent | None:
"""Pop the next event from the buffer, loading a new batch if needed."""
event = self._pop_next_event(session.session_id)
if event is not None:
return event
if not await self._load_next_batch(session.session_id):
return None
return self._pop_next_event(session.session_id)

def _calculate_replay_delay(self, session: ReplaySessionState) -> float:
"""Calculate the delay before dispatching the next event based on speed multiplier."""
next_event = self._peek_next_event(session.session_id)
if next_event and session.last_event_at and session.config.speed_multiplier < 100:
time_diff = (next_event.timestamp - session.last_event_at).total_seconds()
return max(time_diff / session.config.speed_multiplier, 0)
Comment thread
HardMax71 marked this conversation as resolved.
return 0.0

def _reschedule_dispatch(self, session: ReplaySessionState, delay: float) -> None:
"""Schedule the next _dispatch_next call if the session is still running."""
scheduler = self._schedulers.get(session.session_id)
if scheduler and scheduler.running and session.status == ReplayStatus.RUNNING:
scheduler.add_job(
self._dispatch_next,
trigger="date",
run_date=datetime.now(timezone.utc) + timedelta(seconds=delay),
args=[session],
id=f"dispatch_{session.session_id}",
replace_existing=True,
misfire_grace_time=None,
)

async def _dispatch_next(self, session: ReplaySessionState) -> None:
if session.status != ReplayStatus.RUNNING:
return

event = self._pop_next_event(session.session_id)
event = await self._load_next_event(session)
if event is None:
if not await self._load_next_batch(session.session_id):
await self._finalize_session(session, ReplayStatus.COMPLETED)
return
event = self._pop_next_event(session.session_id)
if event is None:
await self._finalize_session(session, ReplayStatus.COMPLETED)
return
await self._finalize_session(session, ReplayStatus.COMPLETED)
return

buf = self._event_buffers.get(session.session_id, [])
idx = self._buffer_indices.get(session.session_id, 0)
Expand Down Expand Up @@ -229,26 +255,10 @@ async def _dispatch_next(self, session: ReplaySessionState) -> None:
session.last_event_at = event.timestamp
await self._update_session_in_db(session)

next_event = self._peek_next_event(session.session_id)
delay = 0.0
if next_event and session.last_event_at and session.config.speed_multiplier < 100:
time_diff = (next_event.timestamp - session.last_event_at).total_seconds()
delay = max(time_diff / session.config.speed_multiplier, 0)

delay = self._calculate_replay_delay(session)
if delay > 0:
self._metrics.record_delay_applied(delay)

scheduler = self._schedulers.get(session.session_id)
if scheduler and scheduler.running and session.status == ReplayStatus.RUNNING:
scheduler.add_job(
self._dispatch_next,
trigger="date",
run_date=datetime.now(timezone.utc) + timedelta(seconds=delay),
args=[session],
id=f"dispatch_{session.session_id}",
replace_existing=True,
misfire_grace_time=None,
)
self._reschedule_dispatch(session, delay)

def _pop_next_event(self, session_id: str) -> DomainEvent | None:
idx = self._buffer_indices.get(session_id, 0)
Expand Down
71 changes: 37 additions & 34 deletions backend/app/services/notification_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,38 @@ def __init__(
}
# --8<-- [end:channel_handlers]

def _validate_scheduled_time(self, scheduled_for: datetime) -> None:
"""Validate that scheduled_for is in the future and within the max schedule window."""
if scheduled_for < datetime.now(UTC):
raise NotificationValidationError("scheduled_for must be in the future")
max_days = self.settings.NOTIF_MAX_SCHEDULE_DAYS
max_schedule = datetime.now(UTC) + timedelta(days=max_days)
Comment thread
HardMax71 marked this conversation as resolved.
if scheduled_for > max_schedule:
raise NotificationValidationError(f"scheduled_for cannot exceed {max_days} days from now")

async def _check_throttle(self, user_id: str, severity: NotificationSeverity, source: str) -> None:
"""Check throttle and raise NotificationThrottledError if rate limit exceeded."""
if self.settings.ENVIRONMENT == "test":
return
throttled = await self._throttle_cache.check_throttle(
user_id,
severity,
window_hours=self.settings.NOTIF_THROTTLE_WINDOW_HOURS,
max_per_hour=self.settings.NOTIF_THROTTLE_MAX_PER_HOUR,
)
if throttled:
self.logger.warning(
f"Notification rate limit exceeded for user {user_id}. "
f"Max {self.settings.NOTIF_THROTTLE_MAX_PER_HOUR} "
f"per {self.settings.NOTIF_THROTTLE_WINDOW_HOURS} hour(s)"
)
Comment thread
HardMax71 marked this conversation as resolved.
self.metrics.record_notification_throttled(source)
raise NotificationThrottledError(
user_id,
self.settings.NOTIF_THROTTLE_MAX_PER_HOUR,
self.settings.NOTIF_THROTTLE_WINDOW_HOURS,
)

async def create_notification(
self,
user_id: str,
Expand All @@ -137,14 +169,7 @@ async def create_notification(
if not tags:
raise NotificationValidationError("tags must be a non-empty list")
if scheduled_for is not None:
if scheduled_for < datetime.now(UTC):
raise NotificationValidationError("scheduled_for must be in the future")
max_days = self.settings.NOTIF_MAX_SCHEDULE_DAYS
max_schedule = datetime.now(UTC) + timedelta(days=max_days)
if scheduled_for > max_schedule:
raise NotificationValidationError(
f"scheduled_for cannot exceed {max_days} days from now"
)
self._validate_scheduled_time(scheduled_for)
self.logger.info(
f"Creating notification for user {user_id}",
user_id=user_id,
Expand All @@ -154,25 +179,7 @@ async def create_notification(
scheduled=scheduled_for is not None,
)

# Check throttling
if self.settings.ENVIRONMENT != "test" and await self._throttle_cache.check_throttle(
user_id,
severity,
window_hours=self.settings.NOTIF_THROTTLE_WINDOW_HOURS,
max_per_hour=self.settings.NOTIF_THROTTLE_MAX_PER_HOUR,
):
error_msg = (
f"Notification rate limit exceeded for user {user_id}. "
f"Max {self.settings.NOTIF_THROTTLE_MAX_PER_HOUR} "
f"per {self.settings.NOTIF_THROTTLE_WINDOW_HOURS} hour(s)"
)
self.logger.warning(error_msg)
self.metrics.record_notification_throttled("general")
raise NotificationThrottledError(
user_id,
self.settings.NOTIF_THROTTLE_MAX_PER_HOUR,
self.settings.NOTIF_THROTTLE_WINDOW_HOURS,
)
await self._check_throttle(user_id, severity, "general")
Comment thread
HardMax71 marked this conversation as resolved.

# Create notification
create_data = DomainNotificationCreate(
Expand Down Expand Up @@ -290,13 +297,9 @@ async def _create_system_for_user(
) -> str:
try:
if not cfg.throttle_exempt:
throttled = await self._throttle_cache.check_throttle(
user_id,
cfg.severity,
window_hours=self.settings.NOTIF_THROTTLE_WINDOW_HOURS,
max_per_hour=self.settings.NOTIF_THROTTLE_MAX_PER_HOUR,
)
if throttled:
try:
await self._check_throttle(user_id, cfg.severity, "system")
except NotificationThrottledError:
return "throttled"

await self.create_notification(
Expand Down
Loading
Loading