diff --git a/e2e-cli/e2e-config.json b/e2e-cli/e2e-config.json index f0aea47..e820b6d 100644 --- a/e2e-cli/e2e-config.json +++ b/e2e-cli/e2e-config.json @@ -1,6 +1,6 @@ { "sdk": "ruby", - "test_suites": "basic", + "test_suites": "basic,retry", "auto_settings": false, "patch": null, "env": {} diff --git a/lib/segment/analytics/backoff_policy.rb b/lib/segment/analytics/backoff_policy.rb index e6033b1..7767838 100644 --- a/lib/segment/analytics/backoff_policy.rb +++ b/lib/segment/analytics/backoff_policy.rb @@ -33,6 +33,10 @@ def next_interval [interval, @max_timeout_ms].min end + def reset! + @attempts = 0 + end + private def add_jitter(base, randomization_factor) diff --git a/lib/segment/analytics/defaults.rb b/lib/segment/analytics/defaults.rb index aa32697..e443caf 100644 --- a/lib/segment/analytics/defaults.rb +++ b/lib/segment/analytics/defaults.rb @@ -12,6 +12,9 @@ module Request 'Content-Type' => 'application/json', 'User-Agent' => "analytics-ruby/#{Analytics::VERSION}" } RETRIES = 10 + MAX_TOTAL_BACKOFF_DURATION = 43_200 # 12 hours in seconds + MAX_RATE_LIMIT_DURATION = 43_200 # 12 hours in seconds + RATE_LIMIT_RETRY_AFTER_CAP = 300 # seconds end module Queue @@ -28,9 +31,9 @@ module MessageBatch end module BackoffPolicy - MIN_TIMEOUT_MS = 100 - MAX_TIMEOUT_MS = 10000 - MULTIPLIER = 1.5 + MIN_TIMEOUT_MS = 500 + MAX_TIMEOUT_MS = 60_000 + MULTIPLIER = 2 RANDOMIZATION_FACTOR = 0.5 end end diff --git a/lib/segment/analytics/response.rb b/lib/segment/analytics/response.rb index c31116a..0a2fd2f 100644 --- a/lib/segment/analytics/response.rb +++ b/lib/segment/analytics/response.rb @@ -12,6 +12,10 @@ def initialize(status = 200, error = nil) @status = status @error = error end + + def success? + status >= 200 && status < 300 + end end end end diff --git a/lib/segment/analytics/transport.rb b/lib/segment/analytics/transport.rb index 6ee14d8..e9f4ca6 100644 --- a/lib/segment/analytics/transport.rb +++ b/lib/segment/analytics/transport.rb @@ -16,16 +16,26 @@ class Transport include Segment::Analytics::Utils include Segment::Analytics::Logging + RETRYABLE_4XX = [408, 410, 429, 460].freeze + NON_RETRYABLE_5XX = [501, 505, 511].freeze + def initialize(options = {}) options[:host] ||= HOST options[:port] ||= PORT - options[:ssl] ||= SSL + options[:ssl] ||= SSL @headers = options[:headers] || HEADERS - @path = options[:path] || PATH + @path = options[:path] || PATH @retries = options[:retries] || RETRIES @backoff_policy = options[:backoff_policy] || Segment::Analytics::BackoffPolicy.new + @max_total_backoff_duration = options[:max_total_backoff_duration] || + MAX_TOTAL_BACKOFF_DURATION + @max_rate_limit_duration = options[:max_rate_limit_duration] || + MAX_RATE_LIMIT_DURATION + @rate_limit_retry_after_cap = options[:rate_limit_retry_after_cap] || + RATE_LIMIT_RETRY_AFTER_CAP + http = Net::HTTP.new(options[:host], options[:port]) http.use_ssl = options[:ssl] http.read_timeout = 8 @@ -40,23 +50,68 @@ def initialize(options = {}) def send(write_key, batch) logger.debug("Sending request for #{batch.length} items") - last_response, exception = retry_with_backoff(@retries) do - status_code, body = send_request(write_key, batch) - error = JSON.parse(body)['error'] - should_retry = should_retry_request?(status_code, body) + @backoff_policy.reset! + + retry_count = 0 + retries_remaining = @retries + backoff_start_time = nil + rate_limit_start_time = nil + + loop do + status_code, body, response_headers = send_request(write_key, batch, retry_count) + error = begin + JSON.parse(body)['error'] + rescue StandardError + nil + end logger.debug("Response status code: #{status_code}") logger.debug("Response error: #{error}") if error - [Response.new(status_code, error), should_retry] - end - - if exception - logger.error(exception.message) - exception.backtrace.each { |line| logger.error(line) } - Response.new(-1, exception.to_s) - else - last_response + return Response.new(status_code, error) if success_status?(status_code) + + if status_code == 429 + rate_limit_start_time ||= Time.now + if (Time.now - rate_limit_start_time) >= @max_rate_limit_duration + logger.error('Max rate limit duration exceeded for batch') + return Response.new(status_code, error) + end + + retry_after = parse_retry_after(response_headers['retry-after']) + if retry_after + delay = [retry_after, @rate_limit_retry_after_cap].min + logger.debug("Rate limited with Retry-After: #{delay}s. Retrying after delay.") + sleep(delay) + retry_count += 1 + next + end + end + + unless retryable_status?(status_code) + logger.error(body) + return Response.new(status_code, error) + end + + retries_remaining -= 1 + if retries_remaining <= 0 + logger.error('Retries exhausted for batch') + return Response.new(status_code, error) + end + + backoff_start_time ||= Time.now + if (Time.now - backoff_start_time) >= @max_total_backoff_duration + logger.error('Max total backoff duration exceeded for batch') + return Response.new(status_code, error) + end + + delay_ms = @backoff_policy.next_interval + logger.debug("Retrying request, #{retries_remaining} retries left. Waiting #{delay_ms}ms") + sleep(delay_ms.to_f / 1000) + retry_count += 1 end + rescue StandardError => e + logger.error(e.message) + e.backtrace.each { |line| logger.error(line) } + Response.new(-1, e.to_s) end # Closes a persistent connection if it exists @@ -66,65 +121,52 @@ def shutdown private - def should_retry_request?(status_code, body) - if status_code >= 500 - true # Server error - elsif status_code == 429 - true # Rate limited - elsif status_code >= 400 - logger.error(body) - false # Client error. Do not retry, but log + def success_status?(code) + code >= 200 && code < 300 + end + + def retryable_status?(code) + if code >= 500 && code < 600 + !NON_RETRYABLE_5XX.include?(code) else - false + RETRYABLE_4XX.include?(code) end end - # Takes a block that returns [result, should_retry]. - # - # Retries upto `retries_remaining` times, if `should_retry` is false or - # an exception is raised. `@backoff_policy` is used to determine the - # duration to sleep between attempts - # - # Returns [last_result, raised_exception] - def retry_with_backoff(retries_remaining, &block) - result, caught_exception = nil - should_retry = false - - begin - result, should_retry = yield - return [result, nil] unless should_retry - rescue StandardError => e - should_retry = true - caught_exception = e - end + def parse_retry_after(value) + return nil if value.nil? - if should_retry && (retries_remaining > 1) - logger.debug("Retrying request, #{retries_remaining} retries left") - sleep(@backoff_policy.next_interval.to_f / 1000) - retry_with_backoff(retries_remaining - 1, &block) - else - [result, caught_exception] - end + str = value.is_a?(Array) ? value.first : value + return nil if str.nil? + + str = str.strip + return nil unless str =~ /\A\d+\z/ + + seconds = str.to_i + seconds > 0 ? seconds : nil end - # Sends a request for the batch, returns [status_code, body] - def send_request(write_key, batch) + # Sends a request for the batch, returns [status_code, body, headers] + def send_request(write_key, batch, retry_count = 0) payload = JSON.generate( :sentAt => datetime_in_iso8601(Time.now), :batch => batch ) - request = Net::HTTP::Post.new(@path, @headers) + headers = @headers.dup + headers['X-Retry-Count'] = retry_count.to_s if retry_count > 0 + + request = Net::HTTP::Post.new(@path, headers) request.basic_auth(write_key, nil) if self.class.stub logger.debug "stubbed request to #{@path}: " \ "write key = #{write_key}, batch = #{JSON.generate(batch)}" - [200, '{}'] + [200, '{}', {}] else - @http.start unless @http.started? # Maintain a persistent connection + @http.start unless @http.started? response = @http.request(request, payload) - [response.code.to_i, response.body] + [response.code.to_i, response.body, response.to_hash] end end diff --git a/lib/segment/analytics/worker.rb b/lib/segment/analytics/worker.rb index 6a7d68e..c4bf728 100644 --- a/lib/segment/analytics/worker.rb +++ b/lib/segment/analytics/worker.rb @@ -45,7 +45,7 @@ def run end res = @transport.send @write_key, @batch - @on_error.call(res.status, res.error) unless res.status == 200 + @on_error.call(res.status, res.error) unless res.success? @lock.synchronize { @batch.clear } end diff --git a/spec/segment/analytics/backoff_policy_spec.rb b/spec/segment/analytics/backoff_policy_spec.rb index 25ef05e..7d3a8d2 100644 --- a/spec/segment/analytics/backoff_policy_spec.rb +++ b/spec/segment/analytics/backoff_policy_spec.rb @@ -67,6 +67,28 @@ class Analytics end end + describe '#reset!' do + it 'resets attempts to 0' do + subject.next_interval + subject.next_interval + subject.next_interval + subject.reset! + expect(subject.instance_variable_get(:@attempts)).to eq(0) + end + + it 'causes next_interval to restart from minimum' do + subject_with_params = described_class.new( + min_timeout_ms: 1000, + max_timeout_ms: 10000, + multiplier: 2, + randomization_factor: 0.5 + ) + 3.times { subject_with_params.next_interval } + subject_with_params.reset! + expect(subject_with_params.next_interval).to be_within(500).of(1000) + end + end + describe '#next_interval' do subject { described_class.new( diff --git a/spec/segment/analytics/response_spec.rb b/spec/segment/analytics/response_spec.rb index bb673db..0376e20 100644 --- a/spec/segment/analytics/response_spec.rb +++ b/spec/segment/analytics/response_spec.rb @@ -13,6 +13,18 @@ class Analytics it { expect(subject).to respond_to(:error) } end + describe '#success?' do + it { expect(described_class.new(200, nil).success?).to be true } + it { expect(described_class.new(201, nil).success?).to be true } + it { expect(described_class.new(204, nil).success?).to be true } + it { expect(described_class.new(301, nil).success?).to be true } + it { expect(described_class.new(302, nil).success?).to be true } + it { expect(described_class.new(400, nil).success?).to be false } + it { expect(described_class.new(429, nil).success?).to be false } + it { expect(described_class.new(500, nil).success?).to be false } + it { expect(described_class.new(-1, nil).success?).to be false } + end + describe '#initialize' do let(:status) { 404 } let(:error) { 'Oh No' } diff --git a/spec/segment/analytics/transport_spec.rb b/spec/segment/analytics/transport_spec.rb index 488b3ee..5cd428f 100644 --- a/spec/segment/analytics/transport_spec.rb +++ b/spec/segment/analytics/transport_spec.rb @@ -115,6 +115,7 @@ class Analytics allow(http).to receive(:start) allow(http).to receive(:request) { response } allow(response).to receive(:body) { response_body } + allow(response).to receive(:to_hash) { {} } end it 'initalizes a new Net::HTTP::Post with path and default headers' do @@ -204,6 +205,14 @@ class Analytics end end + context '3xx is treated as success' do + let(:status_code) { 301 } + it 'returns status without retrying' do + expect(subject).not_to receive(:sleep) + expect(subject.send(write_key, batch).status).to eq(301) + end + end + context 'request results in errorful response' do let(:error) { 'this is an error' } let(:response_body) { { error: error }.to_json } @@ -218,27 +227,132 @@ class Analytics it_behaves_like('retried request', 500, '{}') it_behaves_like('retried request', 503, '{}') - # All 4xx errors other than 429 (rate limited) must be retried + # 429 is retried it_behaves_like('retried request', 429, '{}') it_behaves_like('non-retried request', 404, '{}') it_behaves_like('non-retried request', 400, '{}') + + # Non-retryable 5xx: 501, 505, 511 + it_behaves_like('non-retried request', 501, '{}') + it_behaves_like('non-retried request', 505, '{}') + it_behaves_like('non-retried request', 511, '{}') + + # Retryable 4xx: 408, 410, 460 + it_behaves_like('retried request', 408, '{}') + it_behaves_like('retried request', 410, '{}') + it_behaves_like('retried request', 460, '{}') + end + + context '429 with Retry-After header' do + let(:status_code) { 429 } + let(:retry_after_seconds) { 2 } + subject { described_class.new(retries: 4, backoff_policy: FakeBackoffPolicy.new([1000, 1000, 1000])) } + + before do + allow(response).to receive(:to_hash) { { 'retry-after' => [retry_after_seconds.to_s] } } + # Second attempt succeeds + success_response = Net::HTTPResponse.new(1.1, 200, '{}') + allow(success_response).to receive(:body) { '{}' } + allow(success_response).to receive(:to_hash) { {} } + http = subject.instance_variable_get(:@http) + allow(http).to receive(:request).and_return(response, success_response) + end + + it 'sleeps for the Retry-After duration' do + expect(subject).to receive(:sleep).with(2).once + subject.send(write_key, batch) + end + + it 'caps Retry-After at RATE_LIMIT_RETRY_AFTER_CAP' do + allow(response).to receive(:to_hash) { { 'retry-after' => ['9999'] } } + expect(subject).to receive(:sleep).with(described_class::RATE_LIMIT_RETRY_AFTER_CAP).once + subject.send(write_key, batch) + end + + it 'returns success after retry' do + allow(subject).to receive(:sleep) + expect(subject.send(write_key, batch).success?).to be true + end end - context 'request or parsing of response results in an exception' do + context 'X-Retry-Count header' do + let(:status_code) { 500 } + let(:backoff_policy) { FakeBackoffPolicy.new([1, 1]) } + subject { described_class.new(retries: 3, backoff_policy: backoff_policy) } + + it 'does not send X-Retry-Count on first attempt' do + allow(subject).to receive(:sleep) + first_request = nil + http = subject.instance_variable_get(:@http) + allow(http).to receive(:request) do |req, _| + first_request ||= req + response + end + subject.send(write_key, batch) + expect(first_request['X-Retry-Count']).to be_nil + end + + it 'sends X-Retry-Count incrementing on retries' do + allow(subject).to receive(:sleep) + requests = [] + http = subject.instance_variable_get(:@http) + allow(http).to receive(:request) do |req, _| + requests << req + response + end + subject.send(write_key, batch) + expect(requests[1]['X-Retry-Count']).to eq('1') + expect(requests[2]['X-Retry-Count']).to eq('2') + end + end + + context 'private helpers' do + describe '#success_status?' do + it { expect(subject.send(:success_status?, 200)).to be true } + it { expect(subject.send(:success_status?, 201)).to be true } + it { expect(subject.send(:success_status?, 301)).to be true } + it { expect(subject.send(:success_status?, 400)).to be false } + it { expect(subject.send(:success_status?, 500)).to be false } + end + + describe '#retryable_status?' do + it { expect(subject.send(:retryable_status?, 500)).to be true } + it { expect(subject.send(:retryable_status?, 503)).to be true } + it { expect(subject.send(:retryable_status?, 429)).to be true } + it { expect(subject.send(:retryable_status?, 408)).to be true } + it { expect(subject.send(:retryable_status?, 410)).to be true } + it { expect(subject.send(:retryable_status?, 460)).to be true } + it { expect(subject.send(:retryable_status?, 400)).to be false } + it { expect(subject.send(:retryable_status?, 404)).to be false } + it { expect(subject.send(:retryable_status?, 501)).to be false } + it { expect(subject.send(:retryable_status?, 505)).to be false } + it { expect(subject.send(:retryable_status?, 511)).to be false } + end + + describe '#parse_retry_after' do + it { expect(subject.send(:parse_retry_after, '60')).to eq(60) } + it { expect(subject.send(:parse_retry_after, ['60'])).to eq(60) } + it { expect(subject.send(:parse_retry_after, '0')).to be_nil } + it { expect(subject.send(:parse_retry_after, '-1')).to be_nil } + it { expect(subject.send(:parse_retry_after, nil)).to be_nil } + it { expect(subject.send(:parse_retry_after, '')).to be_nil } + it { expect(subject.send(:parse_retry_after, 'Wed, 07 May 2026 12:00:00 GMT')).to be_nil } + end + end + + context 'response body is malformed JSON but status is 200' do let(:response_body) { 'Malformed JSON ---' } subject { described_class.new(retries: 0) } - it 'returns a -1 for status' do - expect(subject.send(write_key, batch).status).to eq(-1) + it 'treats 200 as success regardless of body' do + expect(subject.send(write_key, batch).status).to eq(200) end - it 'has a connection error' do + it 'has nil error when body is unparseable' do error = subject.send(write_key, batch).error - expect(error).not_to be_nil + expect(error).to be_nil end - - it_behaves_like('retried request', 200, 'Malformed JSON ---') end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 8dc8634..8b255e2 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -114,6 +114,8 @@ def next_interval raise 'FakeBackoffPolicy has no values left' if @interval_values.empty? @interval_values.shift end + + def reset!; end end # usage: