Skip to content

Commit ef85eeb

Browse files
authored
Feat: better backend (#241)
* feat: misc issues fixed, also update saga/exec status * feat: better msg parsing from pods: using NAME+BYTESIZE instead of JSON. Also simplified logging in event mapper * feat: issues fixing (passing exit code instead of 0, previous issues, ..)
1 parent 921666b commit ef85eeb

56 files changed

Lines changed: 1058 additions & 445 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

backend/app/api/routes/admin/events.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from sse_starlette.sse import EventSourceResponse
1010

1111
from app.api.dependencies import admin_user
12+
from app.api.routes.common import SSEResponse
1213
from app.domain.enums import EventType, ExportFormat
1314
from app.domain.events import EventFilter as DomainEventFilter
1415
from app.domain.replay import ReplayFilter
@@ -27,15 +28,6 @@
2728
from app.services.admin import AdminEventsService
2829
from app.services.sse import SSEService
2930

30-
31-
class _SSEResponse(EventSourceResponse):
32-
"""Workaround: sse-starlette sets media_type only in __init__, not as a
33-
class attribute. FastAPI reads the class attribute for OpenAPI generation,
34-
so without this subclass every SSE endpoint shows application/json."""
35-
36-
media_type = "text/event-stream"
37-
38-
3931
router = APIRouter(
4032
prefix="/admin/events", tags=["admin-events"], route_class=DishkaRoute, dependencies=[Depends(admin_user)]
4133
)
@@ -144,7 +136,7 @@ async def replay_events(
144136

145137
@router.get(
146138
"/replay/{session_id}/status",
147-
response_class=_SSEResponse,
139+
response_class=SSEResponse,
148140
responses={
149141
200: {"model": EventReplayStatusResponse},
150142
404: {"model": ErrorResponse, "description": "Replay session not found"},

backend/app/api/routes/admin/executions.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
prefix="/admin/executions",
2121
tags=["admin-executions"],
2222
route_class=DishkaRoute,
23+
dependencies=[Depends(admin_user)],
2324
)
2425

2526

backend/app/api/routes/common.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from sse_starlette.sse import EventSourceResponse
2+
3+
4+
class SSEResponse(EventSourceResponse):
5+
"""Workaround: sse-starlette sets media_type only in __init__, not as a
6+
class attribute. FastAPI reads the class attribute for OpenAPI generation,
7+
so without this subclass every SSE endpoint shows application/json."""
8+
9+
media_type = "text/event-stream"

backend/app/api/routes/sse.py

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,18 @@
66
from sse_starlette.sse import EventSourceResponse
77

88
from app.api.dependencies import current_user
9+
from app.api.routes.common import SSEResponse
910
from app.domain.user import User
1011
from app.schemas_pydantic.notification import NotificationResponse
1112
from app.schemas_pydantic.sse import SSEExecutionEventSchema
1213
from app.services.sse import SSEService
1314

14-
15-
class _SSEResponse(EventSourceResponse):
16-
"""Workaround: sse-starlette sets media_type only in __init__, not as a
17-
class attribute. FastAPI reads the class attribute for OpenAPI generation,
18-
so without this subclass every SSE endpoint shows application/json."""
19-
20-
media_type = "text/event-stream"
21-
22-
2315
router = APIRouter(prefix="/events", tags=["sse"], route_class=DishkaRoute)
2416

2517

2618
@router.get(
2719
"/notifications/stream",
28-
response_class=_SSEResponse,
20+
response_class=SSEResponse,
2921
responses={200: {"model": NotificationResponse}},
3022
)
3123
async def notification_stream(
@@ -41,7 +33,7 @@ async def notification_stream(
4133

4234
@router.get(
4335
"/executions/{execution_id}",
44-
response_class=_SSEResponse,
36+
response_class=SSEResponse,
4537
responses={200: {"model": SSEExecutionEventSchema}},
4638
)
4739
async def execution_events(

backend/app/core/container.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def create_app_container(settings: Settings) -> AsyncContainer:
3535
Args:
3636
settings: Application settings (injected via from_context).
3737
38-
Note: init_beanie() must be called BEFORE this container is created.
38+
Note: init_beanie() is called in the async lifespan AFTER this container is created.
3939
KafkaBroker is created by BrokerProvider and can be retrieved
4040
via container.get(KafkaBroker) after container creation.
4141
"""

backend/app/core/metrics/queue.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ def _create_instruments(self) -> None:
3030
description="Time spent waiting in queue before scheduling",
3131
unit="s",
3232
)
33+
self._event_data_lost = self._meter.create_counter(
34+
name="queue.event_data_lost",
35+
description="Executions lost due to expired event data in Redis",
36+
unit="1",
37+
)
3338

3439
def record_enqueue(self) -> None:
3540
self._enqueue_total.add(1)
@@ -45,3 +50,6 @@ def record_release(self) -> None:
4550

4651
def record_wait_time(self, wait_seconds: float, priority: str) -> None:
4752
self._wait_time.record(wait_seconds, attributes={"priority": priority})
53+
54+
def record_event_data_lost(self) -> None:
55+
self._event_data_lost.add(1)

backend/app/core/middlewares/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from .metrics import MetricsMiddleware, create_system_metrics, setup_metrics
44
from .rate_limit import RateLimitMiddleware
55
from .request_size_limit import RequestSizeLimitMiddleware
6+
from .security_headers import SecurityHeadersMiddleware
67

78
__all__ = [
89
"CacheControlMiddleware",
@@ -12,4 +13,5 @@
1213
"create_system_metrics",
1314
"RequestSizeLimitMiddleware",
1415
"RateLimitMiddleware",
16+
"SecurityHeadersMiddleware",
1517
]

backend/app/core/middlewares/csrf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class CSRFMiddleware:
1919
2020
Requests are skipped if:
2121
- Method is safe (GET, HEAD, OPTIONS)
22-
- Path is an auth endpoint (login, register, logout)
22+
- Path is an auth endpoint (login, register)
2323
- Path is not under /api/
2424
- User is not authenticated (no access_token cookie)
2525
"""

backend/app/core/middlewares/rate_limit.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
from app.core.utils import get_client_ip
99
from app.domain.rate_limit import RateLimitStatus
10-
from app.domain.user import User
1110
from app.services.rate_limit_service import RateLimitService
1211
from app.settings import Settings
1312

@@ -102,10 +101,14 @@ async def send_wrapper(message: Message) -> None:
102101
await self.app(scope, receive, send_wrapper)
103102

104103
# --8<-- [start:extract_user_id]
105-
def _extract_user_id(self, request: Request) -> str:
106-
user: User | None = request.state.__dict__.get("user")
107-
if user:
108-
return str(user.user_id)
104+
@staticmethod
105+
def _extract_user_id(request: Request) -> str:
106+
"""Extract rate-limit bucket key from client IP.
107+
108+
Middleware runs before route-level auth, so no verified identity is
109+
available here. Using unverified JWT claims would let an attacker
110+
craft arbitrary bucket keys to bypass IP-based limits.
111+
"""
109112
return f"ip:{get_client_ip(request)}"
110113
# --8<-- [end:extract_user_id]
111114

backend/app/core/middlewares/request_size_limit.py

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
from starlette.responses import JSONResponse
2-
from starlette.types import ASGIApp, Receive, Scope, Send
2+
from starlette.types import ASGIApp, Message, Receive, Scope, Send
33

44

55
# --8<-- [start:RequestSizeLimitMiddleware]
66
class RequestSizeLimitMiddleware:
7-
"""Middleware to limit request size, default 10MB"""
7+
"""Middleware to limit request size, default 10MB.
8+
9+
Checks Content-Length header when present for an early reject, and wraps
10+
the ASGI ``receive`` callable to count bytes as they stream — this
11+
catches chunked-transfer requests that omit Content-Length.
12+
"""
813

914
def __init__(self, app: ASGIApp, max_size_mb: int = 10) -> None:
1015
self.app = app
@@ -29,4 +34,30 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
2934
await response(scope, receive, send)
3035
return
3136

32-
await self.app(scope, receive, send)
37+
bytes_received = 0
38+
max_size = self.max_size_bytes
39+
exceeded = False
40+
41+
async def receive_wrapper() -> Message:
42+
nonlocal bytes_received, exceeded
43+
message = await receive()
44+
if message["type"] == "http.request":
45+
body = message.get("body", b"")
46+
bytes_received += len(body)
47+
if bytes_received > max_size:
48+
exceeded = True
49+
raise _RequestTooLarge()
50+
return message
51+
52+
try:
53+
await self.app(scope, receive_wrapper, send)
54+
except _RequestTooLarge:
55+
response = JSONResponse(
56+
status_code=413,
57+
content={"detail": f"Request too large. Maximum size is {max_size / 1024 / 1024}MB"},
58+
)
59+
await response(scope, receive, send)
60+
61+
62+
class _RequestTooLarge(Exception):
63+
pass

0 commit comments

Comments
 (0)