Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/instana/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def boot_agent():
# pika, # noqa: F401
pep0249, # noqa: F401
psycopg2, # noqa: F401
# pymongo, # noqa: F401
pymongo, # noqa: F401
pymysql, # noqa: F401
# redis, # noqa: F401
# sqlalchemy, # noqa: F401
Expand Down
65 changes: 38 additions & 27 deletions src/instana/instrumentation/pymongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,49 @@
# (c) Copyright Instana Inc. 2020


from ..log import logger
from ..util.traceutils import get_tracer_tuple, tracing_is_off
from instana.span.span import InstanaSpan
from instana.log import logger
from instana.util.traceutils import get_tracer_tuple, tracing_is_off

try:
import pymongo
from pymongo import monitoring
from bson import json_util
from opentelemetry.semconv.trace import SpanAttributes


class MongoCommandTracer(monitoring.CommandListener):
def __init__(self):
class MongoCommandTracer(pymongo.monitoring.CommandListener):
def __init__(self) -> None:
self.__active_commands = {}

def started(self, event):
def started(self, event: pymongo.monitoring.CommandStartedEvent) -> None:
Comment thread
CagriYonca marked this conversation as resolved.
Outdated
tracer, parent_span, _ = get_tracer_tuple()
# return early if we're not tracing
if tracing_is_off():
return
parent_context = parent_span.get_span_context() if parent_span else None

with tracer.start_active_span("mongo", child_of=parent_span) as scope:
self._collect_connection_tags(scope.span, event)
self._collect_command_tags(scope.span, event)
with tracer.start_as_current_span(
"mongo", span_context=parent_context
) as span:
self._collect_connection_tags(span, event)
self._collect_command_tags(span, event)

# include collection name into the namespace if provided
if event.command_name in event.command:
scope.span.set_tag("collection", event.command.get(event.command_name))
span.set_attribute(
SpanAttributes.DB_MONGODB_COLLECTION,
event.command.get(event.command_name),
)

self.__active_commands[event.request_id] = scope
self.__active_commands[event.request_id] = span

def succeeded(self, event):
def succeeded(self, event: pymongo.monitoring.CommandStartedEvent) -> None:
active_span = self.__active_commands.pop(event.request_id, None)

# return early if we're not tracing
if active_span is None:
return

def failed(self, event):
def failed(self, event: pymongo.monitoring.CommandStartedEvent) -> None:
active_span = self.__active_commands.pop(event.request_id, None)

# return early if we're not tracing
Expand All @@ -47,23 +53,27 @@ def failed(self, event):

active_span.log_exception(event.failure)

def _collect_connection_tags(self, span, event):
def _collect_connection_tags(
self, span: InstanaSpan, event: pymongo.monitoring.CommandStartedEvent
) -> None:
(host, port) = event.connection_id

span.set_tag("host", host)
span.set_tag("port", str(port))
span.set_tag("db", event.database_name)
span.set_attribute(SpanAttributes.SERVER_ADDRESS, host)
span.set_attribute(SpanAttributes.SERVER_PORT, str(port))
span.set_attribute(SpanAttributes.DB_NAME, event.database_name)

def _collect_command_tags(self, span, event):
def _collect_command_tags(self, span, event) -> None:
"""
Extract MongoDB command name and arguments and attach it to the span
"""
cmd = event.command_name
span.set_tag("command", cmd)
span.set_attribute("command", cmd)

for key in ["filter", "query"]:
if key in event.command:
span.set_tag("filter", json_util.dumps(event.command.get(key)))
span.set_attribute(
"filter", json_util.dumps(event.command.get(key))
)
break

# The location of command documents within the command object depends on the name
Expand All @@ -72,24 +82,25 @@ def _collect_command_tags(self, span, event):
"insert": "documents",
"update": "updates",
"delete": "deletes",
"aggregate": "pipeline"
"aggregate": "pipeline",
}

cmd_doc = None
if cmd in cmd_doc_locations:
cmd_doc = event.command.get(cmd_doc_locations[cmd])
elif cmd.lower() == "mapreduce": # mapreduce command was renamed to mapReduce in pymongo 3.9.0
elif (
cmd.lower() == "mapreduce"
): # mapreduce command was renamed to mapReduce in pymongo 3.9.0
# mapreduce command consists of two mandatory parts: map and reduce
cmd_doc = {
"map": event.command.get("map"),
"reduce": event.command.get("reduce")
"reduce": event.command.get("reduce"),
}

if cmd_doc is not None:
span.set_tag("json", json_util.dumps(cmd_doc))

span.set_attribute("json", json_util.dumps(cmd_doc))

monitoring.register(MongoCommandTracer())
pymongo.monitoring.register(MongoCommandTracer())

logger.debug("Instrumenting pymongo")

Expand Down
10 changes: 2 additions & 8 deletions src/instana/span/registered_span.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,14 +252,8 @@ def _populate_exit_span_data(self, span) -> None:
self.data["pg"]["error"] = span.attributes.pop("pg.error", None)

elif span.name == "mongo":
service = "%s:%s" % (
span.attributes.pop("host", None),
span.attributes.pop("port", None),
)
namespace = "%s.%s" % (
span.attributes.pop("db", "?"),
span.attributes.pop("collection", "?"),
)
service = f"{span.attributes.pop(SpanAttributes.SERVER_ADDRESS, None)}:{span.attributes.pop(SpanAttributes.SERVER_PORT, None)}"
namespace = f"{span.attributes.pop(SpanAttributes.DB_NAME, '?')}.{span.attributes.pop(SpanAttributes.DB_MONGODB_COLLECTION, '?')}"

self.data["mongo"]["service"] = service
self.data["mongo"]["namespace"] = namespace
Expand Down
Loading