-
Notifications
You must be signed in to change notification settings - Fork 606
Expand file tree
/
Copy pathasyncio.py
More file actions
209 lines (162 loc) · 7.08 KB
/
asyncio.py
File metadata and controls
209 lines (162 loc) · 7.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import sys
import functools
import sentry_sdk
from sentry_sdk.consts import OP
from sentry_sdk.integrations import Integration, DidNotEnable
from sentry_sdk.integrations._wsgi_common import nullcontext
from sentry_sdk.utils import event_from_exception, logger, reraise
try:
import asyncio
from asyncio.tasks import Task
except ImportError:
raise DidNotEnable("asyncio not available")
from typing import cast, TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any, Callable, TypeVar
from collections.abc import Coroutine
from sentry_sdk._types import ExcInfo
T = TypeVar("T", bound=Callable[..., Any])
def get_name(coro: "Any") -> str:
return (
getattr(coro, "__qualname__", None)
or getattr(coro, "__name__", None)
or "coroutine without __name__"
)
def _wrap_coroutine(wrapped: "Coroutine[Any, Any, Any]") -> "Callable[[T], T]":
# Only __name__ and __qualname__ are copied from function to coroutine in CPython
return functools.partial(
functools.update_wrapper,
wrapped=wrapped, # type: ignore
assigned=("__name__", "__qualname__"),
updated=(),
)
def patch_asyncio() -> None:
orig_task_factory = None
try:
loop = asyncio.get_running_loop()
orig_task_factory = loop.get_task_factory()
# Check if already patched
if getattr(orig_task_factory, "_is_sentry_task_factory", False):
return
def _sentry_task_factory(
loop: "asyncio.AbstractEventLoop",
coro: "Coroutine[Any, Any, Any]",
**kwargs: "Any",
) -> "asyncio.Future[Any]":
# we get the current span here, as later it may already be closed
in_span = sentry_sdk.get_current_span() is not None
headers = dict(
sentry_sdk.get_current_scope().iter_trace_propagation_headers()
)
@_wrap_coroutine(coro)
async def _task_with_sentry_span_creation() -> "Any":
result = None
integration = sentry_sdk.get_client().get_integration(
AsyncioIntegration
)
task_spans = integration.task_spans if integration else False
if task_spans and in_span:
transaction = sentry_sdk.continue_trace(
headers, op="task", name="downstream"
)
ctx = sentry_sdk.start_transaction(transaction)
else:
ctx = nullcontext()
with sentry_sdk.isolation_scope():
with ctx:
try:
result = await coro
except StopAsyncIteration as e:
raise e from None
except Exception:
reraise(*_capture_exception())
return result
task = None
# Trying to use user set task factory (if there is one)
if orig_task_factory:
task = orig_task_factory(
loop, _task_with_sentry_span_creation(), **kwargs
)
if task is None:
# The default task factory in `asyncio` does not have its own function
# but is just a couple of lines in `asyncio.base_events.create_task()`
# Those lines are copied here.
# WARNING:
# If the default behavior of the task creation in asyncio changes,
# this will break!
task = Task(_task_with_sentry_span_creation(), loop=loop, **kwargs)
if task._source_traceback: # type: ignore
del task._source_traceback[-1] # type: ignore
# Set the task name to include the original coroutine's name
try:
cast("asyncio.Task[Any]", task).set_name(
f"{get_name(coro)} (Sentry-wrapped)"
)
except AttributeError:
# set_name might not be available in all Python versions
pass
return task
_sentry_task_factory._is_sentry_task_factory = True # type: ignore
loop.set_task_factory(_sentry_task_factory) # type: ignore
except RuntimeError:
# When there is no running loop, we have nothing to patch.
logger.warning(
"There is no running asyncio loop so there is nothing Sentry can patch. "
"Please make sure you call sentry_sdk.init() within a running "
"asyncio loop for the AsyncioIntegration to work. "
"See https://docs.sentry.io/platforms/python/integrations/asyncio/"
)
def _capture_exception() -> "ExcInfo":
exc_info = sys.exc_info()
client = sentry_sdk.get_client()
integration = client.get_integration(AsyncioIntegration)
if integration is not None:
event, hint = event_from_exception(
exc_info,
client_options=client.options,
mechanism={"type": "asyncio", "handled": False},
)
sentry_sdk.capture_event(event, hint=hint)
return exc_info
class AsyncioIntegration(Integration):
identifier = "asyncio"
origin = f"auto.function.{identifier}"
def __init__(self, task_spans: bool = True) -> None:
self.task_spans = task_spans
@staticmethod
def setup_once() -> None:
patch_asyncio()
def enable_asyncio_integration(*args: "Any", **kwargs: "Any") -> None:
"""
Enable AsyncioIntegration with the provided options.
This is useful in scenarios where Sentry needs to be initialized before
an event loop is set up, but you still want to instrument asyncio once there
is an event loop. In that case, you can sentry_sdk.init() early on without
the AsyncioIntegration and then, once the event loop has been set up,
execute:
```python
from sentry_sdk.integrations.asyncio import enable_asyncio_integration
async def async_entrypoint():
enable_asyncio_integration()
```
Any arguments provided will be passed to AsyncioIntegration() as is.
If AsyncioIntegration has already patched the current event loop, this
function won't have any effect.
If AsyncioIntegration was provided in
sentry_sdk.init(disabled_integrations=[...]), this function will ignore that
and the integration will be enabled.
"""
client = sentry_sdk.get_client()
if not client.is_active():
return
# This function purposefully bypasses the integration machinery in
# integrations/__init__.py. _installed_integrations/_processed_integrations
# is used to prevent double patching the same module, but in the case of
# the AsyncioIntegration, we don't monkeypatch the standard library directly,
# we patch the currently running event loop, and we keep the record of doing
# that on the loop itself.
logger.debug("Setting up integration asyncio")
integration = AsyncioIntegration(*args, **kwargs)
integration.setup_once()
if "asyncio" not in client.integrations:
client.integrations["asyncio"] = integration