Skip to content

Commit 2ed173b

Browse files
committed
Fixed some event loop issues.
Signed-off-by: Pavel Kirilin <win10@list.ru>
1 parent b04c63d commit 2ed173b

3 files changed

Lines changed: 102 additions & 74 deletions

File tree

.flake8

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ ignore =
9090
N802,
9191
; Do not perform function calls in argument defaults.
9292
B008,
93+
; Found protected attribute
94+
WPS437,
9395

9496
; all init files
9597
__init__.py:

taskiq_aiohttp/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
"""Project was generated using taskiq."""
22
from taskiq_aiohttp.initializer import init
33

4-
__all__ = ['init']
4+
__all__ = ["init"]

taskiq_aiohttp/initializer.py

Lines changed: 99 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,105 +1,131 @@
11
import asyncio
22
import inspect
33
from typing import Awaitable, Callable
4-
from taskiq import AsyncBroker, TaskiqEvents, TaskiqState
5-
from taskiq.cli.utils import import_object
4+
5+
import yarl
66
from aiohttp import web
7-
from aiohttp.web_request import RawRequestMessage
8-
from aiohttp.web_urldispatcher import UrlMappingMatchInfo, ResourceRoute
97
from aiohttp.http_writer import HttpVersion10
108
from aiohttp.web_protocol import RequestHandler
11-
import yarl
9+
from aiohttp.web_request import RawRequestMessage
10+
from aiohttp.web_urldispatcher import SystemRoute, UrlMappingMatchInfo
11+
from multidict import CIMultiDict, CIMultiDictProxy
12+
from taskiq import AsyncBroker, TaskiqEvents, TaskiqState
13+
from taskiq.cli.utils import import_object
1214

13-
# def _startup(state: Context):
14-
# state.broker.add_dependency_context()
1515

16+
def startup_event_generator(
17+
broker: AsyncBroker,
18+
app_path: str,
19+
) -> Callable[[TaskiqState], Awaitable[None]]:
20+
"""
21+
Creates an event to run on broker's startup.
1622
17-
async def _mocked_handler(request: web.Request) -> web.Response:
18-
return web.Response()
23+
This function create a mock application for
24+
later use and updates broker's dependency context.
1925
26+
Also we run application's startup event so it would
27+
act the same as the real application.
28+
29+
:param broker: current broker.
30+
:param app_path: string with a path to an application or a factory.
31+
32+
:returns: a function that is called on startup.
33+
"""
2034

21-
def startup_event_generator(
22-
app: web.Application,
23-
) -> Callable[[TaskiqState], Awaitable[None]]:
2435
async def startup(state: TaskiqState) -> None:
36+
loop = asyncio.get_event_loop()
37+
38+
app = import_object(app_path)
39+
40+
if not isinstance(app, web.Application):
41+
app = app()
42+
43+
if inspect.iscoroutine(app):
44+
app = await app
45+
46+
if not isinstance(app, web.Application):
47+
raise ValueError(f"'{app_path}' is not an AioHTTP application.")
48+
49+
handler = RequestHandler(app._make_handler(), loop=loop)
50+
handler.transport = asyncio.Transport()
51+
request = web.Request(
52+
RawRequestMessage(
53+
"GET",
54+
"/",
55+
HttpVersion10,
56+
headers=CIMultiDictProxy(CIMultiDict()),
57+
raw_headers=(),
58+
should_close=False,
59+
upgrade=False,
60+
chunked=False,
61+
compression=None,
62+
url=yarl.URL.build(
63+
scheme="https",
64+
host="test.com",
65+
path="/",
66+
),
67+
),
68+
None,
69+
handler,
70+
None,
71+
None,
72+
None,
73+
)
74+
75+
request._match_info = UrlMappingMatchInfo(
76+
match_dict={},
77+
route=SystemRoute(web.HTTPBadRequest()),
78+
)
79+
request._match_info._apps = app._subapps
80+
request._match_info._current_app = app
81+
82+
broker.add_dependency_context(
83+
{
84+
web.Application: app,
85+
web.Request: request,
86+
},
87+
)
88+
2589
state.aiohttp_app = app
2690
app.router._resources = []
2791
await app.startup()
2892

2993
return startup
3094

3195

32-
def shutdown_event_generator(
33-
app: web.Application,
34-
) -> Callable[[TaskiqState], Awaitable[None]]:
35-
async def shutdown(_: TaskiqState) -> None:
36-
await app.shutdown()
37-
await app.cleanup()
96+
async def shutdown(state: TaskiqState) -> None:
97+
"""
98+
Shuts down the app.
3899
39-
return shutdown
100+
It just gets the application
101+
we created in startup and shuts it down.
102+
103+
:param state: current state.
104+
"""
105+
await state.aiohttp_app.shutdown()
106+
await state.aiohttp_app.cleanup()
40107

41108

42109
def init(broker: AsyncBroker, app_path: str) -> None:
43-
loop = asyncio.get_event_loop()
44-
if not broker.is_worker_process:
45-
return
110+
"""
111+
Initialize dependencies for your taskiq.
46112
47-
app = import_object(app_path)
48-
49-
if not isinstance(app, web.Application):
50-
app = app()
51-
52-
if inspect.isawaitable(app):
53-
app = asyncio.run(app)
54-
55-
if not isinstance(app, web.Application):
56-
raise ValueError(f"'{app_path}' is not an AioHTTP application.")
57-
58-
handler = RequestHandler(app._make_handler(), loop=loop)
59-
handler.transport = asyncio.Transport()
60-
request = web.Request(
61-
RawRequestMessage(
62-
"GET",
63-
"/",
64-
HttpVersion10,
65-
headers={},
66-
raw_headers=tuple(),
67-
should_close=False,
68-
upgrade=False,
69-
chunked=False,
70-
compression=None,
71-
url=yarl.URL.build(
72-
scheme="https",
73-
host="test.com",
74-
path="/",
75-
),
76-
),
77-
None,
78-
handler,
79-
None,
80-
None,
81-
None,
82-
)
113+
This function imports your application and tries to
114+
update broker's dependency context, so request and
115+
applicaiton will be available in all your taskiq
116+
dependencies.
83117
84-
request._match_info = UrlMappingMatchInfo(
85-
match_dict={},
86-
route=ResourceRoute("GET", _mocked_handler, None),
87-
)
88-
request._match_info._apps = app._subapps
89-
request._match_info._current_app = app
90-
91-
broker.add_dependency_context(
92-
{
93-
web.Application: app,
94-
web.Request: request,
95-
}
96-
)
118+
:param broker: current broker.
119+
:param app_path: string with a path to an application or a factory.
120+
"""
121+
if not broker.is_worker_process:
122+
return
97123

98124
broker.add_event_handler(
99125
TaskiqEvents.WORKER_STARTUP,
100-
startup_event_generator(app),
126+
startup_event_generator(broker, app_path),
101127
)
102128
broker.add_event_handler(
103129
TaskiqEvents.WORKER_SHUTDOWN,
104-
shutdown_event_generator(app),
130+
shutdown,
105131
)

0 commit comments

Comments
 (0)