-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_faststream_bootstrap.py
More file actions
110 lines (87 loc) · 4.05 KB
/
test_faststream_bootstrap.py
File metadata and controls
110 lines (87 loc) · 4.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import typing
import faststream.asgi
import pytest
import structlog
from faststream._internal.broker import BrokerUsecase
from faststream.redis import RedisBroker, TestRedisBroker
from faststream.redis.opentelemetry import RedisTelemetryMiddleware
from faststream.redis.prometheus import RedisPrometheusMiddleware
from starlette import status
from starlette.testclient import TestClient
from lite_bootstrap import FastStreamBootstrapper, FastStreamConfig
from tests.conftest import CustomInstrumentor, SentryTestTransport, emulate_package_missing
logger = structlog.getLogger(__name__)
@pytest.fixture
def broker() -> RedisBroker:
return RedisBroker()
def build_faststream_config(
broker: BrokerUsecase[typing.Any, typing.Any] | None = None,
) -> FastStreamConfig:
return FastStreamConfig(
service_name="microservice",
service_version="2.0.0",
service_environment="test",
service_debug=False,
opentelemetry_instrumentors=[CustomInstrumentor()],
opentelemetry_log_traces=True,
opentelemetry_middleware_cls=RedisTelemetryMiddleware,
prometheus_metrics_path="/custom-metrics/",
prometheus_middleware_cls=RedisPrometheusMiddleware,
sentry_dsn="https://testdsn@localhost/1",
sentry_additional_params={"transport": SentryTestTransport()},
health_checks_path="/custom-health/",
logging_buffer_capacity=0,
application=faststream.asgi.AsgiFastStream(
broker,
asyncapi_path=faststream.asgi.AsyncAPIRoute("/docs/"),
specification=faststream.AsyncAPI(),
),
)
async def test_faststream_bootstrap(broker: RedisBroker) -> None:
bootstrap_config = build_faststream_config(broker=broker)
bootstrapper = FastStreamBootstrapper(bootstrap_config=bootstrap_config)
application = bootstrapper.bootstrap()
assert bootstrapper.is_bootstrapped
logger.info("testing logging", key="value")
with TestClient(app=application) as test_client:
async with TestRedisBroker(broker):
response = test_client.get(bootstrap_config.health_checks_path)
assert response.status_code == status.HTTP_200_OK
assert response.json() == {
"health_status": True,
"service_name": "microservice",
"service_version": "2.0.0",
}
response = test_client.get(bootstrap_config.prometheus_metrics_path)
assert response.status_code == status.HTTP_200_OK
response = test_client.get("/docs/")
assert response.status_code == status.HTTP_200_OK
assert not bootstrapper.is_bootstrapped
async def test_faststream_bootstrap_health_check_wo_broker() -> None:
bootstrap_config = build_faststream_config()
bootstrapper = FastStreamBootstrapper(bootstrap_config=bootstrap_config)
application = bootstrapper.bootstrap()
test_client = TestClient(app=application)
response = test_client.get(bootstrap_config.health_checks_path)
assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
assert response.text == "Service is unhealthy"
bootstrapper.teardown()
def test_faststream_config_default_application() -> None:
config = FastStreamConfig()
assert isinstance(config.application, faststream.asgi.AsgiFastStream)
def test_faststream_bootstrapper_not_ready() -> None:
with emulate_package_missing("faststream"), pytest.raises(RuntimeError, match="faststream is not installed"):
FastStreamBootstrapper(bootstrap_config=FastStreamConfig(application=faststream.asgi.AsgiFastStream()))
@pytest.mark.parametrize(
"package_name",
[
"opentelemetry",
"sentry_sdk",
"structlog",
"prometheus_client",
],
)
def test_faststream_bootstrapper_with_missing_instrument_dependency(broker: RedisBroker, package_name: str) -> None:
bootstrap_config = build_faststream_config(broker=broker)
with emulate_package_missing(package_name), pytest.warns(UserWarning, match=package_name):
FastStreamBootstrapper(bootstrap_config=bootstrap_config)