Skip to content

Commit 0d3b81d

Browse files
committed
2.18.0 - Add function
1 parent dcb53ca commit 0d3b81d

5 files changed

Lines changed: 94 additions & 3 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
run\_coro\_thread\_async
2+
========================
3+
4+
.. currentmodule:: privex.helpers.asyncx
5+
6+
.. autofunction:: run_coro_thread_async

docs/source/helpers/privex.helpers.asyncx.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ AsyncIO Helpers / Wrappers
2525
is_async_context
2626
loop_run
2727
run_coro_thread
28+
run_coro_thread_async
2829
run_coro_thread_base
2930
run_sync
3031

privex/helpers/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ def _setup_logging(level=logging.WARNING):
148148
log = _setup_logging()
149149
name = 'helpers'
150150

151-
VERSION = '2.17.1'
151+
VERSION = '2.18.0'
152152

153153

154154

privex/helpers/asyncx.py

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
__all__ = [
4343
'awaitable', 'AWAITABLE_BLACKLIST_MODS', 'AWAITABLE_BLACKLIST', 'AWAITABLE_BLACKLIST_FUNCS', 'run_sync', 'aobject',
4444
'call_sys_async', 'async_sync', 'awaitable_class', 'AwaitableMixin', 'loop_run', 'is_async_context', 'await_if_needed',
45-
'get_async_type', 'run_coro_thread', 'run_coro_thread_base', 'coro_thread_func'
45+
'get_async_type', 'run_coro_thread', 'run_coro_thread_async', 'run_coro_thread_base', 'coro_thread_func'
4646
]
4747

4848

@@ -136,6 +136,15 @@ def run_coro_thread(func: callable, *args, **kwargs) -> Any:
136136
Run a Python AsyncIO coroutine function within a new event loop using a thread, and return the result / raise any exceptions
137137
as if it were ran normally within an AsyncIO function.
138138
139+
140+
.. Caution:: If you're wanting to run a coroutine within a thread from an AsyncIO function/method, then you should
141+
use :func:`.run_coro_thread_async` instead, which uses :func:`asyncio.sleep` while waiting for a result/exception
142+
to be transmitted via a queue.
143+
144+
This allows you to run and wait for multiple coroutine threads simultaneously, as there's no synchronous blocking
145+
wait - unlike this function.
146+
147+
139148
This will usually allow you to run coroutines from a synchronous function without running into the dreaded "Event loop is already
140149
running" error - since the coroutine will be ran inside of a thread with it's own dedicated event loop.
141150
@@ -174,7 +183,59 @@ def run_coro_thread(func: callable, *args, **kwargs) -> Any:
174183
if isinstance(res, (Exception, BaseException)):
175184
raise res
176185
return res
186+
187+
188+
async def run_coro_thread_async(func: callable, *args, _queue_timeout=30.0, _queue_sleep=0.05, **kwargs) -> Any:
189+
"""
190+
AsyncIO version of :func:`.run_coro_thread` which uses :func:`asyncio.sleep` while waiting on a result from the queue,
191+
allowing you to run multiple AsyncIO coroutines which call blocking synchronous code - simultaneously,
192+
e.g. by using :func:`asyncio.gather`
193+
194+
Below is an example of running an example coroutine ``hello`` which runs the synchronous blocking ``time.sleep``.
195+
Using :func:`.run_coro_thread_async` plus :func:`asyncio.gather` - we can run ``hello`` 4 times simultaneously,
196+
despite the use of the blocking :func:`time.sleep`.
197+
198+
**Basic usage**::
199+
200+
>>> import asyncio
201+
>>> from privex.helpers.asyncx import run_coro_thread_async
202+
>>> async def hello(world):
203+
... time.sleep(1)
204+
... return world * 10
205+
>>> await asyncio.gather(run_coro_thread_async(hello, 5), run_coro_thread_async(hello, 15),
206+
... run_coro_thread_async(hello, 90), run_coro_thread_async(hello, 25))
207+
[50, 150, 900, 250]
177208
209+
210+
:param callable func: A reference to the ``async def`` coroutine function that you want to run
211+
:param args: Positional arguments to pass-through to the coroutine function
212+
:param kwargs: Keyword arguments to pass-through to the coroutine function
213+
:param float|int _queue_timeout: (default: ``30``) Maximum amount of seconds to wait for a result or exception
214+
from ``func`` before giving up.
215+
:param _queue_sleep: (default: ``0.05``) Amount of time to AsyncIO sleep between each check of the result queue
216+
:return Any coro_res: The result returned from the coroutine ``func``
217+
"""
218+
_queue_timeout, _queue_sleep = float(_queue_timeout), float(_queue_sleep)
219+
thread_waited = 0.0
220+
q = queue.Queue()
221+
t_co = run_coro_thread_base(func, *args, **kwargs, _output_queue=q)
222+
223+
res = NO_RESULT
224+
while res == NO_RESULT:
225+
if thread_waited >= _queue_timeout:
226+
raise TimeoutError(f"No thread result after waiting {thread_waited} seconds...")
227+
try:
228+
_res = q.get_nowait()
229+
if isinstance(_res, (Exception, BaseException)):
230+
raise _res
231+
res = _res
232+
except queue.Empty:
233+
thread_waited += _queue_sleep
234+
await asyncio.sleep(_queue_sleep)
235+
t_co.join(5)
236+
237+
return res
238+
178239

179240
def run_sync(func, *args, **kwargs):
180241
"""

tests/asyncx/test_async_common.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import asyncio
22
import inspect
3+
import time
4+
from datetime import datetime
35
from time import sleep
46
from typing import Union, Coroutine, Type, Callable
57

@@ -275,4 +277,25 @@ async def another_func(lorem: int, ipsum: int):
275277
with self.assertRaisesRegex(AttributeError, 'ipsum over 100'):
276278
helpers.run_coro_thread(another_func, 5, 900)
277279

278-
280+
def test_run_coro_thread_async(self):
281+
"""
282+
Test :func:`.run_coro_thread_async` using :func:`asyncio.gather` to confirm coroutines with blocking synchronous code
283+
are being ran simultaneously despite their blocking synchronous code.
284+
"""
285+
rcta = helpers.run_coro_thread_async
286+
287+
async def _hello(world):
288+
# time.sleep is used in-place of a synchronous blocking function
289+
time.sleep(1)
290+
return world * 10
291+
292+
async def hello():
293+
return await asyncio.gather(rcta(_hello, 5), rcta(_hello, 20), rcta(_hello, 2), rcta(_hello, 80))
294+
295+
start = datetime.utcnow()
296+
res = helpers.loop_run(hello())
297+
end = datetime.utcnow()
298+
# After running 4 instances of _hello using run_coro_thread_async, check that no more than
299+
# 2 seconds have passed - if they were ran synchronously, they would've taken 4 or more seconds.
300+
self.assertLessEqual((end - start).total_seconds(), 2)
301+
self.assertListEqual(res, [50, 200, 20, 800])

0 commit comments

Comments
 (0)