-
Notifications
You must be signed in to change notification settings - Fork 348
Expand file tree
/
Copy pathasync_guard.py
More file actions
43 lines (33 loc) · 1.33 KB
/
async_guard.py
File metadata and controls
43 lines (33 loc) · 1.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
""" Utils for async """
import functools
import logging
import threading
import traceback
# NOTE: Global, but needed to be able to use ensure_not_async() in
# decorator context.
_ASYNC_SENTINELS: dict[int, bool] = {}
logger = logging.getLogger(__name__)
def set_async_sentinel(enable: bool):
""" Register a function to validate if async is running """
_ASYNC_SENTINELS[threading.get_ident()] = enable
def ensure_not_async(fn):
""" Decorator that will ensure that the function is not called if async
is running.
"""
@functools.wraps(fn)
def async_guard_wrap(*args, **kwargs):
if _ASYNC_SENTINELS.get(threading.get_ident(), False):
st = "".join(traceback.format_stack())
logger.debug("Traceback:\n%s", st.rstrip())
raise RuntimeError(f"Calling a blocking function, {fn.__qualname__}() in {fn.__code__.co_filename}:{fn.__code__.co_firstlineno}, while running async")
return fn(*args, **kwargs)
return async_guard_wrap
class AllowBlocking:
""" Context manager to pause async guard """
def __init__(self):
self._enabled = _ASYNC_SENTINELS.get(threading.get_ident(), False)
def __enter__(self):
set_async_sentinel(False)
return self
def __exit__(self, exc_type, exc_value, traceback):
set_async_sentinel(self._enabled)