diff --git a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md index 9e2d5428d0d69..01c3d0e1e17c2 100644 --- a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md @@ -5,6 +5,7 @@ ### Features Added - Setting the v2 stack as the default for "Synchronous Receiver Client". +- Added the support for batch delete APIs in `ServiceBusReceiverClient` and `ServiceBusReceiverAsyncClient`. ### Breaking Changes diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusErrorSource.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusErrorSource.java index ccdb31ca2da9a..bc4673e3d75aa 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusErrorSource.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusErrorSource.java @@ -16,6 +16,9 @@ public final class ServiceBusErrorSource extends ExpandableStringEnum renewalContainer; @@ -1460,6 +1468,146 @@ public Mono rollbackTransaction(ServiceBusTransactionContext transactionCo } + /** + * Deletes up to {@code messageCount} messages from the entity enqueued before {@link OffsetDateTime#now()}. + * The actual number of deleted messages may be less if there are fewer eligible messages in the entity. + *

If the lock for a message is held by a receiver, it will be respected and the message will not be deleted.

+ *

You can delete a maximum of 4000 messages in a single API call, this is the current limit determined by + * the Service Bus service.

+ * + * @param messageCount the desired number of messages to delete. + * @return a {@link Mono} indicating the number of messages deleted. + * + * @throws IllegalArgumentException when the {@code messageCount} is less than 1 or exceeds the maximum allowed, as + * determined by the Service Bus service. + */ + public Mono deleteMessages(int messageCount) { + return deleteMessages(messageCount, new DeleteMessagesOptions(), receiverOptions.getSessionId()); + } + + /** + * Deletes up to {@code messageCount} messages from the entity. The actual number of deleted messages may be less + * if there are fewer eligible messages in the entity. + *

If the lock for a message is held by a receiver, it will be respected and the message will not be deleted.

+ *

You can delete a maximum of 4000 messages in a single API call, this is the current limit determined by + * the Service Bus service.

+ * + * @param messageCount the desired number of messages to delete. + * @param options options used to delete the messages. + * @return a {@link Mono} indicating the number of messages deleted. + * + * @throws IllegalArgumentException when the {@code messageCount} is less than 1 or exceeds the maximum allowed, as + * determined by the Service Bus service. + * @throws NullPointerException if {@code options} is null. + */ + public Mono deleteMessages(int messageCount, DeleteMessagesOptions options) { + return deleteMessages(messageCount, options, receiverOptions.getSessionId()); + } + + /** + * Attempts to purge all messages from an entity. + *

If the lock for a message is held by a receiver, it will be respected and the message will not be deleted.

+ *

+ * This method may invoke multiple service requests to delete all messages. Because multiple service requests may be + * made, the possibility of partial success exists, in such scenario, the method will stop attempting to delete + * additional messages and throw the exception that was encountered. Also, due to the multiple service requests, + * purge operation may take longer if there are a lot of messages to delete. + *

+ *

+ * The api will purge all the messages enqueued before {@link OffsetDateTime#now()} UTC. + *

+ * + * @return a {@link Mono} indicating the number of messages deleted. + */ + public Mono purgeMessages() { + return purgeMessages(new PurgeMessagesOptions()); + } + + /** + * Attempts to purge all messages from an entity. + *

If the lock for a message is held by a receiver, it will be respected and the message will not be deleted.

+ *

+ * This method may invoke multiple service requests to delete all messages. Because multiple service requests may be + * made, the possibility of partial success exists, in such scenario, the method will stop attempting to delete + * additional messages and throw the exception that was encountered. Also, due to the multiple service requests, + * purge operation may take longer if there are a lot of messages to delete. + *

+ * + * @param options options used to purge the messages, application may specify a UTC timestamp in the options + * indicating only purge messages enqueued before that time, if such a cut of time is not specified then + * {@link OffsetDateTime#now()} UTC will be used. + * + * @return a {@link Mono} indicating the number of messages deleted. + * @throws NullPointerException if {@code options} is null. + */ + public Mono purgeMessages(PurgeMessagesOptions options) { + if (Objects.isNull(options)) { + return monoError(LOGGER, new NullPointerException("'options' cannot be null.")); + } + final DeleteMessagesOptions deleteMessagesOptions; + if (options.getBeforeEnqueueTimeUtc() != null) { + deleteMessagesOptions = new DeleteMessagesOptions() + .setBeforeEnqueueTimeUtc(options.getBeforeEnqueueTimeUtc()); + } else { + deleteMessagesOptions = new DeleteMessagesOptions() + .setBeforeEnqueueTimeUtc(OffsetDateTime.now(ZoneOffset.UTC)); + } + return deleteMessages(MAX_DELETE_MESSAGES_COUNT, deleteMessagesOptions, receiverOptions.getSessionId()) + .expand(deleted -> { + // The service currently has a known bug that should be fixed before GA, where the delete operation may + // not delete the requested batch size in a single call, even when there are enough messages to do so. + // This logic should check "deleted == MAX_DELETE_MESSAGES_COUNT". Until this is fixed, we'll need to + // loop if there were any messages deleted, which will cost an extra service call. + if (deleted > 0) { + return deleteMessages(MAX_DELETE_MESSAGES_COUNT, deleteMessagesOptions, receiverOptions.getSessionId()); + } else { + return Mono.empty(); + } + }) + .reduce(0, Integer::sum); + } + + /** + * Deletes up to {@code messageCount} messages from the entity. The actual number of deleted messages may be less + * if there are fewer eligible messages in the entity. + * + *

If the lock for a message is held by a receiver, it will be respected and the message will not be deleted.

+ * + * @param messageCount the desired number of messages to delete. + * @param options options used to delete the messages. + * @param sessionId Session id of the messages to delete from. {@code null} if there is no session. + * @return a {@link Mono} indicating the number of messages deleted. + * + * @throws IllegalArgumentException when the {@code messageCount} is less than 1 or exceeds the maximum allowed, as + * determined by the Service Bus service. + */ + private Mono deleteMessages(int messageCount, DeleteMessagesOptions options, String sessionId) { + if (isDisposed.get()) { + return monoError(LOGGER, new IllegalStateException( + String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "deleteMessages"))); + } + if (Objects.isNull(options)) { + return monoError(LOGGER, new NullPointerException("'options' cannot be null.")); + } + if (messageCount <= 0 || messageCount > MAX_DELETE_MESSAGES_COUNT) { + final String message = "'messageCount' must be a positive number and less than " + MAX_DELETE_MESSAGES_COUNT; + return monoError(LOGGER, new IllegalArgumentException(message)); + } + final OffsetDateTime beforeEnqueueTimeUtc; + if (options.getBeforeEnqueueTimeUtc() != null) { + beforeEnqueueTimeUtc = options.getBeforeEnqueueTimeUtc(); + } else { + beforeEnqueueTimeUtc = OffsetDateTime.now(ZoneOffset.UTC); + } + final Mono deleteMessages = connectionProcessor + .flatMap(connection -> connection.getManagementNode(entityPath, entityType)) + .flatMap(managementNode -> managementNode.deleteMessages(messageCount, beforeEnqueueTimeUtc, getLinkName(sessionId), sessionId)); + + return withRetry(deleteMessages, connectionCacheWrapper.getRetryOptions(), + "Deleting messages timed out.", false) + .onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.DELETE_MESSAGES)); + } + /** * Disposes of the consumer by closing the underlying links to the service. */ diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java index cfe46babcda85..54aecb135d221 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -12,6 +12,8 @@ import com.azure.messaging.servicebus.models.CompleteOptions; import com.azure.messaging.servicebus.models.DeadLetterOptions; import com.azure.messaging.servicebus.models.DeferOptions; +import com.azure.messaging.servicebus.models.DeleteMessagesOptions; +import com.azure.messaging.servicebus.models.PurgeMessagesOptions; import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; @@ -843,6 +845,82 @@ public void rollbackTransaction(ServiceBusTransactionContext transactionContext) asyncClient.rollbackTransaction(transactionContext).block(operationTimeout); } + /** + * Deletes up to {@code messageCount} messages from the entity enqueued before {@link OffsetDateTime#now()}. + * The actual number of deleted messages may be less if there are fewer eligible messages in the entity. + *

If the lock for a message is held by a receiver, it will be respected and the message will not be deleted.

+ *

You can delete a maximum of 4000 messages in a single API call, this is the current limit determined by + * the Service Bus service.

+ * + * @param messageCount the desired number of messages to delete. + * @return the number of messages deleted. + * + * @throws IllegalArgumentException when the {@code messageCount} is less than 1 or exceeds the maximum allowed, as + * determined by the Service Bus service. + */ + public int deleteMessages(int messageCount) { + return deleteMessages(messageCount, new DeleteMessagesOptions()); + } + + /** + * Deletes up to {@code messageCount} messages from the entity. The actual number of deleted messages may be less + * if there are fewer eligible messages in the entity. + *

If the lock for a message is held by a receiver, it will be respected and the message will not be deleted.

+ *

You can delete a maximum of 4000 messages in a single API call, this is the current limit determined by + * the Service Bus service.

+ * + * @param messageCount the desired number of messages to delete. + * @param options options used to delete the messages. + * @return the number of messages deleted. + * + * @throws IllegalArgumentException when the {@code messageCount} is less than 1 or exceeds the maximum allowed, as + * determined by the Service Bus service. + * @throws NullPointerException if {@code options} is null. + */ + public int deleteMessages(int messageCount, DeleteMessagesOptions options) { + return asyncClient.deleteMessages(messageCount, options).block(); + } + + /** + * Attempts to purge all messages from an entity. + *

If the lock for a message is held by a receiver, it will be respected and the message will not be deleted.

+ *

+ * This method may invoke multiple service requests to delete all messages. Because multiple service requests may be + * made, the possibility of partial success exists, in such scenario, the method will stop attempting to delete + * additional messages and throw the exception that was encountered. Also, due to the multiple service requests, + * purge operation may take longer if there are a lot of messages to delete. + *

+ *

+ * The api will purge all the messages enqueued before {@link OffsetDateTime#now()} UTC. + *

+ * + * @return the number of messages deleted. + */ + public int purgeMessages() { + return purgeMessages(new PurgeMessagesOptions()); + } + + /** + * Attempts to purge all messages from an entity. + *

If the lock for a message is held by a receiver, it will be respected and the message will not be deleted.

+ *

+ * This method may invoke multiple service requests to delete all messages. Because multiple service requests may be + * made, the possibility of partial success exists, in such scenario, the method will stop attempting to delete + * additional messages and throw the exception that was encountered. Also, due to the multiple service requests, + * purge operation may take longer if there are a lot of messages to delete. + *

+ * + * @param options options used to purge the messages, application may specify a UTC timestamp in the options + * indicating only purge messages enqueued before that time, if such a cut of time is not specified then + * {@link OffsetDateTime#now()} UTC will be used. + * + * @return the number of messages deleted. + * @throws NullPointerException if {@code options} is null. + */ + public int purgeMessages(PurgeMessagesOptions options) { + return asyncClient.purgeMessages(options).block(); + } + /** * Disposes of the consumer by closing the underlying links to the service. */ diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementChannel.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementChannel.java index 8e88884d9ad00..2fff348aa7228 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementChannel.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementChannel.java @@ -56,6 +56,7 @@ import static com.azure.core.util.FluxUtil.fluxError; import static com.azure.core.util.FluxUtil.monoError; import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_ADD_RULE; +import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_DELETE_MESSAGES; import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_GET_RULES; import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_GET_SESSION_STATE; import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_PEEK; @@ -533,6 +534,58 @@ public Flux listRules() { }); } + /** + * {@inheritDoc} + */ + @Override + public Mono deleteMessages(int messageCount, OffsetDateTime beforeEnqueueTimeUtc, + String associatedLinkName, String sessionId) { + return isAuthorized(OPERATION_DELETE_MESSAGES).then(createChannel.flatMap(channel -> { + final Message message = createManagementMessage(OPERATION_DELETE_MESSAGES, associatedLinkName); + final Map body = new HashMap<>(3); + body.put(ManagementConstants.MESSAGE_COUNT_KEY, messageCount); + body.put(ManagementConstants.ENQUEUED_TIME_UTC, Date.from(beforeEnqueueTimeUtc.toInstant())); + if (!CoreUtils.isNullOrEmpty(sessionId)) { + body.put(ManagementConstants.SESSION_ID, sessionId); + } + message.setBody(new AmqpValue(body)); + return sendWithVerify(channel, message, null); + })).flatMap(response -> { + AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(response); + if (statusCode == AmqpResponseCode.OK) { + if (!(response.getBody() instanceof AmqpValue)) { + return monoError(logger, new AmqpException(false, + String.format("Expected response body of type AmqpValue not found %s", response.getBody()), + getErrorContext())); + } + final AmqpValue body = (AmqpValue) response.getBody(); + if (!(body.getValue() instanceof Map)) { + return monoError(logger, new AmqpException(false, + String.format("Expected value of type Map not found in the response body %s", body.getValue()), + getErrorContext())); + } + @SuppressWarnings("unchecked") final Map map = (Map) body.getValue(); + if (!map.containsKey("message-count")) { + return monoError(logger, new AmqpException(false, + String.format("Expected message-count entry not found in the map %s", map), + getErrorContext())); + } + final int count = map.get("message-count"); + return Mono.just(count); + } else if (statusCode == AmqpResponseCode.NO_CONTENT) { + return Mono.just(0); + } else if (statusCode == AmqpResponseCode.NOT_FOUND) { + final String errorCondition = RequestResponseUtils.getErrorCondition(response); + final AmqpErrorCondition amqpErrorCondition = AmqpErrorCondition.fromString(errorCondition); + if (amqpErrorCondition == AmqpErrorCondition.MESSAGE_NOT_FOUND) { + return Mono.just(0); + } + } + return monoError(logger, + new IllegalStateException("Unexpected state when deleting messages.")); + }); + } + /** * {@inheritDoc} */ diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementConstants.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementConstants.java index 825dd13e8fcbd..be06578d5df83 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementConstants.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementConstants.java @@ -44,6 +44,7 @@ public class ManagementConstants { public static final String STATUS_CODE = "statusCode"; public static final String LEGACY_STATUS_CODE = "status-code"; public static final String RULES = "rules"; + public static final String ENQUEUED_TIME_UTC = "enqueued-time-utc"; // Operation name key. static final String MANAGEMENT_OPERATION_KEY = "operation"; @@ -61,6 +62,7 @@ public class ManagementConstants { static final String OPERATION_ADD_RULE = AmqpConstants.VENDOR + ":add-rule"; static final String OPERATION_REMOVE_RULE = AmqpConstants.VENDOR + ":remove-rule"; static final String OPERATION_GET_RULES = AmqpConstants.VENDOR + ":enumerate-rules"; + static final String OPERATION_DELETE_MESSAGES = AmqpConstants.VENDOR + ":batch-delete-messages"; static final String SERVER_TIMEOUT = AmqpConstants.VENDOR + ":server-timeout"; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementNode.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementNode.java index 1ec1f222cc5f0..6bff7a7ce65f8 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementNode.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementNode.java @@ -148,6 +148,20 @@ Mono updateDisposition(String lockToken, DispositionStatus dispositionStat */ Flux listRules(); + /** + * Deletes up to {@code messageCount} number of messages from the entity. Only messages that were added to the queue + * prior to {@code beforeEnqueueTimeUtc} will be deleted. The actual number of deleted messages may be less if there + * are fewer eligible messages in the entity. + * + * @param messageCount the desired number of messages to delete. + * @param beforeEnqueueTimeUtc representing the cutoff time for deletion. Only messages that were enqueued before this time will be deleted. + * @param associatedLinkName the name of the associated link. + * @param sessionId Identifier for the session. + * @return Mono the number of messages that were deleted. + */ + Mono deleteMessages(int messageCount, OffsetDateTime beforeEnqueueTimeUtc, + String associatedLinkName, String sessionId); + @Override void close(); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/DeleteMessagesOptions.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/DeleteMessagesOptions.java new file mode 100644 index 0000000000000..3c2a3b3abe5cb --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/DeleteMessagesOptions.java @@ -0,0 +1,41 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus.models; + +import java.time.OffsetDateTime; + +/** + * Options to specify when deleting a batch of messages. + */ +public final class DeleteMessagesOptions { + private OffsetDateTime beforeEnqueueTimeUtc; + + /** + * Creates a new instance of options to specify when deleting a batch of messages. + */ + public DeleteMessagesOptions() { + } + + /** + * Sets cutoff time for the deletion, only messages that were enqueued before this time will be deleted. If not set, + * then {@link OffsetDateTime#now()} will be assumed. + * + * @param beforeEnqueueTimeUtc the cutoff time for the deletion. + * + * @return The updated {@link DeleteMessagesOptions} object. + */ + public DeleteMessagesOptions setBeforeEnqueueTimeUtc(OffsetDateTime beforeEnqueueTimeUtc) { + this.beforeEnqueueTimeUtc = beforeEnqueueTimeUtc; + return this; + } + + /** + * Gets the cutoff time for the deletion, only messages that were enqueued before this time will be deleted. + * + * @return the cutoff time for the deletion. + */ + public OffsetDateTime getBeforeEnqueueTimeUtc() { + return beforeEnqueueTimeUtc; + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/PurgeMessagesOptions.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/PurgeMessagesOptions.java new file mode 100644 index 0000000000000..2c6404eed87ec --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/PurgeMessagesOptions.java @@ -0,0 +1,41 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus.models; + +import java.time.OffsetDateTime; + +/** + * Options to specify when purging messages. + */ +public final class PurgeMessagesOptions { + private OffsetDateTime beforeEnqueueTimeUtc; + + /** + * Creates a new instance of options to specify when purging messages. + */ + public PurgeMessagesOptions() { + } + + /** + * Sets cutoff time for the purging, only messages that were enqueued before this time will be purged. If not set, + * then {@link OffsetDateTime#now()} will be assumed. + * + * @param beforeEnqueueTimeUtc the cutoff time for the deletion. + * + * @return The updated {@link PurgeMessagesOptions} object. + */ + public PurgeMessagesOptions setBeforeEnqueueTimeUtc(OffsetDateTime beforeEnqueueTimeUtc) { + this.beforeEnqueueTimeUtc = beforeEnqueueTimeUtc; + return this; + } + + /** + * Gets the cutoff time for the purging, only messages that were enqueued before this time will be purged. + * + * @return the cutoff time for the purging. + */ + public OffsetDateTime getBeforeEnqueueTimeUtc() { + return beforeEnqueueTimeUtc; + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java index d06803c10c61c..4aa3a9cfc85b5 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java @@ -19,6 +19,7 @@ import com.azure.messaging.servicebus.models.CompleteOptions; import com.azure.messaging.servicebus.models.DeadLetterOptions; import com.azure.messaging.servicebus.models.DeferOptions; +import com.azure.messaging.servicebus.models.PurgeMessagesOptions; import com.azure.messaging.servicebus.models.SubQueue; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; @@ -37,6 +38,7 @@ import java.nio.charset.Charset; import java.time.Duration; import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -76,6 +78,7 @@ public class ServiceBusReceiverAsyncClientIntegrationTest extends IntegrationTestBase { private static final ClientLogger LOGGER = new ClientLogger(ServiceBusReceiverAsyncClientIntegrationTest.class); private static final AmqpRetryOptions DEFAULT_RETRY_OPTIONS = null; + private static final ServiceBusMessage END = new ServiceBusMessage(new byte[0]); private final boolean isSessionEnabled = false; private final ClientCreationOptions defaultClientCreationOptions = new ClientCreationOptions() .setMaxAutoLockRenewDuration(Duration.ofMinutes(5)); @@ -1464,6 +1467,31 @@ void autoComplete(MessagingEntityType entityType) { } } + @Test + void purgeMessages() { + setSenderAndReceiver(MessagingEntityType.QUEUE, TestUtils.USE_CASE_BATCH_DELETE, false); + + final int totalMessages = ServiceBusReceiverAsyncClient.MAX_DELETE_MESSAGES_COUNT * 2 + 100; // 8200 + final List messages = new ArrayList<>(totalMessages); + for (int i = 0; i < totalMessages; i++) { + messages.add(getMessage(UUID.randomUUID().toString(), false)); + } + + StepVerifier.create(sender.sendMessages(messages)) + .verifyComplete(); + + final PurgeMessagesOptions options = new PurgeMessagesOptions() + // 5-sec buffer to account for any clock skew. + .setBeforeEnqueueTimeUtc(OffsetDateTime.now(ZoneOffset.UTC).plusSeconds(5)); + + // since the service can delete upto 4000 messages in a single batch, the purge operation will make + // 3 batch delete API calls to delete the 8200 messages. + // + StepVerifier.create(receiver.purgeMessages(options)) + .assertNext(count -> assertEquals(totalMessages, count)) + .verifyComplete(); + } + /** * Asserts the length and values with in the map. */ diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java index 0bb44625ed389..64bce497b0b15 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java @@ -89,6 +89,7 @@ public class TestUtils { static final int USE_CASE_MULTIPLE_SESSIONS2 = 30; static final int USE_CASE_MULTIPLE_SESSIONS3 = 31; static final int USE_CASE_AUTO_RENEW_RECEIVE = 32; + static final int USE_CASE_BATCH_DELETE = 33; static final Configuration GLOBAL_CONFIGURATION = Configuration.getGlobalConfiguration(); // An application property key to identify where in the stream this message was created. diff --git a/sdk/servicebus/test-resources.json b/sdk/servicebus/test-resources.json index c30d4ef9dfee3..4e10f28e064dd 100644 --- a/sdk/servicebus/test-resources.json +++ b/sdk/servicebus/test-resources.json @@ -59,7 +59,7 @@ "namespaceName": "[concat('sb-java-', parameters('baseName'))]", "queueName": "queue", "queueSessionName": "queue-session", - "numberOfInstances": 33, + "numberOfInstances": 34, "subscriptionName": "subscription", "subscriptionSessionName": "subscription-session", "serviceBusDataOwnerRoleId": "[concat('/subscriptions/', subscription().subscriptionId, '/providers/Microsoft.Authorization/roleDefinitions/090c5cfd-751d-490a-894a-3ce6f1109419')]",