Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/instana/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def boot_agent():
psycopg2, # noqa: F401
pymongo, # noqa: F401
pymysql, # noqa: F401
# redis, # noqa: F401
redis, # noqa: F401
# sqlalchemy, # noqa: F401
starlette_inst, # noqa: F401
sanic_inst, # noqa: F401
Expand Down
103 changes: 62 additions & 41 deletions src/instana/instrumentation/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,94 +2,115 @@
# (c) Copyright Instana Inc. 2018


from typing import Any, Callable, Dict, Tuple
import wrapt

from ..log import logger
from ..util.traceutils import get_tracer_tuple, tracing_is_off

from instana.log import logger
from instana.span.span import InstanaSpan
from instana.util.traceutils import get_tracer_tuple, tracing_is_off

try:
import redis

EXCLUDED_PARENT_SPANS = ["redis", "celery-client", "celery-worker"]

def collect_tags(span, instance, args, kwargs):
def collect_attributes(
span: InstanaSpan,
instance: redis.client.Redis,
args: Tuple[object, ...],
kwargs: Dict[str, Any],
) -> None:
try:
ckw = instance.connection_pool.connection_kwargs

span.set_tag("driver", "redis-py")
span.set_attribute("driver", "redis-py")

host = ckw.get('host', None)
port = ckw.get('port', '6379')
db = ckw.get('db', None)
host = ckw.get("host", None)
port = ckw.get("port", "6379")
db = ckw.get("db", None)

if host is not None:
url = "redis://%s:%s" % (host, port)
if host:
url = f"redis://{host}:{port}"
if db is not None:
Comment thread
CagriYonca marked this conversation as resolved.
Outdated
url = url + "/%s" % db
span.set_tag('connection', url)

except:
logger.debug("redis.collect_tags non-fatal error", exc_info=True)

return span


def execute_command_with_instana(wrapped, instance, args, kwargs):
url = f"{url}/{db}"
span.set_attribute("connection", url)
except Exception:
logger.debug("redis.collect_attributes non-fatal error", exc_info=True)

def execute_command_with_instana(
wrapped: Callable[..., object],
instance: redis.client.Redis,
args: Tuple[object, ...],
kwargs: Dict[str, Any],
) -> object:
tracer, parent_span, operation_name = get_tracer_tuple()
parent_context = parent_span.get_span_context() if parent_span else None

# If we're not tracing, just return
if (tracing_is_off() or (operation_name in EXCLUDED_PARENT_SPANS)):
if tracing_is_off() or (operation_name in EXCLUDED_PARENT_SPANS):
return wrapped(*args, **kwargs)

with tracer.start_active_span("redis", child_of=parent_span) as scope:
with tracer.start_as_current_span("redis", span_context=parent_context) as span:
try:
collect_tags(scope.span, instance, args, kwargs)
if (len(args) > 0):
scope.span.set_tag("command", args[0])
collect_attributes(span, instance, args, kwargs)
if len(args) > 0:
span.set_attribute("command", args[0])

rv = wrapped(*args, **kwargs)
except Exception as e:
scope.span.log_exception(e)
except Exception as exc:
span.record_exception(exc)
raise
else:
return rv


def execute_with_instana(wrapped, instance, args, kwargs):
def execute_with_instana(
wrapped: Callable[..., object],
instance: redis.client.Redis,
args: Tuple[object, ...],
kwargs: Dict[str, Any],
) -> object:
tracer, parent_span, operation_name = get_tracer_tuple()
parent_context = parent_span.get_span_context() if parent_span else None

# If we're not tracing, just return
if (tracing_is_off() or (operation_name in EXCLUDED_PARENT_SPANS)):
if tracing_is_off() or (operation_name in EXCLUDED_PARENT_SPANS):
return wrapped(*args, **kwargs)

with tracer.start_active_span("redis", child_of=parent_span) as scope:
with tracer.start_as_current_span("redis", span_context=parent_context) as span:
try:
collect_tags(scope.span, instance, args, kwargs)
scope.span.set_tag("command", 'PIPELINE')
collect_attributes(span, instance, args, kwargs)
span.set_attribute("command", "PIPELINE")

pipe_cmds = []
for e in instance.command_stack:
pipe_cmds.append(e[0][0])
scope.span.set_tag("subCommands", pipe_cmds)
span.set_attribute("subCommands", pipe_cmds)
except Exception as e:
# If anything breaks during K/V collection, just log a debug message
logger.debug("Error collecting pipeline commands", exc_info=True)

try:
rv = wrapped(*args, **kwargs)
except Exception as e:
scope.span.log_exception(e)
except Exception as exc:
span.record_exception(exc)
raise
else:
return rv

if redis.VERSION < (3,0,0):
wrapt.wrap_function_wrapper('redis.client', 'BasePipeline.execute', execute_with_instana)
wrapt.wrap_function_wrapper('redis.client', 'StrictRedis.execute_command', execute_command_with_instana)
if redis.VERSION < (3, 0, 0):
wrapt.wrap_function_wrapper(
"redis.client", "BasePipeline.execute", execute_with_instana
)
wrapt.wrap_function_wrapper(
"redis.client", "StrictRedis.execute_command", execute_command_with_instana
)
else:
wrapt.wrap_function_wrapper('redis.client', 'Pipeline.execute', execute_with_instana)
wrapt.wrap_function_wrapper('redis.client', 'Redis.execute_command', execute_command_with_instana)
wrapt.wrap_function_wrapper(
"redis.client", "Pipeline.execute", execute_with_instana
)
wrapt.wrap_function_wrapper(
"redis.client", "Redis.execute_command", execute_command_with_instana
)

logger.debug("Instrumenting redis")
except ImportError:
Expand Down
Loading