diff --git a/.circleci/config.yml b/.circleci/config.yml index 1c531a84..4eb25278 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -136,6 +136,15 @@ jobs: environment: PUBSUB_EMULATOR_HOST: 0.0.0.0:8681 PUBSUB_PROJECT1: test-project,test-topic + - image: public.ecr.aws/bitnami/kafka:3.9.0 + environment: + KAFKA_CFG_NODE_ID: 0 + KAFKA_CFG_PROCESS_ROLES: controller,broker + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093 + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094 working_directory: ~/repo steps: - checkout @@ -164,6 +173,15 @@ jobs: environment: PUBSUB_EMULATOR_HOST: 0.0.0.0:8681 PUBSUB_PROJECT1: test-project,test-topic + - image: public.ecr.aws/bitnami/kafka:3.9.0 + environment: + KAFKA_CFG_NODE_ID: 0 + KAFKA_CFG_PROCESS_ROLES: controller,broker + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093 + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094 working_directory: ~/repo steps: - checkout @@ -192,6 +210,15 @@ jobs: environment: PUBSUB_EMULATOR_HOST: 0.0.0.0:8681 PUBSUB_PROJECT1: test-project,test-topic + - image: public.ecr.aws/bitnami/kafka:3.9.0 + environment: + KAFKA_CFG_NODE_ID: 0 + KAFKA_CFG_PROCESS_ROLES: controller,broker + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093 + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094 working_directory: ~/repo steps: - checkout @@ -221,6 +248,15 @@ jobs: environment: PUBSUB_EMULATOR_HOST: 0.0.0.0:8681 PUBSUB_PROJECT1: test-project,test-topic + - image: public.ecr.aws/bitnami/kafka:3.9.0 + environment: + KAFKA_CFG_NODE_ID: 0 + KAFKA_CFG_PROCESS_ROLES: controller,broker + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093 + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094 working_directory: ~/repo steps: - checkout @@ -250,6 +286,15 @@ jobs: environment: PUBSUB_EMULATOR_HOST: 0.0.0.0:8681 PUBSUB_PROJECT1: test-project,test-topic + - image: public.ecr.aws/bitnami/kafka:3.9.0 + environment: + KAFKA_CFG_NODE_ID: 0 + KAFKA_CFG_PROCESS_ROLES: controller,broker + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093 + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094 working_directory: ~/repo steps: - checkout @@ -293,6 +338,15 @@ jobs: environment: PUBSUB_EMULATOR_HOST: 0.0.0.0:8681 PUBSUB_PROJECT1: test-project,test-topic + - image: public.ecr.aws/bitnami/kafka:3.9.0 + environment: + KAFKA_CFG_NODE_ID: 0 + KAFKA_CFG_PROCESS_ROLES: controller,broker + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093 + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094 working_directory: ~/repo steps: - checkout @@ -322,6 +376,15 @@ jobs: environment: PUBSUB_EMULATOR_HOST: 0.0.0.0:8681 PUBSUB_PROJECT1: test-project,test-topic + - image: public.ecr.aws/bitnami/kafka:3.9.0 + environment: + KAFKA_CFG_NODE_ID: 0 + KAFKA_CFG_PROCESS_ROLES: controller,broker + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093 + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,EXTERNAL://localhost:9094 working_directory: ~/repo steps: - checkout diff --git a/.tekton/python-tracer-prepuller.yaml b/.tekton/python-tracer-prepuller.yaml index 045f90cc..80b86017 100644 --- a/.tekton/python-tracer-prepuller.yaml +++ b/.tekton/python-tracer-prepuller.yaml @@ -45,6 +45,10 @@ spec: # public.ecr.aws/docker/library/postgres:16.2-bookworm image: public.ecr.aws/docker/library/postgres@sha256:07572430dbcd821f9f978899c3ab3a727f5029be9298a41662e1b5404d5b73e0 command: ["sh", "-c", "'true'"] + - name: prepuller-kafka + # public.ecr.aws/bitnami/kafka:3.9.0 + image: public.ecr.aws/docker/library/kafka@sha256:d2890d68f96b36da3c8413fa94294f018b2f95d87cf108cbf71eab510572d9be + command: ["sh", "-c", "'true'"] - name: prepuller-38 # public.ecr.aws/docker/library/python:3.8.20-bookworm image: public.ecr.aws/docker/library/python@ diff --git a/.tekton/task.yaml b/.tekton/task.yaml index 7e7c917c..4671cd83 100644 --- a/.tekton/task.yaml +++ b/.tekton/task.yaml @@ -131,6 +131,25 @@ spec: - name: rabbitmq # public.ecr.aws/docker/library/rabbitmq:3.13.0 image: public.ecr.aws/docker/library/rabbitmq@sha256:39de1a4fc6c72d12bd5dfa23e8576536fd1c0cc8418344cd5a51addfc9a1145d + - name: kafka + # public.ecr.aws/bitnami/kafka:3.9.0 + image: public.ecr.aws/bitnami/kafka@sha256:d2890d68f96b36da3c8413fa94294f018b2f95d87cf108cbf71eab510572d9be + env: + - name: KAFKA_CFG_NODE_ID + value: 0 + - name: KAFKA_CFG_PROCESS_ROLES + value: controller,broker + - name: KAFKA_CFG_LISTENERS + value: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 + - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP + value: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT + - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS + value: 0@kafka:9093 + - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES + value: CONTROLLER + - name: KAFKA_CFG_ADVERTISED_LISTENERS + value: PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094 + params: - name: imageDigest type: string diff --git a/docker-compose.yml b/docker-compose.yml index 47567682..a60d89df 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -62,3 +62,17 @@ services: ports: - "8681:8681" - "8682:8682" + + kafka: + image: public.ecr.aws/bitnami/kafka:latest + ports: + - '9092:9092' + - '9094:9094' + environment: + - KAFKA_CFG_NODE_ID=0 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094 \ No newline at end of file diff --git a/src/instana/__init__.py b/src/instana/__init__.py index bb4d1d80..43f27465 100644 --- a/src/instana/__init__.py +++ b/src/instana/__init__.py @@ -169,6 +169,7 @@ def boot_agent() -> None: asyncio, # noqa: F401 boto3_inst, # noqa: F401 cassandra_inst, # noqa: F401 + celery, # noqa: F401 couchbase_inst, # noqa: F401 fastapi_inst, # noqa: F401 flask, # noqa: F401 @@ -176,36 +177,42 @@ def boot_agent() -> None: grpcio, # noqa: F401 logging, # noqa: F401 mysqlclient, # noqa: F401 - pika, # noqa: F401 pep0249, # noqa: F401 + pika, # noqa: F401 psycopg2, # noqa: F401 pymongo, # noqa: F401 pymysql, # noqa: F401 pyramid, # noqa: F401 redis, # noqa: F401 + sanic_inst, # noqa: F401 sqlalchemy, # noqa: F401 starlette_inst, # noqa: F401 - sanic_inst, # noqa: F401 urllib3, # noqa: F401 ) from instana.instrumentation.aiohttp import ( - client, # noqa: F401 - server, # noqa: F401 + client as aiohttp_client, # noqa: F401 + ) + from instana.instrumentation.aiohttp import ( + server as aiohttp_server, # noqa: F401 ) from instana.instrumentation.aws import lambda_inst # noqa: F401 - from instana.instrumentation import celery # noqa: F401 from instana.instrumentation.django import middleware # noqa: F401 from instana.instrumentation.google.cloud import ( pubsub, # noqa: F401 storage, # noqa: F401 ) + from instana.instrumentation.kafka import ( + kafka_python, # noqa: F401 + ) + from instana.instrumentation.tornado import ( + client as tornado_client, # noqa: F401 + ) from instana.instrumentation.tornado import ( - client, # noqa: F401 - server, # noqa: F401 + server as tornado_server, # noqa: F401 ) # Hooks - from instana.hooks import hook_uwsgi, hook_gunicorn # noqa: F401 + from instana.hooks import hook_gunicorn, hook_uwsgi # noqa: F401 if "INSTANA_DISABLE" not in os.environ: diff --git a/src/instana/instrumentation/kafka/__init__.py b/src/instana/instrumentation/kafka/__init__.py new file mode 100644 index 00000000..593be793 --- /dev/null +++ b/src/instana/instrumentation/kafka/__init__.py @@ -0,0 +1 @@ +# (c) Copyright IBM Corp. 2025 diff --git a/src/instana/instrumentation/kafka/kafka_python.py b/src/instana/instrumentation/kafka/kafka_python.py new file mode 100644 index 00000000..42174c9e --- /dev/null +++ b/src/instana/instrumentation/kafka/kafka_python.py @@ -0,0 +1,90 @@ +# (c) Copyright IBM Corp. 2025 + +try: + from typing import TYPE_CHECKING, Any, Callable, Dict, Tuple + + import kafka # noqa: F401 + import wrapt + from opentelemetry.trace import SpanKind + + from instana.log import logger + from instana.propagators.format import Format + from instana.util.traceutils import ( + get_tracer_tuple, + tracing_is_off, + ) + + if TYPE_CHECKING: + from kafka.producer.future import FutureRecordMetadata + + @wrapt.patch_function_wrapper("kafka", "KafkaProducer.send") + def trace_kafka_send( + wrapped: Callable[..., "kafka.KafkaProducer.send"], + instance: "kafka.KafkaProducer", + args: Tuple[int, str, Tuple[Any, ...]], + kwargs: Dict[str, Any], + ) -> "FutureRecordMetadata": + if tracing_is_off(): + return wrapped(*args, **kwargs) + + tracer, parent_span, _ = get_tracer_tuple() + parent_context = parent_span.get_span_context() if parent_span else None + + with tracer.start_as_current_span( + "kafka-producer", span_context=parent_context, kind=SpanKind.PRODUCER + ) as span: + span.set_attribute("kafka.service", args[0]) + span.set_attribute("kafka.access", "send") + + # context propagation + tracer.inject( + span.context, + Format.KAFKA_HEADERS, + kwargs.get("headers", {}), + disable_w3c_trace_context=True, + ) + + try: + res = wrapped(*args, **kwargs) + except Exception as exc: + span.record_exception(exc) + else: + return res + + @wrapt.patch_function_wrapper("kafka", "KafkaConsumer.__next__") + def trace_kafka_consume( + wrapped: Callable[..., "kafka.KafkaConsumer.__next__"], + instance: "kafka.KafkaConsumer", + args: Tuple[int, str, Tuple[Any, ...]], + kwargs: Dict[str, Any], + ) -> "FutureRecordMetadata": + if tracing_is_off(): + return wrapped(*args, **kwargs) + + tracer, parent_span, _ = get_tracer_tuple() + + parent_context = ( + parent_span.get_span_context() + if parent_span + else tracer.extract( + Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True + ) + ) + + with tracer.start_as_current_span( + "kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER + ) as span: + topic = list(instance.subscription())[0] + span.set_attribute("kafka.service", topic) + span.set_attribute("kafka.access", "consume") + + try: + res = wrapped(*args, **kwargs) + except Exception as exc: + span.record_exception(exc) + else: + return res + + logger.debug("Instrumenting Kafka (kafka-python)") +except ImportError: + pass diff --git a/src/instana/propagators/format.py b/src/instana/propagators/format.py index 9049c4e1..01228ba8 100644 --- a/src/instana/propagators/format.py +++ b/src/instana/propagators/format.py @@ -50,3 +50,15 @@ class Format(object): should use a prefix or other convention to distinguish tracer-specific key:value pairs. """ + + KAFKA_HEADERS = "kafka_headers" + """ + The KAFKA_HEADERS format represents :class:`SpanContext`\\ s in a python + ``dict`` mapping from character-restricted strings to strings. + + Keys and values in the KAFKA_HEADERS carrier must be suitable for use as + HTTP headers (without modification or further escaping). That is, the + keys have a greatly restricted character set, casing for the keys may not + be preserved by various intermediaries, and the values should be + URL-escaped. + """ diff --git a/src/instana/propagators/kafka_propagator.py b/src/instana/propagators/kafka_propagator.py new file mode 100644 index 00000000..6b22fb6e --- /dev/null +++ b/src/instana/propagators/kafka_propagator.py @@ -0,0 +1,61 @@ +# (c) Copyright IBM Corp. 2025 +from typing import TYPE_CHECKING + +from opentelemetry.trace.span import format_span_id + +from instana.log import logger +from instana.propagators.base_propagator import BasePropagator, CarrierT +from instana.util.ids import hex_id_limited + +if TYPE_CHECKING: + from instana.span_context import SpanContext + + +class KafkaPropagator(BasePropagator): + """ + Instana Propagator for Format.KAFKA_HEADERS. + + The KAFKA_HEADERS format deals with key-values with string to string mapping. + The character set should be restricted to HTTP compatible. + """ + + def __init__(self) -> None: + super(KafkaPropagator, self).__init__() + + def inject( + self, + span_context: "SpanContext", + carrier: CarrierT, + disable_w3c_trace_context: bool = True, + ) -> None: + trace_id = span_context.trace_id + span_id = span_context.span_id + dictionary_carrier = self.extract_headers_dict(carrier) + + if dictionary_carrier: + # Suppression `level` made in the child context or in the parent context + # has priority over any non-suppressed `level` setting + child_level = int( + self.extract_instana_headers(dictionary_carrier)[2] or "1" + ) + span_context.level = min(child_level, span_context.level) + + serializable_level = str(span_context.level) + + def inject_key_value(carrier, key, value): + if isinstance(carrier, list): + carrier.append((key, value)) + elif isinstance(carrier, dict) or "__setitem__" in dir(carrier): + carrier[key] = value + else: + raise Exception( + f"KafkaPropagator: Unsupported carrier type {type(carrier)}", + ) + + try: + inject_key_value(carrier, "X_INSTANA_L_S", serializable_level) + inject_key_value(carrier, "X_INSTANA_T", hex_id_limited(trace_id)) + inject_key_value(carrier, "X_INSTANA_S", format_span_id(span_id)) + + except Exception: + logger.debug("KafkaPropagator - inject error:", exc_info=True) diff --git a/src/instana/span/kind.py b/src/instana/span/kind.py index 9fd7b340..f3487c39 100644 --- a/src/instana/span/kind.py +++ b/src/instana/span/kind.py @@ -31,6 +31,7 @@ "tornado-server", "gcps-consumer", "asgi", + "kafka-consumer", ) EXIT_SPANS = ( @@ -53,6 +54,7 @@ "pymongo", "gcs", "gcps-producer", + "kafka-producer", ) REGISTERED_SPANS = LOCAL_SPANS + ENTRY_SPANS + EXIT_SPANS diff --git a/src/instana/span/registered_span.py b/src/instana/span/registered_span.py index efc826f1..66769ebd 100644 --- a/src/instana/span/registered_span.py +++ b/src/instana/span/registered_span.py @@ -1,15 +1,31 @@ # (c) Copyright IBM Corp. 2024 +from typing import TYPE_CHECKING, Any, Dict + +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import SpanKind + from instana.log import logger from instana.span.base_span import BaseSpan -from instana.span.kind import ENTRY_SPANS, EXIT_SPANS, HTTP_SPANS, LOCAL_SPANS +from instana.span.kind import ( + ENTRY_SPANS, + EXIT_SPANS, + HTTP_SPANS, + LOCAL_SPANS, +) -from opentelemetry.trace import SpanKind -from opentelemetry.semconv.trace import SpanAttributes +if TYPE_CHECKING: + from instana.span.span import InstanaSpan class RegisteredSpan(BaseSpan): - def __init__(self, span, source, service_name, **kwargs) -> None: + def __init__( + self, + span: "InstanaSpan", + source: Dict[str, Any], + service_name: str, + **kwargs: Dict[str, Any], + ) -> None: # pylint: disable=invalid-name super(RegisteredSpan, self).__init__(span, source, **kwargs) self.n = span.name @@ -34,13 +50,17 @@ def __init__(self, span, source, service_name, **kwargs) -> None: if "gcps" in span.name: self.n = "gcps" + # unify the span name for kafka-producer and kafka-consumer + if "kafka" in span.name: + self.n = "kafka" + # Logic to store custom attributes for registered spans (not used yet) if len(span.attributes) > 0: self.data["sdk"]["custom"]["tags"] = self._validate_attributes( span.attributes ) - def _populate_entry_span_data(self, span) -> None: + def _populate_entry_span_data(self, span: "InstanaSpan") -> None: if span.name in HTTP_SPANS: self._collect_http_attributes(span) @@ -127,10 +147,14 @@ def _populate_entry_span_data(self, span) -> None: self.data["rpc"]["params"] = span.attributes.pop("rpc.params", None) # self.data["rpc"]["baggage"] = span.attributes.pop("rpc.baggage", None) self.data["rpc"]["error"] = span.attributes.pop("rpc.error", None) + + elif span.name.startswith("kafka"): + self._collect_kafka_attributes(span) + else: logger.debug("SpanRecorder: Unknown entry span: %s" % span.name) - def _populate_local_span_data(self, span) -> None: + def _populate_local_span_data(self, span: "InstanaSpan") -> None: if span.name == "render": self.data["render"]["name"] = span.attributes.pop("name", None) self.data["render"]["type"] = span.attributes.pop("type", None) @@ -139,7 +163,7 @@ def _populate_local_span_data(self, span) -> None: else: logger.debug("SpanRecorder: Unknown local span: %s" % span.name) - def _populate_exit_span_data(self, span) -> None: + def _populate_exit_span_data(self, span: "InstanaSpan") -> None: if span.name in HTTP_SPANS: self._collect_http_attributes(span) @@ -239,8 +263,12 @@ def _populate_exit_span_data(self, span) -> None: self.data["mysql"]["host"] = span.attributes.pop("host", None) self.data["mysql"]["port"] = span.attributes.pop("port", None) self.data["mysql"]["db"] = span.attributes.pop(SpanAttributes.DB_NAME, None) - self.data["mysql"]["user"] = span.attributes.pop(SpanAttributes.DB_USER, None) - self.data["mysql"]["stmt"] = span.attributes.pop(SpanAttributes.DB_STATEMENT, None) + self.data["mysql"]["user"] = span.attributes.pop( + SpanAttributes.DB_USER, None + ) + self.data["mysql"]["stmt"] = span.attributes.pop( + SpanAttributes.DB_STATEMENT, None + ) self.data["mysql"]["error"] = span.attributes.pop("mysql.error", None) elif span.name == "postgres": @@ -303,10 +331,14 @@ def _populate_exit_span_data(self, span) -> None: self.data["log"]["parameters"] = event.attributes.pop( "parameters", None ) + + elif span.name.startswith("kafka"): + self._collect_kafka_attributes(span) + else: logger.debug("SpanRecorder: Unknown exit span: %s" % span.name) - def _collect_http_attributes(self, span) -> None: + def _collect_http_attributes(self, span: "InstanaSpan") -> None: self.data["http"]["host"] = span.attributes.pop("http.host", None) self.data["http"]["url"] = span.attributes.pop("http.url", None) self.data["http"]["path"] = span.attributes.pop("http.path", None) @@ -325,3 +357,7 @@ def _collect_http_attributes(self, span) -> None: for key in custom_headers: trimmed_key = key[12:] self.data["http"]["header"][trimmed_key] = span.attributes.pop(key) + + def _collect_kafka_attributes(self, span: "InstanaSpan") -> None: + self.data["kafka"]["service"] = span.attributes.pop("kafka.service", None) + self.data["kafka"]["access"] = span.attributes.pop("kafka.access", None) diff --git a/src/instana/span/span.py b/src/instana/span/span.py index 61b99e55..f05a01f0 100644 --- a/src/instana/span/span.py +++ b/src/instana/span/span.py @@ -165,6 +165,8 @@ def record_exception( self.set_attribute("sqlalchemy.err", message) elif self.name == "aws.lambda.entry": self.set_attribute("lambda.error", message) + elif self.name.startswith("kafka"): + self.set_attribute("kafka.error", message) else: _attributes = {"message": message} if attributes: diff --git a/src/instana/tracer.py b/src/instana/tracer.py index 28876070..aed28d17 100644 --- a/src/instana/tracer.py +++ b/src/instana/tracer.py @@ -26,6 +26,7 @@ from instana.propagators.exceptions import UnsupportedFormatException from instana.propagators.format import Format from instana.propagators.http_propagator import HTTPPropagator +from instana.propagators.kafka_propagator import KafkaPropagator from instana.propagators.text_propagator import TextPropagator from instana.recorder import StanRecorder from instana.sampling import InstanaSampler, Sampler @@ -53,6 +54,7 @@ def __init__( self._propagators[Format.HTTP_HEADERS] = HTTPPropagator() self._propagators[Format.TEXT_MAP] = TextPropagator() self._propagators[Format.BINARY] = BinaryPropagator() + self._propagators[Format.KAFKA_HEADERS] = KafkaPropagator() def get_tracer( self, @@ -118,7 +120,9 @@ def start_span( record_exception: bool = True, set_status_on_exception: bool = True, ) -> InstanaSpan: - parent_context = span_context if span_context else get_current_span().get_span_context() + parent_context = ( + span_context if span_context else get_current_span().get_span_context() + ) if parent_context and not isinstance(parent_context, SpanContext): raise TypeError("parent_context must be an Instana SpanContext or None.") @@ -224,9 +228,13 @@ def _create_span_context(self, parent_context: SpanContext) -> SpanContext: level=(parent_context.level if parent_context else 1), synthetic=(parent_context.synthetic if parent_context else False), trace_parent=(parent_context.trace_parent if parent_context else None), - instana_ancestor=(parent_context.instana_ancestor if parent_context else None), + instana_ancestor=( + parent_context.instana_ancestor if parent_context else None + ), long_trace_id=(parent_context.long_trace_id if parent_context else None), - correlation_type=(parent_context.correlation_type if parent_context else None), + correlation_type=( + parent_context.correlation_type if parent_context else None + ), correlation_id=(parent_context.correlation_id if parent_context else None), traceparent=(parent_context.traceparent if parent_context else None), tracestate=(parent_context.tracestate if parent_context else None), @@ -237,7 +245,9 @@ def _create_span_context(self, parent_context: SpanContext) -> SpanContext: def inject( self, span_context: SpanContext, - format: Union[Format.BINARY, Format.HTTP_HEADERS, Format.TEXT_MAP], + format: Union[ + Format.BINARY, Format.HTTP_HEADERS, Format.TEXT_MAP, Format.KAFKA_HEADERS + ], carrier: "CarrierT", disable_w3c_trace_context: bool = False, ) -> Optional["CarrierT"]: @@ -250,7 +260,9 @@ def inject( def extract( self, - format: Union[Format.BINARY, Format.HTTP_HEADERS, Format.TEXT_MAP], + format: Union[ + Format.BINARY, Format.HTTP_HEADERS, Format.TEXT_MAP, Format.KAFKA_HEADERS + ], carrier: "CarrierT", disable_w3c_trace_context: bool = False, ) -> Optional[Context]: diff --git a/tests/clients/test_kafka_python.py b/tests/clients/test_kafka_python.py new file mode 100644 index 00000000..9c47b7ab --- /dev/null +++ b/tests/clients/test_kafka_python.py @@ -0,0 +1,126 @@ +# (c) Copyright IBM Corp. 2025 + +from typing import Generator + +import pytest +from kafka import KafkaConsumer, KafkaProducer +from kafka.admin import KafkaAdminClient, NewTopic +from kafka.errors import TopicAlreadyExistsError +from opentelemetry.trace import SpanKind + +from instana.singletons import agent, tracer +from tests.helpers import testenv + + +class TestKafkaPythonProducer: + @pytest.fixture(autouse=True) + def _resource(self) -> Generator[None, None, None]: + """SetUp and TearDown""" + # setup + # Clear all spans before a test run + self.recorder = tracer.span_processor + self.recorder.clear_spans() + + # Kafka admin client + self.kafka_client = KafkaAdminClient( + bootstrap_servers=testenv["kafka_bootstrap_servers"], + client_id="test_kafka_python", + ) + + try: + self.kafka_client.create_topics( + [ + NewTopic( + name=testenv["kafka_topic"], + num_partitions=1, + replication_factor=1, + ), + ] + ) + except TopicAlreadyExistsError: + pass + + # Kafka producer + self.producer = KafkaProducer( + bootstrap_servers=testenv["kafka_bootstrap_servers"] + ) + yield + # teardown + # Ensure that allow_exit_as_root has the default value""" + agent.options.allow_exit_as_root = False + # Close connections + self.producer.close() + self.kafka_client.delete_topics([testenv["kafka_topic"]]) + self.kafka_client.close() + + def test_trace_kafka_send(self) -> None: + with tracer.start_as_current_span("test"): + future = self.producer.send(testenv["kafka_topic"], b"raw_bytes") + + record_metadata = future.get(timeout=10) # noqa: F841 + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + kafka_span = spans[0] + test_span = spans[1] + + # Same traceId + assert test_span.t == kafka_span.t + + # Parent relationships + assert kafka_span.p == test_span.s + + # Error logging + assert not test_span.ec + assert not kafka_span.ec + + assert kafka_span.n == "kafka" + assert kafka_span.k == SpanKind.CLIENT + assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] + assert kafka_span.data["kafka"]["access"] == "send" + + def test_trace_kafka_consume(self) -> None: + agent.options.allow_exit_as_root = False + + # Produce some events + self.producer.send(testenv["kafka_topic"], b"raw_bytes1") + self.producer.send(testenv["kafka_topic"], b"raw_bytes2") + self.producer.flush() + + # Consume the events + consumer = KafkaConsumer( + testenv["kafka_topic"], + bootstrap_servers=testenv["kafka_bootstrap_servers"], + auto_offset_reset="earliest", # consume earliest available messages + enable_auto_commit=False, # do not auto-commit offsets + consumer_timeout_ms=1000, + ) + + with tracer.start_as_current_span("test"): + for msg in consumer: + if msg is None: + break + + consumer.close() + + spans = self.recorder.queued_spans() + assert len(spans) == 4 + + kafka_span = spans[0] + test_span = spans[len(spans) - 1] + + # Same traceId + assert test_span.t == kafka_span.t + + # Parent relationships + assert kafka_span.p == test_span.s + + # Error logging + assert not test_span.ec + assert not kafka_span.ec + + assert kafka_span.n == "kafka" + assert kafka_span.k == SpanKind.SERVER + assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] + assert kafka_span.data["kafka"]["access"] == "consume" diff --git a/tests/helpers.py b/tests/helpers.py index 2c5c52d4..7f496814 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -2,6 +2,7 @@ # (c) Copyright Instana Inc. 2018 import os + import pytest testenv = {} @@ -57,6 +58,17 @@ testenv["mongodb_pw"] = os.environ.get("MONGO_PW", None) +""" +Kafka Environment +""" +testenv["kafka_host"] = os.environ.get("KAFKA_HOST", "127.0.0.1") +testenv["kafka_port"] = os.environ.get("KAFKA_PORT", "9094") +testenv["kafka_topic"] = os.environ.get("KAFKA_TOPIC", "span-topic") +testenv["kafka_bootstrap_servers"] = [ + f"{testenv['kafka_host']}:{testenv['kafka_port']}", +] + + def drop_log_spans_from_list(spans): """ Log spans may occur randomly in test runs because of various intentional errors (for testing). This @@ -142,6 +154,7 @@ def get_spans_by_filter(spans, filter): def launch_traced_request(url): import requests + from instana.log import logger from instana.singletons import tracer @@ -153,6 +166,3 @@ def launch_traced_request(url): response = requests.get(url) return response - - - diff --git a/tests/requirements-pre314.txt b/tests/requirements-pre314.txt index 53915bb7..686aac11 100644 --- a/tests/requirements-pre314.txt +++ b/tests/requirements-pre314.txt @@ -40,3 +40,4 @@ tornado>=6.4.1 uvicorn>=0.13.4 urllib3>=1.26.5 httpx>=0.27.0 +kafka-python-ng>=2.0.0 diff --git a/tests/requirements.txt b/tests/requirements.txt index 5d28130a..5d6eb85e 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -39,3 +39,5 @@ tornado>=6.4.1 uvicorn>=0.13.4 urllib3>=1.26.5 httpx>=0.27.0 +kafka-python>=2.0.0; python_version < "3.12" +kafka-python-ng>=2.0.0; python_version >= "3.12" \ No newline at end of file diff --git a/tests/span/test_registered_span.py b/tests/span/test_registered_span.py index 6d40fb03..d707dafa 100644 --- a/tests/span/test_registered_span.py +++ b/tests/span/test_registered_span.py @@ -146,6 +146,14 @@ def test_populate_local_span_data_with_other_name( "rpc.port": 1234, }, ), + ( + "kafka-consumer", + "kafka", + { + "kafka.service": "my-topic", + "kafka.access": "consume", + }, + ), ], ) def test_populate_entry_span_data( @@ -350,6 +358,14 @@ def test_populate_entry_span_data_AWSlambda( "gcps.top": "MY_SUBSCRIPTION_NAME", }, ), + ( + "kafka-producer", + "kafka", + { + "kafka.service": "my-topic", + "kafka.access": "send", + }, + ), ], ) def test_populate_exit_span_data( @@ -453,3 +469,29 @@ def test_populate_exit_span_data_log( while self.span._events: self.span._events.pop() + + def test_collect_kafka_attributes( + self, + span_context: SpanContext, + span_processor: StanRecorder, + ) -> None: + span_name = "test-kafka-registered-span" + attributes = { + "kafka.service": "my-topic", + "kafka.access": "send", + } + service_name = "test-kafka-registered-service" + self.span = InstanaSpan( + span_name, span_context, span_processor, attributes=attributes + ) + reg_span = RegisteredSpan(self.span, None, service_name) + + excepted_result = { + "kafka.service": attributes["kafka.service"], + "kafka.access": attributes["kafka.access"], + } + + reg_span._collect_kafka_attributes(self.span) + + assert excepted_result["kafka.service"] == reg_span.data["kafka"]["service"] + assert excepted_result["kafka.access"] == reg_span.data["kafka"]["access"] diff --git a/tests/span/test_span.py b/tests/span/test_span.py index 16800dc4..15479a7b 100644 --- a/tests/span/test_span.py +++ b/tests/span/test_span.py @@ -569,6 +569,7 @@ def test_span_add_event( ("celery-worker", "error"), ("sqlalchemy", "sqlalchemy.err"), ("aws.lambda.entry", "lambda.error"), + ("kafka", "kafka.error"), ], ) def test_span_record_exception_default( diff --git a/tests/test_tracer.py b/tests/test_tracer.py index 06474447..79991d8d 100644 --- a/tests/test_tracer.py +++ b/tests/test_tracer.py @@ -27,7 +27,7 @@ def test_tracer_defaults(tracer_provider: InstanaTracerProvider) -> None: assert isinstance(tracer._sampler, InstanaSampler) assert isinstance(tracer.span_processor, StanRecorder) assert isinstance(tracer.exporter, HostAgent) - assert len(tracer._propagators) == 3 + assert len(tracer._propagators) == 4 def test_tracer_start_span( diff --git a/tests/test_tracer_provider.py b/tests/test_tracer_provider.py index 5a1ffd5b..6d6c3d61 100644 --- a/tests/test_tracer_provider.py +++ b/tests/test_tracer_provider.py @@ -7,6 +7,7 @@ from instana.propagators.binary_propagator import BinaryPropagator from instana.propagators.format import Format from instana.propagators.http_propagator import HTTPPropagator +from instana.propagators.kafka_propagator import KafkaPropagator from instana.propagators.text_propagator import TextPropagator from instana.recorder import StanRecorder from instana.sampling import InstanaSampler @@ -18,10 +19,11 @@ def test_tracer_provider_defaults() -> None: assert isinstance(provider.sampler, InstanaSampler) assert isinstance(provider._span_processor, StanRecorder) assert isinstance(provider._exporter, HostAgent) - assert len(provider._propagators) == 3 + assert len(provider._propagators) == 4 assert isinstance(provider._propagators[Format.HTTP_HEADERS], HTTPPropagator) assert isinstance(provider._propagators[Format.TEXT_MAP], TextPropagator) assert isinstance(provider._propagators[Format.BINARY], BinaryPropagator) + assert isinstance(provider._propagators[Format.KAFKA_HEADERS], KafkaPropagator) def test_tracer_provider_get_tracer() -> None: