Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/instana/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def boot_agent():
pymongo, # noqa: F401
pymysql, # noqa: F401
redis, # noqa: F401
# sqlalchemy, # noqa: F401
sqlalchemy, # noqa: F401
starlette_inst, # noqa: F401
sanic_inst, # noqa: F401
urllib3, # noqa: F401
Expand Down
145 changes: 87 additions & 58 deletions src/instana/instrumentation/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,90 +3,119 @@


import re
from operator import attrgetter
from typing import Any, Dict

from ..log import logger
from ..util.traceutils import get_tracer_tuple, tracing_is_off
from opentelemetry import context, trace

from instana.log import logger
from instana.span.span import InstanaSpan, get_current_span
from instana.span_context import SpanContext
from instana.util.traceutils import get_tracer_tuple, tracing_is_off

try:
import sqlalchemy
from sqlalchemy import __version__ as sqlalchemy_version
from sqlalchemy import event
from sqlalchemy.engine import Engine

url_regexp = re.compile(r"\/\/(\S+@)")


@event.listens_for(Engine, 'before_cursor_execute', named=True)
def receive_before_cursor_execute(**kw):
@event.listens_for(Engine, "before_cursor_execute", named=True)
def receive_before_cursor_execute(
**kw: Dict[str, Any],
) -> None:
try:
# If we're not tracing, just return
if tracing_is_off():
return

tracer, parent_span, _ = get_tracer_tuple()
scope = tracer.start_active_span("sqlalchemy", child_of=parent_span)
context = kw['context']
if context:
context._stan_scope = scope

conn = kw['conn']
url = str(conn.engine.url)
scope.span.set_tag('sqlalchemy.sql', kw['statement'])
scope.span.set_tag('sqlalchemy.eng', conn.engine.name)
scope.span.set_tag('sqlalchemy.url', url_regexp.sub('//', url))
except Exception as e:
logger.debug(e)
return


@event.listens_for(Engine, 'after_cursor_execute', named=True)
def receive_after_cursor_execute(**kw):
context = kw['context']

if context is not None and hasattr(context, '_stan_scope'):
scope = context._stan_scope
if scope is not None:
scope.close()
parent_context = parent_span.get_span_context() if parent_span else None

span = tracer.start_span("sqlalchemy", span_context=parent_context)
conn = kw["conn"]
conn.span = span
span.set_attribute("sqlalchemy.sql", kw["statement"])
span.set_attribute("sqlalchemy.eng", conn.engine.name)
span.set_attribute(
"sqlalchemy.url", url_regexp.sub("//", str(conn.engine.url))
)

ctx = trace.set_span_in_context(span)
token = context.attach(ctx)
conn.token = token
except Exception:
logger.debug(
"Instrumenting sqlalchemy @ receive_before_cursor_execute",
exc_info=True,
)

@event.listens_for(Engine, "after_cursor_execute", named=True)
def receive_after_cursor_execute(
**kw: Dict[str, Any],
) -> None:
try:
# If we're not tracing, just return
if tracing_is_off():
return

current_span = get_current_span()
conn = kw["conn"]
if current_span.is_recording():
current_span.end()
if hasattr(conn, "token"):
context.detach(conn.token)
conn.token = None
except Exception:
logger.debug(
"Instrumenting sqlalchemy @ receive_after_cursor_execute",
exc_info=True,
)

error_event = "handle_error"
# Handle dbapi_error event; deprecated since version 0.9
if sqlalchemy.__version__[0] == "0":
if sqlalchemy_version[0] == "0":
error_event = "dbapi_error"


def _set_error_tags(context, exception_string, scope_string):
scope, context_exception = None, None
if attrgetter(scope_string)(context) and attrgetter(exception_string)(context):
scope = attrgetter(scope_string)(context)
context_exception = attrgetter(exception_string)(context)
if scope and context_exception:
scope.span.log_exception(context_exception)
scope.close()
def _set_error_attributes(
context: SpanContext,
exception_string: str,
span: InstanaSpan,
) -> None:
context_exception = None, None
if hasattr(context, exception_string):
context_exception = getattr(context, exception_string)
if span and context_exception:
span.record_exception(context_exception)
else:
scope.span.log_exception("No %s specified." % error_event)
scope.close()

span.record_exception(f"No {error_event} specified.")
if span.is_recording():
span.end()

@event.listens_for(Engine, error_event, named=True)
def receive_handle_db_error(**kw):

if tracing_is_off():
return
def receive_handle_db_error(
**kw: Dict[str, Any],
) -> None:
try:
if tracing_is_off():
return

# support older db error event
if error_event == "dbapi_error":
context = kw.get('context')
exception_string = 'exception'
scope_string = '_stan_scope'
else:
context = kw.get('exception_context')
exception_string = 'sqlalchemy_exception'
scope_string = 'execution_context._stan_scope'
current_span = get_current_span()

if context:
_set_error_tags(context, exception_string, scope_string)
# support older db error event
if error_event == "dbapi_error":
context = kw.get("context")
exception_string = "exception"
else:
context = kw.get("exception_context")
exception_string = "sqlalchemy_exception"

if context:
_set_error_attributes(context, exception_string, current_span)
except Exception:
logger.debug(
"Instrumenting sqlalchemy @ receive_handle_db_error",
exc_info=True,
)

logger.debug("Instrumenting sqlalchemy")

Expand Down
Loading