-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy path_featuremanager.py
More file actions
186 lines (160 loc) · 8 KB
/
_featuremanager.py
File metadata and controls
186 lines (160 loc) · 8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# ------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------
import inspect
import logging
from typing import cast, overload, Any, Optional, Dict, Mapping, List, Tuple
from ._defaultfilters import TimeWindowFilter, TargetingFilter
from ._featurefilters import FeatureFilter
from .._models import EvaluationEvent, Variant, TargetingContext
from .._featuremanagerbase import (
FeatureManagerBase,
PROVIDED_FEATURE_FILTERS,
REQUIREMENT_TYPE_ALL,
FEATURE_FILTER_NAME,
)
logger = logging.getLogger(__name__)
class FeatureManager(FeatureManagerBase):
"""
Feature Manager that determines if a feature flag is enabled for the given context.
:param Mapping configuration: Configuration object.
:keyword list[FeatureFilter] feature_filters: Custom filters to be used for evaluating feature flags.
:keyword Callable[EvaluationEvent] on_feature_evaluated: Callback function to be called when a feature flag is
evaluated.
:keyword Callable[[], TargetingContext] targeting_context_accessor: Callback function to get the current targeting
context if one isn't provided.
"""
def __init__(self, configuration: Mapping[str, Any], **kwargs: Any):
super().__init__(configuration, **kwargs)
self._filters: Dict[str, FeatureFilter] = {}
filters = [TimeWindowFilter(), TargetingFilter()] + cast(
List[FeatureFilter], kwargs.pop(PROVIDED_FEATURE_FILTERS, [])
)
for feature_filter in filters:
if not isinstance(feature_filter, FeatureFilter):
raise ValueError("Custom filter must be a subclass of FeatureFilter")
self._filters[feature_filter.name] = feature_filter
@overload # type: ignore
async def is_enabled(self, feature_flag_id: str, user_id: str, **kwargs: Any) -> bool:
"""
Determine if the feature flag is enabled for the given context.
:param str feature_flag_id: Name of the feature flag.
:param str user_id: User identifier.
:return: True if the feature flag is enabled for the given context.
:rtype: bool
"""
async def is_enabled(self, feature_flag_id: str, *args: Any, **kwargs: Any) -> bool:
"""
Determine if the feature flag is enabled for the given context.
:param str feature_flag_id: Name of the feature flag.
:return: True if the feature flag is enabled for the given context.
:rtype: bool
"""
targeting_context: TargetingContext = await self._build_targeting_context_async(args)
result = await self._check_feature(feature_flag_id, targeting_context, **kwargs)
if (
self._on_feature_evaluated
and result.feature
and result.feature.telemetry.enabled
and callable(self._on_feature_evaluated)
):
result.user = targeting_context.user_id
if inspect.iscoroutinefunction(self._on_feature_evaluated):
await self._on_feature_evaluated(result)
else:
self._on_feature_evaluated(result)
return result.enabled
@overload # type: ignore
async def get_variant(self, feature_flag_id: str, user_id: str, **kwargs: Any) -> Optional[Variant]:
"""
Determine the variant for the given context.
:param str feature_flag_id: Name of the feature flag.
:param str user_id: User identifier.
:return: return: Variant instance.
:rtype: Variant
"""
async def get_variant(self, feature_flag_id: str, *args: Any, **kwargs: Any) -> Optional[Variant]:
"""
Determine the variant for the given context.
:param str feature_flag_id: Name of the feature flag
:keyword TargetingContext targeting_context: Targeting context.
:return: Variant instance.
:rtype: Variant
"""
targeting_context: TargetingContext = await self._build_targeting_context_async(args)
result = await self._check_feature(feature_flag_id, targeting_context, **kwargs)
if (
self._on_feature_evaluated
and result.feature
and result.feature.telemetry.enabled
and callable(self._on_feature_evaluated)
):
result.user = targeting_context.user_id
if inspect.iscoroutinefunction(self._on_feature_evaluated):
await self._on_feature_evaluated(result)
else:
self._on_feature_evaluated(result)
return result.variant
async def _build_targeting_context_async(self, args: Tuple[Any]) -> TargetingContext:
targeting_context = super()._build_targeting_context(args)
if targeting_context:
return targeting_context
if not targeting_context and self._targeting_context_accessor and callable(self._targeting_context_accessor):
if inspect.iscoroutinefunction(self._targeting_context_accessor):
# If a targeting_context_accessor is provided, return the TargetingContext from it
targeting_context = await self._targeting_context_accessor()
else:
targeting_context = self._targeting_context_accessor()
if targeting_context and isinstance(targeting_context, TargetingContext):
return targeting_context
logger.warning(
"targeting_context_accessor did not return a TargetingContext. Received type %s.",
type(targeting_context),
)
return TargetingContext()
async def _check_feature_filters(
self, evaluation_event: EvaluationEvent, targeting_context: TargetingContext, **kwargs: Any
) -> None:
feature_flag = evaluation_event.feature
if not feature_flag:
return
feature_conditions = feature_flag.conditions
feature_filters = feature_conditions.client_filters
if len(feature_filters) == 0:
# Feature flags without any filters return evaluate
evaluation_event.enabled = True
else:
# The assumed value is no filters is based on the requirement type.
# Requirement type Any assumes false until proven true, All assumes true until proven false
evaluation_event.enabled = feature_conditions.requirement_type == REQUIREMENT_TYPE_ALL
for feature_filter in feature_filters:
filter_name = feature_filter[FEATURE_FILTER_NAME]
kwargs["user"] = targeting_context.user_id
kwargs["groups"] = targeting_context.groups
if filter_name not in self._filters:
raise ValueError(f"Feature flag {feature_flag.name} has unknown filter {filter_name}")
if feature_conditions.requirement_type == REQUIREMENT_TYPE_ALL:
if not await self._filters[filter_name].evaluate(feature_filter, **kwargs):
evaluation_event.enabled = False
break
elif await self._filters[filter_name].evaluate(feature_filter, **kwargs):
evaluation_event.enabled = True
break
async def _check_feature(
self, feature_flag_id: str, targeting_context: TargetingContext, **kwargs: Any
) -> EvaluationEvent:
"""
Determine if the feature flag is enabled for the given context.
:param str feature_flag_id: Name of the feature flag.
:param TargetingContext targeting_context: Targeting context.
:return: EvaluationEvent for the given context.
:rtype: EvaluationEvent
"""
evaluation_event, done = super()._check_feature_base(feature_flag_id)
if done:
return evaluation_event
await self._check_feature_filters(evaluation_event, targeting_context, **kwargs)
self._assign_allocation(evaluation_event, targeting_context)
return evaluation_event