Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/instana/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def boot_agent():
asyncio, # noqa: F401
boto3_inst, # noqa: F401
# cassandra_inst, # noqa: F401
# couchbase_inst, # noqa: F401
couchbase_inst, # noqa: F401
fastapi_inst, # noqa: F401
flask, # noqa: F401
# gevent_inst, # noqa: F401
Expand Down
129 changes: 91 additions & 38 deletions src/instana/instrumentation/couchbase_inst.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,90 +6,143 @@
https://docs.couchbase.com/python-sdk/2.5/start-using-sdk.html
"""

from typing import Any, Callable, Dict, Tuple, Union

import wrapt

from ..log import logger
from ..util.traceutils import get_tracer_tuple, tracing_is_off
from instana.log import logger
from instana.span.span import InstanaSpan
from instana.util.traceutils import get_tracer_tuple, tracing_is_off

try:
import couchbase
from couchbase.bucket import Bucket

if not (hasattr(couchbase, '__version__') and couchbase.__version__[0] == '2'
and (couchbase.__version__[2] > '3'
or (couchbase.__version__[2] == '3' and couchbase.__version__[4] >= '4'))
):
if not hasattr(couchbase, "__version__") and (
couchbase.__version__ < "2.3.4" or couchbase.__version__ >= "3.0.0"
):
logger.debug("Instana supports 2.3.4 <= couchbase_versions < 3.0.0. Skipping.")
raise ImportError

from couchbase.n1ql import N1QLQuery

# List of operations to instrument
# incr, incr_multi, decr, decr_multi, retrieve_in are wrappers around operations above
operations = ['upsert', 'insert', 'replace', 'append', 'prepend', 'get', 'rget',
'touch', 'lock', 'unlock', 'remove', 'counter', 'mutate_in', 'lookup_in',
'stats', 'ping', 'diagnostics', 'observe',

'upsert_multi', 'insert_multi', 'replace_multi', 'append_multi',
'prepend_multi', 'get_multi', 'touch_multi', 'lock_multi', 'unlock_multi',
'observe_multi', 'endure_multi', 'remove_multi', 'counter_multi']

def capture_kvs(scope, instance, query_arg, op):
operations = [
"upsert",
"insert",
"replace",
"append",
"prepend",
"get",
"rget",
"touch",
"lock",
"unlock",
"remove",
"counter",
"mutate_in",
"lookup_in",
"stats",
"ping",
"diagnostics",
"observe",
"upsert_multi",
"insert_multi",
"replace_multi",
"append_multi",
"prepend_multi",
"get_multi",
"touch_multi",
"lock_multi",
"unlock_multi",
"observe_multi",
"endure_multi",
"remove_multi",
"counter_multi",
]

def collect_attributes(
span: InstanaSpan,
instance: Bucket,
query_arg: Union[N1QLQuery, object],
op: str,
) -> None:
try:
scope.span.set_tag('couchbase.hostname', instance.server_nodes[0])
scope.span.set_tag('couchbase.bucket', instance.bucket)
scope.span.set_tag('couchbase.type', op)
span.set_attribute("couchbase.hostname", instance.server_nodes[0])
span.set_attribute("couchbase.bucket", instance.bucket)
span.set_attribute("couchbase.type", op)

if query_arg is not None:
if query_arg:
query = None
if type(query_arg) is N1QLQuery:
Comment thread
CagriYonca marked this conversation as resolved.
Outdated
query = query_arg.statement
else:
query = query_arg

scope.span.set_tag('couchbase.sql', query)
except:
span.set_attribute("couchbase.sql", query)
except Exception:
# No fail on key capture - best effort
pass

def make_wrapper(op):
def wrapper(wrapped, instance, args, kwargs):
def make_wrapper(op: str) -> Callable:
def wrapper(
wrapped: Callable[..., object],
instance: couchbase.bucket.Bucket,
args: Tuple[object, ...],
kwargs: Dict[str, Any],
) -> object:
tracer, parent_span, _ = get_tracer_tuple()
parent_context = parent_span.get_span_context() if parent_span else None

# If we're not tracing, just return
if tracing_is_off():
return wrapped(*args, **kwargs)

with tracer.start_active_span("couchbase", child_of=parent_span) as scope:
capture_kvs(scope, instance, None, op)
with tracer.start_as_current_span(
"couchbase", span_context=parent_context
) as span:
collect_attributes(span, instance, None, op)
try:
return wrapped(*args, **kwargs)
except Exception as e:
scope.span.log_exception(e)
scope.span.set_tag('couchbase.error', repr(e))
raise
except Exception as exc:
span.record_exception(exc)
span.set_attribute("couchbase.error", repr(exc))
logger.debug("Instana couchbase @ wrapper", exc_info=True)

return wrapper

def query_with_instana(wrapped, instance, args, kwargs):
def query_with_instana(
wrapped: Callable[..., object],
instance: couchbase.bucket.Bucket,
args: Tuple[object, ...],
kwargs: Dict[str, Any],
) -> object:
tracer, parent_span, _ = get_tracer_tuple()
parent_context = parent_span.get_span_context() if parent_span else None

# If we're not tracing, just return
if tracing_is_off():
return wrapped(*args, **kwargs)

with tracer.start_active_span("couchbase", child_of=parent_span) as scope:
capture_kvs(scope, instance, args[0], 'n1ql_query')
with tracer.start_as_current_span(
"couchbase", span_context=parent_context
) as span:
try:
collect_attributes(span, instance, args[0], "n1ql_query")
return wrapped(*args, **kwargs)
except Exception as e:
scope.span.log_exception(e)
scope.span.set_tag('couchbase.error', repr(e))
raise
except Exception as exc:
span.record_exception(exc)
span.set_attribute("couchbase.error", repr(exc))
logger.debug("Instana couchbase @ query_with_instana", exc_info=True)

logger.debug("Instrumenting couchbase")
wrapt.wrap_function_wrapper('couchbase.bucket', 'Bucket.n1ql_query', query_with_instana)
wrapt.wrap_function_wrapper(
"couchbase.bucket", "Bucket.n1ql_query", query_with_instana
)
for op in operations:
f = make_wrapper(op)
wrapt.wrap_function_wrapper('couchbase.bucket', 'Bucket.%s' % op, f)
wrapt.wrap_function_wrapper("couchbase.bucket", f"Bucket.{op}", f)

except ImportError:
pass
Loading