From 8aeac9fef6901da2e8e73a37a05e3d4dfdd44d96 Mon Sep 17 00:00:00 2001 From: Cagri Yonca Date: Tue, 3 Sep 2024 14:41:24 +0300 Subject: [PATCH 1/2] refactor(pymongo): added otel instrumentation of pymongo Signed-off-by: Cagri Yonca --- src/instana/__init__.py | 2 +- src/instana/instrumentation/pymongo.py | 65 +++++++++++++++----------- src/instana/span/registered_span.py | 10 +--- 3 files changed, 41 insertions(+), 36 deletions(-) diff --git a/src/instana/__init__.py b/src/instana/__init__.py index f7db3304..2173a3d4 100644 --- a/src/instana/__init__.py +++ b/src/instana/__init__.py @@ -176,7 +176,7 @@ def boot_agent(): # pika, # noqa: F401 pep0249, # noqa: F401 psycopg2, # noqa: F401 - # pymongo, # noqa: F401 + pymongo, # noqa: F401 pymysql, # noqa: F401 # redis, # noqa: F401 # sqlalchemy, # noqa: F401 diff --git a/src/instana/instrumentation/pymongo.py b/src/instana/instrumentation/pymongo.py index 264fd658..2c0bc203 100644 --- a/src/instana/instrumentation/pymongo.py +++ b/src/instana/instrumentation/pymongo.py @@ -2,43 +2,49 @@ # (c) Copyright Instana Inc. 2020 -from ..log import logger -from ..util.traceutils import get_tracer_tuple, tracing_is_off +from instana.span.span import InstanaSpan +from instana.log import logger +from instana.util.traceutils import get_tracer_tuple, tracing_is_off try: import pymongo - from pymongo import monitoring from bson import json_util + from opentelemetry.semconv.trace import SpanAttributes - - class MongoCommandTracer(monitoring.CommandListener): - def __init__(self): + class MongoCommandTracer(pymongo.monitoring.CommandListener): + def __init__(self) -> None: self.__active_commands = {} - def started(self, event): + def started(self, event: pymongo.monitoring.CommandStartedEvent) -> None: tracer, parent_span, _ = get_tracer_tuple() # return early if we're not tracing if tracing_is_off(): return + parent_context = parent_span.get_span_context() if parent_span else None - with tracer.start_active_span("mongo", child_of=parent_span) as scope: - self._collect_connection_tags(scope.span, event) - self._collect_command_tags(scope.span, event) + with tracer.start_as_current_span( + "mongo", span_context=parent_context + ) as span: + self._collect_connection_tags(span, event) + self._collect_command_tags(span, event) # include collection name into the namespace if provided if event.command_name in event.command: - scope.span.set_tag("collection", event.command.get(event.command_name)) + span.set_attribute( + SpanAttributes.DB_MONGODB_COLLECTION, + event.command.get(event.command_name), + ) - self.__active_commands[event.request_id] = scope + self.__active_commands[event.request_id] = span - def succeeded(self, event): + def succeeded(self, event: pymongo.monitoring.CommandStartedEvent) -> None: active_span = self.__active_commands.pop(event.request_id, None) # return early if we're not tracing if active_span is None: return - def failed(self, event): + def failed(self, event: pymongo.monitoring.CommandStartedEvent) -> None: active_span = self.__active_commands.pop(event.request_id, None) # return early if we're not tracing @@ -47,23 +53,27 @@ def failed(self, event): active_span.log_exception(event.failure) - def _collect_connection_tags(self, span, event): + def _collect_connection_tags( + self, span: InstanaSpan, event: pymongo.monitoring.CommandStartedEvent + ) -> None: (host, port) = event.connection_id - span.set_tag("host", host) - span.set_tag("port", str(port)) - span.set_tag("db", event.database_name) + span.set_attribute(SpanAttributes.SERVER_ADDRESS, host) + span.set_attribute(SpanAttributes.SERVER_PORT, str(port)) + span.set_attribute(SpanAttributes.DB_NAME, event.database_name) - def _collect_command_tags(self, span, event): + def _collect_command_tags(self, span, event) -> None: """ Extract MongoDB command name and arguments and attach it to the span """ cmd = event.command_name - span.set_tag("command", cmd) + span.set_attribute("command", cmd) for key in ["filter", "query"]: if key in event.command: - span.set_tag("filter", json_util.dumps(event.command.get(key))) + span.set_attribute( + "filter", json_util.dumps(event.command.get(key)) + ) break # The location of command documents within the command object depends on the name @@ -72,24 +82,25 @@ def _collect_command_tags(self, span, event): "insert": "documents", "update": "updates", "delete": "deletes", - "aggregate": "pipeline" + "aggregate": "pipeline", } cmd_doc = None if cmd in cmd_doc_locations: cmd_doc = event.command.get(cmd_doc_locations[cmd]) - elif cmd.lower() == "mapreduce": # mapreduce command was renamed to mapReduce in pymongo 3.9.0 + elif ( + cmd.lower() == "mapreduce" + ): # mapreduce command was renamed to mapReduce in pymongo 3.9.0 # mapreduce command consists of two mandatory parts: map and reduce cmd_doc = { "map": event.command.get("map"), - "reduce": event.command.get("reduce") + "reduce": event.command.get("reduce"), } if cmd_doc is not None: - span.set_tag("json", json_util.dumps(cmd_doc)) - + span.set_attribute("json", json_util.dumps(cmd_doc)) - monitoring.register(MongoCommandTracer()) + pymongo.monitoring.register(MongoCommandTracer()) logger.debug("Instrumenting pymongo") diff --git a/src/instana/span/registered_span.py b/src/instana/span/registered_span.py index afb38a97..6164ca86 100644 --- a/src/instana/span/registered_span.py +++ b/src/instana/span/registered_span.py @@ -252,14 +252,8 @@ def _populate_exit_span_data(self, span) -> None: self.data["pg"]["error"] = span.attributes.pop("pg.error", None) elif span.name == "mongo": - service = "%s:%s" % ( - span.attributes.pop("host", None), - span.attributes.pop("port", None), - ) - namespace = "%s.%s" % ( - span.attributes.pop("db", "?"), - span.attributes.pop("collection", "?"), - ) + service = f"{span.attributes.pop(SpanAttributes.SERVER_ADDRESS, None)}:{span.attributes.pop(SpanAttributes.SERVER_PORT, None)}" + namespace = f"{span.attributes.pop(SpanAttributes.DB_NAME, '?')}.{span.attributes.pop(SpanAttributes.DB_MONGODB_COLLECTION, '?')}" self.data["mongo"]["service"] = service self.data["mongo"]["namespace"] = namespace From 8879047961de00233d0ee7ece0aad46affe851db Mon Sep 17 00:00:00 2001 From: Cagri Yonca Date: Tue, 3 Sep 2024 15:03:06 +0300 Subject: [PATCH 2/2] unittests(pymongo): added unittests for instrumentation Signed-off-by: Cagri Yonca --- tests/clients/test_pymongo.py | 319 +++++++++++++++++++--------------- tests/conftest.py | 1 - tests/test_tracer_provider.py | 4 +- 3 files changed, 178 insertions(+), 146 deletions(-) diff --git a/tests/clients/test_pymongo.py b/tests/clients/test_pymongo.py index b54b0525..251f0b40 100644 --- a/tests/clients/test_pymongo.py +++ b/tests/clients/test_pymongo.py @@ -2,256 +2,289 @@ # (c) Copyright Instana Inc. 2020 import json -import unittest import logging +from typing import Generator -from ..helpers import testenv -from instana.singletons import agent, tracer - -import pymongo import bson +import pymongo +import pytest -logger = logging.getLogger(__name__) +from instana.singletons import agent, tracer +from instana.span.span import get_current_span +from tests.helpers import testenv -pymongoversion = unittest.skipIf( - pymongo.version_tuple >= (4, 0), reason="map reduce is removed in pymongo 4.0" -) +logger = logging.getLogger(__name__) -class TestPyMongoTracer(unittest.TestCase): - def setUp(self): - self.client = pymongo.MongoClient(host=testenv['mongodb_host'], port=int(testenv['mongodb_port']), - username=testenv['mongodb_user'], password=testenv['mongodb_pw']) +class TestPyMongoTracer: + @pytest.fixture(autouse=True) + def _resource(self) -> Generator[None, None, None]: + self.client = pymongo.MongoClient( + host=testenv["mongodb_host"], + port=int(testenv["mongodb_port"]), + username=testenv["mongodb_user"], + password=testenv["mongodb_pw"], + ) self.client.test.records.delete_many(filter={}) - - self.recorder = tracer.recorder + self.recorder = tracer.span_processor self.recorder.clear_spans() - - def tearDown(self): + yield self.client.close() agent.options.allow_exit_as_root = False - def test_successful_find_query(self): - with tracer.start_active_span("test"): + def test_successful_find_query(self) -> None: + with tracer.start_as_current_span("test"): self.client.test.records.find_one({"type": "string"}) - - self.assertIsNone(tracer.active_span) + current_span = get_current_span() + assert not current_span.is_recording() spans = self.recorder.queued_spans() - self.assertEqual(len(spans), 2) + assert len(spans) == 2 db_span = spans[0] test_span = spans[1] - self.assertEqual(test_span.t, db_span.t) - self.assertEqual(db_span.p, test_span.s) + assert test_span.t == db_span.t + assert db_span.p == test_span.s - self.assertIsNone(db_span.ec) + assert not db_span.ec - self.assertEqual(db_span.n, "mongo") - self.assertEqual(db_span.data["mongo"]["service"], "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) - self.assertEqual(db_span.data["mongo"]["namespace"], "test.records") - self.assertEqual(db_span.data["mongo"]["command"], "find") + assert db_span.n == "mongo" + assert ( + db_span.data["mongo"]["service"] + == f"{testenv['mongodb_host']}:{testenv['mongodb_port']}" + ) + assert db_span.data["mongo"]["namespace"] == "test.records" + assert db_span.data["mongo"]["command"] == "find" - self.assertEqual(db_span.data["mongo"]["filter"], '{"type": "string"}') - self.assertIsNone(db_span.data["mongo"]["json"]) + assert db_span.data["mongo"]["filter"] == '{"type": "string"}' + assert not db_span.data["mongo"]["json"] - def test_successful_find_query_as_root_span(self): + def test_successful_find_query_as_root_span(self) -> None: agent.options.allow_exit_as_root = True self.client.test.records.find_one({"type": "string"}) - - self.assertIsNone(tracer.active_span) + current_span = get_current_span() + assert not current_span.is_recording() spans = self.recorder.queued_spans() - self.assertEqual(len(spans), 1) + assert len(spans) == 1 db_span = spans[0] - self.assertEqual(db_span.p, None) - - self.assertIsNone(db_span.ec) + assert not db_span.p + assert not db_span.ec - self.assertEqual(db_span.n, "mongo") - self.assertEqual(db_span.data["mongo"]["service"], "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) - self.assertEqual(db_span.data["mongo"]["namespace"], "test.records") - self.assertEqual(db_span.data["mongo"]["command"], "find") + assert db_span.n == "mongo" + assert ( + db_span.data["mongo"]["service"] + == f"{testenv['mongodb_host']}:{testenv['mongodb_port']}" + ) + assert db_span.data["mongo"]["namespace"] == "test.records" + assert db_span.data["mongo"]["command"] == "find" - self.assertEqual(db_span.data["mongo"]["filter"], '{"type": "string"}') - self.assertIsNone(db_span.data["mongo"]["json"]) + assert db_span.data["mongo"]["filter"] == '{"type": "string"}' + assert not db_span.data["mongo"]["json"] - def test_successful_insert_query(self): - with tracer.start_active_span("test"): + def test_successful_insert_query(self) -> None: + with tracer.start_as_current_span("test"): self.client.test.records.insert_one({"type": "string"}) - - self.assertIsNone(tracer.active_span) + current_span = get_current_span() + assert not current_span.is_recording() spans = self.recorder.queued_spans() - self.assertEqual(len(spans), 2) + assert len(spans) == 2 db_span = spans[0] test_span = spans[1] - self.assertEqual(test_span.t, db_span.t) - self.assertEqual(db_span.p, test_span.s) + assert test_span.t == db_span.t + assert db_span.p == test_span.s - self.assertIsNone(db_span.ec) + assert not db_span.ec - self.assertEqual(db_span.n, "mongo") - self.assertEqual(db_span.data["mongo"]["service"], "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) - self.assertEqual(db_span.data["mongo"]["namespace"], "test.records") - self.assertEqual(db_span.data["mongo"]["command"], "insert") + assert db_span.n == "mongo" + assert ( + db_span.data["mongo"]["service"] + == f"{testenv['mongodb_host']}:{testenv['mongodb_port']}" + ) + assert db_span.data["mongo"]["namespace"] == "test.records" + assert db_span.data["mongo"]["command"] == "insert" - self.assertIsNone(db_span.data["mongo"]["filter"]) + assert not db_span.data["mongo"]["filter"] - def test_successful_update_query(self): - with tracer.start_active_span("test"): - self.client.test.records.update_one({"type": "string"}, {"$set": {"type": "int"}}) - - self.assertIsNone(tracer.active_span) + def test_successful_update_query(self) -> None: + with tracer.start_as_current_span("test"): + self.client.test.records.update_one( + {"type": "string"}, {"$set": {"type": "int"}} + ) + current_span = get_current_span() + assert not current_span.is_recording() spans = self.recorder.queued_spans() - self.assertEqual(len(spans), 2) + assert len(spans) == 2 db_span = spans[0] test_span = spans[1] - self.assertEqual(test_span.t, db_span.t) - self.assertEqual(db_span.p, test_span.s) + assert test_span.t == db_span.t + assert db_span.p == test_span.s - self.assertIsNone(db_span.ec) + assert not db_span.ec - self.assertEqual(db_span.n, "mongo") - self.assertEqual(db_span.data["mongo"]["service"], "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) - self.assertEqual(db_span.data["mongo"]["namespace"], "test.records") - self.assertEqual(db_span.data["mongo"]["command"], "update") + assert db_span.n == "mongo" + assert ( + db_span.data["mongo"]["service"] + == f"{testenv['mongodb_host']}:{testenv['mongodb_port']}" + ) + assert db_span.data["mongo"]["namespace"] == "test.records" + assert db_span.data["mongo"]["command"] == "update" - self.assertIsNone(db_span.data["mongo"]["filter"]) - self.assertIsNotNone(db_span.data["mongo"]["json"]) + assert not db_span.data["mongo"]["filter"] + assert db_span.data["mongo"]["json"] payload = json.loads(db_span.data["mongo"]["json"]) - self.assertIn({ - "q": {"type": "string"}, - "u": {"$set": {"type": "int"}}, - "multi": False, - "upsert": False - }, payload) - - def test_successful_delete_query(self): - with tracer.start_active_span("test"): + assert { + "q": {"type": "string"}, + "u": {"$set": {"type": "int"}}, + "multi": False, + "upsert": False, + } in payload + + def test_successful_delete_query(self) -> None: + with tracer.start_as_current_span("test"): self.client.test.records.delete_one(filter={"type": "string"}) - - self.assertIsNone(tracer.active_span) + current_span = get_current_span() + assert not current_span.is_recording() spans = self.recorder.queued_spans() - self.assertEqual(len(spans), 2) + assert len(spans) == 2 db_span = spans[0] test_span = spans[1] - self.assertEqual(test_span.t, db_span.t) - self.assertEqual(db_span.p, test_span.s) + assert test_span.t == db_span.t + assert db_span.p == test_span.s - self.assertIsNone(db_span.ec) + assert not db_span.ec - self.assertEqual(db_span.n, "mongo") - self.assertEqual(db_span.data["mongo"]["service"], "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) - self.assertEqual(db_span.data["mongo"]["namespace"], "test.records") - self.assertEqual(db_span.data["mongo"]["command"], "delete") + assert db_span.n == "mongo" + assert ( + db_span.data["mongo"]["service"] + == f"{testenv['mongodb_host']}:{testenv['mongodb_port']}" + ) + assert db_span.data["mongo"]["namespace"] == "test.records" + assert db_span.data["mongo"]["command"] == "delete" - self.assertIsNone(db_span.data["mongo"]["filter"]) - self.assertIsNotNone(db_span.data["mongo"]["json"]) + assert not db_span.data["mongo"]["filter"] + assert db_span.data["mongo"]["json"] payload = json.loads(db_span.data["mongo"]["json"]) - self.assertIn({"q": {"type": "string"}, "limit": 1}, payload) + assert {"q": {"type": "string"}, "limit": 1} in payload - def test_successful_aggregate_query(self): - with tracer.start_active_span("test"): + def test_successful_aggregate_query(self) -> None: + with tracer.start_as_current_span("test"): self.client.test.records.count_documents({"type": "string"}) - - self.assertIsNone(tracer.active_span) + current_span = get_current_span() + assert not current_span.is_recording() spans = self.recorder.queued_spans() - self.assertEqual(len(spans), 2) + assert len(spans) == 2 db_span = spans[0] test_span = spans[1] - self.assertEqual(test_span.t, db_span.t) - self.assertEqual(db_span.p, test_span.s) + assert test_span.t == db_span.t + assert db_span.p == test_span.s - self.assertIsNone(db_span.ec) + assert not db_span.ec - self.assertEqual(db_span.n, "mongo") - self.assertEqual(db_span.data["mongo"]["service"], "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) - self.assertEqual(db_span.data["mongo"]["namespace"], "test.records") - self.assertEqual(db_span.data["mongo"]["command"], "aggregate") + assert db_span.n == "mongo" + assert ( + db_span.data["mongo"]["service"] + == f"{testenv['mongodb_host']}:{testenv['mongodb_port']}" + ) + assert db_span.data["mongo"]["namespace"] == "test.records" + assert db_span.data["mongo"]["command"] == "aggregate" - self.assertIsNone(db_span.data["mongo"]["filter"]) - self.assertIsNotNone(db_span.data["mongo"]["json"]) + assert not db_span.data["mongo"]["filter"] + assert db_span.data["mongo"]["json"] payload = json.loads(db_span.data["mongo"]["json"]) - self.assertIn({"$match": {"type": "string"}}, payload) + assert {"$match": {"type": "string"}} in payload - @pymongoversion - def test_successful_map_reduce_query(self): + @pytest.mark.skipif( + pymongo.version_tuple >= (4, 0), reason="map reduce is removed in pymongo 4.0" + ) + def test_successful_map_reduce_query(self) -> None: mapper = "function () { this.tags.forEach(function(z) { emit(z, 1); }); }" reducer = "function (key, values) { return len(values); }" - with tracer.start_active_span("test"): - self.client.test.records.map_reduce(bson.code.Code(mapper), bson.code.Code(reducer), "results", - query={"x": {"$lt": 2}}) - - self.assertIsNone(tracer.active_span) + with tracer.start_as_current_span("test"): + self.client.test.records.map_reduce( + bson.code.Code(mapper), + bson.code.Code(reducer), + "results", + query={"x": {"$lt": 2}}, + ) + current_span = get_current_span() + assert not current_span.is_recording() spans = self.recorder.queued_spans() - self.assertEqual(len(spans), 2) + assert len(spans) == 2 db_span = spans[0] test_span = spans[1] - self.assertEqual(test_span.t, db_span.t) - self.assertEqual(db_span.p, test_span.s) + assert test_span.t == db_span.t + assert db_span.p == test_span.s - self.assertIsNone(db_span.ec) + assert not db_span.ec - self.assertEqual(db_span.n, "mongo") - self.assertEqual(db_span.data["mongo"]["service"], "%s:%s" % (testenv['mongodb_host'], testenv['mongodb_port'])) - self.assertEqual(db_span.data["mongo"]["namespace"], "test.records") - self.assertEqual(db_span.data["mongo"]["command"].lower(), - "mapreduce") # mapreduce command was renamed to mapReduce in pymongo 3.9.0 + assert db_span.n == "mongo" + assert ( + db_span.data["mongo"]["service"] + == f"{testenv['mongodb_host']}:{testenv['mongodb_port']}" + ) + assert db_span.data["mongo"]["namespace"] == "test.records" + assert ( + db_span.data["mongo"]["command"].lower() == "mapreduce" + ) # mapreduce command was renamed to mapReduce in pymongo 3.9.0 - self.assertEqual(db_span.data["mongo"]["filter"], '{"x": {"$lt": 2}}') - self.assertIsNotNone(db_span.data["mongo"]["json"]) + assert db_span.data["mongo"]["filter"] == '{"x": {"$lt": 2}}' + assert db_span.data["mongo"]["json"] payload = json.loads(db_span.data["mongo"]["json"]) - self.assertEqual(payload["map"], {"$code": mapper}, db_span.data["mongo"]["json"]) - self.assertEqual(payload["reduce"], {"$code": reducer}, db_span.data["mongo"]["json"]) - - def test_successful_mutiple_queries(self): - with tracer.start_active_span("test"): - self.client.test.records.bulk_write([pymongo.InsertOne({"type": "string"}), - pymongo.UpdateOne({"type": "string"}, {"$set": {"type": "int"}}), - pymongo.DeleteOne({"type": "string"})]) - - self.assertIsNone(tracer.active_span) + assert payload["map"], {"$code": mapper} == db_span.data["mongo"]["json"] + assert payload["reduce"], {"$code": reducer} == db_span.data["mongo"]["json"] + + def test_successful_mutiple_queries(self) -> None: + with tracer.start_as_current_span("test"): + self.client.test.records.bulk_write( + [ + pymongo.InsertOne({"type": "string"}), + pymongo.UpdateOne({"type": "string"}, {"$set": {"type": "int"}}), + pymongo.DeleteOne({"type": "string"}), + ] + ) + current_span = get_current_span() + assert not current_span.is_recording() spans = self.recorder.queued_spans() - self.assertEqual(len(spans), 4) + assert len(spans) == 4 test_span = spans.pop() seen_span_ids = set() commands = [] for span in spans: - self.assertEqual(test_span.t, span.t) - self.assertEqual(span.p, test_span.s) + assert test_span.t == span.t + assert span.p == test_span.s # check if all spans got a unique id - self.assertNotIn(span.s, seen_span_ids) + assert span.s not in seen_span_ids seen_span_ids.add(span.s) commands.append(span.data["mongo"]["command"]) # ensure spans are ordered the same way as commands - self.assertListEqual(commands, ["insert", "update", "delete"]) - + assert commands == ["insert", "update", "delete"] diff --git a/tests/conftest.py b/tests/conftest.py index e2161a6c..e5e49643 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -41,7 +41,6 @@ collect_ignore_glob.append("*clients/test_couchbase*") collect_ignore_glob.append("*clients/test_google*") collect_ignore_glob.append("*clients/test_pika*") -collect_ignore_glob.append("*clients/test_pymongo*") collect_ignore_glob.append("*clients/test_redis*") collect_ignore_glob.append("*clients/test_sql*") diff --git a/tests/test_tracer_provider.py b/tests/test_tracer_provider.py index f2933485..2a55203b 100644 --- a/tests/test_tracer_provider.py +++ b/tests/test_tracer_provider.py @@ -1,5 +1,7 @@ # (c) Copyright IBM Corp. 2024 +from pytest import LogCaptureFixture + from instana.agent.host import HostAgent from instana.agent.test import TestAgent from instana.propagators.binary_propagator import BinaryPropagator @@ -9,8 +11,6 @@ from instana.recorder import StanRecorder from instana.sampling import InstanaSampler from instana.tracer import InstanaTracer, InstanaTracerProvider -from opentelemetry.trace.span import _SPAN_ID_MAX_VALUE, INVALID_SPAN_ID -from pytest import LogCaptureFixture def test_tracer_provider_defaults() -> None: