11import inspect
22from abc import ABC , abstractmethod
33from functools import lru_cache , wraps
4- from typing import Any , Callable , Iterable , List , Optional , Sequence , Set , Type , Union
4+ from typing import Any , Callable , Iterable , Sequence , Type
55
66from rodi import ContainerProtocol
77
@@ -46,7 +46,7 @@ class RolesRequirement(Requirement):
4646
4747 __slots__ = ("_roles" ,)
4848
49- def __init__ (self , roles : Optional [ Sequence [str ]] = None ):
49+ def __init__ (self , roles : Sequence [str ] | None = None ):
5050 self ._roles = list (roles ) if roles else None
5151
5252 def handle (self , context : "AuthorizationContext" ):
@@ -61,7 +61,7 @@ def handle(self, context: "AuthorizationContext"):
6161 context .succeed (self )
6262
6363
64- RequirementConfType = Union [ Requirement , Type [Requirement ] ]
64+ RequirementConfType = Requirement | Type [Requirement ]
6565
6666
6767@lru_cache (maxsize = None )
@@ -79,11 +79,11 @@ class UnauthorizedError(AuthorizationError):
7979
8080 def __init__ (
8181 self ,
82- forced_failure : Optional [ str ] ,
82+ forced_failure : str | None ,
8383 failed_requirements : Sequence [Requirement ],
84- scheme : Optional [ str ] = None ,
85- error : Optional [ str ] = None ,
86- error_description : Optional [ str ] = None ,
84+ scheme : str | None = None ,
85+ error : str | None = None ,
86+ error_description : str | None = None ,
8787 ):
8888 """
8989 Creates a new instance of UnauthorizedError, with details.
@@ -132,11 +132,11 @@ class AuthorizationContext:
132132 def __init__ (self , identity : Identity , requirements : Sequence [Requirement ]):
133133 self .identity = identity
134134 self .requirements = requirements
135- self ._succeeded : Set [Requirement ] = set ()
136- self ._failed_forced : Optional [ str ] = None
135+ self ._succeeded : set [Requirement ] = set ()
136+ self ._failed_forced : str | None = None
137137
138138 @property
139- def pending_requirements (self ) -> List [Requirement ]:
139+ def pending_requirements (self ) -> list [Requirement ]:
140140 return [item for item in self .requirements if item not in self ._succeeded ]
141141
142142 @property
@@ -146,7 +146,7 @@ def has_succeeded(self) -> bool:
146146 return all (requirement in self ._succeeded for requirement in self .requirements )
147147
148148 @property
149- def forced_failure (self ) -> Optional [ str ] :
149+ def forced_failure (self ) -> str | None :
150150 return None if self ._failed_forced is None else str (self ._failed_forced )
151151
152152 def fail (self , reason : str ):
@@ -208,16 +208,16 @@ class AuthorizationStrategy(BaseStrategy):
208208 def __init__ (
209209 self ,
210210 * policies : Policy ,
211- container : Optional [ ContainerProtocol ] = None ,
212- default_policy : Optional [ Policy ] = None ,
213- identity_getter : Optional [ Callable [..., Identity ]] = None ,
211+ container : ContainerProtocol | None = None ,
212+ default_policy : Policy | None = None ,
213+ identity_getter : Callable [..., Identity ] | None = None ,
214214 ):
215215 super ().__init__ (container )
216216 self .policies = list (policies )
217217 self .default_policy = default_policy
218218 self .identity_getter = identity_getter
219219
220- def get_policy (self , name : str ) -> Optional [ Policy ] :
220+ def get_policy (self , name : str ) -> Policy | None :
221221 for policy in self .policies :
222222 if policy .name == name :
223223 return policy
@@ -237,10 +237,10 @@ def with_default_policy(self, policy: Policy) -> "AuthorizationStrategy":
237237
238238 async def authorize (
239239 self ,
240- policy_name : Optional [ str ] ,
240+ policy_name : str | None ,
241241 identity : Identity ,
242242 scope : Any = None ,
243- roles : Optional [ Sequence [str ]] = None ,
243+ roles : Sequence [str ] | None = None ,
244244 ):
245245 if policy_name :
246246 policy = self .get_policy (policy_name )
@@ -268,7 +268,7 @@ async def authorize(
268268 raise UnauthorizedError ("The resource requires authentication" , [])
269269
270270 def _get_requirements (
271- self , policy : Policy , scope : Any , roles : Optional [ Sequence [str ]] = None
271+ self , policy : Policy , scope : Any , roles : Sequence [str ] | None = None
272272 ) -> Iterable [Requirement ]:
273273 if roles :
274274 yield RolesRequirement (roles = roles )
@@ -279,15 +279,15 @@ async def _handle_with_policy(
279279 policy : Policy ,
280280 identity : Identity ,
281281 scope : Any ,
282- roles : Optional [ Sequence [str ]] = None ,
282+ roles : Sequence [str ] | None = None ,
283283 ):
284284 with AuthorizationContext (
285285 identity , list (self ._get_requirements (policy , scope , roles ))
286286 ) as context :
287287 await self ._handle_context (identity , context )
288288
289289 async def _handle_with_roles (
290- self , identity : Identity , roles : Optional [ Sequence [str ]] = None
290+ self , identity : Identity , roles : Sequence [str ] | None = None
291291 ):
292292 # This method is to be used only when the user specified roles without a policy
293293 with AuthorizationContext (identity , [RolesRequirement (roles = roles )]) as context :
@@ -310,13 +310,13 @@ async def _handle_context(self, identity: Identity, context: AuthorizationContex
310310 )
311311
312312 async def _handle_with_identity_getter (
313- self , policy_name : Optional [ str ] , * args , ** kwargs
313+ self , policy_name : str | None , * args , ** kwargs
314314 ):
315315 if self .identity_getter is None :
316316 raise TypeError ("Missing identity getter function." )
317317 await self .authorize (policy_name , self .identity_getter (* args , ** kwargs ))
318318
319- def __call__ (self , policy : Optional [ str ] = None ):
319+ def __call__ (self , policy : str | None = None ):
320320 """
321321 Decorates a function to apply authorization logic on each call.
322322 """
0 commit comments