Skip to content

Commit 21a8460

Browse files
MichaelGHSegclaude
andcommitted
Improve HTTP response handling and retry logic
- Add Authorization header with Basic auth (base64 encoded write key) - Add X-Retry-Count header on all requests (starts at 0) - Implement Retry-After header support (capped at 300s) - Retry-After attempts don't count against backoff retry budget - Add granular status code classification: - Retryable 4xx: 408, 410, 429, 460 - Non-retryable 4xx: 400, 401, 403, 404, 413, 422 - Retryable 5xx: all except 501, 505 - Non-retryable 5xx: 501, 505 - Replace backoff decorator with custom retry loop - Exponential backoff with jitter (0.5s base, 60s cap) - Clear OAuth token on 511 Network Authentication Required - 413 Payload Too Large is non-retryable - Add 30 new comprehensive tests (106 total tests) Aligns with analytics-java and analytics-next retry behavior. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent bc40fba commit 21a8460

4 files changed

Lines changed: 821 additions & 44 deletions

File tree

segment/analytics/consumer.py

Lines changed: 104 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import logging
22
import time
3+
import random
34
from threading import Thread
4-
import backoff
55
import json
66

7-
from segment.analytics.request import post, APIError, DatetimeSerializer
7+
from segment.analytics.request import post, APIError, DatetimeSerializer, parse_retry_after
88

99
from queue import Empty
1010

@@ -120,40 +120,108 @@ def next(self):
120120
return items
121121

122122
def request(self, batch):
123-
"""Attempt to upload the batch and retry before raising an error """
124-
125-
def fatal_exception(exc):
126-
if isinstance(exc, APIError):
127-
# retry on server errors and client errors
128-
# with 429 status code (rate limited),
129-
# don't retry on other client errors
130-
return (400 <= exc.status < 500) and exc.status != 429
131-
elif isinstance(exc, FatalError):
132-
return True
133-
else:
134-
# retry on all other errors (eg. network)
135-
return False
136-
137-
attempt_count = 0
138-
139-
@backoff.on_exception(
140-
backoff.expo,
141-
Exception,
142-
max_tries=self.retries + 1,
143-
giveup=fatal_exception,
144-
on_backoff=lambda details: self.log.debug(
145-
f"Retry attempt {details['tries']}/{self.retries + 1} after {details['elapsed']:.2f}s"
146-
))
147-
def send_request():
148-
nonlocal attempt_count
149-
attempt_count += 1
123+
"""Attempt to upload the batch and retry before raising an error"""
124+
125+
def is_retryable_status(status):
126+
"""
127+
Determine if a status code is retryable.
128+
Retryable 4xx: 408, 410, 429, 460
129+
Non-retryable 4xx: 400, 401, 403, 404, 413, 422, and all other 4xx
130+
Retryable 5xx: All except 501, 505
131+
Non-retryable 5xx: 501, 505
132+
"""
133+
if 400 <= status < 500:
134+
return status in (408, 410, 429, 460)
135+
elif 500 <= status < 600:
136+
return status not in (501, 505)
137+
return False
138+
139+
def should_use_retry_after(status):
140+
"""Check if status code should respect Retry-After header"""
141+
return status in (408, 429, 503)
142+
143+
total_attempts = 0
144+
backoff_attempts = 0
145+
max_backoff_attempts = self.retries + 1
146+
147+
while True:
150148
try:
151-
return post(self.write_key, self.host, gzip=self.gzip,
152-
timeout=self.timeout, batch=batch, proxies=self.proxies,
153-
oauth_manager=self.oauth_manager)
154-
except Exception as e:
155-
if attempt_count >= self.retries + 1:
156-
self.log.error(f"All {self.retries} retries exhausted. Final error: {e}")
149+
# Make the request with current retry count
150+
response = post(
151+
self.write_key,
152+
self.host,
153+
gzip=self.gzip,
154+
timeout=self.timeout,
155+
batch=batch,
156+
proxies=self.proxies,
157+
oauth_manager=self.oauth_manager,
158+
retry_count=total_attempts
159+
)
160+
# Success
161+
return response
162+
163+
except FatalError as e:
164+
# Non-retryable error
165+
self.log.error(f"Fatal error after {total_attempts} attempts: {e}")
157166
raise
158167

