forked from taskiq-python/taskiq-fastapi
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinitializator.py
More file actions
127 lines (99 loc) · 3.58 KB
/
initializator.py
File metadata and controls
127 lines (99 loc) · 3.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import copy
from collections.abc import Awaitable, Callable, Mapping
from typing import Any
from fastapi import FastAPI, Request
from starlette.requests import HTTPConnection
from taskiq import AsyncBroker, TaskiqEvents, TaskiqState
from taskiq.cli.utils import import_object
PathOrAppOrFactory = str | FastAPI | Callable[[], FastAPI]
def startup_event_generator(
broker: AsyncBroker,
app_or_path: PathOrAppOrFactory,
) -> Callable[[TaskiqState], Awaitable[None]]:
"""
Generate startup event handler.
This function takes FastAPI application path
and runs startup event on broker's startup.
:param broker: current broker.
:param app_or_path: application path or fastapi instance
or callable that creates fastapi app instance.
:returns: startup handler.
"""
async def startup(state: TaskiqState) -> None:
if not broker.is_worker_process:
return
if isinstance(app_or_path, str):
app = import_object(app_or_path)
else:
app = app_or_path
if not isinstance(app, FastAPI):
app = app()
if not isinstance(app, FastAPI):
raise ValueError(f"'{app_or_path}' is not a FastAPI application.")
state.fastapi_app = app
await app.router.startup()
state.lf_ctx = app.router.lifespan_context(app)
asgi_state = await state.lf_ctx.__aenter__()
populate_dependency_context(broker, app, asgi_state)
return startup
def shutdown_event_generator(
broker: AsyncBroker,
) -> Callable[[TaskiqState], Awaitable[None]]:
"""
Generate shutdown event handler.
This function takes FastAPI application
and runs shutdown event on broker's shutdown.
:param broker: current broker.
:return: shutdown event handler.
"""
async def shutdown(state: TaskiqState) -> None:
if not broker.is_worker_process:
return
await state.fastapi_app.router.shutdown()
await state.lf_ctx.__aexit__(None, None, None)
return shutdown
def init(broker: AsyncBroker, app_or_path: PathOrAppOrFactory) -> None:
"""
Add taskiq startup events.
This is the main function to integrate FastAPI
with taskiq.
It creates startup events for broker. So
in worker processes all fastapi
startup events will run.
:param broker: current broker to use.
:param app_or_path: application path or fastapi instance
or callable that creates fastapi app instance.
"""
broker.add_event_handler(
TaskiqEvents.WORKER_STARTUP,
startup_event_generator(broker, app_or_path),
)
broker.add_event_handler(
TaskiqEvents.WORKER_SHUTDOWN,
shutdown_event_generator(broker),
)
def populate_dependency_context(
broker: AsyncBroker,
app: FastAPI,
asgi_state: Mapping[str, Any] | None = None,
) -> None:
"""
Populate dependency context.
This function injects the Request and HTTPConnection
into the broker's dependency context.
It may be needed to be called manually if you are using InMemoryBroker.
:param broker: current broker to use.
:param app: current application.
:param asgi_state: state that will be injected in request.
"""
asgi_state = asgi_state or {}
broker.dependency_overrides.update(
{
Request: lambda: Request(
scope={"app": app, "type": "http", "state": copy.copy(asgi_state)},
),
HTTPConnection: lambda: HTTPConnection(
scope={"app": app, "type": "http", "state": copy.copy(asgi_state)},
),
},
)