Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e-cli/e2e-config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"sdk": "ruby",
"test_suites": "basic",
"test_suites": "basic,retry",
"auto_settings": false,
"patch": null,
"env": {}
Expand Down
4 changes: 4 additions & 0 deletions lib/segment/analytics/backoff_policy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ def next_interval
[interval, @max_timeout_ms].min
end

def reset!
@attempts = 0
end

private

def add_jitter(base, randomization_factor)
Expand Down
9 changes: 6 additions & 3 deletions lib/segment/analytics/defaults.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ module Request
'Content-Type' => 'application/json',
'User-Agent' => "analytics-ruby/#{Analytics::VERSION}" }
RETRIES = 10
MAX_TOTAL_BACKOFF_DURATION = 43_200 # 12 hours in seconds
MAX_RATE_LIMIT_DURATION = 43_200 # 12 hours in seconds
RATE_LIMIT_RETRY_AFTER_CAP = 300 # seconds
end

module Queue
Expand All @@ -28,9 +31,9 @@ module MessageBatch
end

module BackoffPolicy
MIN_TIMEOUT_MS = 100
MAX_TIMEOUT_MS = 10000
MULTIPLIER = 1.5
MIN_TIMEOUT_MS = 500
MAX_TIMEOUT_MS = 60_000
MULTIPLIER = 2
RANDOMIZATION_FACTOR = 0.5
end
end
Expand Down
4 changes: 4 additions & 0 deletions lib/segment/analytics/response.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ def initialize(status = 200, error = nil)
@status = status
@error = error
end

def success?
status >= 200 && status < 300
end
end
end
end
152 changes: 97 additions & 55 deletions lib/segment/analytics/transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,26 @@ class Transport
include Segment::Analytics::Utils
include Segment::Analytics::Logging

RETRYABLE_4XX = [408, 410, 429, 460].freeze
NON_RETRYABLE_5XX = [501, 505, 511].freeze

def initialize(options = {})
options[:host] ||= HOST
options[:port] ||= PORT
options[:ssl] ||= SSL
options[:ssl] ||= SSL
@headers = options[:headers] || HEADERS
@path = options[:path] || PATH
@path = options[:path] || PATH
@retries = options[:retries] || RETRIES
@backoff_policy =
options[:backoff_policy] || Segment::Analytics::BackoffPolicy.new

@max_total_backoff_duration = options[:max_total_backoff_duration] ||
MAX_TOTAL_BACKOFF_DURATION
@max_rate_limit_duration = options[:max_rate_limit_duration] ||
MAX_RATE_LIMIT_DURATION
@rate_limit_retry_after_cap = options[:rate_limit_retry_after_cap] ||
RATE_LIMIT_RETRY_AFTER_CAP

http = Net::HTTP.new(options[:host], options[:port])
http.use_ssl = options[:ssl]
http.read_timeout = 8
Expand All @@ -40,23 +50,68 @@ def initialize(options = {})
def send(write_key, batch)
logger.debug("Sending request for #{batch.length} items")

last_response, exception = retry_with_backoff(@retries) do
status_code, body = send_request(write_key, batch)
error = JSON.parse(body)['error']
should_retry = should_retry_request?(status_code, body)
@backoff_policy.reset!

retry_count = 0
retries_remaining = @retries
backoff_start_time = nil
rate_limit_start_time = nil

loop do
status_code, body, response_headers = send_request(write_key, batch, retry_count)
error = begin
JSON.parse(body)['error']
rescue StandardError
nil
end
logger.debug("Response status code: #{status_code}")
logger.debug("Response error: #{error}") if error

[Response.new(status_code, error), should_retry]
end

if exception
logger.error(exception.message)
exception.backtrace.each { |line| logger.error(line) }
Response.new(-1, exception.to_s)
else
last_response
return Response.new(status_code, error) if success_status?(status_code)

if status_code == 429
rate_limit_start_time ||= Time.now
if (Time.now - rate_limit_start_time) >= @max_rate_limit_duration
logger.error('Max rate limit duration exceeded for batch')
return Response.new(status_code, error)
end

retry_after = parse_retry_after(response_headers['retry-after'])
if retry_after
delay = [retry_after, @rate_limit_retry_after_cap].min
logger.debug("Rate limited with Retry-After: #{delay}s. Retrying after delay.")
sleep(delay)
retry_count += 1
next
end
end

unless retryable_status?(status_code)
logger.error(body)
return Response.new(status_code, error)
end

retries_remaining -= 1
if retries_remaining <= 0
logger.error('Retries exhausted for batch')
return Response.new(status_code, error)
end

backoff_start_time ||= Time.now
if (Time.now - backoff_start_time) >= @max_total_backoff_duration
logger.error('Max total backoff duration exceeded for batch')
return Response.new(status_code, error)
end

