diff --git a/src/instana/collector/base.py b/src/instana/collector/base.py index 67008e34..23c410b3 100644 --- a/src/instana/collector/base.py +++ b/src/instana/collector/base.py @@ -2,15 +2,21 @@ # (c) Copyright Instana Inc. 2020 """ -A Collector launches a background thread and continually collects & reports data. The data -can be any combination of metrics, snapshot data and spans. +A Collector launches a background thread and continually collects & reports data. +The data can be any combination of metrics, snapshot data and spans. """ import queue # pylint: disable=import-error import threading +import time +from typing import TYPE_CHECKING, Any, DefaultDict, Dict, List, Type from instana.log import logger -from instana.util import DictionaryOfStan, every +from instana.util import DictionaryOfStan + +if TYPE_CHECKING: + from instana.agent.base import BaseAgent + from instana.span.readable_span import ReadableSpan class BaseCollector(object): @@ -19,7 +25,7 @@ class BaseCollector(object): This class launches a background thread to do this work. """ - def __init__(self, agent): + def __init__(self, agent: Type["BaseAgent"]) -> None: # The agent for this process. Can be Standard, AWSLambda or Fargate self.agent = agent @@ -60,7 +66,7 @@ def __init__(self, agent): # Start time of fetching metadata self.fetching_start_time = 0 - def is_reporting_thread_running(self): + def is_reporting_thread_running(self) -> bool: """ Indicates if there is a thread running with the name self.THREAD_NAME """ @@ -69,29 +75,31 @@ def is_reporting_thread_running(self): return True return False - def start(self): + def start(self) -> None: """ Starts the collector and starts reporting as long as the agent is in a ready state. @return: None """ if self.is_reporting_thread_running(): if self.thread_shutdown.is_set(): - # Shutdown still in progress; Reschedule this start in 5 seconds from now + # Force a restart. + self.thread_shutdown.clear() + # Reschedule this start in 5 seconds from now timer = threading.Timer(5, self.start) timer.daemon = True timer.name = "Collector Timed Start" timer.start() return logger.debug( - "BaseCollector.start non-fatal: call but thread already running (started: %s)", - self.started, + f"BaseCollector.start non-fatal: call but thread already running (started: {self.started})" ) - return if self.agent.can_send(): logger.debug("BaseCollector.start: launching collection thread") self.thread_shutdown.clear() - self.reporting_thread = threading.Thread(target=self.thread_loop, args=()) + self.reporting_thread = threading.Thread( + target=self.background_report, args=() + ) self.reporting_thread.daemon = True self.reporting_thread.name = self.THREAD_NAME self.reporting_thread.start() @@ -101,7 +109,7 @@ def start(self): "BaseCollector.start: the agent tells us we can't send anything out" ) - def shutdown(self, report_final=True): + def shutdown(self, report_final: bool = True) -> None: """ Shuts down the collector and reports any final data (if possible). e.g. If the host agent disappeared, we won't be able to report final data. @@ -113,39 +121,27 @@ def shutdown(self, report_final=True): self.prepare_and_report_data() self.started = False - def thread_loop(self): - """ - Just a loop that is run in the background thread. - @return: None - """ - every( - self.report_interval, - self.background_report, - "Instana Collector: prepare_and_report_data", - ) - - def background_report(self): + def background_report(self) -> None: """ The main work-horse method to report data in the background thread. - @return: Boolean - """ - if self.thread_shutdown.is_set(): - logger.debug( - "Thread shutdown signal is active: Shutting down reporting thread" - ) - return False - self.prepare_and_report_data() + This method runs indefinitely, preparing and reporting data at regular + intervals. + It checks for a shutdown signal and stops execution if it's set. - if self.thread_shutdown.is_set(): - logger.debug( - "Thread shutdown signal is active: Shutting down reporting thread" - ) - return False + @return: None + """ + while True: # pragma: no cover + if self.thread_shutdown.is_set(): + logger.debug( + "Thread shutdown signal is active: Shutting down reporting thread" + ) + break - return True + self.prepare_and_report_data() + time.sleep(self.report_interval) - def prepare_and_report_data(self): + def prepare_and_report_data(self) -> bool: """ Prepare and report the data payload. @return: Boolean @@ -155,7 +151,7 @@ def prepare_and_report_data(self): self.agent.report_data_payload(payload) return True - def prepare_payload(self): + def prepare_payload(self) -> DefaultDict[str, Any]: """ Method to prepare the data to be reported. @return: DictionaryOfStan() @@ -163,7 +159,7 @@ def prepare_payload(self): logger.debug("BaseCollector: prepare_payload needs to be overridden") return DictionaryOfStan() - def should_send_snapshot_data(self): + def should_send_snapshot_data(self) -> bool: """ Determines if snapshot data should be sent @return: Boolean @@ -171,10 +167,10 @@ def should_send_snapshot_data(self): logger.debug("BaseCollector: should_send_snapshot_data needs to be overridden") return False - def collect_snapshot(self, *argv, **kwargs): + def collect_snapshot(self, *argv, **kwargs) -> None: logger.debug("BaseCollector: collect_snapshot needs to be overridden") - def queued_spans(self): + def queued_spans(self) -> List["ReadableSpan"]: """ Get all of the queued spans @return: list @@ -189,7 +185,7 @@ def queued_spans(self): spans.append(span) return spans - def queued_profiles(self): + def queued_profiles(self) -> List[Dict[str, Any]]: """ Get all of the queued profiles @return: list diff --git a/src/instana/util/__init__.py b/src/instana/util/__init__.py index bd991126..b8b44365 100644 --- a/src/instana/util/__init__.py +++ b/src/instana/util/__init__.py @@ -1,37 +1,45 @@ # (c) Copyright IBM Corp. 2021 # (c) Copyright Instana Inc. 2020 +import importlib.metadata import json -import time from collections import defaultdict +from typing import Any, DefaultDict from urllib import parse -import importlib.metadata - -from ..log import logger +from instana.log import logger -def nested_dictionary(): +def nested_dictionary() -> DefaultDict[str, Any]: return defaultdict(DictionaryOfStan) # Simple implementation of a nested dictionary. -DictionaryOfStan = nested_dictionary +DictionaryOfStan: DefaultDict[str, Any] = nested_dictionary -def to_json(obj): +# Assisted by watsonx Code Assistant +def to_json(obj: Any) -> bytes: """ - Convert obj to json. Used mostly to convert the classes in json_span.py until we switch to nested - dicts (or something better) + Convert the given object to a JSON binary string. - :param obj: the object to serialize to json - :return: json string + This function is primarily used to serialize objects from `json_span.py` + until a switch to nested dictionaries (or a better solution) is made. + + :param obj: The object to serialize to JSON. + :return: The JSON string encoded as bytes. """ try: - def extractor(o): + def extractor(o: Any) -> dict: + """ + Extract dictionary-like attributes from an object. + + :param o: The object to extract attributes from. + :return: A dictionary containing the object's attributes. + """ if not hasattr(o, "__dict__"): - logger.debug("Couldn't serialize non dict type: %s", type(o)) + logger.debug(f"Couldn't serialize non dict type: {type(o)}") return {} else: return {k.lower(): v for k, v in o.__dict__.items() if v is not None} @@ -43,9 +51,12 @@ def extractor(o): logger.debug("to_json non-fatal encoding issue: ", exc_info=True) -def to_pretty_json(obj): +# Assisted by watsonx Code Assistant +def to_pretty_json(obj: Any) -> str: """ - Convert obj to pretty json. Used mostly in logging/debugging. + Convert an object to a pretty-printed JSON string. + + This function is primarily used for logging and debugging purposes. :param obj: the object to serialize to json :return: json string @@ -66,26 +77,40 @@ def extractor(o): logger.debug("to_pretty_json non-fatal encoding issue: ", exc_info=True) -def package_version(): +# Assisted by watsonx Code Assistant +def package_version() -> str: """ - Determine the version of this package. + Determine the version of the 'instana' package. + + This function uses the `importlib.metadata` module to fetch the version of + the 'instana' package. + If the package is not found, it returns 'unknown'. - :return: String representing known version + :return: A string representing the version of the 'instana' package. """ - version = "" try: version = importlib.metadata.version("instana") except importlib.metadata.PackageNotFoundError: + logger.debug("Not able to identify the Instana package version.") version = "unknown" return version -def get_default_gateway(): +# Assisted by watsonx Code Assistant +def get_default_gateway() -> str: """ Attempts to read /proc/self/net/route to determine the default gateway in use. - :return: String - the ip address of the default gateway or None if not found/possible/non-existant + This function reads the /proc/self/net/route file, which contains network + routing information for the current process. + It specifically looks for the line where the Destination is 00000000, + indicating the default route. + The Gateway IP is encoded backwards in hex, which this function decodes and + converts to a standard IP address format. + + :return: String - the ip address of the default gateway or None if not + found/possible/non-existant """ try: hip = None @@ -100,52 +125,29 @@ def get_default_gateway(): if hip is not None and len(hip) == 8: # Reverse order, convert hex to int - return "%i.%i.%i.%i" % ( - int(hip[6:8], 16), - int(hip[4:6], 16), - int(hip[2:4], 16), - int(hip[0:2], 16), - ) + return f"{int(hip[6:8], 16)}.{int(hip[4:6], 16)}.{int(hip[2:4], 16)}.{int(hip[0:2], 16)}" except Exception: logger.warning("get_default_gateway: ", exc_info=True) -def every(delay, task, name): - """ - Executes a task every `delay` seconds - - :param delay: the delay in seconds - :param task: the method to run. The method should return False if you want the loop to stop. - :return: None +# Assisted by watsonx Code Assistant +def validate_url(url: str) -> bool: """ - next_time = time.time() + delay + Validate if the provided is a valid URL. - while True: - time.sleep(max(0, next_time - time.time())) - try: - if task() is False: - break - except Exception: - logger.debug( - "Problem while executing repetitive task: %s", name, exc_info=True - ) - - # skip tasks if we are behind schedule: - next_time += (time.time() - next_time) // delay * delay + delay - - -def validate_url(url): - """ - Validate if is a valid url + This function checks if the given string is a valid URL by attempting to + parse it using the `urlparse` function from the `urllib.parse` module. + A URL is considered valid if it has both a scheme (like 'http' or 'https') + and a network location (netloc). Examples: - "http://localhost:5000" - valid - "http://localhost:5000/path" - valid - "sandwich" - invalid - @param url: string - @return: Boolean + @param url: A string representing the URL to validate. + @return: A boolean value. Returns `True` if the URL is valid, otherwise `False`. """ try: result = parse.urlparse(url) diff --git a/tests/collector/test_base_collector.py b/tests/collector/test_base_collector.py index 8f8550a8..dad090b6 100644 --- a/tests/collector/test_base_collector.py +++ b/tests/collector/test_base_collector.py @@ -146,11 +146,6 @@ def test_shutdown( assert "Collector.shutdown: Reporting final data." in caplog.messages assert not self.collector.started - def test_background_report(self) -> None: - assert self.collector.background_report() - self.collector.thread_shutdown.set() - assert not self.collector.background_report() - def test_should_send_snapshot_data(self, caplog: LogCaptureFixture) -> None: caplog.set_level(logging.DEBUG, logger="instana") self.collector.should_send_snapshot_data()