From b08fbba1c841dbd42d23ff62946fb1d3fa95187a Mon Sep 17 00:00:00 2001 From: Cagri Yonca Date: Fri, 27 Sep 2024 14:41:00 +0300 Subject: [PATCH 1/4] refactor(pubsub): added pubsub otel instrumentation --- src/instana/__init__.py | 9 +- .../instrumentation/google/cloud/pubsub.py | 118 +++++++++++------- 2 files changed, 77 insertions(+), 50 deletions(-) diff --git a/src/instana/__init__.py b/src/instana/__init__.py index b790f74a..6cb98669 100644 --- a/src/instana/__init__.py +++ b/src/instana/__init__.py @@ -193,11 +193,10 @@ def boot_agent(): # from instana.instrumentation.aws import lambda_inst # noqa: F401 from instana.instrumentation import celery # noqa: F401 from instana.instrumentation.django import middleware # noqa: F401 - - # from instana.instrumentation.google.cloud import ( - # pubsub, # noqa: F401 - # storage, # noqa: F401 - # ) + from instana.instrumentation.google.cloud import ( + pubsub, # noqa: F401 + storage, # noqa: F401 + ) from instana.instrumentation.tornado import ( client, # noqa: F401 server, # noqa: F401 diff --git a/src/instana/instrumentation/google/cloud/pubsub.py b/src/instana/instrumentation/google/cloud/pubsub.py index 712ec515..fe4b5424 100644 --- a/src/instana/instrumentation/google/cloud/pubsub.py +++ b/src/instana/instrumentation/google/cloud/pubsub.py @@ -2,38 +2,50 @@ # (c) Copyright Instana Inc. 2021 -import json +from typing import TYPE_CHECKING, Any, Callable, Dict, Tuple + import wrapt -from opentracing import Format -from ....log import logger -from ....singletons import tracer -from ....util.traceutils import get_tracer_tuple, tracing_is_off +from instana.log import logger +from instana.propagators.format import Format +from instana.singletons import tracer +from instana.util.traceutils import get_tracer_tuple, tracing_is_off + +if TYPE_CHECKING: + from instana.span.span import InstanaSpan try: from google.cloud import pubsub_v1 - - def _set_publisher_tags(span, topic_path): - span.set_tag('gcps.op', 'publish') + def _set_publisher_attributes( + span: "InstanaSpan", + topic_path: str, + ) -> None: + span.set_attribute("gcps.op", "publish") # Fully qualified identifier is in the form of # `projects/{project_id}/topic/{topic_name}` - project_id, topic_name = topic_path.split('/')[1::2] - span.set_tag('gcps.projid', project_id) - span.set_tag('gcps.top', topic_name) - - - def _set_consumer_tags(span, subscription_path): - span.set_tag('gcps.op', 'consume') + project_id, topic_name = topic_path.split("/")[1::2] + span.set_attribute("gcps.projid", project_id) + span.set_attribute("gcps.top", topic_name) + + def _set_consumer_attributes( + span: "InstanaSpan", + subscription_path: str, + ) -> None: + span.set_attribute("gcps.op", "consume") # Fully qualified identifier is in the form of # `projects/{project_id}/subscriptions/{subscription_name}` - project_id, subscription_id = subscription_path.split('/')[1::2] - span.set_tag('gcps.projid', project_id) - span.set_tag('gcps.sub', subscription_id) - - - @wrapt.patch_function_wrapper('google.cloud.pubsub_v1', 'PublisherClient.publish') - def publish_with_instana(wrapped, instance, args, kwargs): + project_id, subscription_id = subscription_path.split("/")[1::2] + span.set_attribute("gcps.projid", project_id) + span.set_attribute("gcps.sub", subscription_id) + + @wrapt.patch_function_wrapper("google.cloud.pubsub_v1", "PublisherClient.publish") + def publish_with_instana( + wrapped: Callable[..., object], + instance: pubsub_v1.PublisherClient, + args: Tuple[object, ...], + kwargs: Dict[str, Any], + ) -> object: """References: - PublisherClient.publish(topic_path, messages, metadata) """ @@ -42,29 +54,43 @@ def publish_with_instana(wrapped, instance, args, kwargs): return wrapped(*args, **kwargs) tracer, parent_span, _ = get_tracer_tuple() + parent_context = parent_span.get_span_context() if parent_span else None - with tracer.start_active_span('gcps-producer', child_of=parent_span) as scope: + with tracer.start_as_current_span( + "gcps-producer", span_context=parent_context + ) as span: # trace continuity, inject to the span context - headers = dict() - tracer.inject(scope.span.context, Format.TEXT_MAP, headers, disable_w3c_trace_context=True) + headers = {} + tracer.inject( + span.context, + Format.TEXT_MAP, + headers, + disable_w3c_trace_context=True, + ) + + headers = {key: str(value) for key, value in headers.items()} # update the metadata dict with instana trace attributes kwargs.update(headers) - _set_publisher_tags(scope.span, topic_path=args[0]) + _set_publisher_attributes(span, topic_path=args[0]) try: rv = wrapped(*args, **kwargs) - except Exception as e: - scope.span.log_exception(e) - raise + except Exception as exc: + span.record_exception(exc) else: return rv - - @wrapt.patch_function_wrapper('google.cloud.pubsub_v1', 'SubscriberClient.subscribe') - def subscribe_with_instana(wrapped, instance, args, kwargs): - + @wrapt.patch_function_wrapper( + "google.cloud.pubsub_v1", "SubscriberClient.subscribe" + ) + def subscribe_with_instana( + wrapped: Callable[..., object], + instance: pubsub_v1.SubscriberClient, + args: Tuple[object, ...], + kwargs: Dict[str, Any], + ) -> object: """References: - SubscriberClient.subscribe(subscription_path, callback) - callback(message) is called from the subscription future @@ -72,29 +98,31 @@ def subscribe_with_instana(wrapped, instance, args, kwargs): def callback_with_instana(message): if message.attributes: - parent_span = tracer.extract(Format.TEXT_MAP, message.attributes, disable_w3c_trace_context=True) + parent_context = tracer.extract( + Format.TEXT_MAP, message.attributes, disable_w3c_trace_context=True + ) else: - parent_span = None + parent_context = None - with tracer.start_active_span('gcps-consumer', child_of=parent_span) as scope: - _set_consumer_tags(scope.span, subscription_path=args[0]) + with tracer.start_as_current_span( + "gcps-consumer", span_context=parent_context + ) as span: + _set_consumer_attributes(span, subscription_path=args[0]) try: callback(message) - except Exception as e: - scope.span.log_exception(e) - raise + except Exception as exc: + span.record_exception(exc) # Handle callback appropriately from args or kwargs - if 'callback' in kwargs: - callback = kwargs.get('callback') - kwargs['callback'] = callback_with_instana + if "callback" in kwargs: + callback = kwargs.get("callback") + kwargs["callback"] = callback_with_instana return wrapped(*args, **kwargs) else: subscription, callback, *args = args args = (subscription, callback_with_instana, *args) return wrapped(*args, **kwargs) - - logger.debug('Instrumenting Google Cloud Pub/Sub') + logger.debug("Instrumenting Google Cloud Pub/Sub") except ImportError: pass From 3d587ce3dbc23b517d1419cc4da3f705c8bfe73e Mon Sep 17 00:00:00 2001 From: Cagri Yonca Date: Fri, 27 Sep 2024 14:41:16 +0300 Subject: [PATCH 2/4] unittests(pubsub): added unittests of pubsub otel instrumentation --- tests/clients/test_google-cloud-pubsub.py | 142 ++++++++++++---------- tests/conftest.py | 2 - 2 files changed, 76 insertions(+), 68 deletions(-) diff --git a/tests/clients/test_google-cloud-pubsub.py b/tests/clients/test_google-cloud-pubsub.py index 48b57eda..cbbafe6f 100644 --- a/tests/clients/test_google-cloud-pubsub.py +++ b/tests/clients/test_google-cloud-pubsub.py @@ -4,30 +4,33 @@ import os import threading import time -import six -import unittest +from typing import Generator -from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient +import pytest +import six from google.api_core.exceptions import AlreadyExists +from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient from google.cloud.pubsub_v1.publisher import exceptions +from opentelemetry.trace import SpanKind + from instana.singletons import agent, tracer +from instana.span.span import get_current_span from tests.test_utils import _TraceContextMixin # Use PubSub Emulator exposed at :8085 os.environ["PUBSUB_EMULATOR_HOST"] = "localhost:8085" -class TestPubSubPublish(unittest.TestCase, _TraceContextMixin): - @classmethod - def setUpClass(cls): - cls.publisher = PublisherClient() +class TestPubSubPublish(_TraceContextMixin): + publisher = PublisherClient() - def setUp(self): - self.recorder = tracer.recorder + @pytest.fixture(autouse=True) + def _resource(self) -> Generator[None, None, None]: + self.recorder = tracer.span_processor self.recorder.clear_spans() - self.project_id = 'test-project' - self.topic_name = 'test-topic' + self.project_id = "test-project" + self.topic_name = "test-topic" # setup topic_path & topic self.topic_path = self.publisher.topic_path(self.project_id, self.topic_name) @@ -36,31 +39,32 @@ def setUp(self): except AlreadyExists: self.publisher.delete_topic(request={"topic": self.topic_path}) self.publisher.create_topic(request={"name": self.topic_path}) - - def tearDown(self): + yield self.publisher.delete_topic(request={"topic": self.topic_path}) agent.options.allow_exit_as_root = False - def test_publish(self): + def test_publish(self) -> None: # publish a single message - with tracer.start_active_span('test'): - future = self.publisher.publish(self.topic_path, - b'Test Message', - origin="instana") + with tracer.start_as_current_span("test"): + future = self.publisher.publish( + self.topic_path, b"Test Message", origin="instana" + ) time.sleep(2.0) # for sanity result = future.result() - self.assertIsInstance(result, six.string_types) + assert isinstance(result, six.string_types) spans = self.recorder.queued_spans() gcps_span, test_span = spans[0], spans[1] - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) - self.assertEqual('gcps', gcps_span.n) - self.assertEqual(2, gcps_span.k) # EXIT + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() + assert gcps_span.n == "gcps" + assert gcps_span.k is SpanKind.CLIENT - self.assertEqual('publish', gcps_span.data['gcps']['op']) - self.assertEqual(self.topic_name, gcps_span.data['gcps']['top']) + assert gcps_span.data["gcps"]["op"] == "publish" + assert self.topic_name == gcps_span.data["gcps"]["top"] # Trace Context Propagation self.assertTraceContextPropagated(test_span, gcps_span) @@ -68,57 +72,58 @@ def test_publish(self): # Error logging self.assertErrorLogging(spans) - def test_publish_as_root_exit_span(self): + def test_publish_as_root_exit_span(self) -> None: agent.options.allow_exit_as_root = True # publish a single message - future = self.publisher.publish(self.topic_path, - b'Test Message', - origin="instana") + future = self.publisher.publish( + self.topic_path, b"Test Message", origin="instana" + ) time.sleep(2.0) # for sanity result = future.result() - self.assertIsInstance(result, six.string_types) + assert isinstance(result, six.string_types) spans = self.recorder.queued_spans() - self.assertEqual(1, len(spans)) + assert len(spans) == 1 gcps_span = spans[0] - self.assertIsNone(tracer.active_span) - self.assertEqual('gcps', gcps_span.n) - self.assertEqual(2, gcps_span.k) # EXIT + current_span = get_current_span() + assert not current_span.is_recording() + assert gcps_span.n == "gcps" + assert gcps_span.k is SpanKind.CLIENT - self.assertEqual('publish', gcps_span.data['gcps']['op']) - self.assertEqual(self.topic_name, gcps_span.data['gcps']['top']) + assert gcps_span.data["gcps"]["op"] == "publish" + assert self.topic_name == gcps_span.data["gcps"]["top"] # Error logging self.assertErrorLogging(spans) class AckCallback(object): - def __init__(self): + def __init__(self) -> None: self.calls = 0 self.lock = threading.Lock() - def __call__(self, message): + def __call__(self, message) -> None: message.ack() # Only increment the number of calls **after** finishing. with self.lock: self.calls += 1 -class TestPubSubSubscribe(unittest.TestCase, _TraceContextMixin): +class TestPubSubSubscribe(_TraceContextMixin): @classmethod - def setUpClass(cls): + def setup_class(cls) -> None: cls.publisher = PublisherClient() cls.subscriber = SubscriberClient() - def setUp(self): - - self.recorder = tracer.recorder + @pytest.fixture(autouse=True) + def _resource(self) -> Generator[None, None, None]: + self.recorder = tracer.span_processor self.recorder.clear_spans() - self.project_id = 'test-project' - self.topic_name = 'test-topic' - self.subscription_name = 'test-subscription' + self.project_id = "test-project" + self.topic_name = "test-topic" + self.subscription_name = "test-subscription" # setup topic_path & topic self.topic_path = self.publisher.topic_path(self.project_id, self.topic_name) @@ -130,29 +135,33 @@ def setUp(self): # setup subscription path & attach subscription self.subscription_path = self.subscriber.subscription_path( - self.project_id, self.subscription_name) + self.project_id, + self.subscription_name, + ) try: self.subscriber.create_subscription( request={"name": self.subscription_path, "topic": self.topic_path} ) except AlreadyExists: - self.subscriber.delete_subscription(request={"subscription": self.subscription_path}) + self.subscriber.delete_subscription( + request={"subscription": self.subscription_path} + ) self.subscriber.create_subscription( request={"name": self.subscription_path, "topic": self.topic_path} ) - - def tearDown(self): + yield self.publisher.delete_topic(request={"topic": self.topic_path}) - self.subscriber.delete_subscription(request={"subscription": self.subscription_path}) - - def test_subscribe(self): + self.subscriber.delete_subscription( + request={"subscription": self.subscription_path} + ) - with tracer.start_active_span('test'): + def test_subscribe(self) -> None: + with tracer.start_as_current_span("test"): # Publish a message - future = self.publisher.publish(self.topic_path, - b"Test Message to PubSub", - origin="instana") - self.assertIsInstance(future.result(), six.string_types) + future = self.publisher.publish( + self.topic_path, b"Test Message to PubSub", origin="instana" + ) + assert isinstance(future.result(), six.string_types) time.sleep(2.0) # for sanity @@ -171,15 +180,16 @@ def test_subscribe(self): consumer_span = spans[1] test_span = spans[2] - self.assertEqual(3, len(spans)) - self.assertIsNone(tracer.active_span) - self.assertEqual('publish', producer_span.data['gcps']['op']) - self.assertEqual('consume', consumer_span.data['gcps']['op']) - self.assertEqual(self.topic_name, producer_span.data['gcps']['top']) - self.assertEqual(self.subscription_name, consumer_span.data['gcps']['sub']) + assert len(spans) == 3 + current_span = get_current_span() + assert not current_span.is_recording() + assert producer_span.data["gcps"]["op"] == "publish" + assert consumer_span.data["gcps"]["op"] == "consume" + assert self.topic_name == producer_span.data["gcps"]["top"] + assert self.subscription_name == consumer_span.data["gcps"]["sub"] - self.assertEqual(2, producer_span.k) # EXIT - self.assertEqual(1, consumer_span.k) # ENTRY + assert producer_span.k is SpanKind.CLIENT + assert consumer_span.k is SpanKind.SERVER # Trace Context Propagation self.assertTraceContextPropagated(producer_span, consumer_span) diff --git a/tests/conftest.py b/tests/conftest.py index 061fe27a..f392a935 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -29,8 +29,6 @@ # TODO: remove the following entries as the migration of the instrumentation # codes are finalised. -collect_ignore_glob.append("*clients/test_google*") - collect_ignore_glob.append("*frameworks/test_gevent*") # # Cassandra and gevent tests are run in dedicated jobs on CircleCI and will From 5610018dcb2ee50e6aca2467b2b58be323b7041f Mon Sep 17 00:00:00 2001 From: Cagri Yonca Date: Fri, 27 Sep 2024 14:41:28 +0300 Subject: [PATCH 3/4] refactor(cloudstorage): added cloud storage otel instrumentation --- .../instrumentation/google/cloud/storage.py | 146 +++++++++++------- 1 file changed, 90 insertions(+), 56 deletions(-) diff --git a/src/instana/instrumentation/google/cloud/storage.py b/src/instana/instrumentation/google/cloud/storage.py index 45f6607f..a1ccb6d9 100644 --- a/src/instana/instrumentation/google/cloud/storage.py +++ b/src/instana/instrumentation/google/cloud/storage.py @@ -5,16 +5,19 @@ import wrapt import re -from ....log import logger -from .collectors import _storage_api -from ....util.traceutils import get_tracer_tuple, tracing_is_off +from typing import Any, Callable, Dict, Tuple, Union +from instana.log import logger +from instana.instrumentation.google.cloud.collectors import _storage_api +from instana.util.traceutils import get_tracer_tuple, tracing_is_off try: from google.cloud import storage - logger.debug('Instrumenting google-cloud-storage') + logger.debug("Instrumenting google-cloud-storage") - def _collect_tags(api_request): + def _collect_attributes( + api_request: Dict[str, Any], + ) -> Dict[str, Any]: """ Extract span tags from Google Cloud Storage API request. Returns None if the request is not supported. @@ -22,21 +25,21 @@ def _collect_tags(api_request): :param: dict :return: dict or None """ - method, path = api_request.get('method', None), api_request.get('path', None) + method, path = api_request.get("method", None), api_request.get("path", None) if method not in _storage_api: return try: - params = api_request.get('query_params', {}) - data = api_request.get('data', {}) + params = api_request.get("query_params", {}) + data = api_request.get("data", {}) if path in _storage_api[method]: # check is any of string keys matches the path exactly return _storage_api[method][path](params, data) else: # look for a regex that matches the string - for (matcher, collect) in _storage_api[method].items(): + for matcher, collect in _storage_api[method].items(): if not isinstance(matcher, re.Pattern): continue @@ -46,108 +49,139 @@ def _collect_tags(api_request): return collect(params, data, m) except Exception: - logger.debug("instana.instrumentation.google.cloud.storage._collect_tags: ", exc_info=True) - - def execute_with_instana(wrapped, instance, args, kwargs): + logger.debug( + "instana.instrumentation.google.cloud.storage._collect_attributes: ", + exc_info=True, + ) + + def execute_with_instana( + wrapped: Callable[..., object], + instance: Union[storage.Batch, storage._http.Connection], + args: Tuple[object, ...], + kwargs: Dict[str, Any], + ) -> object: # batch requests are traced with finish_batch_with_instana() # also return early if we're not tracing if isinstance(instance, storage.Batch) or tracing_is_off(): return wrapped(*args, **kwargs) tracer, parent_span, _ = get_tracer_tuple() - tags = _collect_tags(kwargs) - - # don't trace if the call is not instrumented - if tags is None: - logger.debug('uninstrumented Google Cloud Storage API request: %s' % kwargs) - return wrapped(*args, **kwargs) - - with tracer.start_active_span('gcs', child_of=parent_span) as scope: - for (k, v) in tags.items(): - scope.span.set_tag(k, v) + parent_context = parent_span.get_span_context() if parent_span else None + with tracer.start_as_current_span("gcs", span_context=parent_context) as span: try: + attributes = _collect_attributes(kwargs) + + # don't trace if the call is not instrumented + if attributes is None: + logger.debug( + f"uninstrumented Google Cloud Storage API request: {kwargs}" + ) + return wrapped(*args, **kwargs) + span.set_attributes(attributes) kv = wrapped(*args, **kwargs) - except Exception as e: - scope.span.log_exception(e) - raise + except Exception as exc: + span.record_exception(exc) else: return kv - def download_with_instana(wrapped, instance, args, kwargs): + def download_with_instana( + wrapped: Callable[..., object], + instance: storage.Blob, + args: Tuple[object, ...], + kwargs: Dict[str, Any], + ) -> object: # return early if we're not tracing if tracing_is_off(): return wrapped(*args, **kwargs) tracer, parent_span, _ = get_tracer_tuple() + parent_context = parent_span.get_span_context() if parent_span else None - with tracer.start_active_span('gcs', child_of=parent_span) as scope: - scope.span.set_tag('gcs.op', 'objects.get') - scope.span.set_tag('gcs.bucket', instance.bucket.name) - scope.span.set_tag('gcs.object', instance.name) + with tracer.start_as_current_span("gcs", span_context=parent_context) as span: + span.set_attribute("gcs.op", "objects.get") + span.set_attribute("gcs.bucket", instance.bucket.name) + span.set_attribute("gcs.object", instance.name) - start = len(args) > 4 and args[4] or kwargs.get('start', None) + start = len(args) > 4 and args[4] or kwargs.get("start", None) if start is None: - start = '' + start = "" - end = len(args) > 5 and args[5] or kwargs.get('end', None) + end = len(args) > 5 and args[5] or kwargs.get("end", None) if end is None: - end = '' + end = "" - if start != '' or end != '': - scope.span.set_tag('gcs.range', '-'.join((start, end))) + if start != "" or end != "": + span.set_attribute("gcs.range", "-".join((start, end))) try: kv = wrapped(*args, **kwargs) except Exception as e: - scope.span.log_exception(e) - raise + span.record_exception(e) else: return kv - def upload_with_instana(wrapped, instance, args, kwargs): + def upload_with_instana( + wrapped: Callable[..., object], + instance: storage.Blob, + args: Tuple[object, ...], + kwargs: Dict[str, Any], + ) -> object: # return early if we're not tracing if tracing_is_off(): return wrapped(*args, **kwargs) tracer, parent_span, _ = get_tracer_tuple() + parent_context = parent_span.get_span_context() if parent_span else None - with tracer.start_active_span('gcs', child_of=parent_span) as scope: - scope.span.set_tag('gcs.op', 'objects.insert') - scope.span.set_tag('gcs.bucket', instance.bucket.name) - scope.span.set_tag('gcs.object', instance.name) + with tracer.start_as_current_span("gcs", span_context=parent_context) as span: + span.set_attribute("gcs.op", "objects.insert") + span.set_attribute("gcs.bucket", instance.bucket.name) + span.set_attribute("gcs.object", instance.name) try: kv = wrapped(*args, **kwargs) except Exception as e: - scope.span.log_exception(e) - raise + span.record_exception(e) else: return kv - def finish_batch_with_instana(wrapped, instance, args, kwargs): + def finish_batch_with_instana( + wrapped: Callable[..., object], + instance: storage.Batch, + args: Tuple[object, ...], + kwargs: Dict[str, Any], + ) -> object: # return early if we're not tracing if tracing_is_off(): return wrapped(*args, **kwargs) tracer, parent_span, _ = get_tracer_tuple() + parent_context = parent_span.get_span_context() if parent_span else None - with tracer.start_active_span('gcs', child_of=parent_span) as scope: - scope.span.set_tag('gcs.op', 'batch') - scope.span.set_tag('gcs.projectId', instance._client.project) - scope.span.set_tag('gcs.numberOfOperations', len(instance._requests)) + with tracer.start_as_current_span("gcs", span_context=parent_context) as span: + span.set_attribute("gcs.op", "batch") + span.set_attribute("gcs.projectId", instance._client.project) + span.set_attribute("gcs.numberOfOperations", len(instance._requests)) try: kv = wrapped(*args, **kwargs) except Exception as e: - scope.span.log_exception(e) - raise + span.record_exception(e) else: return kv - wrapt.wrap_function_wrapper('google.cloud.storage._http', 'Connection.api_request', execute_with_instana) - wrapt.wrap_function_wrapper('google.cloud.storage.blob', 'Blob._do_download', download_with_instana) - wrapt.wrap_function_wrapper('google.cloud.storage.blob', 'Blob._do_upload', upload_with_instana) - wrapt.wrap_function_wrapper('google.cloud.storage.batch', 'Batch.finish', finish_batch_with_instana) + wrapt.wrap_function_wrapper( + "google.cloud.storage._http", "Connection.api_request", execute_with_instana + ) + wrapt.wrap_function_wrapper( + "google.cloud.storage.blob", "Blob._do_download", download_with_instana + ) + wrapt.wrap_function_wrapper( + "google.cloud.storage.blob", "Blob._do_upload", upload_with_instana + ) + wrapt.wrap_function_wrapper( + "google.cloud.storage.batch", "Batch.finish", finish_batch_with_instana + ) except ImportError: pass From b9effb35771cd1b9d40251d1cc077eab8ca8eadc Mon Sep 17 00:00:00 2001 From: Cagri Yonca Date: Fri, 27 Sep 2024 14:41:38 +0300 Subject: [PATCH 4/4] unittests(cloudstorage): added unittests of cloud storage otel instrumentation --- tests/clients/test_google-cloud-storage.py | 791 ++++++++++++--------- 1 file changed, 454 insertions(+), 337 deletions(-) diff --git a/tests/clients/test_google-cloud-storage.py b/tests/clients/test_google-cloud-storage.py index d8762584..3a069acc 100644 --- a/tests/clients/test_google-cloud-storage.py +++ b/tests/clients/test_google-cloud-storage.py @@ -1,37 +1,35 @@ # (c) Copyright IBM Corp. 2021 # (c) Copyright Instana Inc. 2020 -import sys -import unittest +from typing import Generator import json +import pytest import requests import io from instana.singletons import agent, tracer +from instana.span.span import get_current_span from tests.test_utils import _TraceContextMixin +from opentelemetry.trace import SpanKind from mock import patch, Mock from six.moves import http_client from google.cloud import storage -from google.api_core import iam +from google.api_core import iam, page_iterator from google.auth.credentials import AnonymousCredentials -class TestGoogleCloudStorage(unittest.TestCase, _TraceContextMixin): - def setUp(self): - self.recorder = tracer.recorder +class TestGoogleCloudStorage(_TraceContextMixin): + @pytest.fixture(autouse=True) + def _resource(self) -> Generator[None, None, None]: + self.recorder = tracer.span_processor self.recorder.clear_spans() - - def tearDown(self): - """Ensure that allow_exit_as_root has the default value""" + yield agent.options.allow_exit_as_root = False - @unittest.skipIf( - sys.platform == "darwin", reason="Raises not Implemented exception in OSX" - ) @patch("requests.Session.request") - def test_buckets_list(self, mock_requests): + def test_buckets_list(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#buckets", "items": []}, status_code=http_client.OK, @@ -41,40 +39,32 @@ def test_buckets_list(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): buckets = client.list_buckets() - self.assertEqual( - 0, - self.recorder.queue_size(), - msg="span has been created before the actual request", - ) - - # trigger the iterator - for b in buckets: + for _ in buckets: pass spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("buckets.list", gcs_span.data["gcs"]["op"]) - self.assertEqual("test-project", gcs_span.data["gcs"]["projectId"]) + assert gcs_span.data["gcs"]["op"] == "buckets.list" + assert gcs_span.data["gcs"]["projectId"] == "test-project" - @unittest.skipIf( - sys.platform == "darwin", reason="Raises not Implemented exception in OSX" - ) @patch("requests.Session.request") - def test_buckets_list_as_root_exit_span(self, mock_requests): + def test_buckets_list_as_root_exit_span(self, mock_requests: Mock) -> None: agent.options.allow_exit_as_root = True mock_requests.return_value = self._mock_response( json_content={"kind": "storage#buckets", "items": []}, @@ -86,32 +76,27 @@ def test_buckets_list_as_root_exit_span(self, mock_requests): ) buckets = client.list_buckets() - self.assertEqual( - 0, - self.recorder.queue_size(), - msg="span has been created before the actual request", - ) - - # trigger the iterator - for b in buckets: + for _ in buckets: pass spans = self.recorder.queued_spans() - self.assertEqual(1, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 1 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("buckets.list", gcs_span.data["gcs"]["op"]) - self.assertEqual("test-project", gcs_span.data["gcs"]["projectId"]) + assert gcs_span.data["gcs"]["op"] == "buckets.list" + assert gcs_span.data["gcs"]["projectId"] == "test-project" @patch("requests.Session.request") - def test_buckets_insert(self, mock_requests): + def test_buckets_insert(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#bucket"}, status_code=http_client.OK ) @@ -120,29 +105,31 @@ def test_buckets_insert(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): client.create_bucket("test bucket") spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("buckets.insert", gcs_span.data["gcs"]["op"]) - self.assertEqual("test-project", gcs_span.data["gcs"]["projectId"]) - self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + assert gcs_span.data["gcs"]["op"] == "buckets.insert" + assert gcs_span.data["gcs"]["projectId"] == "test-project" + assert gcs_span.data["gcs"]["bucket"] == "test bucket" @patch("requests.Session.request") - def test_buckets_get(self, mock_requests): + def test_buckets_get(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#bucket"}, status_code=http_client.OK ) @@ -151,29 +138,31 @@ def test_buckets_get(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): client.get_bucket("test bucket") spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] - self.assertEqual(test_span.t, gcs_span.t) - self.assertEqual(test_span.s, gcs_span.p) + assert gcs_span.t == test_span.t + assert gcs_span.p == test_span.s - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("buckets.get", gcs_span.data["gcs"]["op"]) - self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + assert gcs_span.data["gcs"]["op"] == "buckets.get" + assert gcs_span.data["gcs"]["bucket"] == "test bucket" @patch("requests.Session.request") - def test_buckets_patch(self, mock_requests): + def test_buckets_patch(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#bucket"}, status_code=http_client.OK ) @@ -182,28 +171,30 @@ def test_buckets_patch(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): client.bucket("test bucket").patch() spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("buckets.patch", gcs_span.data["gcs"]["op"]) - self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + assert gcs_span.data["gcs"]["op"] == "buckets.patch" + assert gcs_span.data["gcs"]["bucket"] == "test bucket" @patch("requests.Session.request") - def test_buckets_update(self, mock_requests): + def test_buckets_update(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#bucket"}, status_code=http_client.OK ) @@ -212,28 +203,30 @@ def test_buckets_update(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): client.bucket("test bucket").update() spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("buckets.update", gcs_span.data["gcs"]["op"]) - self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + assert gcs_span.data["gcs"]["op"] == "buckets.update" + assert gcs_span.data["gcs"]["bucket"] == "test bucket" @patch("requests.Session.request") - def test_buckets_get_iam_policy(self, mock_requests): + def test_buckets_get_iam_policy(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#policy"}, status_code=http_client.OK ) @@ -242,28 +235,30 @@ def test_buckets_get_iam_policy(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): client.bucket("test bucket").get_iam_policy() spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("buckets.getIamPolicy", gcs_span.data["gcs"]["op"]) - self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + assert gcs_span.data["gcs"]["op"] == "buckets.getIamPolicy" + assert gcs_span.data["gcs"]["bucket"] == "test bucket" @patch("requests.Session.request") - def test_buckets_set_iam_policy(self, mock_requests): + def test_buckets_set_iam_policy(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#policy"}, status_code=http_client.OK ) @@ -272,28 +267,30 @@ def test_buckets_set_iam_policy(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): client.bucket("test bucket").set_iam_policy(iam.Policy()) spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("buckets.setIamPolicy", gcs_span.data["gcs"]["op"]) - self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + assert gcs_span.data["gcs"]["op"] == "buckets.setIamPolicy" + assert gcs_span.data["gcs"]["bucket"] == "test bucket" @patch("requests.Session.request") - def test_buckets_test_iam_permissions(self, mock_requests): + def test_buckets_test_iam_permissions(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#testIamPermissionsResponse"}, status_code=http_client.OK, @@ -303,28 +300,30 @@ def test_buckets_test_iam_permissions(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): client.bucket("test bucket").test_iam_permissions("test-permission") spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("buckets.testIamPermissions", gcs_span.data["gcs"]["op"]) - self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + assert gcs_span.data["gcs"]["op"] == "buckets.testIamPermissions" + assert gcs_span.data["gcs"]["bucket"] == "test bucket" @patch("requests.Session.request") - def test_buckets_lock_retention_policy(self, mock_requests): + def test_buckets_lock_retention_policy(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={ "kind": "storage#bucket", @@ -341,56 +340,60 @@ def test_buckets_lock_retention_policy(self, mock_requests): bucket = client.bucket("test bucket") bucket.reload() - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): bucket.lock_retention_policy() spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("buckets.lockRetentionPolicy", gcs_span.data["gcs"]["op"]) - self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + assert gcs_span.data["gcs"]["op"] == "buckets.lockRetentionPolicy" + assert gcs_span.data["gcs"]["bucket"] == "test bucket" @patch("requests.Session.request") - def test_buckets_delete(self, mock_requests): + def test_buckets_delete(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response() client = self._client( credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): client.bucket("test bucket").delete() spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("buckets.delete", gcs_span.data["gcs"]["op"]) - self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + assert gcs_span.data["gcs"]["op"] == "buckets.delete" + assert gcs_span.data["gcs"]["bucket"] == "test bucket" @patch("requests.Session.request") - def test_objects_compose(self, mock_requests): + def test_objects_compose(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#object"}, status_code=http_client.OK ) @@ -399,7 +402,7 @@ def test_objects_compose(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): client.bucket("test bucket").blob("dest object").compose( [ storage.blob.Blob("object 1", "test bucket"), @@ -409,28 +412,30 @@ def test_objects_compose(self, mock_requests): spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("objects.compose", gcs_span.data["gcs"]["op"]) - self.assertEqual("test bucket", gcs_span.data["gcs"]["destinationBucket"]) - self.assertEqual("dest object", gcs_span.data["gcs"]["destinationObject"]) - self.assertEqual( - "test bucket/object 1,test bucket/object 2", - gcs_span.data["gcs"]["sourceObjects"], + assert gcs_span.data["gcs"]["op"] == "objects.compose" + assert gcs_span.data["gcs"]["destinationBucket"] == "test bucket" + assert gcs_span.data["gcs"]["destinationObject"] == "dest object" + assert ( + gcs_span.data["gcs"]["sourceObjects"] + == "test bucket/object 1,test bucket/object 2" ) @patch("requests.Session.request") - def test_objects_copy(self, mock_requests): + def test_objects_copy(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#object"}, status_code=http_client.OK ) @@ -440,7 +445,7 @@ def test_objects_copy(self, mock_requests): ) bucket = client.bucket("src bucket") - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): bucket.copy_blob( bucket.blob("src object"), client.bucket("dest bucket"), @@ -449,55 +454,59 @@ def test_objects_copy(self, mock_requests): spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("objects.copy", gcs_span.data["gcs"]["op"]) - self.assertEqual("dest bucket", gcs_span.data["gcs"]["destinationBucket"]) - self.assertEqual("dest object", gcs_span.data["gcs"]["destinationObject"]) - self.assertEqual("src bucket", gcs_span.data["gcs"]["sourceBucket"]) - self.assertEqual("src object", gcs_span.data["gcs"]["sourceObject"]) + assert gcs_span.data["gcs"]["op"] == "objects.copy" + assert gcs_span.data["gcs"]["destinationBucket"] == "dest bucket" + assert gcs_span.data["gcs"]["destinationObject"] == "dest object" + assert gcs_span.data["gcs"]["sourceBucket"] == "src bucket" + assert gcs_span.data["gcs"]["sourceObject"] == "src object" @patch("requests.Session.request") - def test_objects_delete(self, mock_requests): + def test_objects_delete(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response() client = self._client( credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): client.bucket("test bucket").blob("test object").delete() spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("objects.delete", gcs_span.data["gcs"]["op"]) - self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) - self.assertEqual("test object", gcs_span.data["gcs"]["object"]) + assert gcs_span.data["gcs"]["op"] == "objects.delete" + assert gcs_span.data["gcs"]["bucket"] == "test bucket" + assert gcs_span.data["gcs"]["object"] == "test object" @patch("requests.Session.request") - def test_objects_attrs(self, mock_requests): + def test_objects_attrs(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#object"}, status_code=http_client.OK ) @@ -506,29 +515,31 @@ def test_objects_attrs(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): client.bucket("test bucket").blob("test object").exists() spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("objects.attrs", gcs_span.data["gcs"]["op"]) - self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) - self.assertEqual("test object", gcs_span.data["gcs"]["object"]) + assert gcs_span.data["gcs"]["op"] == "objects.attrs" + assert gcs_span.data["gcs"]["bucket"] == "test bucket" + assert gcs_span.data["gcs"]["object"] == "test object" @patch("requests.Session.request") - def test_objects_get(self, mock_requests): + def test_objects_get(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( content=b"CONTENT", status_code=http_client.OK ) @@ -537,31 +548,33 @@ def test_objects_get(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): client.bucket("test bucket").blob("test object").download_to_file( io.BytesIO(), raw_download=True ) spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("objects.get", gcs_span.data["gcs"]["op"]) - self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) - self.assertEqual("test object", gcs_span.data["gcs"]["object"]) + assert gcs_span.data["gcs"]["op"] == "objects.get" + assert gcs_span.data["gcs"]["bucket"] == "test bucket" + assert gcs_span.data["gcs"]["object"] == "test object" @patch("requests.Session.request") - def test_objects_insert(self, mock_requests): + def test_objects_insert(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#object"}, status_code=http_client.OK ) @@ -570,34 +583,33 @@ def test_objects_insert(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): client.bucket("test bucket").blob("test object").upload_from_string( "CONTENT" ) spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("objects.insert", gcs_span.data["gcs"]["op"]) - self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) - self.assertEqual("test object", gcs_span.data["gcs"]["object"]) + assert gcs_span.data["gcs"]["op"] == "objects.insert" + assert gcs_span.data["gcs"]["bucket"] == "test bucket" + assert gcs_span.data["gcs"]["object"] == "test object" - @unittest.skipIf( - sys.platform == "darwin", reason="Raises not Implemented exception in OSX" - ) @patch("requests.Session.request") - def test_objects_list(self, mock_requests): + def test_objects_list(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#object"}, status_code=http_client.OK ) @@ -606,36 +618,33 @@ def test_objects_list(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): blobs = client.bucket("test bucket").list_blobs() - self.assertEqual( - 0, - self.recorder.queue_size(), - msg="span has been created before the actual request", - ) - for b in blobs: + for _ in blobs: pass spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("objects.list", gcs_span.data["gcs"]["op"]) - self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + assert gcs_span.data["gcs"]["op"] == "objects.list" + assert gcs_span.data["gcs"]["bucket"] == "test bucket" @patch("requests.Session.request") - def test_objects_patch(self, mock_requests): + def test_objects_patch(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#object"}, status_code=http_client.OK ) @@ -644,29 +653,31 @@ def test_objects_patch(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): client.bucket("test bucket").blob("test object").patch() spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("objects.patch", gcs_span.data["gcs"]["op"]) - self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) - self.assertEqual("test object", gcs_span.data["gcs"]["object"]) + assert gcs_span.data["gcs"]["op"] == "objects.patch" + assert gcs_span.data["gcs"]["bucket"] == "test bucket" + assert gcs_span.data["gcs"]["object"] == "test object" @patch("requests.Session.request") - def test_objects_rewrite(self, mock_requests): + def test_objects_rewrite(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={ "kind": "storage#rewriteResponse", @@ -682,33 +693,35 @@ def test_objects_rewrite(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): client.bucket("dest bucket").blob("dest object").rewrite( client.bucket("src bucket").blob("src object") ) spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("objects.rewrite", gcs_span.data["gcs"]["op"]) - self.assertEqual("dest bucket", gcs_span.data["gcs"]["destinationBucket"]) - self.assertEqual("dest object", gcs_span.data["gcs"]["destinationObject"]) - self.assertEqual("src bucket", gcs_span.data["gcs"]["sourceBucket"]) - self.assertEqual("src object", gcs_span.data["gcs"]["sourceObject"]) + assert gcs_span.data["gcs"]["op"] == "objects.rewrite" + assert gcs_span.data["gcs"]["destinationBucket"] == "dest bucket" + assert gcs_span.data["gcs"]["destinationObject"] == "dest object" + assert gcs_span.data["gcs"]["sourceBucket"] == "src bucket" + assert gcs_span.data["gcs"]["sourceObject"] == "src object" @patch("requests.Session.request") - def test_objects_update(self, mock_requests): + def test_objects_update(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#object"}, status_code=http_client.OK ) @@ -717,29 +730,31 @@ def test_objects_update(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): client.bucket("test bucket").blob("test object").update() spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("objects.update", gcs_span.data["gcs"]["op"]) - self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) - self.assertEqual("test object", gcs_span.data["gcs"]["object"]) + assert gcs_span.data["gcs"]["op"] == "objects.update" + assert gcs_span.data["gcs"]["bucket"] == "test bucket" + assert gcs_span.data["gcs"]["object"] == "test object" @patch("requests.Session.request") - def test_default_acls_list(self, mock_requests): + def test_default_acls_list(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#objectAccessControls", "items": []}, status_code=http_client.OK, @@ -749,28 +764,30 @@ def test_default_acls_list(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): client.bucket("test bucket").default_object_acl.get_entities() spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("defaultAcls.list", gcs_span.data["gcs"]["op"]) - self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) + assert gcs_span.data["gcs"]["op"] == "defaultAcls.list" + assert gcs_span.data["gcs"]["bucket"] == "test bucket" @patch("requests.Session.request") - def test_object_acls_list(self, mock_requests): + def test_object_acls_list(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#objectAccessControls", "items": []}, status_code=http_client.OK, @@ -780,29 +797,31 @@ def test_object_acls_list(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): client.bucket("test bucket").blob("test object").acl.get_entities() spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("objectAcls.list", gcs_span.data["gcs"]["op"]) - self.assertEqual("test bucket", gcs_span.data["gcs"]["bucket"]) - self.assertEqual("test object", gcs_span.data["gcs"]["object"]) + assert gcs_span.data["gcs"]["op"] == "objectAcls.list" + assert gcs_span.data["gcs"]["bucket"] == "test bucket" + assert gcs_span.data["gcs"]["object"] == "test object" @patch("requests.Session.request") - def test_object_hmac_keys_create(self, mock_requests): + def test_object_hmac_keys_create(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#hmacKey", "metadata": {}, "secret": ""}, status_code=http_client.OK, @@ -812,59 +831,63 @@ def test_object_hmac_keys_create(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): client.create_hmac_key("test@example.com") spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("hmacKeys.create", gcs_span.data["gcs"]["op"]) - self.assertEqual("test-project", gcs_span.data["gcs"]["projectId"]) + assert gcs_span.data["gcs"]["op"] == "hmacKeys.create" + assert gcs_span.data["gcs"]["projectId"] == "test-project" @patch("requests.Session.request") - def test_object_hmac_keys_delete(self, mock_requests): + def test_object_hmac_keys_delete(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response() client = self._client( credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): key = storage.hmac_key.HMACKeyMetadata(client, access_id="test key") key.state = storage.hmac_key.HMACKeyMetadata.INACTIVE_STATE key.delete() spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("hmacKeys.delete", gcs_span.data["gcs"]["op"]) - self.assertEqual("test-project", gcs_span.data["gcs"]["projectId"]) - self.assertEqual("test key", gcs_span.data["gcs"]["accessId"]) + assert gcs_span.data["gcs"]["op"] == "hmacKeys.delete" + assert gcs_span.data["gcs"]["projectId"] == "test-project" + assert gcs_span.data["gcs"]["accessId"] == "test key" @patch("requests.Session.request") - def test_object_hmac_keys_get(self, mock_requests): + def test_object_hmac_keys_get(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#hmacKey", "metadata": {}, "secret": ""}, status_code=http_client.OK, @@ -874,32 +897,31 @@ def test_object_hmac_keys_get(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): storage.hmac_key.HMACKeyMetadata(client, access_id="test key").exists() spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("hmacKeys.get", gcs_span.data["gcs"]["op"]) - self.assertEqual("test-project", gcs_span.data["gcs"]["projectId"]) - self.assertEqual("test key", gcs_span.data["gcs"]["accessId"]) + assert gcs_span.data["gcs"]["op"] == "hmacKeys.get" + assert gcs_span.data["gcs"]["projectId"] == "test-project" + assert gcs_span.data["gcs"]["accessId"] == "test key" - @unittest.skipIf( - sys.platform == "darwin", reason="Raises not Implemented exception in OSX" - ) @patch("requests.Session.request") - def test_object_hmac_keys_list(self, mock_requests): + def test_object_hmac_keys_list(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#hmacKeysMetadata", "items": []}, status_code=http_client.OK, @@ -909,36 +931,33 @@ def test_object_hmac_keys_list(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): keys = client.list_hmac_keys() - self.assertEqual( - 0, - self.recorder.queue_size(), - msg="span has been created before the actual request", - ) - for k in keys: + for _ in keys: pass spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("hmacKeys.list", gcs_span.data["gcs"]["op"]) - self.assertEqual("test-project", gcs_span.data["gcs"]["projectId"]) + assert gcs_span.data["gcs"]["op"] == "hmacKeys.list" + assert gcs_span.data["gcs"]["projectId"] == "test-project" @patch("requests.Session.request") - def test_object_hmac_keys_update(self, mock_requests): + def test_object_hmac_keys_update(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={"kind": "storage#hmacKey", "metadata": {}, "secret": ""}, status_code=http_client.OK, @@ -948,29 +967,31 @@ def test_object_hmac_keys_update(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): storage.hmac_key.HMACKeyMetadata(client, access_id="test key").update() spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("hmacKeys.update", gcs_span.data["gcs"]["op"]) - self.assertEqual("test-project", gcs_span.data["gcs"]["projectId"]) - self.assertEqual("test key", gcs_span.data["gcs"]["accessId"]) + assert gcs_span.data["gcs"]["op"] == "hmacKeys.update" + assert gcs_span.data["gcs"]["projectId"] == "test-project" + assert gcs_span.data["gcs"]["accessId"] == "test key" @patch("requests.Session.request") - def test_object_hmac_keys_update(self, mock_requests): + def test_object_get_service_account_email(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( json_content={ "email_address": "test@example.com", @@ -983,28 +1004,30 @@ def test_object_hmac_keys_update(self, mock_requests): credentials=AnonymousCredentials(), project="test-project" ) - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): client.get_service_account_email() spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) - self.assertIsNone(tracer.active_span) + assert len(spans) == 2 + + current_span = get_current_span() + assert not current_span.is_recording() gcs_span = spans[0] test_span = spans[1] self.assertTraceContextPropagated(test_span, gcs_span) - self.assertEqual("gcs", gcs_span.n) - self.assertEqual(2, gcs_span.k) - self.assertIsNone(gcs_span.ec) + assert gcs_span.n == "gcs" + assert gcs_span.k is SpanKind.CLIENT + assert not gcs_span.ec - self.assertEqual("serviceAccount.get", gcs_span.data["gcs"]["op"]) - self.assertEqual("test-project", gcs_span.data["gcs"]["projectId"]) + assert gcs_span.data["gcs"]["op"] == "serviceAccount.get" + assert gcs_span.data["gcs"]["projectId"] == "test-project" @patch("requests.Session.request") - def test_batch_operation(self, mock_requests): + def test_batch_operation(self, mock_requests: Mock) -> None: mock_requests.return_value = self._mock_response( _TWO_PART_BATCH_RESPONSE, status_code=http_client.OK, @@ -1016,16 +1039,110 @@ def test_batch_operation(self, mock_requests): ) bucket = client.bucket("test-bucket") - with tracer.start_active_span("test"): + with tracer.start_as_current_span("test"): with client.batch(): for obj in ["obj1", "obj2"]: bucket.delete_blob(obj) spans = self.recorder.queued_spans() - self.assertEqual(2, len(spans)) + assert len(spans) == 2 + + @patch("requests.Session.request") + def test_execute_with_instana_without_tags(self, mock_requests: Mock) -> None: + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#buckets", "items": []}, + status_code=http_client.OK, + ) + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + with tracer.start_as_current_span("test"), patch( + "instana.instrumentation.google.cloud.storage._collect_attributes", + return_value=None, + ): + buckets = client.list_buckets() + for b in buckets: + pass + assert isinstance(buckets, page_iterator.HTTPIterator) + + def test_execute_with_instana_tracing_is_off(self) -> None: + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + with tracer.start_as_current_span("test"), patch( + "instana.instrumentation.google.cloud.storage.tracing_is_off", + return_value=True, + ): + response = client.list_buckets() + assert isinstance(response.client, storage.Client) + + @patch("requests.Session.request") + def test_download_with_instana_tracing_is_off(self, mock_requests: Mock) -> None: + mock_requests.return_value = self._mock_response( + content=b"CONTENT", status_code=http_client.OK + ) + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + with tracer.start_as_current_span("test"), patch( + "instana.instrumentation.google.cloud.storage.tracing_is_off", + return_value=True, + ): + response = ( + client.bucket("test bucket") + .blob("test object") + .download_to_file( + io.BytesIO(), + raw_download=True, + ) + ) + assert not response + + @patch("requests.Session.request") + def test_upload_with_instana_tracing_is_off(self, mock_requests: Mock) -> None: + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#object"}, status_code=http_client.OK + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + + with tracer.start_as_current_span("test"), patch( + "instana.instrumentation.google.cloud.storage.tracing_is_off", + return_value=True, + ): + response = ( + client.bucket("test bucket") + .blob("test object") + .upload_from_string("CONTENT") + ) + assert not response + + @patch("requests.Session.request") + def test_finish_batch_operation_tracing_is_off(self, mock_requests: Mock) -> None: + mock_requests.return_value = self._mock_response( + _TWO_PART_BATCH_RESPONSE, + status_code=http_client.OK, + headers={"content-type": 'multipart/mixed; boundary="DEADBEEF="'}, + ) + + client = self._client( + credentials=AnonymousCredentials(), project="test-project" + ) + bucket = client.bucket("test-bucket") + + with tracer.start_as_current_span("test"), patch( + "instana.instrumentation.google.cloud.storage.tracing_is_off", + return_value=True, + ): + with client.batch() as batch_response: + for obj in ["obj1", "obj2"]: + bucket.delete_blob(obj) + assert batch_response - def _client(self, *args, **kwargs): + def _client(self, *args, **kwargs) -> storage.Client: # override the HTTP client to bypass the authorization kwargs["_http"] = kwargs.get("_http", requests.Session()) kwargs["_http"].is_mtls = False @@ -1038,7 +1155,7 @@ def _mock_response( status_code=http_client.NO_CONTENT, json_content=None, headers={}, - ): + ) -> Mock: resp = Mock() resp.status_code = status_code resp.headers = headers