diff --git a/src/instana/__init__.py b/src/instana/__init__.py index 07b30608..b790f74a 100644 --- a/src/instana/__init__.py +++ b/src/instana/__init__.py @@ -191,8 +191,9 @@ def boot_agent(): ) # from instana.instrumentation.aws import lambda_inst # noqa: F401 - # from instana.instrumentation.celery import hooks # noqa: F401 + from instana.instrumentation import celery # noqa: F401 from instana.instrumentation.django import middleware # noqa: F401 + # from instana.instrumentation.google.cloud import ( # pubsub, # noqa: F401 # storage, # noqa: F401 diff --git a/src/instana/instrumentation/celery.py b/src/instana/instrumentation/celery.py new file mode 100644 index 00000000..c69131aa --- /dev/null +++ b/src/instana/instrumentation/celery.py @@ -0,0 +1,202 @@ +# (c) Copyright IBM Corp. 2021 +# (c) Copyright Instana Inc. 2020 + + +import contextvars +from typing import Any, Dict, Tuple +from instana.log import logger +from instana.propagators.format import Format +from instana.singletons import tracer +from instana.span.span import InstanaSpan +from instana.util.traceutils import get_tracer_tuple +from opentelemetry import trace, context + +try: + import celery + from celery import registry, signals + + from urllib import parse + + client_token: Dict[str, Any] = {} + worker_token: Dict[str, Any] = {} + client_span = contextvars.ContextVar("client_span") + worker_span = contextvars.ContextVar("worker_span") + + def _get_task_id( + headers: Dict[str, Any], + body: Tuple[str, Any], + ) -> str: + """ + Across Celery versions, the task id can exist in a couple of places. + """ + id = headers.get("id", None) + if id is None: + id = body.get("id", None) + return id + + def add_broker_attributes( + span: InstanaSpan, + broker_url: str, + ) -> None: + try: + url = parse.urlparse(broker_url) + + # Add safety for edge case where scheme may not be a string + url_scheme = str(url.scheme) + span.set_attribute("scheme", url_scheme) + + span.set_attribute("host", url.hostname if url.hostname else "localhost") + + if not url.port: + # Set default port if not specified + if url_scheme == "redis": + span.set_attribute("port", "6379") + elif "amqp" in url_scheme: + span.set_attribute("port", "5672") + elif "sqs" in url_scheme: + span.set_attribute("port", "443") + else: + span.set_attribute("port", str(url.port)) + except Exception: + logger.debug(f"Error parsing broker URL: {broker_url}", exc_info=True) + + @signals.task_prerun.connect + def task_prerun( + *args: Tuple[object, ...], + **kwargs: Dict[str, Any], + ) -> None: + try: + ctx = None + + task = kwargs.get("sender", None) + task_id = kwargs.get("task_id", None) + task = registry.tasks.get(task.name) + + headers = task.request.get("headers", {}) + if headers is not None: + ctx = tracer.extract( + Format.HTTP_HEADERS, headers, disable_w3c_trace_context=True + ) + + span = tracer.start_span("celery-worker", span_context=ctx) + span.set_attribute("task", task.name) + span.set_attribute("task_id", task_id) + add_broker_attributes(span, task.app.conf["broker_url"]) + + ctx = trace.set_span_in_context(span) + token = context.attach(ctx) + worker_token["token"] = token + worker_span.set(span) + except Exception: + logger.debug("celery-worker task_prerun: ", exc_info=True) + + @signals.task_postrun.connect + def task_postrun( + *args: Tuple[object, ...], + **kwargs: Dict[str, Any], + ) -> None: + try: + span = worker_span.get() + + if span.is_recording(): + span.end() + worker_span.set(None) + if "token" in worker_token: + context.detach(worker_token.pop("token", None)) + except Exception: + logger.debug("celery-worker after_task_publish: ", exc_info=True) + + @signals.task_failure.connect + def task_failure( + *args: Tuple[object, ...], + **kwargs: Dict[str, Any], + ) -> None: + try: + span = worker_span.get() + if span.is_recording(): + span.set_attribute("success", False) + exc = kwargs.get("exception", None) + if exc: + span.record_exception(exc) + else: + span.mark_as_errored() + except Exception: + logger.debug("celery-worker task_failure: ", exc_info=True) + + @signals.task_retry.connect + def task_retry( + *args: Tuple[object, ...], + **kwargs: Dict[str, Any], + ) -> None: + try: + span = worker_span.get() + if span.is_recording(): + reason = kwargs.get("reason", None) + if reason: + span.set_attribute("retry-reason", reason) + except Exception: + logger.debug("celery-worker task_failure: ", exc_info=True) + + @signals.before_task_publish.connect + def before_task_publish( + *args: Tuple[object, ...], + **kwargs: Dict[str, Any], + ) -> None: + try: + tracer, parent_span, _ = get_tracer_tuple() + parent_context = parent_span.get_span_context() if parent_span else None + + if tracer: + body = kwargs["body"] + headers = kwargs["headers"] + task_name = kwargs["sender"] + task = registry.tasks.get(task_name) + task_id = _get_task_id(headers, body) + + span = tracer.start_span("celery-client", span_context=parent_context) + span.set_attribute("task", task_name) + span.set_attribute("task_id", task_id) + add_broker_attributes(span, task.app.conf["broker_url"]) + + # Context propagation + context_headers = {} + tracer.inject( + span.context, + Format.HTTP_HEADERS, + context_headers, + disable_w3c_trace_context=True, + ) + + # Fix for broken header propagation + # https://github.com/celery/celery/issues/4875 + task_headers = kwargs.get("headers") or {} + task_headers.setdefault("headers", {}) + task_headers["headers"].update(context_headers) + kwargs["headers"] = task_headers + + ctx = trace.set_span_in_context(span) + token = context.attach(ctx) + client_token["token"] = token + client_span.set(span) + except Exception: + logger.debug("celery-client before_task_publish: ", exc_info=True) + + @signals.after_task_publish.connect + def after_task_publish( + *args: Tuple[object, ...], + **kwargs: Dict[str, Any], + ) -> None: + try: + span = client_span.get() + if span.is_recording(): + span.end() + client_span.set(None) + if "token" in client_token: + context.detach(client_token.pop("token", None)) + + except Exception: + logger.debug("celery-client after_task_publish: ", exc_info=True) + + logger.debug("Instrumenting celery") +except ImportError: + pass diff --git a/src/instana/instrumentation/celery/__init__.py b/src/instana/instrumentation/celery/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/instana/instrumentation/celery/catalog.py b/src/instana/instrumentation/celery/catalog.py deleted file mode 100644 index 2ba395ac..00000000 --- a/src/instana/instrumentation/celery/catalog.py +++ /dev/null @@ -1,76 +0,0 @@ -# (c) Copyright IBM Corp. 2021 -# (c) Copyright Instana Inc. 2020 - -""" -Celery Signals are disjointed and don't allow us to pass the scope object along -with the Job message so we instead store all scopes in a dictionary on the -registered Task job. - -These methods allow pushing and pop'ing of scopes on Task objects. - -WeakValueDictionary allows for lost scopes to be garbage collected. -""" - -from weakref import WeakValueDictionary - - -def get_task_id(headers, body): - """ - Across Celery versions, the task id can exist in a couple of places. - """ - id = headers.get('id', None) - if id is None: - id = body.get('id', None) - return id - - -def task_catalog_push(task, task_id, scope, is_consumer): - """ - Push (adds) an object to the task catalog - @param task: The Celery Task - @param task_id: The Celery Task ID - @param is_consumer: Boolean - @return: scope - """ - catalog = None - if not hasattr(task, '_instana_scopes'): - catalog = WeakValueDictionary() - setattr(task, '_instana_scopes', catalog) - else: - catalog = getattr(task, '_instana_scopes') - - key = (task_id, is_consumer) - catalog[key] = scope - - -def task_catalog_pop(task, task_id, is_consumer): - """ - Pop (removes) an object from the task catalog - @param task: The Celery Task - @param task_id: The Celery Task ID - @param is_consumer: Boolean - @return: scope - """ - catalog = getattr(task, '_instana_scopes', None) - if catalog is None: - return None - - key = (task_id, is_consumer) - return catalog.pop(key, None) - - -def task_catalog_get(task, task_id, is_consumer): - """ - Get an object from the task catalog - @param task: The Celery Task - @param task_id: The Celery Task ID - @param is_consumer: Boolean - @return: scope - """ - catalog = getattr(task, '_instana_scopes', None) - if catalog is None: - return None - - key = (task_id, is_consumer) - return catalog.get(key, None) - diff --git a/src/instana/instrumentation/celery/hooks.py b/src/instana/instrumentation/celery/hooks.py deleted file mode 100644 index eb2a180c..00000000 --- a/src/instana/instrumentation/celery/hooks.py +++ /dev/null @@ -1,162 +0,0 @@ -# (c) Copyright IBM Corp. 2021 -# (c) Copyright Instana Inc. 2020 - - -import opentracing - -from ...log import logger -from ...singletons import tracer -from ...util.traceutils import get_active_tracer - -try: - import celery - from celery import registry, signals - from .catalog import task_catalog_get, task_catalog_pop, task_catalog_push, get_task_id - - from urllib import parse - - - def add_broker_tags(span, broker_url): - try: - url = parse.urlparse(broker_url) - - # Add safety for edge case where scheme may not be a string - url_scheme = str(url.scheme) - span.set_tag("scheme", url_scheme) - - if url.hostname is None: - span.set_tag("host", 'localhost') - else: - span.set_tag("host", url.hostname) - - if url.port is None: - # Set default port if not specified - if url_scheme == 'redis': - span.set_tag("port", "6379") - elif 'amqp' in url_scheme: - span.set_tag("port", "5672") - elif 'sqs' in url_scheme: - span.set_tag("port", "443") - else: - span.set_tag("port", str(url.port)) - except Exception: - logger.debug("Error parsing broker URL: %s" % broker_url, exc_info=True) - - - @signals.task_prerun.connect - def task_prerun(*args, **kwargs): - try: - ctx = None - task = kwargs.get('sender', None) - task_id = kwargs.get('task_id', None) - task = registry.tasks.get(task.name) - - headers = task.request.get('headers', {}) - if headers is not None: - ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, headers, disable_w3c_trace_context=True) - - scope = tracer.start_active_span("celery-worker", child_of=ctx) - scope.span.set_tag("task", task.name) - scope.span.set_tag("task_id", task_id) - add_broker_tags(scope.span, task.app.conf['broker_url']) - - # Store the scope on the task to eventually close it out on the "after" signal - task_catalog_push(task, task_id, scope, True) - except: - logger.debug("task_prerun: ", exc_info=True) - - - @signals.task_postrun.connect - def task_postrun(*args, **kwargs): - try: - task = kwargs.get('sender', None) - task_id = kwargs.get('task_id', None) - scope = task_catalog_pop(task, task_id, True) - if scope is not None: - scope.close() - except: - logger.debug("after_task_publish: ", exc_info=True) - - - @signals.task_failure.connect - def task_failure(*args, **kwargs): - try: - task_id = kwargs.get('task_id', None) - task = kwargs['sender'] - scope = task_catalog_get(task, task_id, True) - - if scope is not None: - scope.span.set_tag("success", False) - exc = kwargs.get('exception', None) - if exc is None: - scope.span.mark_as_errored() - else: - scope.span.log_exception(kwargs['exception']) - except: - logger.debug("task_failure: ", exc_info=True) - - - @signals.task_retry.connect - def task_retry(*args, **kwargs): - try: - task_id = kwargs.get('task_id', None) - task = kwargs['sender'] - scope = task_catalog_get(task, task_id, True) - - if scope is not None: - reason = kwargs.get('reason', None) - if reason is not None: - scope.span.set_tag('retry-reason', reason) - except: - logger.debug("task_failure: ", exc_info=True) - - - @signals.before_task_publish.connect - def before_task_publish(*args, **kwargs): - try: - active_tracer = get_active_tracer() - if active_tracer is not None: - body = kwargs['body'] - headers = kwargs['headers'] - task_name = kwargs['sender'] - task = registry.tasks.get(task_name) - task_id = get_task_id(headers, body) - - scope = active_tracer.start_active_span("celery-client", child_of=active_tracer.active_span) - scope.span.set_tag("task", task_name) - scope.span.set_tag("task_id", task_id) - add_broker_tags(scope.span, task.app.conf['broker_url']) - - # Context propagation - context_headers = {} - active_tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, context_headers, - disable_w3c_trace_context=True) - - # Fix for broken header propagation - # https://github.com/celery/celery/issues/4875 - task_headers = kwargs.get('headers') or {} - task_headers.setdefault('headers', {}) - task_headers['headers'].update(context_headers) - kwargs['headers'] = task_headers - - # Store the scope on the task to eventually close it out on the "after" signal - task_catalog_push(task, task_id, scope, False) - except: - logger.debug("before_task_publish: ", exc_info=True) - - - @signals.after_task_publish.connect - def after_task_publish(*args, **kwargs): - try: - task_id = get_task_id(kwargs['headers'], kwargs['body']) - task = registry.tasks.get(kwargs['sender']) - scope = task_catalog_pop(task, task_id, False) - if scope is not None: - scope.close() - except: - logger.debug("after_task_publish: ", exc_info=True) - - - logger.debug("Instrumenting celery") -except ImportError: - pass diff --git a/src/instana/instrumentation/redis.py b/src/instana/instrumentation/redis.py index ec439ef4..621bca26 100644 --- a/src/instana/instrumentation/redis.py +++ b/src/instana/instrumentation/redis.py @@ -93,7 +93,6 @@ def execute_with_instana( rv = wrapped(*args, **kwargs) except Exception as exc: span.record_exception(exc) - raise else: return rv diff --git a/tests/conftest.py b/tests/conftest.py index 2faa6073..061fe27a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -31,7 +31,6 @@ # codes are finalised. collect_ignore_glob.append("*clients/test_google*") -collect_ignore_glob.append("*frameworks/test_celery*") collect_ignore_glob.append("*frameworks/test_gevent*") # # Cassandra and gevent tests are run in dedicated jobs on CircleCI and will diff --git a/tests/frameworks/test_celery.py b/tests/frameworks/test_celery.py index bd08877f..126f0368 100644 --- a/tests/frameworks/test_celery.py +++ b/tests/frameworks/test_celery.py @@ -2,222 +2,280 @@ # (c) Copyright Instana Inc. 2020 import time +from typing import Generator, List from celery import shared_task +import celery +import celery.app +import celery.contrib +import celery.contrib.testing +import celery.contrib.testing.worker +import pytest from instana.singletons import tracer -from ..helpers import get_first_span_by_filter +from instana.span.span import InstanaSpan +from tests.helpers import get_first_span_by_filter # TODO: Refactor to class based tests + @shared_task -def add(x, y): +def add( + x: int, + y: int, +) -> int: return x + y @shared_task -def will_raise_error(): - raise Exception('This is a simulated error') +def will_raise_error() -> None: + raise Exception("This is a simulated error") -def filter_out_ping_tasks(spans): +def filter_out_ping_tasks( + spans: List[InstanaSpan], +) -> List[InstanaSpan]: filtered_spans = [] for span in spans: - is_ping_task = (span.n == 'celery-worker' and span.data['celery']['task'] == 'celery.ping') + is_ping_task = ( + span.n == "celery-worker" and span.data["celery"]["task"] == "celery.ping" + ) if not is_ping_task: filtered_spans.append(span) return filtered_spans -def setup_method(): - """ Clear all spans before a test run """ - tracer.recorder.clear_spans() - - -def test_apply_async(celery_app, celery_worker): - result = None - with tracer.start_active_span('test'): - result = add.apply_async(args=(4, 5)) - - # Wait for jobs to finish - time.sleep(0.5) - - spans = filter_out_ping_tasks(tracer.recorder.queued_spans()) - assert len(spans) == 3 - - filter = lambda span: span.n == "sdk" - test_span = get_first_span_by_filter(spans, filter) - assert(test_span) - - filter = lambda span: span.n == "celery-client" - client_span = get_first_span_by_filter(spans, filter) - assert(client_span) - - filter = lambda span: span.n == "celery-worker" - worker_span = get_first_span_by_filter(spans, filter) - assert(worker_span) - - assert(client_span.t == test_span.t) - assert(client_span.t == worker_span.t) - assert(client_span.p == test_span.s) - - assert("tests.frameworks.test_celery.add" == client_span.data["celery"]["task"]) - assert("redis" == client_span.data["celery"]["scheme"]) - assert("localhost" == client_span.data["celery"]["host"]) - assert("6379" == client_span.data["celery"]["port"]) - assert(client_span.data["celery"]["task_id"]) - assert(client_span.data["celery"]["error"] == None) - assert(client_span.ec == None) - - assert("tests.frameworks.test_celery.add" == worker_span.data["celery"]["task"]) - assert("redis" == worker_span.data["celery"]["scheme"]) - assert("localhost" == worker_span.data["celery"]["host"]) - assert("6379" == worker_span.data["celery"]["port"]) - assert(worker_span.data["celery"]["task_id"]) - assert(worker_span.data["celery"]["error"] == None) - assert(worker_span.data["celery"]["retry-reason"] == None) - assert(worker_span.ec == None) - - -def test_delay(celery_app, celery_worker): - result = None - with tracer.start_active_span('test'): - result = add.delay(4, 5) - - # Wait for jobs to finish - time.sleep(0.5) - - spans = filter_out_ping_tasks(tracer.recorder.queued_spans()) - assert len(spans) == 3 - - filter = lambda span: span.n == "sdk" - test_span = get_first_span_by_filter(spans, filter) - assert(test_span) - - filter = lambda span: span.n == "celery-client" - client_span = get_first_span_by_filter(spans, filter) - assert(client_span) - - filter = lambda span: span.n == "celery-worker" - worker_span = get_first_span_by_filter(spans, filter) - assert(worker_span) - - assert(client_span.t == test_span.t) - assert(client_span.t == worker_span.t) - assert(client_span.p == test_span.s) - - assert("tests.frameworks.test_celery.add" == client_span.data["celery"]["task"]) - assert("redis" == client_span.data["celery"]["scheme"]) - assert("localhost" == client_span.data["celery"]["host"]) - assert("6379" == client_span.data["celery"]["port"]) - assert(client_span.data["celery"]["task_id"]) - assert(client_span.data["celery"]["error"] == None) - assert(client_span.ec == None) - - assert("tests.frameworks.test_celery.add" == worker_span.data["celery"]["task"]) - assert("redis" == worker_span.data["celery"]["scheme"]) - assert("localhost" == worker_span.data["celery"]["host"]) - assert("6379" == worker_span.data["celery"]["port"]) - assert(worker_span.data["celery"]["task_id"]) - assert(worker_span.data["celery"]["error"] == None) - assert(worker_span.data["celery"]["retry-reason"] == None) - assert(worker_span.ec == None) - - -def test_send_task(celery_app, celery_worker): - result = None - with tracer.start_active_span('test'): - result = celery_app.send_task('tests.frameworks.test_celery.add', (1, 2)) - - # Wait for jobs to finish - time.sleep(0.5) - - spans = filter_out_ping_tasks(tracer.recorder.queued_spans()) - assert len(spans) == 3 - - filter = lambda span: span.n == "sdk" - test_span = get_first_span_by_filter(spans, filter) - assert(test_span) - - filter = lambda span: span.n == "celery-client" - client_span = get_first_span_by_filter(spans, filter) - assert(client_span) - - filter = lambda span: span.n == "celery-worker" - worker_span = get_first_span_by_filter(spans, filter) - assert(worker_span) - - assert(client_span.t == test_span.t) - assert(client_span.t == worker_span.t) - assert(client_span.p == test_span.s) - - assert("tests.frameworks.test_celery.add" == client_span.data["celery"]["task"]) - assert("redis" == client_span.data["celery"]["scheme"]) - assert("localhost" == client_span.data["celery"]["host"]) - assert("6379" == client_span.data["celery"]["port"]) - assert(client_span.data["celery"]["task_id"]) - assert(client_span.data["celery"]["error"] == None) - assert(client_span.ec == None) - - assert("tests.frameworks.test_celery.add" == worker_span.data["celery"]["task"]) - assert("redis" == worker_span.data["celery"]["scheme"]) - assert("localhost" == worker_span.data["celery"]["host"]) - assert("6379" == worker_span.data["celery"]["port"]) - assert(worker_span.data["celery"]["task_id"]) - assert(worker_span.data["celery"]["error"] == None) - assert(worker_span.data["celery"]["retry-reason"] == None) - assert(worker_span.ec == None) - - -def test_error_reporting(celery_app, celery_worker): - result = None - with tracer.start_active_span('test'): - result = will_raise_error.apply_async() - - # Wait for jobs to finish - time.sleep(0.5) - - spans = filter_out_ping_tasks(tracer.recorder.queued_spans()) - assert len(spans) == 4 - - filter = lambda span: span.n == "sdk" - test_span = get_first_span_by_filter(spans, filter) - assert(test_span) - - filter = lambda span: span.n == "celery-client" - client_span = get_first_span_by_filter(spans, filter) - assert(client_span) - - filter = lambda span: span.n == "log" - log_span = get_first_span_by_filter(spans, filter) - assert(log_span) - - filter = lambda span: span.n == "celery-worker" - worker_span = get_first_span_by_filter(spans, filter) - assert(worker_span) - - assert(client_span.t == test_span.t) - assert(client_span.t == worker_span.t) - assert(client_span.t == log_span.t) - - assert(client_span.p == test_span.s) - assert(worker_span.p == client_span.s) - assert(log_span.p == worker_span.s) - - assert("tests.frameworks.test_celery.will_raise_error" == client_span.data["celery"]["task"]) - assert("redis" == client_span.data["celery"]["scheme"]) - assert("localhost" == client_span.data["celery"]["host"]) - assert("6379" == client_span.data["celery"]["port"]) - assert(client_span.data["celery"]["task_id"]) - assert(client_span.data["celery"]["error"] == None) - assert(client_span.ec == None) - - assert("tests.frameworks.test_celery.will_raise_error" == worker_span.data["celery"]["task"]) - assert("redis" == worker_span.data["celery"]["scheme"]) - assert("localhost" == worker_span.data["celery"]["host"]) - assert("6379" == worker_span.data["celery"]["port"]) - assert(worker_span.data["celery"]["task_id"]) - assert(worker_span.data["celery"]["error"] == 'This is a simulated error') - assert(worker_span.data["celery"]["retry-reason"] == None) - assert(worker_span.ec == 1) - +class TestCelery: + @pytest.fixture(autouse=True) + def _resource(self) -> Generator[None, None, None]: + self.recorder = tracer.span_processor + self.recorder.clear_spans() + yield + + def test_apply_async( + self, + celery_app: celery.app.base.Celery, + celery_worker: celery.contrib.testing.worker.TestWorkController, + ) -> None: + with tracer.start_as_current_span("test"): + _ = add.apply_async(args=(4, 5)) + + # Wait for jobs to finish + time.sleep(1) + + spans = filter_out_ping_tasks(self.recorder.queued_spans()) + assert len(spans) == 3 + + def filter(span): + return span.n == "sdk" + + test_span = get_first_span_by_filter(spans, filter) + assert test_span + + def filter(span): + return span.n == "celery-client" + + client_span = get_first_span_by_filter(spans, filter) + assert client_span + + def filter(span): + return span.n == "celery-worker" + + worker_span = get_first_span_by_filter(spans, filter) + assert worker_span + + assert client_span.t == test_span.t + assert client_span.t == worker_span.t + assert client_span.p == test_span.s + + assert client_span.data["celery"]["task"] == "tests.frameworks.test_celery.add" + assert client_span.data["celery"]["scheme"] == "redis" + assert client_span.data["celery"]["host"] == "localhost" + assert client_span.data["celery"]["port"] == "6379" + assert client_span.data["celery"]["task_id"] + assert not client_span.data["celery"]["error"] + assert not client_span.ec + + assert worker_span.data["celery"]["task"] == "tests.frameworks.test_celery.add" + assert worker_span.data["celery"]["scheme"] == "redis" + assert worker_span.data["celery"]["host"] == "localhost" + assert worker_span.data["celery"]["port"] == "6379" + assert worker_span.data["celery"]["task_id"] + assert not worker_span.data["celery"]["error"] + assert not worker_span.data["celery"]["retry-reason"] + assert not worker_span.ec + + def test_delay( + self, + celery_app: celery.app.base.Celery, + celery_worker: celery.contrib.testing.worker.TestWorkController, + ) -> None: + with tracer.start_as_current_span("test"): + _ = add.delay(4, 5) + + # Wait for jobs to finish + time.sleep(0.5) + + spans = filter_out_ping_tasks(self.recorder.queued_spans()) + assert len(spans) == 3 + + def filter(span): + return span.n == "sdk" + + test_span = get_first_span_by_filter(spans, filter) + assert test_span + + def filter(span): + return span.n == "celery-client" + + client_span = get_first_span_by_filter(spans, filter) + assert client_span + + def filter(span): + return span.n == "celery-worker" + + worker_span = get_first_span_by_filter(spans, filter) + assert worker_span + + assert client_span.t == test_span.t + assert client_span.t == worker_span.t + assert client_span.p == test_span.s + + assert client_span.data["celery"]["task"] == "tests.frameworks.test_celery.add" + assert client_span.data["celery"]["scheme"] == "redis" + assert client_span.data["celery"]["host"] == "localhost" + assert client_span.data["celery"]["port"] == "6379" + assert client_span.data["celery"]["task_id"] + assert not client_span.data["celery"]["error"] + assert not client_span.ec + + assert worker_span.data["celery"]["task"] == "tests.frameworks.test_celery.add" + assert worker_span.data["celery"]["scheme"] == "redis" + assert worker_span.data["celery"]["host"] == "localhost" + assert worker_span.data["celery"]["port"] == "6379" + assert worker_span.data["celery"]["task_id"] + assert not worker_span.data["celery"]["error"] + assert not worker_span.data["celery"]["retry-reason"] + assert not worker_span.ec + + def test_send_task( + self, + celery_app: celery.app.base.Celery, + celery_worker: celery.contrib.testing.worker.TestWorkController, + ) -> None: + with tracer.start_as_current_span("test"): + _ = celery_app.send_task("tests.frameworks.test_celery.add", (1, 2)) + + # Wait for jobs to finish + time.sleep(0.5) + + spans = filter_out_ping_tasks(self.recorder.queued_spans()) + assert len(spans) == 3 + + def filter(span): + return span.n == "sdk" + + test_span = get_first_span_by_filter(spans, filter) + assert test_span + + def filter(span): + return span.n == "celery-client" + + client_span = get_first_span_by_filter(spans, filter) + assert client_span + + def filter(span): + return span.n == "celery-worker" + + worker_span = get_first_span_by_filter(spans, filter) + assert worker_span + + assert client_span.t == test_span.t + assert client_span.t == worker_span.t + assert client_span.p == test_span.s + + assert client_span.data["celery"]["task"] == "tests.frameworks.test_celery.add" + assert client_span.data["celery"]["scheme"] == "redis" + assert client_span.data["celery"]["host"] == "localhost" + assert client_span.data["celery"]["port"] == "6379" + assert client_span.data["celery"]["task_id"] + assert not client_span.data["celery"]["error"] + assert not client_span.ec + + assert worker_span.data["celery"]["task"] == "tests.frameworks.test_celery.add" + assert worker_span.data["celery"]["scheme"] == "redis" + assert worker_span.data["celery"]["host"] == "localhost" + assert worker_span.data["celery"]["port"] == "6379" + assert worker_span.data["celery"]["task_id"] + assert not worker_span.data["celery"]["error"] + assert not worker_span.data["celery"]["retry-reason"] + assert not worker_span.ec + + def test_error_reporting( + self, + celery_app: celery.app.base.Celery, + celery_worker: celery.contrib.testing.worker.TestWorkController, + ) -> None: + with tracer.start_as_current_span("test"): + _ = will_raise_error.apply_async() + + # Wait for jobs to finish + time.sleep(4) + + spans = filter_out_ping_tasks(self.recorder.queued_spans()) + assert len(spans) == 4 + + def filter(span): + return span.n == "sdk" + + test_span = get_first_span_by_filter(spans, filter) + assert test_span + + def filter(span): + return span.n == "celery-client" + + client_span = get_first_span_by_filter(spans, filter) + assert client_span + + def filter(span): + return span.n == "log" + + log_span = get_first_span_by_filter(spans, filter) + assert log_span + + def filter(span): + return span.n == "celery-worker" + + worker_span = get_first_span_by_filter(spans, filter) + assert worker_span + + assert client_span.t == test_span.t + assert client_span.t == worker_span.t + assert client_span.t == log_span.t + + assert client_span.p == test_span.s + assert worker_span.p == client_span.s + assert log_span.p == worker_span.s + + assert ( + client_span.data["celery"]["task"] + == "tests.frameworks.test_celery.will_raise_error" + ) + assert client_span.data["celery"]["scheme"] == "redis" + assert client_span.data["celery"]["host"] == "localhost" + assert client_span.data["celery"]["port"] == "6379" + assert client_span.data["celery"]["task_id"] + assert not client_span.data["celery"]["error"] + assert not client_span.ec + + assert ( + worker_span.data["celery"]["task"] + == "tests.frameworks.test_celery.will_raise_error" + ) + assert worker_span.data["celery"]["scheme"] == "redis" + assert worker_span.data["celery"]["host"] == "localhost" + assert worker_span.data["celery"]["port"] == "6379" + assert worker_span.data["celery"]["task_id"] + assert worker_span.data["celery"]["error"] == "This is a simulated error" + assert not worker_span.data["celery"]["retry-reason"] + assert worker_span.ec == 1