spring-cloud-stream-binder-sqs lets you use Spring Cloud Stream with the AWS Simple Queue Service (SQS).
<dependencies>
<dependency>
<groupId>de.idealo.spring</groupId>
<artifactId>spring-cloud-stream-binder-sqs</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
spring-cloud-stream-binder-sqs | spring-boot | spring-cloud-aws | spring-cloud | aws sdk | java compiler/runtime |
---|---|---|---|---|---|
1.9.0 | 2.7.x | 2.4.x | 2021.0.5 | 1.x | 8 |
3.0.0 | 3.1.x | 3.0.x | 2022.0.3 | 2.x | 17 |
Changes in 3.0.0:
- removed consumer configuration for messageDeletionPolicy: the default behaviour is now that Messages will be acknowledged when message processing is successful.
- renamed consumer configuration for maxNumberOfMessages to maxMessagesPerPoll to align with the naming in spring-cloud-aws-sqs. The old property is deprecated but still supported for now.
- renamed consumer configuration for waitTimeout to pollTimeout to align with the naming in spring-cloud-aws-sqs. The old property is deprecated but still supported for now.
- renamed consumer configuration for queueStopTimeout to listenerShutdownTimeout to align with the naming in spring-cloud-aws-sqs. The old property is deprecated but still supported for now.
With the library in your dependencies you can configure your Spring Cloud Stream bindings as usual. The type name for
this binder is sqs
. The destination needs to match the queue name, the specific ARN will be looked up from the
available queue in the account.
You may also provide additional configuration options:
- Consumers
- maxMessagesPerPoll - Maximum number of messages to retrieve with one poll to SQS. Must be a number between 1 and 10.
- visibilityTimeout - The duration in seconds that polled messages are hidden from subsequent poll requests after having been retrieved.
- pollTimeout - The duration in seconds that the system will wait for new messages to arrive when polling. Uses the Amazon SQS long polling feature. The value should be between 1 and 20.
- listenerShutdownTimeout - The number of milliseconds that the queue worker is given to gracefully finish its work on shutdown before interrupting the current thread. Default value is 10 seconds.
- snsFanout - Whether the incoming message has the SNS format and should be deserialized automatically. Defaults to true.
Example Configuration:
spring:
cloud:
stream:
sqs:
bindings:
someFunction-in-0:
consumer:
snsFanout: false
bindings:
someFunction-in-0:
destination: input-queue-name
someFunction-out-0:
destination: output-queue-name
You may also provide your own beans of SqsAsyncClient
to override those that are created
by spring-cloud-aws-autoconfigure.
To use FIFO SQS queues
you will need to provide a deduplication id and a group id.
With this binder you may set these using the message headers SqsHeaders.GROUP_ID
and SqsHeaders.DEDUPLICATION_ID
.
The example below shows how you could use a FIFO queue in real life.
Example Configuration:
spring:
cloud:
stream:
bindings:
someFunction-in-0:
destination: input-queue-name
someFunction-out-0:
destination: output-queue-name.fifo
class Application {
@Bean
public Message<String> someFunction(String input) {
return MessageBuilder.withPayload(input)
.setHeader(SqsHeaders.GROUP_ID, "my-application")
.setHeader(SqsHeaders.DEDUPLICATION_ID, UUID.randomUUID())
.build();
}
}
Consumers in the SQS binder support the Spring Cloud Stream concurrency
property.
By specifying a value you will launch concurrency
threads continuously polling for maxNumberOfMessages
each.
The threads will process all messages asynchronously, but each thread will wait for its current batch of messages to all
complete processing before retrieving new ones.
If your message processing is highly variable from message to message it is recommended to set a lower value
for maxNumberOfMessages
and a higher value for concurrency
.
Note that this will increase the amount of API calls done against SQS.
Example Configuration:
spring:
cloud:
stream:
sqs:
bindings:
someFunction-in-0:
consumer:
maxMessagesPerPoll: 5
bindings:
someFunction-in-0:
destination: input-queue-name
consumer:
concurrency: 10