22# (c) Copyright Instana Inc. 2020
33
44"""
5- A Collector launches a background thread and continually collects & reports data. The data
6- can be any combination of metrics, snapshot data and spans.
5+ A Collector launches a background thread and continually collects & reports data.
6+ The data can be any combination of metrics, snapshot data and spans.
77"""
88
99import queue # pylint: disable=import-error
1010import threading
1111import time
12+ from typing import TYPE_CHECKING , Any , DefaultDict , Dict , List , Type
1213
1314from instana .log import logger
1415from instana .util import DictionaryOfStan
1516
17+ if TYPE_CHECKING :
18+ from instana .agent .base import BaseAgent
19+ from instana .span .readable_span import ReadableSpan
20+
1621
1722class BaseCollector (object ):
1823 """
1924 Base class to handle the collection & reporting of snapshot and metric data
2025 This class launches a background thread to do this work.
2126 """
2227
23- def __init__ (self , agent ) :
28+ def __init__ (self , agent : Type [ "BaseAgent" ]) -> None :
2429 # The agent for this process. Can be Standard, AWSLambda or Fargate
2530 self .agent = agent
2631
@@ -61,7 +66,7 @@ def __init__(self, agent):
6166 # Start time of fetching metadata
6267 self .fetching_start_time = 0
6368
64- def is_reporting_thread_running (self ):
69+ def is_reporting_thread_running (self ) -> bool :
6570 """
6671 Indicates if there is a thread running with the name self.THREAD_NAME
6772 """
@@ -70,14 +75,14 @@ def is_reporting_thread_running(self):
7075 return True
7176 return False
7277
73- def start (self ):
78+ def start (self ) -> None :
7479 """
7580 Starts the collector and starts reporting as long as the agent is in a ready state.
7681 @return: None
7782 """
7883 if self .is_reporting_thread_running ():
7984 if self .thread_shutdown .is_set ():
80- # Force a restart.
85+ # Force a restart.
8186 self .thread_shutdown .clear ()
8287 # Reschedule this start in 5 seconds from now
8388 timer = threading .Timer (5 , self .start )
@@ -92,7 +97,9 @@ def start(self):
9297 if self .agent .can_send ():
9398 logger .debug ("BaseCollector.start: launching collection thread" )
9499 self .thread_shutdown .clear ()
95- self .reporting_thread = threading .Thread (target = self .background_report , args = ())
100+ self .reporting_thread = threading .Thread (
101+ target = self .background_report , args = ()
102+ )
96103 self .reporting_thread .daemon = True
97104 self .reporting_thread .name = self .THREAD_NAME
98105 self .reporting_thread .start ()
@@ -102,7 +109,7 @@ def start(self):
102109 "BaseCollector.start: the agent tells us we can't send anything out"
103110 )
104111
105- def shutdown (self , report_final = True ):
112+ def shutdown (self , report_final : bool = True ) -> None :
106113 """
107114 Shuts down the collector and reports any final data (if possible).
108115 e.g. If the host agent disappeared, we won't be able to report final data.
@@ -118,10 +125,10 @@ def background_report(self) -> None:
118125 """
119126 The main work-horse method to report data in the background thread.
120127
121- This method runs indefinitely, preparing and reporting data at regular
128+ This method runs indefinitely, preparing and reporting data at regular
122129 intervals.
123130 It checks for a shutdown signal and stops execution if it's set.
124-
131+
125132 @return: None
126133 """
127134 while True :
@@ -134,7 +141,7 @@ def background_report(self) -> None:
134141 self .prepare_and_report_data ()
135142 time .sleep (self .report_interval )
136143
137- def prepare_and_report_data (self ):
144+ def prepare_and_report_data (self ) -> bool :
138145 """
139146 Prepare and report the data payload.
140147 @return: Boolean
@@ -144,26 +151,26 @@ def prepare_and_report_data(self):
144151 self .agent .report_data_payload (payload )
145152 return True
146153
147- def prepare_payload (self ):
154+ def prepare_payload (self ) -> DefaultDict [ str , Any ] :
148155 """
149156 Method to prepare the data to be reported.
150157 @return: DictionaryOfStan()
151158 """
152159 logger .debug ("BaseCollector: prepare_payload needs to be overridden" )
153160 return DictionaryOfStan ()
154161
155- def should_send_snapshot_data (self ):
162+ def should_send_snapshot_data (self ) -> bool :
156163 """
157164 Determines if snapshot data should be sent
158165 @return: Boolean
159166 """
160167 logger .debug ("BaseCollector: should_send_snapshot_data needs to be overridden" )
161168 return False
162169
163- def collect_snapshot (self , * argv , ** kwargs ):
170+ def collect_snapshot (self , * argv , ** kwargs ) -> None :
164171 logger .debug ("BaseCollector: collect_snapshot needs to be overridden" )
165172
166- def queued_spans (self ):
173+ def queued_spans (self ) -> List [ "ReadableSpan" ] :
167174 """
168175 Get all of the queued spans
169176 @return: list
@@ -178,7 +185,7 @@ def queued_spans(self):
178185 spans .append (span )
179186 return spans
180187
181- def queued_profiles (self ):
188+ def queued_profiles (self ) -> List [ Dict [ str , Any ]] :
182189 """
183190 Get all of the queued profiles
184191 @return: list
0 commit comments