diff --git a/README.md b/README.md index 2698bfc5..986f1886 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,7 @@ Creates a new SQS consumer. * `batchSize` - _Number_ - The number of messages to request from SQS when polling (default `1`). This cannot be higher than the AWS limit of 10. * `visibilityTimeout` - _Number_ - The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request. * `waitTimeSeconds` - _Number_ - The duration (in seconds) for which the call will wait for a message to arrive in the queue before returning. +* `authenticationErrorTimeout` - _Number_ - The duration (in milliseconds) to wait before retrying after an authentication error (defaults to `10000`). * `sqs` - _Object_ - An optional [AWS SQS](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html) object to use if you need to configure the client manually ### `consumer.start()` diff --git a/index.js b/index.js index 9425299e..ce6a0eb8 100644 --- a/index.js +++ b/index.js @@ -32,7 +32,7 @@ function validate(options) { } function isAuthenticationError(err) { - return (err.statusCode === 403 || err.code === 'CredentialsError'); + return (err.statusCode === 403 || err.code === 'CredentialsError'); } /** @@ -59,6 +59,7 @@ function Consumer(options) { this.batchSize = options.batchSize || 1; this.visibilityTimeout = options.visibilityTimeout; this.waitTimeSeconds = options.waitTimeSeconds || 20; + this.authenticationErrorTimeout = options.authenticationErrorTimeout || 10000; this.sqs = options.sqs || new AWS.SQS({ region: options.region || 'eu-west-1' @@ -116,15 +117,7 @@ Consumer.prototype._handleSqsResponse = function (err, response) { var consumer = this; if (err) { - this.emit('error', new SQSError('SQS receive message failed: ' + err.message)); - if (isAuthenticationError(err)) { - debug('Pause polling for 10 seconds, please fix the credentials error'); - setTimeout(function() { - debug('Start polling again, hopefully the credentials problem is fixed'); - consumer.start(); - }, 10000); - return this.stop(); - } + this.emit('error', new SQSError('SQS receive message failed: ' + err.message)); } debug('Received SQS response'); @@ -135,6 +128,10 @@ Consumer.prototype._handleSqsResponse = function (err, response) { // start polling again once all of the messages have been processed consumer._poll(); }); + } else if (err && isAuthenticationError(err)) { + // there was an authentication error, so wait a bit before repolling + debug('There was an authentication error. Pausing before retrying.'); + setTimeout(this._poll.bind(this), this.authenticationErrorTimeout); } else { // there were no messages, so start polling again this._poll(); diff --git a/test/index.js b/test/index.js index 3071d727..5c5c8652 100644 --- a/test/index.js +++ b/test/index.js @@ -26,7 +26,8 @@ describe('Consumer', function () { queueUrl: 'some-queue-url', region: 'some-region', handleMessage: handleMessage, - sqs: sqs + sqs: sqs, + authenticationErrorTimeout: 20 }); }); @@ -128,35 +129,43 @@ describe('Consumer', function () { consumer.start(); }); - it('fires an error event when a credentials error occurs', function (done) { - var credentialsErr = { code : 'CredentialsError', message: 'Missing credentials in config'}; + it('waits before repolling when a credentials error occurs', function (done) { + var credentialsErr = { + code: 'CredentialsError', + message: 'Missing credentials in config' + }; + sqs.receiveMessage.yields(credentialsErr); - consumer.on('error', function (err) { - assert.ok(err); - assert.equal(err.message, 'SQS receive message failed: ' + credentialsErr.message); - sinon.assert.calledOnce(sqs.receiveMessage); + consumer.on('error', function () { setTimeout(function () { - sinon.assert.notCalled(handleMessage); - done(); + sinon.assert.calledOnce(sqs.receiveMessage); }, 10); + setTimeout(function () { + sinon.assert.calledTwice(sqs.receiveMessage); + done(); + }, 30); }); consumer.start(); }); - it('fires an error event when an invalid signature error occurs', function (done) { - var invalidSignatureErr = { statusCode : 403, message: 'The security token included in the request is invalid'}; + it('waits before repolling when a 403 error occurs', function (done) { + var invalidSignatureErr = { + statusCode: 403, + message: 'The security token included in the request is invalid' + }; + sqs.receiveMessage.yields(invalidSignatureErr); - consumer.on('error', function (err) { - assert.ok(err); - assert.equal(err.message, 'SQS receive message failed: ' + invalidSignatureErr.message); - sinon.assert.calledOnce(sqs.receiveMessage); + consumer.on('error', function () { setTimeout(function () { - sinon.assert.notCalled(handleMessage); - done(); + sinon.assert.calledOnce(sqs.receiveMessage); }, 10); + setTimeout(function () { + sinon.assert.calledTwice(sqs.receiveMessage); + done(); + }, 30); }); consumer.start();