forked from taskiq-python/taskiq
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinstrumentation.py
More file actions
145 lines (116 loc) · 4.52 KB
/
instrumentation.py
File metadata and controls
145 lines (116 loc) · 4.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""
Instrument `taskiq`_ to trace Taskiq applications.
.. _taskiq: https://pypi.org/project/taskiq/
Usage
-----
* Run instrumented task
.. code:: python
import asyncio
from taskiq import InMemoryBroker
from taskiq.instrumentation import TaskiqInstrumentor
broker = InMemoryBroker()
@broker.task
async def add(x, y):
return x + y
async def main():
TaskiqInstrumentor().instrument()
await broker.startup()
await my_task.kiq(1, 2)
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
API
---
"""
import logging
from collections.abc import Callable, Collection
from typing import Any
from weakref import WeakSet as _WeakSet
from taskiq.cli.worker.args import WorkerArgs
try:
import opentelemetry # noqa: F401
except ImportError as exc:
raise ImportError(
"Cannot instrument. Please install 'taskiq[opentelemetry]'.",
) from exc
from opentelemetry.instrumentation.auto_instrumentation import initialize
from opentelemetry.instrumentation.instrumentor import ( # type: ignore[attr-defined]
BaseInstrumentor,
)
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.metrics import MeterProvider
from opentelemetry.trace import TracerProvider
from wrapt import wrap_function_wrapper
import taskiq.cli.worker.run
from taskiq import AsyncBroker
from taskiq.middlewares.opentelemetry_middleware import OpenTelemetryMiddleware
logger = logging.getLogger("taskiq.opentelemetry")
class TaskiqInstrumentor(BaseInstrumentor):
"""OpenTelemetry instrumentor for Taskiq."""
_instrumented_brokers: _WeakSet[AsyncBroker] = _WeakSet()
_original_start_listen: Callable[
[WorkerArgs],
None,
] = taskiq.cli.worker.run.start_listen
def __init__(self) -> None:
super().__init__()
self._middleware = None
def instrument_broker(
self,
broker: AsyncBroker,
tracer_provider: TracerProvider | None = None,
meter_provider: MeterProvider | None = None,
) -> None:
"""Instrument broker."""
if not hasattr(broker, "_is_instrumented_by_opentelemetry"):
broker._is_instrumented_by_opentelemetry = False # type: ignore[attr-defined] # noqa: SLF001
if not getattr(broker, "is_instrumented_by_opentelemetry", False):
broker.middlewares.insert(
0,
OpenTelemetryMiddleware(
tracer_provider=tracer_provider,
meter_provider=meter_provider,
),
)
broker._is_instrumented_by_opentelemetry = True # type: ignore[attr-defined] # noqa: SLF001
if broker not in self._instrumented_brokers:
self._instrumented_brokers.add(broker)
else:
logger.warning(
"Attempting to instrument taskiq broker while already instrumented",
)
def uninstrument_broker(self, broker: AsyncBroker) -> None:
"""Uninstrument broker."""
broker.middlewares = [
middleware
for middleware in broker.middlewares
if not isinstance(middleware, OpenTelemetryMiddleware)
]
broker._is_instrumented_by_opentelemetry = False # type: ignore[attr-defined] # noqa: SLF001
self._instrumented_brokers.discard(broker)
def instrumentation_dependencies(self) -> Collection[str]:
"""This function tells which library this instrumentor instruments."""
return ("taskiq >= 0.0.0",)
@classmethod
def _start_listen_with_initialize(cls, args: WorkerArgs) -> None:
initialize()
cls._original_start_listen(args)
def _instrument(self, **kwargs: Any) -> None:
def broker_init(
init: Callable[[Any], Any],
broker: AsyncBroker,
args: Any,
kwargs: Any,
) -> None:
result = init(*args, **kwargs)
self.instrument_broker(broker)
return result
wrap_function_wrapper("taskiq.abc.broker", "AsyncBroker.__init__", broker_init)
taskiq.cli.worker.run.start_listen = self._start_listen_with_initialize
def _uninstrument(self, **kwargs: Any) -> None:
instances_to_uninstrument = list(self._instrumented_brokers)
for broker in instances_to_uninstrument:
self.uninstrument_broker(broker)
self._instrumented_brokers.clear()
unwrap(AsyncBroker, "__init__")
taskiq.cli.worker.run.start_listen = self._original_start_listen # type: ignore[assignment]