-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathsanic.py
More file actions
101 lines (82 loc) · 3.38 KB
/
sanic.py
File metadata and controls
101 lines (82 loc) · 3.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from __future__ import annotations
from collections.abc import Awaitable, Collection, Mapping
from contextlib import aclosing, closing
from functools import wraps
from inspect import isasyncgen, isgenerator
from inspect import isawaitable
from typing import Any, Callable, ParamSpec, Union
from sanic import HTTPResponse, Request
from . import _read_signals
from .sse import SSE_HEADERS, DatastarEvent, DatastarEvents, ServerSentEventGenerator
__all__ = [
"SSE_HEADERS",
"DatastarResponse",
"ServerSentEventGenerator",
"datastar_respond",
"read_signals",
]
class DatastarResponse(HTTPResponse):
default_headers: dict[str, str] = SSE_HEADERS.copy()
def __init__(
self,
content: DatastarEvent | Collection[DatastarEvent] | None = None,
status: int | None = None,
headers: Mapping[str, str] | None = None,
) -> None:
if not content:
status = status or 204
elif not isinstance(content, str):
# Collections of events just get concatenated
content = "".join(content)
super().__init__(
content, status=status or 200, headers={**self.default_headers, **(headers or {})}
)
async def send(
self,
event: DatastarEvent | None = None,
end_stream: bool | None = None,
) -> None:
if event and self.status == 204:
# When the response is created with no content, it's set to a 204 by default
# if we end up streaming to it, change the status code to 200 before sending.
self.status = 200
await super().send(event, end_stream=end_stream)
async def datastar_respond(
request: Request, *, status: int = 200, headers: Mapping[str, str] | None = None
) -> DatastarResponse:
return await request.respond(DatastarResponse(status=status, headers=headers))
P = ParamSpec("P")
def datastar_response(
func: Callable[P, Awaitable[DatastarEvents] | DatastarEvents],
) -> Callable[P, Awaitable[DatastarResponse | None]]:
"""A decorator which wraps a function result in DatastarResponse.
Can be used on a sync or async function or generator function.
"""
@wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse | None:
r = func(*args, **kwargs)
if isawaitable(r):
return DatastarResponse(await r)
if isasyncgen(r):
request = args[0]
response = await request.respond(response=DatastarResponse())
# Make sure when the client cancels the stream clean up the generator now
# Without the aclosing manager it would happen at garbage collection
async with aclosing(r) as ait:
async for event in ait:
await response.send(event)
await response.eof()
return None
if isgenerator(r):
request = args[0]
response = await request.respond(response=DatastarResponse())
with closing(r) as it:
for event in it:
await response.send(event)
await response.eof()
return None
return DatastarResponse(r)
wrapper.__annotations__["return"] = Union[DatastarResponse, None]
return wrapper
async def read_signals(request: Request) -> dict[str, Any] | None:
return _read_signals(request.method, request.headers, request.args, request.body)