Skip to content

Commit

Permalink
Make auth error timeout configurable. Update tests to ensure repollin…
Browse files Browse the repository at this point in the history
…g happens.
  • Loading branch information
robinjmurphy committed Jan 26, 2016
1 parent 8eb7492 commit cb58b95
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 27 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ Creates a new SQS consumer.
* `batchSize` - _Number_ - The number of messages to request from SQS when polling (default `1`). This cannot be higher than the AWS limit of 10.
* `visibilityTimeout` - _Number_ - The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request.
* `waitTimeSeconds` - _Number_ - The duration (in seconds) for which the call will wait for a message to arrive in the queue before returning.
* `authenticationErrorTimeout` - _Number_ - The duration (in milliseconds) to wait before retrying after an authentication error (defaults to `10000`).
* `sqs` - _Object_ - An optional [AWS SQS](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html) object to use if you need to configure the client manually

### `consumer.start()`
Expand Down
17 changes: 7 additions & 10 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ function validate(options) {
}

function isAuthenticationError(err) {
return (err.statusCode === 403 || err.code === 'CredentialsError');
return (err.statusCode === 403 || err.code === 'CredentialsError');
}

/**
Expand All @@ -59,6 +59,7 @@ function Consumer(options) {
this.batchSize = options.batchSize || 1;
this.visibilityTimeout = options.visibilityTimeout;
this.waitTimeSeconds = options.waitTimeSeconds || 20;
this.authenticationErrorTimeout = options.authenticationErrorTimeout || 10000;

this.sqs = options.sqs || new AWS.SQS({
region: options.region || 'eu-west-1'
Expand Down Expand Up @@ -116,15 +117,7 @@ Consumer.prototype._handleSqsResponse = function (err, response) {
var consumer = this;

if (err) {
this.emit('error', new SQSError('SQS receive message failed: ' + err.message));
if (isAuthenticationError(err)) {
debug('Pause polling for 10 seconds, please fix the credentials error');
setTimeout(function() {
debug('Start polling again, hopefully the credentials problem is fixed');
consumer.start();
}, 10000);
return this.stop();
}
this.emit('error', new SQSError('SQS receive message failed: ' + err.message));
}

debug('Received SQS response');
Expand All @@ -135,6 +128,10 @@ Consumer.prototype._handleSqsResponse = function (err, response) {
// start polling again once all of the messages have been processed
consumer._poll();
});
} else if (err && isAuthenticationError(err)) {
// there was an authentication error, so wait a bit before repolling
debug('There was an authentication error. Pausing before retrying.');
setTimeout(this._poll.bind(this), this.authenticationErrorTimeout);
} else {
// there were no messages, so start polling again
this._poll();
Expand Down
43 changes: 26 additions & 17 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ describe('Consumer', function () {
queueUrl: 'some-queue-url',
region: 'some-region',
handleMessage: handleMessage,
sqs: sqs
sqs: sqs,
authenticationErrorTimeout: 20
});
});

Expand Down Expand Up @@ -128,35 +129,43 @@ describe('Consumer', function () {
consumer.start();
});

it('fires an error event when a credentials error occurs', function (done) {
var credentialsErr = { code : 'CredentialsError', message: 'Missing credentials in config'};
it('waits before repolling when a credentials error occurs', function (done) {
var credentialsErr = {
code: 'CredentialsError',
message: 'Missing credentials in config'
};

sqs.receiveMessage.yields(credentialsErr);

consumer.on('error', function (err) {
assert.ok(err);
assert.equal(err.message, 'SQS receive message failed: ' + credentialsErr.message);
sinon.assert.calledOnce(sqs.receiveMessage);
consumer.on('error', function () {
setTimeout(function () {
sinon.assert.notCalled(handleMessage);
done();
sinon.assert.calledOnce(sqs.receiveMessage);
}, 10);
setTimeout(function () {
sinon.assert.calledTwice(sqs.receiveMessage);
done();
}, 30);
});

consumer.start();
});

it('fires an error event when an invalid signature error occurs', function (done) {
var invalidSignatureErr = { statusCode : 403, message: 'The security token included in the request is invalid'};
it('waits before repolling when a 403 error occurs', function (done) {
var invalidSignatureErr = {
statusCode: 403,
message: 'The security token included in the request is invalid'
};

sqs.receiveMessage.yields(invalidSignatureErr);

consumer.on('error', function (err) {
assert.ok(err);
assert.equal(err.message, 'SQS receive message failed: ' + invalidSignatureErr.message);
sinon.assert.calledOnce(sqs.receiveMessage);
consumer.on('error', function () {
setTimeout(function () {
sinon.assert.notCalled(handleMessage);
done();
sinon.assert.calledOnce(sqs.receiveMessage);
}, 10);
setTimeout(function () {
sinon.assert.calledTwice(sqs.receiveMessage);
done();
}, 30);
});

consumer.start();
Expand Down

1 comment on commit cb58b95

@diorahman
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

Please sign in to comment.