diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 5074770a78b6d..560e69e12aa4e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -3,7 +3,6 @@ package com.azure.messaging.servicebus; -import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.AmqpRetryPolicy; import com.azure.core.amqp.AmqpSession; import com.azure.core.amqp.AmqpTransaction; @@ -1512,7 +1511,10 @@ public Mono deleteMessages(int messageCount, DeleteMessagesOptions opti * This method may invoke multiple service requests to delete all messages. Because multiple service requests may be * made, the possibility of partial success exists, in such scenario, the method will stop attempting to delete * additional messages and throw the exception that was encountered. Also, due to the multiple service requests, - * purge operation may exceed the configured {@link AmqpRetryOptions#getTryTimeout()}. + * purge operation may take longer if there are a lot of messages to delete. + *

+ *

+ * The api will purge all the messages enqueued before {@link OffsetDateTime#now()} UTC. *

* * @return a {@link Mono} indicating the number of messages deleted. @@ -1528,10 +1530,12 @@ public Mono purgeMessages() { * This method may invoke multiple service requests to delete all messages. Because multiple service requests may be * made, the possibility of partial success exists, in such scenario, the method will stop attempting to delete * additional messages and throw the exception that was encountered. Also, due to the multiple service requests, - * purge operation may exceed the configured {@link AmqpRetryOptions#getTryTimeout()}. + * purge operation may take longer if there are a lot of messages to delete. *

* - * @param options options used to purge the messages. + * @param options options used to purge the messages, application may specify a UTC timestamp in the options + * indicating only purge messages enqueued before that time, if such a cut of time is not specified then + * {@link OffsetDateTime#now()} UTC will be used. * * @return a {@link Mono} indicating the number of messages deleted. * @throws NullPointerException if {@code options} is null. diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java index b73312dd0944f..54aecb135d221 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -3,7 +3,6 @@ package com.azure.messaging.servicebus; -import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.annotation.ServiceClient; import com.azure.core.util.IterableStream; import com.azure.core.util.logging.ClientLogger; @@ -879,7 +878,7 @@ public int deleteMessages(int messageCount) { * @throws NullPointerException if {@code options} is null. */ public int deleteMessages(int messageCount, DeleteMessagesOptions options) { - return asyncClient.deleteMessages(messageCount, options).block(operationTimeout); + return asyncClient.deleteMessages(messageCount, options).block(); } /** @@ -889,7 +888,10 @@ public int deleteMessages(int messageCount, DeleteMessagesOptions options) { * This method may invoke multiple service requests to delete all messages. Because multiple service requests may be * made, the possibility of partial success exists, in such scenario, the method will stop attempting to delete * additional messages and throw the exception that was encountered. Also, due to the multiple service requests, - * purge operation may exceed the configured {@link AmqpRetryOptions#getTryTimeout()}. + * purge operation may take longer if there are a lot of messages to delete. + *

+ *

+ * The api will purge all the messages enqueued before {@link OffsetDateTime#now()} UTC. *

* * @return the number of messages deleted. @@ -905,10 +907,12 @@ public int purgeMessages() { * This method may invoke multiple service requests to delete all messages. Because multiple service requests may be * made, the possibility of partial success exists, in such scenario, the method will stop attempting to delete * additional messages and throw the exception that was encountered. Also, due to the multiple service requests, - * purge operation may exceed the configured {@link AmqpRetryOptions#getTryTimeout()}. + * purge operation may take longer if there are a lot of messages to delete. *

* - * @param options options used to purge the messages. + * @param options options used to purge the messages, application may specify a UTC timestamp in the options + * indicating only purge messages enqueued before that time, if such a cut of time is not specified then + * {@link OffsetDateTime#now()} UTC will be used. * * @return the number of messages deleted. * @throws NullPointerException if {@code options} is null. diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java index f78c8f4c1c646..72b6c656d5ebb 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java @@ -44,7 +44,6 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -1476,7 +1475,7 @@ void purgeMessages() { messages.add(getMessage(UUID.randomUUID().toString(), false)); } - StepVerifier.create(sendMessages(messages)) + StepVerifier.create(sender.sendMessages(messages)) .verifyComplete(); final PurgeMessagesOptions options = new PurgeMessagesOptions() @@ -1551,41 +1550,6 @@ private Mono sendMessage(ServiceBusMessage message) { }); } - private Mono sendMessages(List messages) { - final Iterator messagesItr = messages.iterator(); - if (messagesItr.hasNext()) { - return sendNextBatch(messagesItr.next(), messagesItr); - } else { - return Mono.empty(); - } - } - - private Mono sendNextBatch(ServiceBusMessage first, Iterator messagesItr) { - return sender.createMessageBatch().flatMap(batch -> { - ServiceBusMessage next = first; - do { - if (!batch.tryAddMessage(next)) { - if (next == first) { - return Mono.error( - new IllegalArgumentException("The event " + first + " is too big to send even in a Batch.")); - } - return sender.sendMessages(batch).then(Mono.just(next)); - } - if (messagesItr.hasNext()) { - next = messagesItr.next(); - } else { - return sender.sendMessages(batch).then(Mono.just(END)); - } - } while (true); - }).flatMap(missed -> { - if (missed == END) { - return Mono.empty(); - } else { - return sendNextBatch(missed, messagesItr); - } - }); - } - /** * Class represents various options while creating receiver/sender client. */