-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathspan_utils.py
More file actions
132 lines (102 loc) · 4.24 KB
/
span_utils.py
File metadata and controls
132 lines (102 loc) · 4.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# (c) Copyright IBM Corp. 2025
from typing import Any, Dict, List, Optional
from instana.util.config import SPAN_TYPE_TO_CATEGORY
def matches_rule(rule_attributes: List[Any], span_attributes: List[Any]) -> bool:
"""Check if the span attributes match the rule attributes."""
for attr_rule in rule_attributes:
key = attr_rule.get("key")
target_values = attr_rule.get("values", [])
match_type = attr_rule.get("match_type", "strict")
rule_matched = False
if key == "category":
if (
"type" in span_attributes
and span_attributes["type"] is not None
and span_attributes["type"] in SPAN_TYPE_TO_CATEGORY
):
actual = SPAN_TYPE_TO_CATEGORY[span_attributes["type"]]
if actual in target_values:
rule_matched = True
elif key == "kind":
if "kind" in span_attributes:
actual_kind = get_span_kind(span_attributes["kind"])
if actual_kind in target_values:
rule_matched = True
elif key == "type":
if "type" in span_attributes:
if span_attributes["type"] in target_values:
rule_matched = True
else:
span_value = None
if key in span_attributes:
span_value = span_attributes[key]
elif "." in key:
# Support dot-notation paths for nested attributes
# e.g. "sdk.custom.tags.http.host" -> span["sdk.custom"]["tags"]["http.host"]
span_value = resolve_nested_key(span_attributes, key.split("."))
if span_value is not None:
for rule_value in target_values:
if match_key_filter(span_value, rule_value, match_type):
rule_matched = True
break
if not rule_matched:
return False
return True
def resolve_nested_key(data: Dict[str, Any], key_parts: List[str]) -> Optional[Any]:
"""Resolve a dotted key path against a potentially nested dict.
Tries all possible prefix lengths so that keys which themselves contain
dots (e.g. ``sdk.custom`` or ``http.host``) are handled correctly.
Example::
# span_attributes = {"sdk.custom": {"tags": {"http.host": "example.com"}}}
resolve_nested_key(span_attributes, ["sdk", "custom", "tags", "http", "host"])
# -> "example.com"
"""
if not key_parts or not isinstance(data, dict):
return None
# Try the longest prefix first so that keys with embedded dots are matched
# before shorter splits (e.g. prefer "sdk.custom" over "sdk").
for i in range(len(key_parts), 0, -1):
candidate = ".".join(key_parts[:i])
if candidate in data:
remaining = key_parts[i:]
if not remaining:
return data[candidate]
result = resolve_nested_key(data[candidate], remaining)
if result is not None:
return result
return None
def match_key_filter(span_value: str, rule_value: str, match_type: str) -> bool:
"""Check if the first value matches the second value based on the match type."""
# Guard against None values
if span_value is None:
return False
if rule_value == "*":
return True
elif match_type == "strict" and span_value == rule_value:
return True
elif match_type == "contains" and rule_value in span_value:
return True
elif match_type == "startswith" and span_value.startswith(rule_value):
return True
elif match_type == "endswith" and span_value.endswith(rule_value):
return True
return False
def get_span_kind(span_kind: Any) -> str:
res = "intermediate"
val = span_kind
if hasattr(span_kind, "value"):
val = span_kind.value
try:
k = int(val)
if k == 1:
res = "entry"
elif k == 2:
res = "exit"
except (ValueError, TypeError):
pass
if res == "intermediate" and isinstance(span_kind, str):
if span_kind.lower() in ["entry", "server"]:
res = "entry"
if span_kind.lower() in ["exit", "client"]:
res = "exit"
return res