Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/instana/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def boot_agent():
client, # noqa: F401
server, # noqa: F401
)

# from instana.instrumentation.aws import lambda_inst # noqa: F401
# from instana.instrumentation.celery import hooks # noqa: F401
from instana.instrumentation.django import middleware # noqa: F401
Expand Down
125 changes: 75 additions & 50 deletions src/instana/instrumentation/cassandra_inst.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,78 +6,103 @@
https://docs.datastax.com/en/developer/python-driver/3.20/
https://github.com/datastax/python-driver
"""

from typing import Any, Callable, Dict, Tuple
import wrapt
from ..log import logger
from ..util.traceutils import get_tracer_tuple, tracing_is_off
from instana.log import logger
from instana.span.span import InstanaSpan
from instana.util.traceutils import get_tracer_tuple, tracing_is_off

try:
import cassandra

consistency_levels = dict({0: "ANY",
1: "ONE",
2: "TWO",
3: "THREE",
4: "QUORUM",
5: "ALL",
6: "LOCAL_QUORUM",
7: "EACH_QUORUM",
8: "SERIAL",
9: "LOCAL_SERIAL",
10: "LOCAL_ONE"})


def collect_response(span, fn):
tried_hosts = list()
from cassandra.cluster import ResponseFuture, Session

consistency_levels = dict(
{
0: "ANY",
1: "ONE",
2: "TWO",
3: "THREE",
4: "QUORUM",
5: "ALL",
6: "LOCAL_QUORUM",
7: "EACH_QUORUM",
8: "SERIAL",
9: "LOCAL_SERIAL",
10: "LOCAL_ONE",
}
)

def collect_attributes(
span: InstanaSpan,
fn: ResponseFuture,
) -> None:
tried_hosts = []
for host in fn.attempted_hosts:
tried_hosts.append("%s:%d" % (host.endpoint.address, host.endpoint.port))
tried_hosts.append(f"{host.endpoint.address}:{host.endpoint.port}")

span.set_tag("cassandra.triedHosts", tried_hosts)
span.set_tag("cassandra.coordHost", fn.coordinator_host)
span.set_attribute("cassandra.triedHosts", tried_hosts)
span.set_attribute("cassandra.coordHost", fn.coordinator_host)

cl = fn.query.consistency_level
if cl and cl in consistency_levels:
span.set_tag("cassandra.achievedConsistency", consistency_levels[cl])


def cb_request_finish(results, span, fn):
collect_response(span, fn)
span.finish()


def cb_request_error(results, span, fn):
collect_response(span, fn)
span.set_attribute("cassandra.achievedConsistency", consistency_levels[cl])

def cb_request_finish(
_,
span: InstanaSpan,
fn: ResponseFuture,
) -> None:
collect_attributes(span, fn)
span.end()

def cb_request_error(
results: Dict[str, Any],
span: InstanaSpan,
fn: ResponseFuture,
) -> None:
collect_attributes(span, fn)
span.mark_as_errored({"cassandra.error": results.summary})
span.finish()
span.end()


def request_init_with_instana(fn):
def request_init_with_instana(
fn: ResponseFuture,
) -> None:
tracer, parent_span, _ = get_tracer_tuple()
parent_context = parent_span.get_span_context() if parent_span else None

if tracing_is_off():
return

ctags = {}
attributes = {}
if isinstance(fn.query, cassandra.query.SimpleStatement):
ctags["cassandra.query"] = fn.query.query_string
attributes["cassandra.query"] = fn.query.query_string
elif isinstance(fn.query, cassandra.query.BoundStatement):
ctags["cassandra.query"] = fn.query.prepared_statement.query_string

ctags["cassandra.keyspace"] = fn.session.keyspace
ctags["cassandra.cluster"] = fn.session.cluster.metadata.cluster_name

with tracer.start_active_span("cassandra", child_of=parent_span,
tags=ctags, finish_on_close=False) as scope:
fn.add_callback(cb_request_finish, scope.span, fn)
fn.add_errback(cb_request_error, scope.span, fn)


@wrapt.patch_function_wrapper('cassandra.cluster', 'Session.__init__')
def init_with_instana(wrapped, instance, args, kwargs):
attributes["cassandra.query"] = fn.query.prepared_statement.query_string

attributes["cassandra.keyspace"] = fn.session.keyspace
attributes["cassandra.cluster"] = fn.session.cluster.metadata.cluster_name

with tracer.start_as_current_span(
"cassandra",
span_context=parent_context,
attributes=attributes,
end_on_exit=False,
) as span:
fn.add_callback(cb_request_finish, span, fn)
fn.add_errback(cb_request_error, span, fn)

@wrapt.patch_function_wrapper("cassandra.cluster", "Session.__init__")
def init_with_instana(
wrapped: Callable[..., object],
instance: Session,
args: Tuple[object, ...],
kwargs: Dict[str, Any],
) -> object:
session = wrapped(*args, **kwargs)
instance.add_request_init_listener(request_init_with_instana)
return session


logger.debug("Instrumenting cassandra")

except ImportError:
Expand Down
Loading