11import asyncio
22import inspect
3- from typing import Any , Awaitable , Callable
3+ from typing import Awaitable , Callable
44
55import yarl
66from aiohttp import web
@@ -71,10 +71,9 @@ def populate_context(
7171 )
7272
7373
74- def startup_event_generator (
74+ def startup_event_generator ( # noqa: C901
7575 broker : AsyncBroker ,
7676 app_path : str ,
77- app : Any ,
7877) -> Callable [[TaskiqState ], Awaitable [None ]]:
7978 """
8079 Creates an event to run on broker's startup.
@@ -87,24 +86,26 @@ def startup_event_generator(
8786
8887 :param broker: current broker.
8988 :param app_path: path to the application.
90- :param app: current application or a fractory.
9189
9290 :returns: a function that is called on startup.
9391 """
9492
9593 async def startup (state : TaskiqState ) -> None :
96- local_app = app
94+ if not broker .is_worker_process :
95+ return
9796
98- if not isinstance (local_app , web .Application ):
99- local_app = local_app ()
97+ app = import_object (app_path )
10098
101- if inspect .iscoroutine (local_app ):
102- local_app = await local_app
103- if not isinstance (local_app , web .Application ):
99+ if not isinstance (app , web .Application ):
100+ app = app ()
101+
102+ if inspect .iscoroutine (app ):
103+ app = await app
104+ if not isinstance (app , web .Application ):
104105 raise ValueError (f"{ app_path } is not an AioHTTP application." )
105106
106107 # Starting the application.
107- app_runner = web .AppRunner (local_app )
108+ app_runner = web .AppRunner (app )
108109 await app_runner .setup ()
109110
110111 if app_runner .server is None :
@@ -115,28 +116,35 @@ async def startup(state: TaskiqState) -> None:
115116 populate_context (
116117 broker = broker ,
117118 server = app_runner .server ,
118- app = local_app ,
119+ app = app ,
119120 loop = loop ,
120121 )
121122
122123 # Creating mocked request
123124 state .aiohttp_runner = app_runner
124- local_app .router ._resources = []
125+ app .router ._resources = []
125126
126127 return startup
127128
128129
129- async def shutdown ( state : TaskiqState ) -> None :
130+ def shutdown_generator ( broker : AsyncBroker ) -> Callable [[ TaskiqState ], Awaitable [ None ]] :
130131 """
131- Shuts down the app .
132+ Shuts down event generator .
132133
133- It just gets the application
134- we created in startup and shuts it down.
134+ This function generates a shutdown broker
135135
136- :param state: current state.
136+ :param broker: current broker.
137+ :returns: shutdown event handler.
137138 """
138- await state .aiohttp_runner .shutdown ()
139- await state .aiohttp_runner .cleanup ()
139+
140+ async def shutdown (state : TaskiqState ) -> None :
141+ if not broker .is_worker_process :
142+ return
143+
144+ await state .aiohttp_runner .shutdown ()
145+ await state .aiohttp_runner .cleanup ()
146+
147+ return shutdown
140148
141149
142150def init (broker : AsyncBroker , app_path : str ) -> None :
@@ -151,16 +159,11 @@ def init(broker: AsyncBroker, app_path: str) -> None:
151159 :param broker: current broker.
152160 :param app_path: string with a path to an application or a factory.
153161 """
154- if not broker .is_worker_process :
155- return
156-
157- app = import_object (app_path )
158-
159162 broker .add_event_handler (
160163 TaskiqEvents .WORKER_STARTUP ,
161- startup_event_generator (broker , app_path , app ),
164+ startup_event_generator (broker , app_path ),
162165 )
163166 broker .add_event_handler (
164167 TaskiqEvents .WORKER_SHUTDOWN ,
165- shutdown ,
168+ shutdown_generator ( broker ) ,
166169 )
0 commit comments