11import copy
22import json
3- import sched
43import threading
54import time
65
@@ -27,17 +26,16 @@ def __init__(self, options=SecureNativeOptions(), http_client=None):
2726 else :
2827 self .http_client = http_client
2928
29+ self .thread = threading .Thread (target = self .run , daemon = True )
30+ self .thread .start ()
31+
3032 self .options = options
3133 self .queue = list ()
3234 self .send_enabled = False
3335 self .attempt = 0
3436 self .coefficients = [1 , 1 , 2 , 3 , 5 , 8 , 13 ]
35- self .scheduler = None
3637 self .thread = None
37-
38- if self .options .auto_send and not self .options .disable :
39- interval_seconds = max (options .interval // 1000 , 1 )
40- threading .Timer (interval_seconds , self .flush ).start ()
38+ self .interval = options .interval
4139
4240 def send_async (self , event , resource_path ):
4341 if self .options .disable :
@@ -50,7 +48,7 @@ def send_async(self, event, resource_path):
5048 False
5149 )
5250
53- self .queue .insert ( 0 , item )
51+ self .queue .append ( item )
5452 if self ._is_queue_full ():
5553 self .queue = self .queue [:len (self .queue - 1 )]
5654
@@ -79,51 +77,47 @@ def send_sync(self, event, resource_path, retry):
7977 json .dumps (EventManager .serialize (event )),
8078 retry
8179 )
82- self .queue .insert (0 , item )
80+ self .queue .append (0 , item )
8381 if self ._is_queue_full ():
8482 self .queue = self .queue [:len (self .queue - 1 )]
8583 return res
8684
8785 def _is_queue_full (self ):
8886 return len (self .queue ) > self .options .max_events
8987
90- def _send_events (self ):
91- if len (self .queue ) > 0 and self .send_enabled :
92- for item in self .queue :
93- try :
94- res = self .http_client .post (item .url , item .body )
95- if res .status_code is 401 :
96- item .retry = False
97- elif res .status_code != 200 :
98- raise SecureNativeHttpException (res .status_code )
99-
100- Logger .debug ("Event successfully sent; {}" .format (item .body ))
101- self .queue .remove (item )
102- return res
103- except Exception as e :
104- Logger .error ("Failed to send event; {}" .format (e ))
105- if item .retry :
106- if len (self .coefficients ) == self .attempt + 1 :
107- self .attempt = 0
108-
109- back_off = self .coefficients [self .attempt ] * self .options .interval
110- Logger .debug ("Automatic back-off of {}" .format (back_off ))
111- self .send_enabled = False
112- time .sleep (back_off )
113- self .send_enabled = True
114- else :
88+ def run (self ):
89+ while True :
90+ if len (self .queue ) > 0 and self .send_enabled :
91+ for item in self .queue :
92+ try :
93+ res = self .http_client .post (item .url , item .body )
94+ if res .status_code is 401 :
95+ item .retry = False
96+ elif res .status_code != 200 :
97+ raise SecureNativeHttpException (res .status_code )
98+
99+ Logger .debug ("Event successfully sent; {}" .format (item .body ))
115100 self .queue .remove (item )
101+ return res
102+ except Exception as e :
103+ Logger .error ("Failed to send event; {}" .format (e ))
104+ if item .retry :
105+ if len (self .coefficients ) == self .attempt + 1 :
106+ self .attempt = 0
107+
108+ back_off = self .coefficients [self .attempt ] * self .options .interval
109+ Logger .debug ("Automatic back-off of {}" .format (back_off ))
110+ self .send_enabled = False
111+ time .sleep (back_off )
112+ self .send_enabled = True
113+ else :
114+ self .queue .remove (item )
115+ time .sleep (self .interval / 1000 )
116116
117117 def start_event_persist (self ):
118118 Logger .debug ("Starting automatic event persistence" )
119119 if self .options .auto_send or self .send_enabled :
120120 self .send_enabled = True
121- try :
122- self .scheduler = sched .scheduler (time .time , time .sleep )
123- self .scheduler .enter (self .options .interval , 1 , self ._send_events )
124- self .thread = threading .Thread (target = self .scheduler .run ).start ()
125- except Exception as e :
126- Logger .error ("Could not start event scheduler; {}" .format (e ))
127121 else :
128122 Logger .debug ("Automatic event persistence is disabled, you should persist events manually" )
129123
@@ -133,7 +127,6 @@ def stop_event_persist(self):
133127 try :
134128 if self .thread :
135129 self .thread .stop ()
136- self .scheduler .cancel (self ._send_events )
137130 except ValueError as e :
138131 Logger .error ("Could not stop event scheduler; {}" .format (e ))
139132
0 commit comments