From 4c40389850ca6e4049e86613373985e04bfc8e1e Mon Sep 17 00:00:00 2001 From: Cagri Yonca Date: Thu, 12 Sep 2024 18:53:47 +0300 Subject: [PATCH 1/2] refactor(cassandra): added cassandra otel instrumentation --- src/instana/__init__.py | 1 + src/instana/instrumentation/cassandra_inst.py | 125 +++++++++++------- 2 files changed, 76 insertions(+), 50 deletions(-) diff --git a/src/instana/__init__.py b/src/instana/__init__.py index bf3b8556..6384a2af 100644 --- a/src/instana/__init__.py +++ b/src/instana/__init__.py @@ -188,6 +188,7 @@ def boot_agent(): client, # noqa: F401 server, # noqa: F401 ) + # from instana.instrumentation.aws import lambda_inst # noqa: F401 # from instana.instrumentation.celery import hooks # noqa: F401 from instana.instrumentation.django import middleware # noqa: F401 diff --git a/src/instana/instrumentation/cassandra_inst.py b/src/instana/instrumentation/cassandra_inst.py index da828d18..3b6e9713 100644 --- a/src/instana/instrumentation/cassandra_inst.py +++ b/src/instana/instrumentation/cassandra_inst.py @@ -6,78 +6,103 @@ https://docs.datastax.com/en/developer/python-driver/3.20/ https://github.com/datastax/python-driver """ + +from typing import Any, Callable, Dict, Tuple import wrapt -from ..log import logger -from ..util.traceutils import get_tracer_tuple, tracing_is_off +from instana.log import logger +from instana.span.span import InstanaSpan +from instana.util.traceutils import get_tracer_tuple, tracing_is_off try: import cassandra - - consistency_levels = dict({0: "ANY", - 1: "ONE", - 2: "TWO", - 3: "THREE", - 4: "QUORUM", - 5: "ALL", - 6: "LOCAL_QUORUM", - 7: "EACH_QUORUM", - 8: "SERIAL", - 9: "LOCAL_SERIAL", - 10: "LOCAL_ONE"}) - - - def collect_response(span, fn): - tried_hosts = list() + from cassandra.cluster import ResponseFuture, Session + + consistency_levels = dict( + { + 0: "ANY", + 1: "ONE", + 2: "TWO", + 3: "THREE", + 4: "QUORUM", + 5: "ALL", + 6: "LOCAL_QUORUM", + 7: "EACH_QUORUM", + 8: "SERIAL", + 9: "LOCAL_SERIAL", + 10: "LOCAL_ONE", + } + ) + + def collect_attributes( + span: InstanaSpan, + fn: ResponseFuture, + ) -> None: + tried_hosts = [] for host in fn.attempted_hosts: - tried_hosts.append("%s:%d" % (host.endpoint.address, host.endpoint.port)) + tried_hosts.append(f"{host.endpoint.address}:{host.endpoint.port}") - span.set_tag("cassandra.triedHosts", tried_hosts) - span.set_tag("cassandra.coordHost", fn.coordinator_host) + span.set_attribute("cassandra.triedHosts", tried_hosts) + span.set_attribute("cassandra.coordHost", fn.coordinator_host) cl = fn.query.consistency_level if cl and cl in consistency_levels: - span.set_tag("cassandra.achievedConsistency", consistency_levels[cl]) - - - def cb_request_finish(results, span, fn): - collect_response(span, fn) - span.finish() - - - def cb_request_error(results, span, fn): - collect_response(span, fn) + span.set_attribute("cassandra.achievedConsistency", consistency_levels[cl]) + + def cb_request_finish( + _, + span: InstanaSpan, + fn: ResponseFuture, + ) -> None: + collect_attributes(span, fn) + span.end() + + def cb_request_error( + results: Dict[str, Any], + span: InstanaSpan, + fn: ResponseFuture, + ) -> None: + collect_attributes(span, fn) span.mark_as_errored({"cassandra.error": results.summary}) - span.finish() + span.end() - - def request_init_with_instana(fn): + def request_init_with_instana( + fn: ResponseFuture, + ) -> None: tracer, parent_span, _ = get_tracer_tuple() + parent_context = parent_span.get_span_context() if parent_span else None if tracing_is_off(): return - ctags = {} + attributes = {} if isinstance(fn.query, cassandra.query.SimpleStatement): - ctags["cassandra.query"] = fn.query.query_string + attributes["cassandra.query"] = fn.query.query_string elif isinstance(fn.query, cassandra.query.BoundStatement): - ctags["cassandra.query"] = fn.query.prepared_statement.query_string - - ctags["cassandra.keyspace"] = fn.session.keyspace - ctags["cassandra.cluster"] = fn.session.cluster.metadata.cluster_name - - with tracer.start_active_span("cassandra", child_of=parent_span, - tags=ctags, finish_on_close=False) as scope: - fn.add_callback(cb_request_finish, scope.span, fn) - fn.add_errback(cb_request_error, scope.span, fn) - - - @wrapt.patch_function_wrapper('cassandra.cluster', 'Session.__init__') - def init_with_instana(wrapped, instance, args, kwargs): + attributes["cassandra.query"] = fn.query.prepared_statement.query_string + + attributes["cassandra.keyspace"] = fn.session.keyspace + attributes["cassandra.cluster"] = fn.session.cluster.metadata.cluster_name + + with tracer.start_as_current_span( + "cassandra", + span_context=parent_context, + attributes=attributes, + end_on_exit=False, + ) as span: + fn.add_callback(cb_request_finish, span, fn) + fn.add_errback(cb_request_error, span, fn) + + @wrapt.patch_function_wrapper("cassandra.cluster", "Session.__init__") + def init_with_instana( + wrapped: Callable[..., object], + instance: Session, + args: Tuple[object, ...], + kwargs: Dict[str, Any], + ) -> object: session = wrapped(*args, **kwargs) instance.add_request_init_listener(request_init_with_instana) return session - logger.debug("Instrumenting cassandra") except ImportError: From 288e85a3cfb3f194e607a680eef1de1e1602b06d Mon Sep 17 00:00:00 2001 From: Cagri Yonca Date: Thu, 12 Sep 2024 18:54:01 +0300 Subject: [PATCH 2/2] unittest(cassandra): added unittests of cassandra otel instrumentation --- tests/clients/test_cassandra-driver.py | 319 +++++++++++++------------ tests/conftest.py | 5 +- 2 files changed, 163 insertions(+), 161 deletions(-) diff --git a/tests/clients/test_cassandra-driver.py b/tests/clients/test_cassandra-driver.py index 44f05a21..3493de14 100644 --- a/tests/clients/test_cassandra-driver.py +++ b/tests/clients/test_cassandra-driver.py @@ -1,268 +1,271 @@ # (c) Copyright IBM Corp. 2021 # (c) Copyright Instana Inc. 2020 -import os -import time import random -import unittest - -from instana.singletons import agent, tracer -from ..helpers import testenv, get_first_span_by_name +import time +from typing import Generator -from cassandra.cluster import Cluster +import pytest from cassandra import ConsistencyLevel +from cassandra.cluster import Cluster from cassandra.query import SimpleStatement -cluster = Cluster([testenv['cassandra_host']], load_balancing_policy=None) +from instana.singletons import agent, tracer +from tests.helpers import get_first_span_by_name, testenv + +cluster = Cluster([testenv["cassandra_host"]], load_balancing_policy=None) session = cluster.connect() session.execute( - "CREATE KEYSPACE IF NOT EXISTS instana_tests WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};") -session.set_keyspace('instana_tests') -session.execute("CREATE TABLE IF NOT EXISTS users(" - "id int PRIMARY KEY," - "name text," - "age text," - "email varint," - "phone varint" - ");") - - -@unittest.skipUnless(os.environ.get("CASSANDRA_TEST"), reason="") -class TestCassandra(unittest.TestCase): - def setUp(self): - """ Clear all spans before a test run """ - self.recorder = tracer.recorder + "CREATE KEYSPACE IF NOT EXISTS instana_tests WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};" +) +session.set_keyspace("instana_tests") +session.execute( + "CREATE TABLE IF NOT EXISTS users(" + "id int PRIMARY KEY," + "name text," + "age text," + "email varint," + "phone varint" + ");" +) + + +class TestCassandra: + @pytest.fixture(autouse=True) + def _resource(self) -> Generator[None, None, None]: + """Clear all spans before a test run""" + self.recorder = tracer.span_processor self.recorder.clear_spans() - - def tearDown(self): - """ Ensure that allow_exit_as_root has the default value """ + yield agent.options.allow_exit_as_root = False - def test_untraced_execute(self): - res = session.execute('SELECT name, age, email FROM users') + def test_untraced_execute(self) -> None: + res = session.execute("SELECT name, age, email FROM users") - self.assertIsNotNone(res) + assert res time.sleep(0.5) spans = self.recorder.queued_spans() - self.assertEqual(0, len(spans)) + assert len(spans) == 0 - def test_untraced_execute_error(self): + def test_untraced_execute_error(self) -> None: res = None try: - res = session.execute('Not a valid query') - except: + res = session.execute("Not a valid query") + except Exception: pass - self.assertIsNone(res) + assert not res time.sleep(0.5) spans = self.recorder.queued_spans() - self.assertEqual(0, len(spans)) + assert len(spans) == 0 - def test_execute(self): + def test_execute(self) -> None: res = None - with tracer.start_active_span('test'): - res = session.execute('SELECT name, age, email FROM users') + with tracer.start_as_current_span("test"): + res = session.execute("SELECT name, age, email FROM users") - self.assertIsNotNone(res) + assert res time.sleep(0.5) spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) + assert len(spans) == 2 - test_span = get_first_span_by_name(spans, 'sdk') - self.assertIsNotNone(test_span) - self.assertEqual(test_span.data["sdk"]["name"], 'test') + test_span = get_first_span_by_name(spans, "sdk") + assert test_span + assert test_span.data["sdk"]["name"] == "test" - cspan = get_first_span_by_name(spans, 'cassandra') - self.assertIsNotNone(cspan) + cspan = get_first_span_by_name(spans, "cassandra") + assert cspan # Same traceId and parent relationship - self.assertEqual(test_span.t, cspan.t) - self.assertEqual(cspan.p, test_span.s) + assert cspan.t == test_span.t + assert cspan.p == test_span.s - self.assertIsNotNone(cspan.stack) - self.assertIsNone(cspan.ec) + assert cspan.stack + assert not cspan.ec - self.assertEqual(cspan.data["cassandra"]["cluster"], 'Test Cluster') - self.assertEqual(cspan.data["cassandra"]["query"], 'SELECT name, age, email FROM users') - self.assertEqual(cspan.data["cassandra"]["keyspace"], 'instana_tests') - self.assertIsNone(cspan.data["cassandra"]["achievedConsistency"]) - self.assertIsNotNone(cspan.data["cassandra"]["triedHosts"]) - self.assertIsNone(cspan.data["cassandra"]["error"]) + assert cspan.data["cassandra"]["cluster"] == "Test Cluster" + assert cspan.data["cassandra"]["query"] == "SELECT name, age, email FROM users" + assert cspan.data["cassandra"]["keyspace"] == "instana_tests" + assert not cspan.data["cassandra"]["achievedConsistency"] + assert cspan.data["cassandra"]["triedHosts"] + assert not cspan.data["cassandra"]["error"] - def test_execute_as_root_exit_span(self): + def test_execute_as_root_exit_span(self) -> None: agent.options.allow_exit_as_root = True - res = session.execute('SELECT name, age, email FROM users') + res = session.execute("SELECT name, age, email FROM users") - self.assertIsNotNone(res) + assert res time.sleep(0.5) spans = self.recorder.queued_spans() - self.assertEqual(1, len(spans)) + assert len(spans) == 1 - cspan = get_first_span_by_name(spans, 'cassandra') - self.assertIsNotNone(cspan) + cspan = get_first_span_by_name(spans, "cassandra") + assert cspan - self.assertIsNone(cspan.p) + assert not cspan.p - self.assertIsNotNone(cspan.stack) - self.assertIsNone(cspan.ec) + assert cspan.stack + assert not cspan.ec - self.assertEqual(cspan.data["cassandra"]["cluster"], 'Test Cluster') - self.assertEqual(cspan.data["cassandra"]["query"], 'SELECT name, age, email FROM users') - self.assertEqual(cspan.data["cassandra"]["keyspace"], 'instana_tests') - self.assertIsNone(cspan.data["cassandra"]["achievedConsistency"]) - self.assertIsNotNone(cspan.data["cassandra"]["triedHosts"]) - self.assertIsNone(cspan.data["cassandra"]["error"]) + assert cspan.data["cassandra"]["cluster"] == "Test Cluster" + assert cspan.data["cassandra"]["query"] == "SELECT name, age, email FROM users" + assert cspan.data["cassandra"]["keyspace"] == "instana_tests" + assert not cspan.data["cassandra"]["achievedConsistency"] + assert cspan.data["cassandra"]["triedHosts"] + assert not cspan.data["cassandra"]["error"] - def test_execute_async(self): + def test_execute_async(self) -> None: res = None - with tracer.start_active_span('test'): - res = session.execute_async('SELECT name, age, email FROM users').result() + with tracer.start_as_current_span("test"): + res = session.execute_async("SELECT name, age, email FROM users").result() - self.assertIsNotNone(res) + assert res time.sleep(0.5) spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) + assert len(spans) == 2 - test_span = get_first_span_by_name(spans, 'sdk') - self.assertIsNotNone(test_span) - self.assertEqual(test_span.data["sdk"]["name"], 'test') + test_span = get_first_span_by_name(spans, "sdk") + assert test_span + assert test_span.data["sdk"]["name"] == "test" - cspan = get_first_span_by_name(spans, 'cassandra') - self.assertIsNotNone(cspan) + cspan = get_first_span_by_name(spans, "cassandra") + assert cspan # Same traceId and parent relationship - self.assertEqual(test_span.t, cspan.t) - self.assertEqual(cspan.p, test_span.s) + assert cspan.t == test_span.t + assert cspan.p == test_span.s - self.assertIsNotNone(cspan.stack) - self.assertIsNone(cspan.ec) + assert cspan.stack + assert not cspan.ec - self.assertEqual(cspan.data["cassandra"]["cluster"], 'Test Cluster') - self.assertEqual(cspan.data["cassandra"]["query"], 'SELECT name, age, email FROM users') - self.assertEqual(cspan.data["cassandra"]["keyspace"], 'instana_tests') - self.assertIsNone(cspan.data["cassandra"]["achievedConsistency"]) - self.assertIsNotNone(cspan.data["cassandra"]["triedHosts"]) - self.assertIsNone(cspan.data["cassandra"]["error"]) + assert cspan.data["cassandra"]["cluster"] == "Test Cluster" + assert cspan.data["cassandra"]["query"] == "SELECT name, age, email FROM users" + assert cspan.data["cassandra"]["keyspace"] == "instana_tests" + assert not cspan.data["cassandra"]["achievedConsistency"] + assert cspan.data["cassandra"]["triedHosts"] + assert not cspan.data["cassandra"]["error"] - def test_simple_statement(self): + def test_simple_statement(self) -> None: res = None - with tracer.start_active_span('test'): + with tracer.start_as_current_span("test"): query = SimpleStatement( - 'SELECT name, age, email FROM users', - is_idempotent=True + "SELECT name, age, email FROM users", is_idempotent=True ) res = session.execute(query) - self.assertIsNotNone(res) + assert res time.sleep(0.5) spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) + assert len(spans) == 2 - test_span = get_first_span_by_name(spans, 'sdk') - self.assertIsNotNone(test_span) - self.assertEqual(test_span.data["sdk"]["name"], 'test') + test_span = get_first_span_by_name(spans, "sdk") + assert test_span + assert test_span.data["sdk"]["name"] == "test" - cspan = get_first_span_by_name(spans, 'cassandra') - self.assertIsNotNone(cspan) + cspan = get_first_span_by_name(spans, "cassandra") + assert cspan # Same traceId and parent relationship - self.assertEqual(test_span.t, cspan.t) - self.assertEqual(cspan.p, test_span.s) + assert cspan.t == test_span.t + assert cspan.p == test_span.s - self.assertIsNotNone(cspan.stack) - self.assertIsNone(cspan.ec) + assert cspan.stack + assert not cspan.ec - self.assertEqual(cspan.data["cassandra"]["cluster"], 'Test Cluster') - self.assertEqual(cspan.data["cassandra"]["query"], 'SELECT name, age, email FROM users') - self.assertEqual(cspan.data["cassandra"]["keyspace"], 'instana_tests') - self.assertIsNone(cspan.data["cassandra"]["achievedConsistency"]) - self.assertIsNotNone(cspan.data["cassandra"]["triedHosts"]) - self.assertIsNone(cspan.data["cassandra"]["error"]) + assert cspan.data["cassandra"]["cluster"] == "Test Cluster" + assert cspan.data["cassandra"]["query"] == "SELECT name, age, email FROM users" + assert cspan.data["cassandra"]["keyspace"] == "instana_tests" + assert not cspan.data["cassandra"]["achievedConsistency"] + assert cspan.data["cassandra"]["triedHosts"] + assert not cspan.data["cassandra"]["error"] - def test_execute_error(self): + def test_execute_error(self) -> None: res = None try: - with tracer.start_active_span('test'): - res = session.execute('Not a real query') - except: + with tracer.start_as_current_span("test"): + res = session.execute("Not a real query") + except Exception: pass - self.assertIsNone(res) + assert not res time.sleep(0.5) spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) + assert len(spans) == 2 - test_span = get_first_span_by_name(spans, 'sdk') - self.assertIsNotNone(test_span) - self.assertEqual(test_span.data["sdk"]["name"], 'test') + test_span = get_first_span_by_name(spans, "sdk") + assert test_span + assert test_span.data["sdk"]["name"] == "test" - cspan = get_first_span_by_name(spans, 'cassandra') - self.assertIsNotNone(cspan) + cspan = get_first_span_by_name(spans, "cassandra") + assert cspan # Same traceId and parent relationship - self.assertEqual(test_span.t, cspan.t) - self.assertEqual(cspan.p, test_span.s) + assert cspan.t == test_span.t + assert cspan.p == test_span.s - self.assertIsNotNone(cspan.stack) - self.assertEqual(cspan.ec, 1) + assert cspan.stack + assert cspan.ec == 1 - self.assertEqual(cspan.data["cassandra"]["cluster"], 'Test Cluster') - self.assertEqual(cspan.data["cassandra"]["query"], 'Not a real query') - self.assertEqual(cspan.data["cassandra"]["keyspace"], 'instana_tests') - self.assertIsNone(cspan.data["cassandra"]["achievedConsistency"]) - self.assertIsNotNone(cspan.data["cassandra"]["triedHosts"]) - self.assertEqual(cspan.data["cassandra"]["error"], "Syntax error in CQL query") + assert cspan.data["cassandra"]["cluster"] == "Test Cluster" + assert cspan.data["cassandra"]["query"] == "Not a real query" + assert cspan.data["cassandra"]["keyspace"] == "instana_tests" + assert not cspan.data["cassandra"]["achievedConsistency"] + assert cspan.data["cassandra"]["triedHosts"] + assert cspan.data["cassandra"]["error"] == "Syntax error in CQL query" - def test_prepared_statement(self): + def test_prepared_statement(self) -> None: prepared = None - result = None - with tracer.start_active_span('test'): - prepared = session.prepare('INSERT INTO users (id, name, age) VALUES (?, ?, ?)') + with tracer.start_as_current_span("test"): + prepared = session.prepare( + "INSERT INTO users (id, name, age) VALUES (?, ?, ?)" + ) prepared.consistency_level = ConsistencyLevel.QUORUM - result = session.execute(prepared, (random.randint(0, 1000000), "joe", "17")) + session.execute(prepared, (random.randint(0, 1000000), "joe", "17")) - self.assertIsNotNone(prepared) - self.assertIsNotNone(result) + assert prepared time.sleep(0.5) spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) + assert len(spans) == 2 - test_span = get_first_span_by_name(spans, 'sdk') - self.assertIsNotNone(test_span) - self.assertEqual(test_span.data["sdk"]["name"], 'test') + test_span = get_first_span_by_name(spans, "sdk") + assert test_span + assert test_span.data["sdk"]["name"] == "test" - cspan = get_first_span_by_name(spans, 'cassandra') - self.assertIsNotNone(cspan) + cspan = get_first_span_by_name(spans, "cassandra") + assert cspan # Same traceId and parent relationship - self.assertEqual(test_span.t, cspan.t) - self.assertEqual(cspan.p, test_span.s) - - self.assertIsNotNone(cspan.stack) - self.assertIsNone(cspan.ec) - - self.assertEqual(cspan.data["cassandra"]["cluster"], 'Test Cluster') - self.assertEqual(cspan.data["cassandra"]["query"], 'INSERT INTO users (id, name, age) VALUES (?, ?, ?)') - self.assertEqual(cspan.data["cassandra"]["keyspace"], 'instana_tests') - self.assertEqual(cspan.data["cassandra"]["achievedConsistency"], "QUORUM") - self.assertIsNotNone(cspan.data["cassandra"]["triedHosts"]) - self.assertIsNone(cspan.data["cassandra"]["error"]) + assert test_span.t == cspan.t + assert cspan.p == test_span.s + + assert cspan.stack + assert not cspan.ec + + assert cspan.data["cassandra"]["cluster"] == "Test Cluster" + assert ( + cspan.data["cassandra"]["query"] + == "INSERT INTO users (id, name, age) VALUES (?, ?, ?)" + ) + assert cspan.data["cassandra"]["keyspace"] == "instana_tests" + assert cspan.data["cassandra"]["achievedConsistency"] == "QUORUM" + assert cspan.data["cassandra"]["triedHosts"] + assert not cspan.data["cassandra"]["error"] diff --git a/tests/conftest.py b/tests/conftest.py index 0f4e5e66..425ba008 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -37,7 +37,6 @@ # TODO: remove the following entries as the migration of the instrumentation # codes are finalised. -collect_ignore_glob.append("*clients/test_cassandra*") collect_ignore_glob.append("*clients/test_google*") collect_ignore_glob.append("*clients/test_pika*") collect_ignore_glob.append("*clients/test_redis*") @@ -51,8 +50,8 @@ # # Cassandra and gevent tests are run in dedicated jobs on CircleCI and will # # be run explicitly. (So always exclude them here) -# if not os.environ.get("CASSANDRA_TEST"): -# collect_ignore_glob.append("*test_cassandra*") +if not os.environ.get("CASSANDRA_TEST"): + collect_ignore_glob.append("*test_cassandra*") if not os.environ.get("COUCHBASE_TEST"): collect_ignore_glob.append("*test_couchbase*")