-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy path_send_telemetry.py
More file actions
154 lines (123 loc) · 5.91 KB
/
_send_telemetry.py
File metadata and controls
154 lines (123 loc) · 5.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import logging
import inspect
from typing import Any, Callable, Dict, Optional
from .._models import VariantAssignmentReason, EvaluationEvent, TargetingContext
logger = logging.getLogger(__name__)
try:
from azure.monitor.events.extension import track_event as azure_monitor_track_event # type: ignore
from opentelemetry.context.context import Context
from opentelemetry.sdk.trace import Span, SpanProcessor
HAS_AZURE_MONITOR_EVENTS_EXTENSION = True
except ImportError:
HAS_AZURE_MONITOR_EVENTS_EXTENSION = False
logger.warning(
"azure-monitor-events-extension is not installed. Telemetry will not be sent to Application Insights."
)
SpanProcessor = object # type: ignore
Span = object # type: ignore
Context = object # type: ignore
FEATURE_NAME = "FeatureName"
ENABLED = "Enabled"
TARGETING_ID = "TargetingId"
VARIANT = "Variant"
REASON = "VariantAssignmentReason"
DEFAULT_WHEN_ENABLED = "DefaultWhenEnabled"
VERSION = "Version"
VARIANT_ASSIGNMENT_PERCENTAGE = "VariantAssignmentPercentage"
MICROSOFT_TARGETING_ID = "Microsoft.TargetingId"
EVENT_NAME = "FeatureEvaluation"
EVALUATION_EVENT_VERSION = "1.0.0"
def track_event(event_name: str, user: str, event_properties: Optional[Dict[str, Optional[str]]] = None) -> None:
"""
Tracks an event with the specified name and properties.
:param str event_name: The name of the event.
:param str user: The user ID to associate with the event.
:param dict[str, str] event_properties: A dictionary of named string properties.
"""
if not HAS_AZURE_MONITOR_EVENTS_EXTENSION:
return
event_properties = event_properties or {}
if user:
event_properties[TARGETING_ID] = user
azure_monitor_track_event(event_name, event_properties)
def publish_telemetry(evaluation_event: EvaluationEvent) -> None:
"""
Publishes the telemetry for a feature's evaluation event.
:param EvaluationEvent evaluation_event: The evaluation event to publish telemetry for.
"""
if not HAS_AZURE_MONITOR_EVENTS_EXTENSION:
return
feature = evaluation_event.feature
if not feature:
return
event: Dict[str, Optional[str]] = {
FEATURE_NAME: feature.name,
ENABLED: str(evaluation_event.enabled),
VERSION: EVALUATION_EVENT_VERSION,
}
reason = evaluation_event.reason
variant = evaluation_event.variant
event[REASON] = reason.value
if variant:
event[VARIANT] = variant.name
# VariantAllocationPercentage
allocation_percentage = 0
if reason == VariantAssignmentReason.DEFAULT_WHEN_ENABLED:
event[VARIANT_ASSIGNMENT_PERCENTAGE] = str(100)
if feature.allocation:
for allocation in feature.allocation.percentile:
allocation_percentage += allocation.percentile_to - allocation.percentile_from
event[VARIANT_ASSIGNMENT_PERCENTAGE] = str(100 - allocation_percentage)
elif reason == VariantAssignmentReason.PERCENTILE:
if feature.allocation and feature.allocation.percentile:
for allocation in feature.allocation.percentile:
if variant and allocation.variant == variant.name:
allocation_percentage += allocation.percentile_to - allocation.percentile_from
event[VARIANT_ASSIGNMENT_PERCENTAGE] = str(allocation_percentage)
# DefaultWhenEnabled
if feature.allocation and feature.allocation.default_when_enabled:
event[DEFAULT_WHEN_ENABLED] = feature.allocation.default_when_enabled
if feature.telemetry:
for metadata_key, metadata_value in feature.telemetry.metadata.items():
if metadata_key not in event:
event[metadata_key] = metadata_value
track_event(EVENT_NAME, evaluation_event.user, event_properties=event)
class TargetingSpanProcessor(SpanProcessor):
"""
A custom SpanProcessor that attaches the targeting ID to the span and baggage when a new span is started.
:keyword Callable[[], TargetingContext] targeting_context_accessor: Callback function to get the current targeting
context if one isn't provided.
"""
def __init__(self, **kwargs: Any) -> None:
self._targeting_context_accessor: Optional[Callable[[], TargetingContext]] = kwargs.pop(
"targeting_context_accessor", None
)
def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: # pylint: disable=unused-argument
"""
Attaches the targeting ID to the span and baggage when a new span is started.
:param Span span: The span that was started.
:param parent_context: The parent context of the span.
"""
if not HAS_AZURE_MONITOR_EVENTS_EXTENSION:
logger.warning("Azure Monitor Events Extension is not installed.")
return
if self._targeting_context_accessor and callable(self._targeting_context_accessor):
if inspect.iscoroutinefunction(self._targeting_context_accessor):
logger.warning("Async targeting_context_accessor is not supported.")
return
targeting_context = self._targeting_context_accessor()
if not targeting_context or not isinstance(targeting_context, TargetingContext):
logger.warning(
"targeting_context_accessor did not return a TargetingContext. Received type %s.",
type(targeting_context),
)
return
if not targeting_context.user_id:
logger.debug("TargetingContext does not have a user ID.")
return
span.set_attribute(TARGETING_ID, targeting_context.user_id)