Skip to content

Commit 5dc9ef7

Browse files
committed
PYCBC-1721: Wrapper SDK Observability - Tracing Baseline
Changes ------- * Add tracing "hooks" to all KV and streaming operations * Add tracing "hooks" to all bucket, collection and query index management operations * Add appropriate observability (RequestSpan, RequestTracer, tags, etc.) logic to align with the Extended Observability RFC * Add tests to confirm various tracer (legacy, modern, no-op and threshold) behavior across baseline operations Change-Id: I4b1eda52324d0314f95eb6631b4ec3015cff23c2 Reviewed-on: https://review.couchbase.org/c/couchbase-python-client/+/241023 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: Brett Lawson <brett19@gmail.com>
1 parent 47fd1f4 commit 5dc9ef7

159 files changed

Lines changed: 15892 additions & 2079 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

acouchbase/analytics.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,29 +81,42 @@ def _get_next_row(self):
8181
if self.done_streaming is True:
8282
return
8383

84+
# this is a blocking operation
8485
row = next(self._streaming_result)
8586
if isinstance(row, PycbcCoreException):
8687
raise ErrorMapper.build_exception(row)
87-
# should only be None one query request is complete and _no_ errors found
88+
89+
# should only be None onc query request is complete and _no_ errors found
8890
if row is None:
8991
raise StopAsyncIteration
92+
9093
# this should allow the event loop to pick up something else
9194
return self.serializer.deserialize(row)
9295

9396
async def __anext__(self):
9497
try:
95-
return await self._loop.run_in_executor(self._tp_executor, self._get_next_row)
98+
row = await self._loop.run_in_executor(self._tp_executor, self._get_next_row)
99+
# We want to end the streaming op span once we have a response from the C++ core.
100+
# Unfortunately right now, that means we need to wait until we have the first row (or we have an error).
101+
# As this method is idempotent, it is safe to call for each row (it will only do work for the first call).
102+
self._process_core_span()
103+
return row
96104
except asyncio.QueueEmpty:
105+
self._process_core_span(with_error=True)
97106
exc_cls = PYCBC_ERROR_MAP.get(ExceptionMap.InternalSDKException.value, CouchbaseException)
98107
excptn = exc_cls('Unexpected QueueEmpty exception caught when doing Analytics query.')
99108
raise excptn
100109
except StopAsyncIteration:
101110
self._done_streaming = True
111+
if self._processed_core_span is False:
112+
self._process_core_span()
102113
self._get_metadata()
103114
raise
104115
except CouchbaseException as ex:
116+
self._process_core_span(with_error=True)
105117
raise ex
106118
except Exception as ex:
119+
self._process_core_span(with_error=True)
107120
exc_cls = PYCBC_ERROR_MAP.get(ExceptionMap.InternalSDKException.value, CouchbaseException)
108121
excptn = exc_cls(str(ex))
109122
raise excptn

acouchbase/binary_collection.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
Any,
2121
Union)
2222

23+
from couchbase.logic.observability import ObservableRequestHandler
24+
from couchbase.logic.operation_types import KeyValueOperationType
2325
from couchbase.result import CounterResult, MutationResult
2426

