-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfaststream_bootstrapper.py
More file actions
179 lines (137 loc) · 6.97 KB
/
faststream_bootstrapper.py
File metadata and controls
179 lines (137 loc) · 6.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import dataclasses
import json
import typing
from lite_bootstrap import import_checker
from lite_bootstrap.bootstrappers.base import BaseBootstrapper
from lite_bootstrap.instruments.healthchecks_instrument import HealthChecksConfig, HealthChecksInstrument
from lite_bootstrap.instruments.logging_instrument import LoggingConfig, LoggingInstrument
from lite_bootstrap.instruments.opentelemetry_instrument import OpentelemetryConfig, OpenTelemetryInstrument
from lite_bootstrap.instruments.prometheus_instrument import PrometheusConfig, PrometheusInstrument
from lite_bootstrap.instruments.pyroscope_instrument import PyroscopeConfig, PyroscopeInstrument
from lite_bootstrap.instruments.sentry_instrument import SentryConfig, SentryInstrument
if import_checker.is_faststream_installed:
from faststream.asgi import AsgiFastStream, AsgiResponse
from faststream.asgi import get as handle_get
if import_checker.is_prometheus_client_installed:
import prometheus_client
if import_checker.is_opentelemetry_installed:
from opentelemetry import trace
from opentelemetry.metrics import Meter, MeterProvider
from opentelemetry.trace import TracerProvider, get_tracer_provider
tracer: typing.Final = trace.get_tracer(__name__)
@typing.runtime_checkable
class FastStreamTelemetryMiddlewareProtocol(typing.Protocol):
def __init__(
self,
*,
tracer_provider: typing.Optional["TracerProvider"] = None,
meter_provider: typing.Optional["MeterProvider"] = None,
meter: typing.Optional["Meter"] = None,
include_messages_counters: bool = True,
) -> None: ...
@typing.runtime_checkable
class FastStreamPrometheusMiddlewareProtocol(typing.Protocol):
def __init__(
self,
*,
registry: "prometheus_client.CollectorRegistry",
app_name: str = ...,
metrics_prefix: str = "faststream",
received_messages_size_buckets: typing.Sequence[float] | None = None,
) -> None: ...
def _make_asgi_faststream() -> "AsgiFastStream":
return AsgiFastStream()
@dataclasses.dataclass(kw_only=True, slots=True, frozen=True)
class FastStreamConfig(
HealthChecksConfig, LoggingConfig, OpentelemetryConfig, PrometheusConfig, PyroscopeConfig, SentryConfig
):
application: "AsgiFastStream" = dataclasses.field(default_factory=_make_asgi_faststream)
opentelemetry_middleware_cls: type[FastStreamTelemetryMiddlewareProtocol] | None = None
prometheus_middleware_cls: type[FastStreamPrometheusMiddlewareProtocol] | None = None
@dataclasses.dataclass(kw_only=True, slots=True, frozen=True)
class FastStreamHealthChecksInstrument(HealthChecksInstrument):
bootstrap_config: FastStreamConfig
def bootstrap(self) -> None:
@handle_get
async def check_health(_: object) -> "AsgiResponse":
return (
AsgiResponse(
json.dumps(self.render_health_check_data()).encode(),
200,
headers={"content-type": "application/json"},
)
if await self._define_health_status()
else AsgiResponse(b"Service is unhealthy", 500, headers={"content-type": "text/plain"})
)
if self.bootstrap_config.opentelemetry_generate_health_check_spans:
check_health = tracer.start_as_current_span(f"GET {self.bootstrap_config.health_checks_path}")(
check_health,
)
self.bootstrap_config.application.mount(self.bootstrap_config.health_checks_path, check_health)
async def _define_health_status(self) -> bool:
if not self.bootstrap_config.application or not self.bootstrap_config.application.broker:
return False
return await self.bootstrap_config.application.broker.ping(timeout=5)
@dataclasses.dataclass(kw_only=True, frozen=True)
class FastStreamLoggingInstrument(LoggingInstrument):
bootstrap_config: FastStreamConfig
@dataclasses.dataclass(kw_only=True, frozen=True)
class FastStreamOpenTelemetryInstrument(OpenTelemetryInstrument):
bootstrap_config: FastStreamConfig
not_ready_message = OpenTelemetryInstrument.not_ready_message + " or opentelemetry_middleware_cls is empty"
def is_ready(self) -> bool:
return super().is_ready() and bool(self.bootstrap_config.opentelemetry_middleware_cls)
def bootstrap(self) -> None:
if self.bootstrap_config.opentelemetry_middleware_cls and self.bootstrap_config.application.broker:
self.bootstrap_config.application.broker.add_middleware(
self.bootstrap_config.opentelemetry_middleware_cls(tracer_provider=get_tracer_provider())
)
@dataclasses.dataclass(kw_only=True, frozen=True)
class FastStreamSentryInstrument(SentryInstrument):
bootstrap_config: FastStreamConfig
def _make_collector_registry() -> "prometheus_client.CollectorRegistry":
return prometheus_client.CollectorRegistry()
@dataclasses.dataclass(kw_only=True, frozen=True)
class FastStreamPrometheusInstrument(PrometheusInstrument):
bootstrap_config: FastStreamConfig
collector_registry: "prometheus_client.CollectorRegistry" = dataclasses.field(
default_factory=_make_collector_registry, init=False
)
not_ready_message = PrometheusInstrument.not_ready_message + " or prometheus_middleware_cls is missing"
missing_dependency_message = "prometheus_client is not installed"
def is_ready(self) -> bool:
return (
super().is_ready()
and import_checker.is_prometheus_client_installed
and bool(self.bootstrap_config.prometheus_middleware_cls)
)
@staticmethod
def check_dependencies() -> bool:
return import_checker.is_prometheus_client_installed
def bootstrap(self) -> None:
self.bootstrap_config.application.mount(
self.bootstrap_config.prometheus_metrics_path, prometheus_client.make_asgi_app(self.collector_registry)
)
if self.bootstrap_config.prometheus_middleware_cls and self.bootstrap_config.application.broker:
self.bootstrap_config.application.broker.add_middleware(
self.bootstrap_config.prometheus_middleware_cls(registry=self.collector_registry)
)
class FastStreamBootstrapper(BaseBootstrapper["AsgiFastStream"]):
__slots__ = "bootstrap_config", "instruments"
instruments_types: typing.ClassVar = [
FastStreamOpenTelemetryInstrument,
PyroscopeInstrument,
FastStreamSentryInstrument,
FastStreamHealthChecksInstrument,
FastStreamLoggingInstrument,
FastStreamPrometheusInstrument,
]
bootstrap_config: FastStreamConfig
not_ready_message = "faststream is not installed"
def is_ready(self) -> bool:
return import_checker.is_faststream_installed
def __init__(self, bootstrap_config: FastStreamConfig) -> None:
super().__init__(bootstrap_config)
self.bootstrap_config.application.on_shutdown(self.teardown)
def _prepare_application(self) -> "AsgiFastStream":
return self.bootstrap_config.application