diff --git a/src/instana/__init__.py b/src/instana/__init__.py index 647b6253..0801367a 100644 --- a/src/instana/__init__.py +++ b/src/instana/__init__.py @@ -179,7 +179,7 @@ def boot_agent(): pymongo, # noqa: F401 pymysql, # noqa: F401 redis, # noqa: F401 - # sqlalchemy, # noqa: F401 + sqlalchemy, # noqa: F401 starlette_inst, # noqa: F401 sanic_inst, # noqa: F401 urllib3, # noqa: F401 diff --git a/src/instana/instrumentation/sqlalchemy.py b/src/instana/instrumentation/sqlalchemy.py index 2adf705a..3f44b526 100644 --- a/src/instana/instrumentation/sqlalchemy.py +++ b/src/instana/instrumentation/sqlalchemy.py @@ -3,90 +3,119 @@ import re -from operator import attrgetter +from typing import Any, Dict -from ..log import logger -from ..util.traceutils import get_tracer_tuple, tracing_is_off +from opentelemetry import context, trace + +from instana.log import logger +from instana.span.span import InstanaSpan, get_current_span +from instana.span_context import SpanContext +from instana.util.traceutils import get_tracer_tuple, tracing_is_off try: - import sqlalchemy + from sqlalchemy import __version__ as sqlalchemy_version from sqlalchemy import event from sqlalchemy.engine import Engine url_regexp = re.compile(r"\/\/(\S+@)") - - @event.listens_for(Engine, 'before_cursor_execute', named=True) - def receive_before_cursor_execute(**kw): + @event.listens_for(Engine, "before_cursor_execute", named=True) + def receive_before_cursor_execute( + **kw: Dict[str, Any], + ) -> None: try: # If we're not tracing, just return if tracing_is_off(): return tracer, parent_span, _ = get_tracer_tuple() - scope = tracer.start_active_span("sqlalchemy", child_of=parent_span) - context = kw['context'] - if context: - context._stan_scope = scope - - conn = kw['conn'] - url = str(conn.engine.url) - scope.span.set_tag('sqlalchemy.sql', kw['statement']) - scope.span.set_tag('sqlalchemy.eng', conn.engine.name) - scope.span.set_tag('sqlalchemy.url', url_regexp.sub('//', url)) - except Exception as e: - logger.debug(e) - return - - - @event.listens_for(Engine, 'after_cursor_execute', named=True) - def receive_after_cursor_execute(**kw): - context = kw['context'] - - if context is not None and hasattr(context, '_stan_scope'): - scope = context._stan_scope - if scope is not None: - scope.close() + parent_context = parent_span.get_span_context() if parent_span else None + + span = tracer.start_span("sqlalchemy", span_context=parent_context) + conn = kw["conn"] + conn.span = span + span.set_attribute("sqlalchemy.sql", kw["statement"]) + span.set_attribute("sqlalchemy.eng", conn.engine.name) + span.set_attribute( + "sqlalchemy.url", url_regexp.sub("//", str(conn.engine.url)) + ) + + ctx = trace.set_span_in_context(span) + token = context.attach(ctx) + conn.token = token + except Exception: + logger.debug( + "Instrumenting sqlalchemy @ receive_before_cursor_execute", + exc_info=True, + ) + + @event.listens_for(Engine, "after_cursor_execute", named=True) + def receive_after_cursor_execute( + **kw: Dict[str, Any], + ) -> None: + try: + # If we're not tracing, just return + if tracing_is_off(): + return + current_span = get_current_span() + conn = kw["conn"] + if current_span.is_recording(): + current_span.end() + if hasattr(conn, "token"): + context.detach(conn.token) + conn.token = None + except Exception: + logger.debug( + "Instrumenting sqlalchemy @ receive_after_cursor_execute", + exc_info=True, + ) error_event = "handle_error" # Handle dbapi_error event; deprecated since version 0.9 - if sqlalchemy.__version__[0] == "0": + if sqlalchemy_version[0] == "0": error_event = "dbapi_error" - - def _set_error_tags(context, exception_string, scope_string): - scope, context_exception = None, None - if attrgetter(scope_string)(context) and attrgetter(exception_string)(context): - scope = attrgetter(scope_string)(context) - context_exception = attrgetter(exception_string)(context) - if scope and context_exception: - scope.span.log_exception(context_exception) - scope.close() + def _set_error_attributes( + context: SpanContext, + exception_string: str, + span: InstanaSpan, + ) -> None: + context_exception = None, None + if hasattr(context, exception_string): + context_exception = getattr(context, exception_string) + if span and context_exception: + span.record_exception(context_exception) else: - scope.span.log_exception("No %s specified." % error_event) - scope.close() - + span.record_exception(f"No {error_event} specified.") + if span.is_recording(): + span.end() @event.listens_for(Engine, error_event, named=True) - def receive_handle_db_error(**kw): - - if tracing_is_off(): - return + def receive_handle_db_error( + **kw: Dict[str, Any], + ) -> None: + try: + if tracing_is_off(): + return - # support older db error event - if error_event == "dbapi_error": - context = kw.get('context') - exception_string = 'exception' - scope_string = '_stan_scope' - else: - context = kw.get('exception_context') - exception_string = 'sqlalchemy_exception' - scope_string = 'execution_context._stan_scope' + current_span = get_current_span() - if context: - _set_error_tags(context, exception_string, scope_string) + # support older db error event + if error_event == "dbapi_error": + context = kw.get("context") + exception_string = "exception" + else: + context = kw.get("exception_context") + exception_string = "sqlalchemy_exception" + if context: + _set_error_attributes(context, exception_string, current_span) + except Exception: + logger.debug( + "Instrumenting sqlalchemy @ receive_handle_db_error", + exc_info=True, + ) logger.debug("Instrumenting sqlalchemy") diff --git a/tests/clients/test_sqlalchemy.py b/tests/clients/test_sqlalchemy.py index 68afa1ec..6aef6fc9 100644 --- a/tests/clients/test_sqlalchemy.py +++ b/tests/clients/test_sqlalchemy.py @@ -1,244 +1,297 @@ # (c) Copyright IBM Corp. 2021 # (c) Copyright Instana Inc. 2020 -import unittest +from typing import Generator -from ..helpers import testenv -from instana.singletons import agent, tracer - -from sqlalchemy.orm import sessionmaker -from sqlalchemy.exc import OperationalError -from sqlalchemy.orm import declarative_base +import pytest from sqlalchemy import Column, Integer, String, create_engine, text +from sqlalchemy.exc import OperationalError +from sqlalchemy.orm import declarative_base, sessionmaker +from instana.singletons import agent, tracer +from instana.span.span import get_current_span +from tests.helpers import testenv + +engine = create_engine( + f"postgresql://{testenv['postgresql_user']}:{testenv['postgresql_pw']}@{testenv['postgresql_host']}:{testenv['postgresql_port']}/{testenv['postgresql_db']}" +) -engine = create_engine("postgresql://%s:%s@%s/%s" % (testenv['postgresql_user'], testenv['postgresql_pw'], - testenv['postgresql_host'], testenv['postgresql_db'])) +Session = sessionmaker(bind=engine) Base = declarative_base() + class StanUser(Base): - __tablename__ = 'churchofstan' + __tablename__ = "churchofstan" - id = Column(Integer, primary_key=True) - name = Column(String) - fullname = Column(String) - password = Column(String) + id = Column(Integer, primary_key=True) + name = Column(String) + fullname = Column(String) + password = Column(String) - def __repr__(self): + def __repr__(self) -> None: return "" % ( - self.name, self.fullname, self.password) - -Base.metadata.create_all(engine) - -stan_user = StanUser(name='IAmStan', fullname='Stan Robot', password='3X}vP66ADoCFT2g?HPvoem2eJh,zWXgd36Rb/{aRq/>7EYy6@EEH4BP(oeXac@mR') -stan_user2 = StanUser(name='IAmStanToo', fullname='Stan Robot 2', password='3X}vP66ADoCFT2g?HPvoem2eJh,zWXgd36Rb/{aRq/>7EYy6@EEH4BP(oeXac@mR') - -Session = sessionmaker(bind=engine) -Session.configure(bind=engine) - -sqlalchemy_url = 'postgresql://%s/%s' % (testenv['postgresql_host'], testenv['postgresql_db']) - - -class TestSQLAlchemy(unittest.TestCase): - def setUp(self): - """ Clear all spans before a test run """ - self.recorder = tracer.recorder + self.name, + self.fullname, + self.password, + ) + + +@pytest.fixture(scope="class") +def db_setup() -> None: + with tracer.start_as_current_span("metadata") as span: + Base.metadata.create_all(engine) + span.end() + + +stan_user = StanUser( + name="IAmStan", + fullname="Stan Robot", + password="3X}vP66ADoCFT2g?HPvoem2eJh,zWXgd36Rb/{aRq/>7EYy6@EEH4BP(oeXac@mR", +) +stan_user2 = StanUser( + name="IAmStanToo", + fullname="Stan Robot 2", + password="3X}vP66ADoCFT2g?HPvoem2eJh,zWXgd36Rb/{aRq/>7EYy6@EEH4BP(oeXac@mR", +) + +sqlalchemy_url = f"postgresql://{testenv['postgresql_host']}:{testenv['postgresql_port']}/{testenv['postgresql_db']}" + + +@pytest.mark.usefixtures("db_setup") +class TestSQLAlchemy: + @pytest.fixture(autouse=True) + def _resource(self) -> Generator[None, None, None]: + """Clear all spans before a test run""" + self.recorder = tracer.span_processor self.recorder.clear_spans() self.session = Session() - - def tearDown(self): - """ Ensure that allow_exit_as_root has the default value """ + yield + """Ensure that allow_exit_as_root has the default value""" + self.session.close() agent.options.allow_exit_as_root = False - def test_session_add(self): - with tracer.start_active_span('test'): + def test_session_add(self) -> None: + with tracer.start_as_current_span("test"): self.session.add(stan_user) self.session.commit() spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) sql_span = spans[0] test_span = spans[1] - self.assertIsNone(tracer.active_span) + current_span = get_current_span() + assert not current_span.is_recording() # Same traceId - self.assertEqual(test_span.t, sql_span.t) + assert sql_span.t == test_span.t # Parent relationships - self.assertEqual(sql_span.p, test_span.s) + assert sql_span.p == test_span.s # Error logging - self.assertIsNone(test_span.ec) - self.assertIsNone(sql_span.ec) + assert not test_span.ec + assert not sql_span.ec # SQLAlchemy span - self.assertEqual('sqlalchemy', sql_span.n) - self.assertFalse('custom' in sql_span.data) - self.assertTrue('sqlalchemy' in sql_span.data) - - self.assertEqual('postgresql', sql_span.data["sqlalchemy"]["eng"]) - self.assertEqual(sqlalchemy_url, sql_span.data["sqlalchemy"]["url"]) - self.assertEqual('INSERT INTO churchofstan (name, fullname, password) VALUES (%(name)s, %(fullname)s, %(password)s) RETURNING churchofstan.id', sql_span.data["sqlalchemy"]["sql"]) - self.assertIsNone(sql_span.data["sqlalchemy"]["err"]) - - self.assertIsNotNone(sql_span.stack) - self.assertTrue(type(sql_span.stack) is list) - self.assertGreater(len(sql_span.stack), 0) - - def test_session_add_as_root_exit_span(self): + assert sql_span.n == "sqlalchemy" + assert "custom" not in sql_span.data + assert "sqlalchemy" in sql_span.data + + assert sql_span.data["sqlalchemy"]["eng"] == "postgresql" + assert sqlalchemy_url == sql_span.data["sqlalchemy"]["url"] + assert ( + "INSERT INTO churchofstan (name, fullname, password) VALUES (%(name)s, %(fullname)s, %(password)s) RETURNING churchofstan.id" + == sql_span.data["sqlalchemy"]["sql"] + ) + assert not sql_span.data["sqlalchemy"]["err"] + + assert sql_span.stack + assert isinstance(sql_span.stack, list) + assert len(sql_span.stack) > 0 + + def test_session_add_as_root_exit_span(self) -> None: agent.options.allow_exit_as_root = True self.session.add(stan_user2) self.session.commit() spans = self.recorder.queued_spans() - self.assertEqual(1, len(spans)) + assert len(spans) == 1 sql_span = spans[0] - self.assertIsNone(tracer.active_span) + current_span = get_current_span() + assert not current_span.is_recording() # Parent relationships - self.assertEqual(sql_span.p, None) + assert not sql_span.p # Error logging - self.assertIsNone(sql_span.ec) + assert not sql_span.ec # SQLAlchemy span - self.assertEqual('sqlalchemy', sql_span.n) - self.assertFalse('custom' in sql_span.data) - self.assertTrue('sqlalchemy' in sql_span.data) - - self.assertEqual('postgresql', sql_span.data["sqlalchemy"]["eng"]) - self.assertEqual(sqlalchemy_url, sql_span.data["sqlalchemy"]["url"]) - self.assertEqual('INSERT INTO churchofstan (name, fullname, password) VALUES (%(name)s, %(fullname)s, %(password)s) RETURNING churchofstan.id', sql_span.data["sqlalchemy"]["sql"]) - self.assertIsNone(sql_span.data["sqlalchemy"]["err"]) - - self.assertIsNotNone(sql_span.stack) - self.assertTrue(type(sql_span.stack) is list) - self.assertGreater(len(sql_span.stack), 0) - - def test_transaction(self): - result = None - with tracer.start_active_span('test'): + assert sql_span.n == "sqlalchemy" + assert "custom" not in sql_span.data + assert "sqlalchemy" in sql_span.data + + assert sql_span.data["sqlalchemy"]["eng"] == "postgresql" + assert sqlalchemy_url == sql_span.data["sqlalchemy"]["url"] + assert ( + "INSERT INTO churchofstan (name, fullname, password) VALUES (%(name)s, %(fullname)s, %(password)s) RETURNING churchofstan.id" + == sql_span.data["sqlalchemy"]["sql"] + ) + assert not sql_span.data["sqlalchemy"]["err"] + + assert sql_span.stack + assert isinstance(sql_span.stack, list) + assert len(sql_span.stack) > 0 + + def test_transaction(self) -> None: + with tracer.start_as_current_span("test"): with engine.begin() as connection: - result = connection.execute(text("select 1")) - result = connection.execute(text("select (name, fullname, password) from churchofstan where name='doesntexist'")) + connection.execute(text("select 1")) + connection.execute( + text( + "select (name, fullname, password) from churchofstan where name='doesntexist'" + ) + ) spans = self.recorder.queued_spans() - self.assertEqual(3, len(spans)) + assert len(spans) == 3 sql_span0 = spans[0] sql_span1 = spans[1] test_span = spans[2] - self.assertIsNone(tracer.active_span) + current_span = get_current_span() + assert not current_span.is_recording() # Same traceId - self.assertEqual(test_span.t, sql_span0.t) - self.assertEqual(test_span.t, sql_span1.t) + assert sql_span0.t == test_span.t + assert sql_span1.t == test_span.t # Parent relationships - self.assertEqual(sql_span0.p, test_span.s) - self.assertEqual(sql_span1.p, test_span.s) + assert sql_span0.p == test_span.s + assert sql_span1.p == test_span.s # Error logging - self.assertIsNone(test_span.ec) - self.assertIsNone(sql_span0.ec) - self.assertIsNone(sql_span1.ec) + assert not test_span.ec + assert not sql_span0.ec + assert not sql_span1.ec # SQLAlchemy span0 - self.assertEqual('sqlalchemy', sql_span0.n) - self.assertFalse('custom' in sql_span0.data) - self.assertTrue('sqlalchemy' in sql_span0.data) + assert sql_span0.n == "sqlalchemy" + assert "custom" not in sql_span0.data + assert "sqlalchemy" in sql_span0.data - self.assertEqual('postgresql', sql_span0.data["sqlalchemy"]["eng"]) - self.assertEqual(sqlalchemy_url, sql_span0.data["sqlalchemy"]["url"]) - self.assertEqual('select 1', sql_span0.data["sqlalchemy"]["sql"]) - self.assertIsNone(sql_span0.data["sqlalchemy"]["err"]) + assert sql_span0.data["sqlalchemy"]["eng"] == "postgresql" + assert sqlalchemy_url == sql_span0.data["sqlalchemy"]["url"] + assert sql_span0.data["sqlalchemy"]["sql"] == "select 1" + assert not sql_span0.data["sqlalchemy"]["err"] - self.assertIsNotNone(sql_span0.stack) - self.assertTrue(type(sql_span0.stack) is list) - self.assertGreater(len(sql_span0.stack), 0) + assert sql_span0.stack + assert isinstance(sql_span0.stack, list) + assert len(sql_span0.stack) > 0 # SQLAlchemy span1 - self.assertEqual('sqlalchemy', sql_span1.n) - self.assertFalse('custom' in sql_span1.data) - self.assertTrue('sqlalchemy' in sql_span1.data) - - self.assertEqual('postgresql', sql_span1.data["sqlalchemy"]["eng"]) - self.assertEqual(sqlalchemy_url, sql_span1.data["sqlalchemy"]["url"]) - self.assertEqual("select (name, fullname, password) from churchofstan where name='doesntexist'", sql_span1.data["sqlalchemy"]["sql"]) - self.assertIsNone(sql_span1.data["sqlalchemy"]["err"]) - - self.assertIsNotNone(sql_span1.stack) - self.assertTrue(type(sql_span1.stack) is list) - self.assertGreater(len(sql_span1.stack), 0) - - def test_error_logging(self): - with tracer.start_active_span('test'): + assert sql_span1.n == "sqlalchemy" + assert "custom" not in sql_span1.data + assert "sqlalchemy" in sql_span1.data + + assert sql_span1.data["sqlalchemy"]["eng"] == "postgresql" + assert sqlalchemy_url == sql_span1.data["sqlalchemy"]["url"] + assert ( + "select (name, fullname, password) from churchofstan where name='doesntexist'" + == sql_span1.data["sqlalchemy"]["sql"] + ) + assert not sql_span1.data["sqlalchemy"]["err"] + + assert sql_span1.stack + assert isinstance(sql_span1.stack, list) + assert len(sql_span1.stack) > 0 + + def test_error_logging(self) -> None: + with tracer.start_as_current_span("test"): try: self.session.execute(text("htVwGrCwVThisIsInvalidSQLaw4ijXd88")) - self.session.commit() - except: + # self.session.commit() + except Exception: pass spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) + assert len(spans) == 2 sql_span = spans[0] test_span = spans[1] - self.assertIsNone(tracer.active_span) + current_span = get_current_span() + assert not current_span.is_recording() # Same traceId - self.assertEqual(test_span.t, sql_span.t) + assert sql_span.t == test_span.t # Parent relationships - self.assertEqual(sql_span.p, test_span.s) + assert sql_span.p == test_span.s # Error logging - self.assertIsNone(test_span.ec) - self.assertIs(sql_span.ec, 1) + assert not test_span.ec + assert sql_span.ec == 1 # SQLAlchemy span - self.assertEqual('sqlalchemy', sql_span.n) - - self.assertFalse('custom' in sql_span.data) - self.assertTrue('sqlalchemy' in sql_span.data) - - self.assertEqual('postgresql', sql_span.data["sqlalchemy"]["eng"]) - self.assertEqual(sqlalchemy_url, sql_span.data["sqlalchemy"]["url"]) - self.assertEqual('htVwGrCwVThisIsInvalidSQLaw4ijXd88', sql_span.data["sqlalchemy"]["sql"]) - self.assertIn('syntax error at or near "htVwGrCwVThisIsInvalidSQLaw4ijXd88', sql_span.data["sqlalchemy"]["err"]) - self.assertIsNotNone(sql_span.stack) - self.assertTrue(type(sql_span.stack) is list) - self.assertGreater(len(sql_span.stack), 0) - - def test_error_before_tracing(self): + assert sql_span.n == "sqlalchemy" + + assert "custom" not in sql_span.data + assert "sqlalchemy" in sql_span.data + + assert sql_span.data["sqlalchemy"]["eng"] == "postgresql" + assert sqlalchemy_url == sql_span.data["sqlalchemy"]["url"] + assert ( + "htVwGrCwVThisIsInvalidSQLaw4ijXd88" == sql_span.data["sqlalchemy"]["sql"] + ) + assert ( + 'syntax error at or near "htVwGrCwVThisIsInvalidSQLaw4ijXd88' + in sql_span.data["sqlalchemy"]["err"] + ) + assert sql_span.stack + assert isinstance(sql_span.stack, list) + assert len(sql_span.stack) > 0 + + def test_error_before_tracing(self) -> None: """Test the scenario, in which instana is loaded, - but connection fails before tracing begins. - This is typical in test container scenario, - where it is "normal" to just start hammering a database container - which is still starting and not ready to handle requests yet. - In this scenario it is important that we get - an sqlalachemy exception, and not something else - like an AttributeError. Because testcontainer has a logic - to retry in case of certain sqlalchemy exceptions but it - can't handle an AttributeError.""" + but connection fails before tracing begins. + This is typical in test container scenario, + where it is "normal" to just start hammering a database container + which is still starting and not ready to handle requests yet. + In this scenario it is important that we get + an sqlalachemy exception, and not something else + like an AttributeError. Because testcontainer has a logic + to retry in case of certain sqlalchemy exceptions but it + can't handle an AttributeError.""" # https://github.com/instana/python-sensor/issues/362 - self.assertIsNone(tracer.active_span) + current_span = get_current_span() + assert not current_span.is_recording() - invalid_connection_url = 'postgresql://user1:pwd1@localhost:9999/mydb1' - with self.assertRaisesRegex( - OperationalError, - r'\(psycopg2.OperationalError\) connection .* failed.*' - ) as context_manager: + invalid_connection_url = "postgresql://user1:pwd1@localhost:9999/mydb1" + with pytest.raises( + OperationalError, + match=r"\(psycopg2.OperationalError\) connection .* failed.*", + ) as context_manager: engine = create_engine(invalid_connection_url) with engine.connect() as connection: - version, = connection.execute(text("select version()")).fetchone() - - the_exception = context_manager.exception - self.assertFalse(the_exception.connection_invalidated) + (version,) = connection.execute(text("select version()")).fetchone() + + the_exception = context_manager.value + assert not the_exception.connection_invalidated + + def test_if_not_tracing(self) -> None: + with engine.begin() as connection: + connection.execute(text("select 1")) + connection.execute( + text( + "select (name, fullname, password) from churchofstan where name='doesntexist'" + ) + ) + + current_span = get_current_span() + assert not current_span.is_recording() diff --git a/tests/conftest.py b/tests/conftest.py index 794b8d3d..c7c7cb8b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -34,7 +34,6 @@ # TODO: remove the following entries as the migration of the instrumentation # codes are finalised. collect_ignore_glob.append("*clients/test_google*") -collect_ignore_glob.append("*clients/test_sql*") collect_ignore_glob.append("*frameworks/test_celery*") collect_ignore_glob.append("*frameworks/test_gevent*")