2527
if TYPE_CHECKING:
@@ -80,8 +82,10 @@ async def increment(self,
8082
print(f'Counter value: {res.content}')
8183
8284
"""
83-
req = self._impl.request_builder.build_increment_request(key, *opts, **kwargs)
84-
return await self._impl.increment(req)
85+
op_type = KeyValueOperationType.Increment
86+
async with ObservableRequestHandler(op_type, self._impl.observability_instruments) as obs_handler:
87+
req = self._impl.request_builder.build_increment_request(key, obs_handler, *opts, **kwargs)
88+
return await self._impl.increment(req, obs_handler)
8589

8690
async def decrement(self,
8791
key, # type: str
@@ -128,8 +132,10 @@ async def decrement(self,
128132
print(f'Counter value: {res.content}')
129133
130134
"""
131-
req = self._impl.request_builder.build_decrement_request(key, *opts, **kwargs)
132-
return await self._impl.decrement(req)
135+
op_type = KeyValueOperationType.Decrement
136+
async with ObservableRequestHandler(op_type, self._impl.observability_instruments) as obs_handler:
137+
req = self._impl.request_builder.build_decrement_request(key, obs_handler, *opts, **kwargs)
138+
return await self._impl.decrement(req, obs_handler)
133139

134140
async def append(self,
135141
key, # type: str
@@ -184,8 +190,10 @@ async def append(self,
184190
AppendOptions(timeout=timedelta(seconds=2)))
185191
186192
"""
187-
req = self._impl.request_builder.build_append_request(key, value, *opts, **kwargs)
188-
return await self._impl.append(req)
193+
op_type = KeyValueOperationType.Append
194+
async with ObservableRequestHandler(op_type, self._impl.observability_instruments) as obs_handler:
195+
req = self._impl.request_builder.build_append_request(key, value, obs_handler, *opts, **kwargs)
196+
return await self._impl.append(req, obs_handler)
189197

190198
async def prepend(self,
191199
key, # type: str
@@ -240,5 +248,7 @@ async def prepend(self,
240248
PrependOptions(timeout=timedelta(seconds=2)))
241249
242250
"""
243-
req = self._impl.request_builder.build_prepend_request(key, value, *opts, **kwargs)
244-
return await self._impl.prepend(req)
251+
op_type = KeyValueOperationType.Prepend
252+
async with ObservableRequestHandler(op_type, self._impl.observability_instruments) as obs_handler:
253+
req = self._impl.request_builder.build_prepend_request(key, value, obs_handler, *opts, **kwargs)
254+
return await self._impl.prepend(req, obs_handler)

acouchbase/bucket.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
from acouchbase.management.collections import CollectionManager
2525
from acouchbase.management.views import ViewIndexManager
2626
from acouchbase.scope import Scope
27+
from couchbase.logic.observability import ObservableRequestHandler
28+
from couchbase.logic.operation_types import StreamingOperationType
2729
from couchbase.result import PingResult, ViewResult
2830

2931
if TYPE_CHECKING:
@@ -189,7 +191,13 @@ def view_query(self,
189191
print(f'Found row: {row}')
190192
191193
"""
192-
req = self._impl.request_builder.build_view_query_request(design_doc, view_name, *view_options, **kwargs)
194+
op_type = StreamingOperationType.ViewQuery
195+
obs_handler = ObservableRequestHandler(op_type, self._impl.observability_instruments)
196+
req = self._impl.request_builder.build_view_query_request(design_doc,
197+
view_name,
198+
obs_handler,
199+
*view_options,
200+
**kwargs)
193201
return self._impl.view_query(req)
194202

195203
def collections(self) -> CollectionManager:
@@ -200,7 +208,7 @@ def collections(self) -> CollectionManager:
200208
Returns:
201209
:class:`~acouchbase.management.collections.CollectionManager`: A :class:`~couchbase.management.collections.CollectionManager` instance.
202210
""" # noqa: E501
203-
return CollectionManager(self._impl._client_adapter, self.name)
211+
return CollectionManager(self._impl._client_adapter, self.name, self._impl.observability_instruments)
204212

205213
def view_indexes(self) -> ViewIndexManager:
206214
"""

acouchbase/cluster.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
from couchbase.auth import (CertificateAuthenticator,
3636
JwtAuthenticator,
3737
PasswordAuthenticator)
38+
from couchbase.logic.observability import ObservableRequestHandler
39+
from couchbase.logic.operation_types import StreamingOperationType
3840
from couchbase.result import (AnalyticsResult,
3941
ClusterInfoResult,
4042
DiagnosticsResult,
@@ -352,7 +354,9 @@ def query(self,
352354
print(f'Query metrics: {q_res.metadata().metrics()}')
353355
354356
"""
355-
req = self._impl.request_builder.build_query_request(statement, *options, **kwargs)
357+
op_type = StreamingOperationType.Query
358+
obs_handler = ObservableRequestHandler(op_type, self._impl.observability_instruments)
359+
req = self._impl.request_builder.build_query_request(statement, obs_handler, *options, **kwargs)
356360
return self._impl.query(req)
357361

358362
def analytics_query(self, # type: Cluster
@@ -428,7 +432,9 @@ def analytics_query(self, # type: Cluster
428432
print(f'Analytics query metrics: {q_res.metadata().metrics()}')
429433
430434
""" # noqa: E501
431-
req = self._impl.request_builder.build_analytics_query_request(statement, *options, **kwargs)
435+
op_type = StreamingOperationType.AnalyticsQuery
436+
obs_handler = ObservableRequestHandler(op_type, self._impl.observability_instruments)
437+
req = self._impl.request_builder.build_analytics_query_request(statement, obs_handler, *options, **kwargs)
432438
return self._impl.analytics_query(req)
433439

434440
def search_query(self,
@@ -524,7 +530,9 @@ def search_query(self,
524530
print(f'Locations: {row.locations}')
525531
526532
"""
527-
req = self._impl.request_builder.build_search_request(index, query, *options, **kwargs)
533+
op_type = StreamingOperationType.SearchQuery
534+
obs_handler = ObservableRequestHandler(op_type, self._impl.observability_instruments)
535+
req = self._impl.request_builder.build_search_request(index, query, obs_handler, *options, **kwargs)
528536
return self._impl.search(req)
529537

530538
def search(self,
@@ -616,7 +624,9 @@ def search(self,
616624
async for row in q_res.rows():
617625
print(f'Found row: {row}')
618626
""" # noqa: E501
619-
req = self._impl.request_builder.build_search_request(index, request, *options, **kwargs)
627+
op_type = StreamingOperationType.SearchQuery
628+
obs_handler = ObservableRequestHandler(op_type, self._impl.observability_instruments)
629+
req = self._impl.request_builder.build_search_request(index, request, obs_handler, *options, **kwargs)
620630
return self._impl.search(req)
621631

622632
def buckets(self) -> BucketManager:
@@ -627,7 +637,7 @@ def buckets(self) -> BucketManager:
627637
Returns:
628638
:class:`~acouchbase.management.buckets.BucketManager`: A :class:`~acouchbase.management.buckets.BucketManager` instance.
629639
""" # noqa: E501
630-
return BucketManager(self._impl._client_adapter)
640+
return BucketManager(self._impl._client_adapter, self._impl.observability_instruments)
631641

632642
def users(self) -> UserManager:
633643
"""
@@ -647,7 +657,7 @@ def query_indexes(self) -> QueryIndexManager:
647657
Returns:
648658
:class:`~acouchbase.management.queries.QueryIndexManager`: A :class:`~acouchbase.management.queries.QueryIndexManager` instance.
649659
""" # noqa: E501
650-
return QueryIndexManager(self._impl._client_adapter)
660+
return QueryIndexManager(self._impl._client_adapter, self._impl.observability_instruments)
651661

652662
def analytics_indexes(self) -> AnalyticsIndexManager:
653663
"""

0 commit comments

Comments
 (0)