Skip to content

Commit 558dd20

Browse files
committed
PYCBC-1720: Wrapper SDK Observability Improvments - OTel Integration
Changes ------- * Add entry point for OTel tracing and metrics * Update SDK to set start and end time for spans/metrics when it is in control of the start/end time * Update ObservabilityHandler to take into account clock skew between Node and underlying C++ core when handling start/end of core spans and handling end of request span * Add Wrapper classes for OTel integration with the Couchbase observability APIs * Add optional dependency group "otel" to enable `python -m pip install couchbase[otel]` which will also install: * opentelemetry-api~=1.22 * opentelemetry-sdk~=1.22 * Add observability examples Change-Id: I10e5a93d5a21285207f5b5de50d1080588e80a14 Reviewed-on: https://review.couchbase.org/c/couchbase-python-client/+/241406 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: Brett Lawson <brett19@gmail.com>
1 parent d17409c commit 558dd20

19 files changed

Lines changed: 1248 additions & 17 deletions

couchbase/logic/n1ql.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,9 @@ def metrics(self) -> Optional[QueryMetrics]:
295295
Returns:
296296
Optional[:class:`.QueryMetrics`]: A :class:`.QueryMetrics` instance.
297297
"""
298-
if "metrics" in self._raw:
299-
return QueryMetrics(self._raw.get("metrics", {}))
298+
raw_metrics = self._raw.get('metrics', None)
299+
if raw_metrics:
300+
return QueryMetrics(raw_metrics)
300301
return None
301302

302303
def profile(self) -> Optional[JSONType]:

couchbase/logic/observability/handler.py

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ def get_attributes_for_http_op(op_name: str,
124124
return attrs
125125

126126

127+
def get_latest_time(time_a_ns: int, time_b_ns: int) -> int:
128+
return max(time_a_ns, time_b_ns)
129+
130+
127131
class ObservableRequestHandler:
128132

129133
def __init__(self,
@@ -486,7 +490,7 @@ def _process_end(self,
486490
if exc_val is not None:
487491
self._handle_error(attrs, exc_val)
488492

489-
self._meter.value_recorder(OpAttributeName.MeterOperationDuration,
493+
self._meter.value_recorder(OpAttributeName.MeterOperationDuration.value,
490494
tags=attrs).record_value(self._to_microseconds(req_duration_ns))
491495

492496
def _to_microseconds(self, duration_ns: int) -> int:
@@ -642,6 +646,7 @@ def __init__(self,
642646
self._op_name = op_name
643647
self._wrapped_tracer = wrapped_tracer
644648
self._collection_details = options.get('collection_details', None)
649+
self._start_time = start_time or time.time_ns()
645650
# The request_span should _only_ take a SpanProtocol for the parent
646651
self._parent_span = options.get('parent_span', None)
647652
if isinstance(self._parent_span, WrappedSpan):
@@ -650,13 +655,14 @@ def __init__(self,
650655
p_span = self._parent_span
651656
self._request_span = self._create_request_span(self._op_name.value,
652657
parent_span=p_span,
653-
start_time=start_time)
658+
start_time=self._start_time)
654659
self._has_multiple_encoding_spans = (self._op_name.is_multi_op() or self._op_name is OpName.MutateIn)
655660
self._encoding_spans_ended = False
656661
self._encoding_spans: Optional[Union[WrappedEncodingSpan, List[WrappedEncodingSpan]]] = None
657662
self._set_span_attrs(**options)
658663
self._cluster_name: Optional[str] = None
659664
self._cluster_uuid: Optional[str] = None
665+
self._end_time_watermark = time.time_ns()
660666

661667
@property
662668
def cluster_name(self) -> Optional[str]:
@@ -717,6 +723,9 @@ def maybe_create_encoding_span(self, encoding_fn: Callable[..., Tuple[bytes, int
717723
# core span so that we can add the cluster_[name|uuid] attributes
718724
self._encoding_spans = WrappedEncodingSpan(encoding_span, time.time_ns())
719725

726+
def maybe_update_end_time_watermark(self, end_time: int) -> None:
727+
self._end_time_watermark = get_latest_time(self._end_time_watermark, end_time)
728+
720729
def process_core_span(self, core_span: CppWrapperSdkSpan) -> None:
721730
self._maybe_set_attribute_from_core_span(core_span, CppOpAttributeName.ClusterName)
722731
self._maybe_set_attribute_from_core_span(core_span, CppOpAttributeName.ClusterUUID)
@@ -749,12 +758,16 @@ def set_status(self, status: SpanStatusCode) -> None:
749758
self._status = status
750759
self._request_span.set_status(status)
751760

752-
def end(self, end_time: Optional[int]) -> None:
761+
def end(self, end_time: int) -> None:
753762
self._end_encoding_spans()
754763
if self._wrapped_tracer.is_legacy:
755764
self._request_span.finish()
756-
else:
757-
self._request_span.end(end_time)
765+
return
766+
767+
self._end_time_watermark = get_latest_time(self._end_time_watermark, end_time)
768+
if self._parent_span and isinstance(self._parent_span, WrappedSpan):
769+
self._parent_span.maybe_update_end_time_watermark(self._end_time_watermark)
770+
self._request_span.end(self._end_time_watermark)
758771

759772
def _build_core_spans(self,
760773
core_spans: List[CppWrapperSdkChildSpan],
@@ -768,44 +781,48 @@ def _build_core_spans(self,
768781
def _build_dispatch_core_span(self,
769782
core_span: CppWrapperSdkChildSpan,
770783
parent_span: Optional[Union[SpanProtocol, WrappedSpan]] = None) -> None:
771-
# TODO: handle parent span as WrappedSpan
772784
if isinstance(parent_span, WrappedSpan):
773785
p_span = parent_span.request_span
774786
else:
775787
p_span = parent_span
776-
new_span = self._create_request_span(core_span['name'], parent_span=p_span, start_time=core_span['start'])
788+
latest_start_time = get_latest_time(self._start_time, core_span['start'])
789+
new_span = self._create_request_span(core_span['name'], parent_span=p_span, start_time=latest_start_time)
777790
children = core_span.get('children', None)
778791
if children:
779792
self._build_core_spans(children, parent_span=new_span)
780793

781794
for attr_name, attr_val in core_span.get('attributes', {}).items():
782795
new_span.set_attribute(attr_name, attr_val)
783796

784-
new_span.end(core_span['end'])
797+
core_span_end_time = core_span['end']
798+
self._end_time_watermark = get_latest_time(self._end_time_watermark, core_span_end_time)
799+
new_span.end(core_span_end_time)
785800

786801
def _build_non_dispatch_core_span(self,
787802
core_span: CppWrapperSdkChildSpan,
788803
parent_span: Optional[Union[SpanProtocol, WrappedSpan]] = None) -> None:
789-
# TODO: handle parent span as WrappedSpan
790804
if isinstance(parent_span, WrappedSpan):
791805
p_span = parent_span.request_span
792806
else:
793807
p_span = parent_span
794808

809+
latest_start_time = get_latest_time(self._start_time, core_span['start'])
795810
new_span = WrappedSpan(self._service_type,
796811
OpName(core_span['name']),
797812
self._wrapped_tracer,
798813
collection_details=self._collection_details,
799814
parent_span=p_span,
800-
start_time=core_span['start'])
815+
start_time=latest_start_time)
801816
children = core_span.get('children', None)
802817
if children:
803818
self._build_core_spans(children, parent_span=new_span)
804819

805820
for attr_name, attr_val in core_span.get('attributes', {}).items():
806821
new_span.set_attribute(attr_name, attr_val)
807822

808-
new_span.end(core_span['end'])
823+
core_span_end_time = core_span['end']
824+
self._end_time_watermark = get_latest_time(self._end_time_watermark, core_span_end_time)
825+
new_span.end(core_span_end_time)
809826

810827
def _create_request_span(self,
811828
name: str,

couchbase/observability/__init__.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,26 @@
1212
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
15+
16+
"""
17+
Observability integrations for the Couchbase Python SDK.
18+
19+
This package provides integrations with observability platforms:
20+
- OpenTelemetry tracing (otel_tracing)
21+
- OpenTelemetry metrics (otel_metrics)
22+
"""
23+
24+
__all__ = []
25+
26+
# OpenTelemetry integrations (optional - only available if otel installed)
27+
try:
28+
from couchbase.observability.otel_tracing import get_otel_tracer # noqa: F401
29+
__all__.append('get_otel_tracer')
30+
except ImportError:
31+
pass # OTel not installed
32+
33+
try:
34+
from couchbase.observability.otel_metrics import get_otel_meter # noqa: F401
35+
__all__.append('get_otel_meter')
36+
except ImportError:
37+
pass # OTel not installed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# Copyright 2016-2026. Couchbase, Inc.
2+
# All Rights Reserved.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License")
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
from __future__ import annotations
17+
18+
import typing
19+
from typing import (Any,
20+
Dict,
21+
Mapping,
22+
Optional)
23+
24+
from couchbase._version import __version__
25+
from couchbase.observability.metrics import Meter, ValueRecorder
26+
27+
if typing.TYPE_CHECKING:
28+
from opentelemetry import metrics as otel_metrics
29+
from opentelemetry.metrics import MeterProvider
30+
31+
try:
32+
from opentelemetry import metrics as otel_metrics # noqa: F811
33+
HAS_OTEL = True
34+
except ImportError:
35+
HAS_OTEL = False
36+
37+
38+
if HAS_OTEL:
39+
40+
class OtelWrapperValueRecorder(ValueRecorder):
41+
"""
42+
Wraps an OpenTelemetry Histogram to implement SDK's ValueRecorder interface.
43+
"""
44+
45+
def __init__(self, histogram: otel_metrics.Histogram, tags: Dict[str, Any], unit: str) -> None:
46+
self._histogram = histogram
47+
self._attributes = tags if tags else {}
48+
self._unit = unit
49+
50+
def record_value(self, value: int) -> None:
51+
# The SDK records value in micros and will place a special __unit tag in the tags set to 's'
52+
# So self._unit == 's' is indication that we should convert from micros to seconds
53+
value = value / 1_000_000 if self._unit == 's' else value
54+
self._histogram.record(float(value), attributes=self._attributes)
55+
56+
class OtelWrapperMeter(Meter):
57+
"""
58+
Wraps an OpenTelemetry Meter to implement SDK's Meter interface.
59+
"""
60+
61+
def __init__(self, otel_meter: otel_metrics.Meter) -> None:
62+
self._otel_meter = otel_meter
63+
self._histograms: Dict[str, otel_metrics.Histogram] = {}
64+
65+
def value_recorder(self, name: str, tags: Mapping[str, Any]) -> ValueRecorder:
66+
local_tags = dict(tags) if tags else {}
67+
unit = local_tags.pop('__unit', '')
68+
if name not in self._histograms:
69+
self._histograms[name] = self._otel_meter.create_histogram(name=name, unit=unit)
70+
71+
histogram = self._histograms[name]
72+
return OtelWrapperValueRecorder(histogram, local_tags, unit=unit)
73+
74+
75+
def get_otel_meter(provider: Optional[MeterProvider] = None) -> Meter:
76+
"""
77+
Creates an OpenTelemetry wrapper meter.
78+
79+
Args:
80+
provider: Optional OpenTelemetry MeterProvider. If not provided,
81+
falls back to the global OTel meter provider.
82+
83+
Returns:
84+
OtelWrapperMeter instance implementing SDK's Meter interface
85+
86+
Raises:
87+
ImportError: If OpenTelemetry is not installed
88+
"""
89+
if not HAS_OTEL:
90+
raise ImportError(
91+
"OpenTelemetry is not installed. Please install with: "
92+
"pip install couchbase[otel]"
93+
)
94+
95+
pkg_name = "com.couchbase.client/python"
96+
pkg_version = __version__
97+
98+
if provider:
99+
otel_meter = provider.get_meter(pkg_name, pkg_version)
100+
else:
101+
otel_meter = otel_metrics.get_meter(pkg_name, pkg_version)
102+
103+
return OtelWrapperMeter(otel_meter)

0 commit comments

Comments
 (0)