-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathfaststream.py
More file actions
136 lines (114 loc) · 5.35 KB
/
faststream.py
File metadata and controls
136 lines (114 loc) · 5.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import typing
from importlib.metadata import version
from types import TracebackType
from typing import Any, Final, Optional
from packaging.version import Version
from typing_extensions import deprecated, override
from that_depends import container_context
from that_depends.providers.context_resources import ContextScope, SupportsContext
from that_depends.utils import UNSET, Unset, is_set
_FASTSTREAM_MODULE_NAME: Final[str] = "faststream"
_FASTSTREAM_VERSION: Final[str] = version(_FASTSTREAM_MODULE_NAME)
if Version(_FASTSTREAM_VERSION) >= Version("0.6.0"): # pragma: no cover
from faststream import BaseMiddleware, ContextRepo
from faststream._internal.types import AnyMsg
class DIContextMiddleware(BaseMiddleware):
"""Initializes the container context for faststream brokers."""
def __init__(
self,
*context_items: SupportsContext[Any],
msg: AnyMsg | None = None,
context: Optional["ContextRepo"] = None,
global_context: dict[str, Any] | Unset = UNSET,
scope: ContextScope | Unset = UNSET,
) -> None:
"""Initialize the container context middleware.
Args:
*context_items (SupportsContext[Any]): Context items to initialize.
msg (Any): Message object.
context (ContextRepo): Context repository.
global_context (dict[str, Any] | Unset): Global context to initialize the container.
scope (ContextScope | Unset): Context scope to initialize the container.
"""
super().__init__(msg, context=context) # type: ignore[arg-type]
self._context: container_context | None = None
self._context_items = set(context_items)
self._global_context = global_context
self._scope = scope
@override
async def on_receive(self) -> None:
self._context = container_context(
*self._context_items,
scope=self._scope if is_set(self._scope) else None,
global_context=self._global_context if is_set(self._global_context) else None,
)
await self._context.__aenter__()
@override
async def after_processed(
self,
exc_type: type[BaseException] | None = None,
exc_val: BaseException | None = None,
exc_tb: Optional["TracebackType"] = None,
) -> bool | None:
if self._context is not None:
await self._context.__aexit__(exc_type, exc_val, exc_tb)
return None
def __call__(self, msg: Any = None, **kwargs: Any) -> "DIContextMiddleware": # noqa: ANN401
"""Create an instance of DIContextMiddleware.
Args:
msg (Any): Message object.
**kwargs: Additional keyword arguments.
Returns:
DIContextMiddleware: A new instance of DIContextMiddleware.
"""
context = kwargs.get("context")
return DIContextMiddleware(
*self._context_items,
msg=msg,
context=context,
scope=self._scope,
global_context=self._global_context,
)
else: # pragma: no cover
from faststream import BaseMiddleware
@deprecated("Will be removed with faststream v1")
class DIContextMiddleware(BaseMiddleware): # type: ignore[no-redef]
"""Initializes the container context for faststream brokers."""
def __init__(
self,
*context_items: SupportsContext[Any],
global_context: dict[str, Any] | Unset = UNSET,
scope: ContextScope | Unset = UNSET,
) -> None:
"""Initialize the container context middleware.
Args:
*context_items (SupportsContext[Any]): Context items to initialize.
global_context (dict[str, Any] | Unset): Global context to initialize the container.
scope (ContextScope | Unset): Context scope to initialize the container.
"""
super().__init__() # type: ignore[call-arg]
self._context: container_context | None = None
self._context_items = set(context_items)
self._global_context = global_context
self._scope = scope
@override
async def on_receive(self) -> None:
self._context = container_context(
*self._context_items,
scope=self._scope if is_set(self._scope) else None,
global_context=self._global_context if is_set(self._global_context) else None,
)
await self._context.__aenter__()
@override
async def after_processed(
self,
exc_type: type[BaseException] | None = None,
exc_val: BaseException | None = None,
exc_tb: Optional["TracebackType"] = None,
) -> bool | None:
if self._context is not None:
await self._context.__aexit__(exc_type, exc_val, exc_tb)
return None
def __call__(self, *args: typing.Any, **kwargs: typing.Any) -> "DIContextMiddleware": # noqa: ARG002, ANN401
"""Create an instance of DIContextMiddleware."""
return DIContextMiddleware(*self._context_items, scope=self._scope, global_context=self._global_context)