159-
send_request()
168+
except APIError as e:
169+
total_attempts += 1
170+
171+
# Check if we should use Retry-After header
172+
if should_use_retry_after(e.status) and e.response:
173+
retry_after = parse_retry_after(e.response)
174+
if retry_after:
175+
self.log.debug(
176+
f"Retry-After header present: waiting {retry_after}s (attempt {total_attempts})"
177+
)
178+
time.sleep(retry_after)
179+
continue # Does not count against backoff budget
180+
181+
# Check if status is retryable
182+
if not is_retryable_status(e.status):
183+
self.log.error(
184+
f"Non-retryable error {e.status} after {total_attempts} attempts: {e}"
185+
)
186+
raise
187+
188+
# Count this against backoff attempts
189+
backoff_attempts += 1
190+
if backoff_attempts >= max_backoff_attempts:
191+
self.log.error(
192+
f"All {self.retries} retries exhausted after {total_attempts} total attempts. Final error: {e}"
193+
)
194+
raise
195+
196+
# Calculate exponential backoff delay with jitter
197+
base_delay = 0.5 * (2 ** (backoff_attempts - 1))
198+
jitter = random.uniform(0, 0.1 * base_delay)
199+
delay = min(base_delay + jitter, 60) # Cap at 60 seconds
200+
201+
self.log.debug(
202+
f"Retry attempt {backoff_attempts}/{self.retries} (total attempts: {total_attempts}) "
203+
f"after {delay:.2f}s for status {e.status}"
204+
)
205+
time.sleep(delay)
206+
207+
except Exception as e:
208+
# Network errors or other exceptions - retry with backoff
209+
total_attempts += 1
210+
backoff_attempts += 1
211+
212+
if backoff_attempts >= max_backoff_attempts:
213+
self.log.error(
214+
f"All {self.retries} retries exhausted after {total_attempts} total attempts. Final error: {e}"
215+
)
216+
raise
217+
218+
# Calculate exponential backoff delay with jitter
219+
base_delay = 0.5 * (2 ** (backoff_attempts - 1))
220+
jitter = random.uniform(0, 0.1 * base_delay)
221+
delay = min(base_delay + jitter, 60) # Cap at 60 seconds
222+
223+
self.log.debug(
224+
f"Network error retry {backoff_attempts}/{self.retries} (total attempts: {total_attempts}) "
225+
f"after {delay:.2f}s: {e}"
226+
)
227+
time.sleep(delay)

segment/analytics/request.py

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from gzip import GzipFile
44
import logging
55
import json
6+
import base64
67
from dateutil.tz import tzutc
78
from requests.auth import HTTPBasicAuth
89
from requests import sessions
@@ -12,8 +13,31 @@
1213

1314
_session = sessions.Session()
1415

16+
# Maximum Retry-After delay to respect (5 minutes)
17+
MAX_RETRY_AFTER_SECONDS = 300
1518

16-
def post(write_key, host=None, gzip=False, timeout=15, proxies=None, oauth_manager=None, **kwargs):
19+
20+
def parse_retry_after(response):
21+
"""
22+
Parse Retry-After header from response.
23+
Returns the delay in seconds, or None if header is not present or invalid.
24+
Caps the value at MAX_RETRY_AFTER_SECONDS.
25+
"""
26+
retry_after = response.headers.get('Retry-After')
27+
if not retry_after:
28+
return None
29+
30+
try:
31+
# Try parsing as integer (delay in seconds)
32+
delay = int(retry_after)
33+
return min(delay, MAX_RETRY_AFTER_SECONDS)
34+
except ValueError:
35+
# Could be HTTP-date format, but for simplicity we'll skip that
36+
# Most APIs use integer seconds
37+
return None
38+
39+
40+
def post(write_key, host=None, gzip=False, timeout=15, proxies=None, oauth_manager=None, retry_count=0, **kwargs):
1741
"""Post the `kwargs` to the API"""
1842
log = logging.getLogger('segment')
1943
body = kwargs
@@ -28,10 +52,18 @@ def post(write_key, host=None, gzip=False, timeout=15, proxies=None, oauth_manag
2852
log.debug('making request: %s', data)
2953
headers = {
3054
'Content-Type': 'application/json',
31-
'User-Agent': 'analytics-python/' + VERSION
55+
'User-Agent': 'analytics-python/' + VERSION,
56+
'X-Retry-Count': str(retry_count)
3257
}
58+
59+
# Add Authorization header - prefer OAuth Bearer token, fallback to Basic auth
3360
if auth:
3461
headers['Authorization'] = 'Bearer {}'.format(auth)
62+
else:
63+
# Basic auth with write key (format: "writeKey:" encoded in base64)
64+
credentials = '{}:'.format(write_key)
65+
encoded = base64.b64encode(credentials.encode('utf-8')).decode('utf-8')
66+
headers['Authorization'] = 'Basic {}'.format(encoded)
3567

3668
if gzip:
3769
headers['Content-Encoding'] = 'gzip'
@@ -60,24 +92,25 @@ def post(write_key, host=None, gzip=False, timeout=15, proxies=None, oauth_manag
6092
log.debug('data uploaded successfully')
6193
return res
6294

63-
if oauth_manager and res.status_code in [400, 401, 403]:
95+
if oauth_manager and res.status_code in [400, 401, 403, 511]:
6496
oauth_manager.clear_token()
6597

6698
try:
6799
payload = res.json()
68100
log.debug('received response: %s', payload)
69-
raise APIError(res.status_code, payload['code'], payload['message'])
101+
raise APIError(res.status_code, payload['code'], payload['message'], res)
70102
except ValueError:
71103
log.error('Unknown error: [%s] %s', res.status_code, res.reason)
72-
raise APIError(res.status_code, 'unknown', res.text)
104+
raise APIError(res.status_code, 'unknown', res.text, res)
73105

74106

75107
class APIError(Exception):
76108

77-
def __init__(self, status, code, message):
109+
def __init__(self, status, code, message, response=None):
78110
self.message = message
79111
self.status = status
80112
self.code = code
113+
self.response = response
81114

82115
def __str__(self):
83116
msg = "[Segment] {0}: {1} ({2})"

0 commit comments

Comments
 (0)