delay_ms = @backoff_policy.next_interval
logger.debug("Retrying request, #{retries_remaining} retries left. Waiting #{delay_ms}ms")
sleep(delay_ms.to_f / 1000)
retry_count += 1
end
rescue StandardError => e
logger.error(e.message)
e.backtrace.each { |line| logger.error(line) }
Response.new(-1, e.to_s)
end

# Closes a persistent connection if it exists
Expand All @@ -66,65 +121,52 @@ def shutdown

private

def should_retry_request?(status_code, body)
if status_code >= 500
true # Server error
elsif status_code == 429
true # Rate limited
elsif status_code >= 400
logger.error(body)
false # Client error. Do not retry, but log
def success_status?(code)
code >= 200 && code < 300
end

def retryable_status?(code)
if code >= 500 && code < 600
!NON_RETRYABLE_5XX.include?(code)
else
false
RETRYABLE_4XX.include?(code)
end
end

# Takes a block that returns [result, should_retry].
#
# Retries upto `retries_remaining` times, if `should_retry` is false or
# an exception is raised. `@backoff_policy` is used to determine the
# duration to sleep between attempts
#
# Returns [last_result, raised_exception]
def retry_with_backoff(retries_remaining, &block)
result, caught_exception = nil
should_retry = false

begin
result, should_retry = yield
return [result, nil] unless should_retry
rescue StandardError => e
should_retry = true
caught_exception = e
end
def parse_retry_after(value)
return nil if value.nil?

if should_retry && (retries_remaining > 1)
logger.debug("Retrying request, #{retries_remaining} retries left")
sleep(@backoff_policy.next_interval.to_f / 1000)
retry_with_backoff(retries_remaining - 1, &block)
else
[result, caught_exception]
end
str = value.is_a?(Array) ? value.first : value
return nil if str.nil?

str = str.strip
return nil unless str =~ /\A\d+\z/

seconds = str.to_i
seconds > 0 ? seconds : nil
end

# Sends a request for the batch, returns [status_code, body]
def send_request(write_key, batch)
# Sends a request for the batch, returns [status_code, body, headers]
def send_request(write_key, batch, retry_count = 0)
payload = JSON.generate(
:sentAt => datetime_in_iso8601(Time.now),
:batch => batch
)
request = Net::HTTP::Post.new(@path, @headers)
headers = @headers.dup
headers['X-Retry-Count'] = retry_count.to_s if retry_count > 0

request = Net::HTTP::Post.new(@path, headers)
request.basic_auth(write_key, nil)

if self.class.stub
logger.debug "stubbed request to #{@path}: " \
"write key = #{write_key}, batch = #{JSON.generate(batch)}"

[200, '{}']
[200, '{}', {}]
else
@http.start unless @http.started? # Maintain a persistent connection
@http.start unless @http.started?
response = @http.request(request, payload)
[response.code.to_i, response.body]
[response.code.to_i, response.body, response.to_hash]
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/segment/analytics/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def run
end

res = @transport.send @write_key, @batch
@on_error.call(res.status, res.error) unless res.status == 200
@on_error.call(res.status, res.error) unless res.success?

@lock.synchronize { @batch.clear }
end
Expand Down
22 changes: 22 additions & 0 deletions spec/segment/analytics/backoff_policy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,28 @@ class Analytics
end
end

describe '#reset!' do
it 'resets attempts to 0' do
subject.next_interval
subject.next_interval
subject.next_interval
subject.reset!
expect(subject.instance_variable_get(:@attempts)).to eq(0)
end

it 'causes next_interval to restart from minimum' do
subject_with_params = described_class.new(
min_timeout_ms: 1000,
max_timeout_ms: 10000,
multiplier: 2,
randomization_factor: 0.5
)
3.times { subject_with_params.next_interval }
subject_with_params.reset!
expect(subject_with_params.next_interval).to be_within(500).of(1000)
end
end

describe '#next_interval' do
subject {
described_class.new(
Expand Down
12 changes: 12 additions & 0 deletions spec/segment/analytics/response_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@ class Analytics
it { expect(subject).to respond_to(:error) }
end

describe '#success?' do
it { expect(described_class.new(200, nil).success?).to be true }
it { expect(described_class.new(201, nil).success?).to be true }
it { expect(described_class.new(204, nil).success?).to be true }
it { expect(described_class.new(301, nil).success?).to be true }
it { expect(described_class.new(302, nil).success?).to be true }
it { expect(described_class.new(400, nil).success?).to be false }
it { expect(described_class.new(429, nil).success?).to be false }
it { expect(described_class.new(500, nil).success?).to be false }
it { expect(described_class.new(-1, nil).success?).to be false }
end

describe '#initialize' do
let(:status) { 404 }
let(:error) { 'Oh No' }
Expand Down
Loading