Skip to content

Commit

Permalink
Merge pull request #49 from kapost/bug/sns-tcp-connection-error
Browse files Browse the repository at this point in the history
Updated publish to retry on expected connection errors
  • Loading branch information
mhuggins committed Mar 23, 2016
2 parents 98417bc + 1cf4b65 commit 96a7e52
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 5 deletions.
20 changes: 15 additions & 5 deletions lib/circuitry/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ class Publisher
timeout: 15
}.freeze

CONNECTION_ERRORS = [
Seahorse::Client::NetworkingError
].freeze

attr_reader :timeout

def initialize(options = {})
Expand All @@ -32,9 +36,9 @@ def publish(topic_name, object)
message = object.to_json

if async?
process_asynchronously { publish_internal(topic_name, message) }
process_asynchronously { publish_message(topic_name, message) }
else
publish_internal(topic_name, message)
publish_message(topic_name, message)
end
end

Expand All @@ -44,15 +48,21 @@ def self.default_async_strategy

protected

def publish_internal(topic_name, message)
def publish_message(topic_name, message)
middleware.invoke(topic_name, message) do
# TODO: Don't use ruby timeout.
# http://www.mikeperham.com/2015/05/08/timeout-rubys-most-dangerous-api/
Timeout.timeout(timeout) do
logger.info("Publishing message to #{topic_name}")

topic = Topic.find(topic_name)
sns.publish(topic_arn: topic.arn, message: message)
handler = ->(error, attempt_number, _total_delay) do
logger.warn("Error publishing attempt ##{attempt_number}: #{error.class} (#{error.message}); retrying...")
end

with_retries(max_tries: 3, handler: handler, rescue: CONNECTION_ERRORS, base_sleep_seconds: 0, max_sleep_seconds: 0) do
topic = Topic.find(topic_name)
sns.publish(topic_arn: topic.arn, message: message)
end
end
end
end
Expand Down
50 changes: 50 additions & 0 deletions spec/circuitry/publisher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,56 @@
subject.publish(topic_name, object)
expect(mock_sns).to have_received(:publish).with(topic_arn: topic.arn, message: object.to_json)
end

describe 'when a connection error occurs' do
let(:error) { Seahorse::Client::NetworkingError.new(StandardError.new('test error')) }

describe 'on the first try' do
before do
attempts = 0

allow(mock_sns).to receive(:publish) do
attempts += 1
raise error if attempts == 1
end
end

it 'logs a warning' do
subject.publish(topic_name, object)
expect(logger).to have_received(:warn).with("Error publishing attempt #1: #{error.class} (test error); retrying...")
end

it 'retries' do
subject.publish(topic_name, object)
expect(mock_sns).to have_received(:publish).with(topic_arn: topic.arn, message: object.to_json).twice
end

it 'does not raise the error' do
expect { subject.publish(topic_name, object) }.to_not raise_error
end
end

describe 'repeatedly' do
before do
allow(mock_sns).to receive(:publish).and_raise(error)
end

it 'logs 2 warnings' do
subject.publish(topic_name, object) rescue nil
expect(logger).to have_received(:warn).with("Error publishing attempt #1: #{error.class} (test error); retrying...")
expect(logger).to have_received(:warn).with("Error publishing attempt #2: #{error.class} (test error); retrying...")
end

it 'gives up after 3 tries' do
subject.publish(topic_name, object) rescue nil
expect(mock_sns).to have_received(:publish).with(topic_arn: topic.arn, message: object.to_json).thrice
end

it 'raises the error' do
expect { subject.publish(topic_name, object) }.to raise_error(error.class)
end
end
end
end

describe 'synchonously' do
Expand Down

0 comments on commit 96a7e52

Please sign in to comment.