From 280b02009e656fd8fe893a2f1067d443384fe07a Mon Sep 17 00:00:00 2001 From: Paulo Vital Date: Wed, 14 May 2025 12:57:39 +0200 Subject: [PATCH 1/3] fix: re-entrant Collector threads on musl-based systems. Signed-off-by: Paulo Vital --- src/instana/collector/base.py | 55 ++++++++++++++--------------------- src/instana/util/__init__.py | 30 ++----------------- 2 files changed, 24 insertions(+), 61 deletions(-) diff --git a/src/instana/collector/base.py b/src/instana/collector/base.py index 67008e34..4bffade8 100644 --- a/src/instana/collector/base.py +++ b/src/instana/collector/base.py @@ -8,9 +8,10 @@ import queue # pylint: disable=import-error import threading +import time from instana.log import logger -from instana.util import DictionaryOfStan, every +from instana.util import DictionaryOfStan class BaseCollector(object): @@ -76,22 +77,22 @@ def start(self): """ if self.is_reporting_thread_running(): if self.thread_shutdown.is_set(): - # Shutdown still in progress; Reschedule this start in 5 seconds from now + # Force a restart. + self.thread_shutdown.clear() + # Reschedule this start in 5 seconds from now timer = threading.Timer(5, self.start) timer.daemon = True timer.name = "Collector Timed Start" timer.start() return logger.debug( - "BaseCollector.start non-fatal: call but thread already running (started: %s)", - self.started, + f"BaseCollector.start non-fatal: call but thread already running (started: {self.started})" ) - return if self.agent.can_send(): logger.debug("BaseCollector.start: launching collection thread") self.thread_shutdown.clear() - self.reporting_thread = threading.Thread(target=self.thread_loop, args=()) + self.reporting_thread = threading.Thread(target=self.background_report, args=()) self.reporting_thread.daemon = True self.reporting_thread.name = self.THREAD_NAME self.reporting_thread.start() @@ -113,37 +114,25 @@ def shutdown(self, report_final=True): self.prepare_and_report_data() self.started = False - def thread_loop(self): - """ - Just a loop that is run in the background thread. - @return: None - """ - every( - self.report_interval, - self.background_report, - "Instana Collector: prepare_and_report_data", - ) - - def background_report(self): + def background_report(self) -> None: """ The main work-horse method to report data in the background thread. - @return: Boolean - """ - if self.thread_shutdown.is_set(): - logger.debug( - "Thread shutdown signal is active: Shutting down reporting thread" - ) - return False - - self.prepare_and_report_data() - if self.thread_shutdown.is_set(): - logger.debug( - "Thread shutdown signal is active: Shutting down reporting thread" - ) - return False + This method runs indefinitely, preparing and reporting data at regular + intervals. + It checks for a shutdown signal and stops execution if it's set. + + @return: None + """ + while True: + if self.thread_shutdown.is_set(): + logger.debug( + "Thread shutdown signal is active: Shutting down reporting thread" + ) + break - return True + self.prepare_and_report_data() + time.sleep(self.report_interval) def prepare_and_report_data(self): """ diff --git a/src/instana/util/__init__.py b/src/instana/util/__init__.py index bd991126..56487bc4 100644 --- a/src/instana/util/__init__.py +++ b/src/instana/util/__init__.py @@ -1,14 +1,12 @@ # (c) Copyright IBM Corp. 2021 # (c) Copyright Instana Inc. 2020 +import importlib.metadata import json -import time from collections import defaultdict from urllib import parse -import importlib.metadata - -from ..log import logger +from instana.log import logger def nested_dictionary(): @@ -111,30 +109,6 @@ def get_default_gateway(): logger.warning("get_default_gateway: ", exc_info=True) -def every(delay, task, name): - """ - Executes a task every `delay` seconds - - :param delay: the delay in seconds - :param task: the method to run. The method should return False if you want the loop to stop. - :return: None - """ - next_time = time.time() + delay - - while True: - time.sleep(max(0, next_time - time.time())) - try: - if task() is False: - break - except Exception: - logger.debug( - "Problem while executing repetitive task: %s", name, exc_info=True - ) - - # skip tasks if we are behind schedule: - next_time += (time.time() - next_time) // delay * delay + delay - - def validate_url(url): """ Validate if is a valid url From 8031aaa15a22472136eff1020442d45eddaa52f5 Mon Sep 17 00:00:00 2001 From: Paulo Vital Date: Sun, 25 May 2025 21:52:15 +0200 Subject: [PATCH 2/3] style: format BaseCollector and utils files. Used ruff (vscode) to: - Black-compatible code formatting. - fix all auto-fixable violations, like unused imports. - isort-compatible import sorting. Signed-off-by: Paulo Vital --- src/instana/collector/base.py | 39 ++++++++++------- src/instana/util/__init__.py | 82 +++++++++++++++++++++++------------ 2 files changed, 78 insertions(+), 43 deletions(-) diff --git a/src/instana/collector/base.py b/src/instana/collector/base.py index 4bffade8..82bfa71a 100644 --- a/src/instana/collector/base.py +++ b/src/instana/collector/base.py @@ -2,17 +2,22 @@ # (c) Copyright Instana Inc. 2020 """ -A Collector launches a background thread and continually collects & reports data. The data -can be any combination of metrics, snapshot data and spans. +A Collector launches a background thread and continually collects & reports data. +The data can be any combination of metrics, snapshot data and spans. """ import queue # pylint: disable=import-error import threading import time +from typing import TYPE_CHECKING, Any, DefaultDict, Dict, List, Type from instana.log import logger from instana.util import DictionaryOfStan +if TYPE_CHECKING: + from instana.agent.base import BaseAgent + from instana.span.readable_span import ReadableSpan + class BaseCollector(object): """ @@ -20,7 +25,7 @@ class BaseCollector(object): This class launches a background thread to do this work. """ - def __init__(self, agent): + def __init__(self, agent: Type["BaseAgent"]) -> None: # The agent for this process. Can be Standard, AWSLambda or Fargate self.agent = agent @@ -61,7 +66,7 @@ def __init__(self, agent): # Start time of fetching metadata self.fetching_start_time = 0 - def is_reporting_thread_running(self): + def is_reporting_thread_running(self) -> bool: """ Indicates if there is a thread running with the name self.THREAD_NAME """ @@ -70,14 +75,14 @@ def is_reporting_thread_running(self): return True return False - def start(self): + def start(self) -> None: """ Starts the collector and starts reporting as long as the agent is in a ready state. @return: None """ if self.is_reporting_thread_running(): if self.thread_shutdown.is_set(): - # Force a restart. + # Force a restart. self.thread_shutdown.clear() # Reschedule this start in 5 seconds from now timer = threading.Timer(5, self.start) @@ -92,7 +97,9 @@ def start(self): if self.agent.can_send(): logger.debug("BaseCollector.start: launching collection thread") self.thread_shutdown.clear() - self.reporting_thread = threading.Thread(target=self.background_report, args=()) + self.reporting_thread = threading.Thread( + target=self.background_report, args=() + ) self.reporting_thread.daemon = True self.reporting_thread.name = self.THREAD_NAME self.reporting_thread.start() @@ -102,7 +109,7 @@ def start(self): "BaseCollector.start: the agent tells us we can't send anything out" ) - def shutdown(self, report_final=True): + def shutdown(self, report_final: bool = True) -> None: """ Shuts down the collector and reports any final data (if possible). e.g. If the host agent disappeared, we won't be able to report final data. @@ -118,10 +125,10 @@ def background_report(self) -> None: """ The main work-horse method to report data in the background thread. - This method runs indefinitely, preparing and reporting data at regular + This method runs indefinitely, preparing and reporting data at regular intervals. It checks for a shutdown signal and stops execution if it's set. - + @return: None """ while True: @@ -134,7 +141,7 @@ def background_report(self) -> None: self.prepare_and_report_data() time.sleep(self.report_interval) - def prepare_and_report_data(self): + def prepare_and_report_data(self) -> bool: """ Prepare and report the data payload. @return: Boolean @@ -144,7 +151,7 @@ def prepare_and_report_data(self): self.agent.report_data_payload(payload) return True - def prepare_payload(self): + def prepare_payload(self) -> DefaultDict[str, Any]: """ Method to prepare the data to be reported. @return: DictionaryOfStan() @@ -152,7 +159,7 @@ def prepare_payload(self): logger.debug("BaseCollector: prepare_payload needs to be overridden") return DictionaryOfStan() - def should_send_snapshot_data(self): + def should_send_snapshot_data(self) -> bool: """ Determines if snapshot data should be sent @return: Boolean @@ -160,10 +167,10 @@ def should_send_snapshot_data(self): logger.debug("BaseCollector: should_send_snapshot_data needs to be overridden") return False - def collect_snapshot(self, *argv, **kwargs): + def collect_snapshot(self, *argv, **kwargs) -> None: logger.debug("BaseCollector: collect_snapshot needs to be overridden") - def queued_spans(self): + def queued_spans(self) -> List["ReadableSpan"]: """ Get all of the queued spans @return: list @@ -178,7 +185,7 @@ def queued_spans(self): spans.append(span) return spans - def queued_profiles(self): + def queued_profiles(self) -> List[Dict[str, Any]]: """ Get all of the queued profiles @return: list diff --git a/src/instana/util/__init__.py b/src/instana/util/__init__.py index 56487bc4..b8b44365 100644 --- a/src/instana/util/__init__.py +++ b/src/instana/util/__init__.py @@ -4,32 +4,42 @@ import importlib.metadata import json from collections import defaultdict +from typing import Any, DefaultDict from urllib import parse from instana.log import logger -def nested_dictionary(): +def nested_dictionary() -> DefaultDict[str, Any]: return defaultdict(DictionaryOfStan) # Simple implementation of a nested dictionary. -DictionaryOfStan = nested_dictionary +DictionaryOfStan: DefaultDict[str, Any] = nested_dictionary -def to_json(obj): +# Assisted by watsonx Code Assistant +def to_json(obj: Any) -> bytes: """ - Convert obj to json. Used mostly to convert the classes in json_span.py until we switch to nested - dicts (or something better) + Convert the given object to a JSON binary string. - :param obj: the object to serialize to json - :return: json string + This function is primarily used to serialize objects from `json_span.py` + until a switch to nested dictionaries (or a better solution) is made. + + :param obj: The object to serialize to JSON. + :return: The JSON string encoded as bytes. """ try: - def extractor(o): + def extractor(o: Any) -> dict: + """ + Extract dictionary-like attributes from an object. + + :param o: The object to extract attributes from. + :return: A dictionary containing the object's attributes. + """ if not hasattr(o, "__dict__"): - logger.debug("Couldn't serialize non dict type: %s", type(o)) + logger.debug(f"Couldn't serialize non dict type: {type(o)}") return {} else: return {k.lower(): v for k, v in o.__dict__.items() if v is not None} @@ -41,9 +51,12 @@ def extractor(o): logger.debug("to_json non-fatal encoding issue: ", exc_info=True) -def to_pretty_json(obj): +# Assisted by watsonx Code Assistant +def to_pretty_json(obj: Any) -> str: """ - Convert obj to pretty json. Used mostly in logging/debugging. + Convert an object to a pretty-printed JSON string. + + This function is primarily used for logging and debugging purposes. :param obj: the object to serialize to json :return: json string @@ -64,26 +77,40 @@ def extractor(o): logger.debug("to_pretty_json non-fatal encoding issue: ", exc_info=True) -def package_version(): +# Assisted by watsonx Code Assistant +def package_version() -> str: """ - Determine the version of this package. + Determine the version of the 'instana' package. + + This function uses the `importlib.metadata` module to fetch the version of + the 'instana' package. + If the package is not found, it returns 'unknown'. - :return: String representing known version + :return: A string representing the version of the 'instana' package. """ - version = "" try: version = importlib.metadata.version("instana") except importlib.metadata.PackageNotFoundError: + logger.debug("Not able to identify the Instana package version.") version = "unknown" return version -def get_default_gateway(): +# Assisted by watsonx Code Assistant +def get_default_gateway() -> str: """ Attempts to read /proc/self/net/route to determine the default gateway in use. - :return: String - the ip address of the default gateway or None if not found/possible/non-existant + This function reads the /proc/self/net/route file, which contains network + routing information for the current process. + It specifically looks for the line where the Destination is 00000000, + indicating the default route. + The Gateway IP is encoded backwards in hex, which this function decodes and + converts to a standard IP address format. + + :return: String - the ip address of the default gateway or None if not + found/possible/non-existant """ try: hip = None @@ -98,28 +125,29 @@ def get_default_gateway(): if hip is not None and len(hip) == 8: # Reverse order, convert hex to int - return "%i.%i.%i.%i" % ( - int(hip[6:8], 16), - int(hip[4:6], 16), - int(hip[2:4], 16), - int(hip[0:2], 16), - ) + return f"{int(hip[6:8], 16)}.{int(hip[4:6], 16)}.{int(hip[2:4], 16)}.{int(hip[0:2], 16)}" except Exception: logger.warning("get_default_gateway: ", exc_info=True) -def validate_url(url): +# Assisted by watsonx Code Assistant +def validate_url(url: str) -> bool: """ - Validate if is a valid url + Validate if the provided is a valid URL. + + This function checks if the given string is a valid URL by attempting to + parse it using the `urlparse` function from the `urllib.parse` module. + A URL is considered valid if it has both a scheme (like 'http' or 'https') + and a network location (netloc). Examples: - "http://localhost:5000" - valid - "http://localhost:5000/path" - valid - "sandwich" - invalid - @param url: string - @return: Boolean + @param url: A string representing the URL to validate. + @return: A boolean value. Returns `True` if the URL is valid, otherwise `False`. """ try: result = parse.urlparse(url) From 286ed5029c2cfa7c5e224b0c1d58944a189ab5b3 Mon Sep 17 00:00:00 2001 From: Paulo Vital Date: Wed, 4 Jun 2025 21:50:58 +0200 Subject: [PATCH 3/3] fix (test): remove test and coverage for BaseCollector.background_report As the BaseCollector.background_report runs in a thread on top of a `while True` loop, it's test is impossible to capture the output. Signed-off-by: Paulo Vital --- src/instana/collector/base.py | 2 +- tests/collector/test_base_collector.py | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/src/instana/collector/base.py b/src/instana/collector/base.py index 82bfa71a..23c410b3 100644 --- a/src/instana/collector/base.py +++ b/src/instana/collector/base.py @@ -131,7 +131,7 @@ def background_report(self) -> None: @return: None """ - while True: + while True: # pragma: no cover if self.thread_shutdown.is_set(): logger.debug( "Thread shutdown signal is active: Shutting down reporting thread" diff --git a/tests/collector/test_base_collector.py b/tests/collector/test_base_collector.py index 8f8550a8..dad090b6 100644 --- a/tests/collector/test_base_collector.py +++ b/tests/collector/test_base_collector.py @@ -146,11 +146,6 @@ def test_shutdown( assert "Collector.shutdown: Reporting final data." in caplog.messages assert not self.collector.started - def test_background_report(self) -> None: - assert self.collector.background_report() - self.collector.thread_shutdown.set() - assert not self.collector.background_report() - def test_should_send_snapshot_data(self, caplog: LogCaptureFixture) -> None: caplog.set_level(logging.DEBUG, logger="instana") self.collector.should_send_snapshot_data()