Skip to content

Commit

Permalink
Merge pull request #97 from jessegranger/master
Browse files Browse the repository at this point in the history
Emit 'response_processed' event once per batch
  • Loading branch information
niklasR authored Oct 31, 2017
2 parents ccf49d3 + 722d4f2 commit 0a01e6e
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ Each consumer is an [`EventEmitter`](http://nodejs.org/api/events.html) and emit
|`processing_error`|`err`, `message`|Fired when an error occurs processing the message.|
|`message_received`|`message`|Fired when a message is received.|
|`message_processed`|`message`|Fired when a message is successfully processed and removed from the queue.|
|`response_processed`|None|Fired after one batch of items (up to `batchSize`) has been successfully processed.|
|`stopped`|None|Fired when the consumer finally stops its work.|
|`empty`|None|Fired when the queue is empty (All messages have been consumed).|

Expand Down
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ Consumer.prototype._handleSqsResponse = function (err, response) {
if (response && response.Messages && response.Messages.length > 0) {
async.each(response.Messages, this._processMessageBound, function () {
// start polling again once all of the messages have been processed
consumer.emit('response_processed');
consumer._poll();
});
} else if (response && !response.Messages) {
Expand Down
31 changes: 31 additions & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,37 @@ describe('Consumer', function () {

consumer.start();
});

it('fires response_processed event for each batch', function (done) {
sqs.receiveMessage.yieldsAsync(null, {
Messages: [
{
ReceiptHandle: 'receipt-handle-1',
MessageId: '1',
Body: 'body-1'
},
{
ReceiptHandle: 'receipt-handle-2',
MessageId: '2',
Body: 'body-2'
}
]
});
handleMessage.yields(null);

consumer = new Consumer({
queueUrl: 'some-queue-url',
messageAttributeNames: ['attribute-1', 'attribute-2'],
region: 'some-region',
handleMessage: handleMessage,
batchSize: 2,
sqs: sqs
});

consumer.on('response_processed', done);
consumer.start();

});
});

describe('.stop', function () {
Expand Down

0 comments on commit 0a01e6e

Please sign in to comment.