|
6 | 6 | from dishka.integrations.fastapi import DishkaRoute |
7 | 7 | from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query |
8 | 8 | from fastapi.responses import StreamingResponse |
| 9 | +from sse_starlette.sse import EventSourceResponse |
9 | 10 |
|
10 | 11 | from app.api.dependencies import admin_user |
11 | 12 | from app.domain.enums import EventType, ExportFormat |
|
24 | 25 | ) |
25 | 26 | from app.schemas_pydantic.common import ErrorResponse |
26 | 27 | from app.services.admin import AdminEventsService |
| 28 | +from app.services.sse import SSEService |
| 29 | + |
| 30 | + |
| 31 | +class _SSEResponse(EventSourceResponse): |
| 32 | + """Workaround: sse-starlette sets media_type only in __init__, not as a |
| 33 | + class attribute. FastAPI reads the class attribute for OpenAPI generation, |
| 34 | + so without this subclass every SSE endpoint shows application/json.""" |
| 35 | + |
| 36 | + media_type = "text/event-stream" |
| 37 | + |
27 | 38 |
|
28 | 39 | router = APIRouter( |
29 | 40 | prefix="/admin/events", tags=["admin-events"], route_class=DishkaRoute, dependencies=[Depends(admin_user)] |
@@ -133,16 +144,25 @@ async def replay_events( |
133 | 144 |
|
134 | 145 | @router.get( |
135 | 146 | "/replay/{session_id}/status", |
136 | | - responses={404: {"model": ErrorResponse, "description": "Replay session not found"}}, |
| 147 | + response_class=_SSEResponse, |
| 148 | + responses={ |
| 149 | + 200: {"model": EventReplayStatusResponse}, |
| 150 | + 404: {"model": ErrorResponse, "description": "Replay session not found"}, |
| 151 | + }, |
137 | 152 | ) |
138 | | -async def get_replay_status(session_id: str, service: FromDishka[AdminEventsService]) -> EventReplayStatusResponse: |
139 | | - """Get the status and progress of a replay session.""" |
140 | | - status = await service.get_replay_status(session_id) |
| 153 | +async def stream_replay_status( |
| 154 | + session_id: str, |
| 155 | + service: FromDishka[AdminEventsService], |
| 156 | + sse_service: FromDishka[SSEService], |
| 157 | +) -> EventSourceResponse: |
| 158 | + """Stream the status and progress of a replay session via SSE.""" |
| 159 | + status = await service.get_replay_sse_status(session_id) |
141 | 160 |
|
142 | 161 | if not status: |
143 | 162 | raise HTTPException(status_code=404, detail="Replay session not found") |
144 | 163 |
|
145 | | - return EventReplayStatusResponse.model_validate(status) |
| 164 | + stream = await sse_service.create_replay_stream(status) |
| 165 | + return EventSourceResponse(stream, ping=15) |
146 | 166 |
|
147 | 167 |
|
148 | 168 | @router.delete("/{event_id}", responses={404: {"model": ErrorResponse, "description": "Event not found"}}) |
|
0 commit comments