Skip to content

Commit

Permalink
Code review feedbacks: Improving the java-doc around purge API, remov…
Browse files Browse the repository at this point in the history
…ing the timeout duration for synchronous deleteMessages API (similar to purgeMessages), updating test to use sendMessages(Iterable) API in Sender-Client.
  • Loading branch information
anuchandy committed Jun 10, 2024
1 parent 1f36e93 commit f46a871
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.AmqpTransaction;
Expand Down Expand Up @@ -1512,7 +1511,10 @@ public Mono<Integer> deleteMessages(int messageCount, DeleteMessagesOptions opti
* This method may invoke multiple service requests to delete all messages. Because multiple service requests may be
* made, the possibility of partial success exists, in such scenario, the method will stop attempting to delete
* additional messages and throw the exception that was encountered. Also, due to the multiple service requests,
* purge operation may exceed the configured {@link AmqpRetryOptions#getTryTimeout()}.
* purge operation may take longer if there are a lot of messages to delete.
* </p>
* <p>
* The api will purge all the messages enqueued before {@link OffsetDateTime#now()} UTC.
* </p>
*
* @return a {@link Mono} indicating the number of messages deleted.
Expand All @@ -1528,10 +1530,12 @@ public Mono<Integer> purgeMessages() {
* This method may invoke multiple service requests to delete all messages. Because multiple service requests may be
* made, the possibility of partial success exists, in such scenario, the method will stop attempting to delete
* additional messages and throw the exception that was encountered. Also, due to the multiple service requests,
* purge operation may exceed the configured {@link AmqpRetryOptions#getTryTimeout()}.
* purge operation may take longer if there are a lot of messages to delete.
* </p>
*
* @param options options used to purge the messages.
* @param options options used to purge the messages, application may specify a UTC timestamp in the options
* indicating only purge messages enqueued before that time, if such a cut of time is not specified then
* {@link OffsetDateTime#now()} UTC will be used.
*
* @return a {@link Mono} indicating the number of messages deleted.
* @throws NullPointerException if {@code options} is null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.util.IterableStream;
import com.azure.core.util.logging.ClientLogger;
Expand Down Expand Up @@ -879,7 +878,7 @@ public int deleteMessages(int messageCount) {
* @throws NullPointerException if {@code options} is null.
*/
public int deleteMessages(int messageCount, DeleteMessagesOptions options) {
return asyncClient.deleteMessages(messageCount, options).block(operationTimeout);
return asyncClient.deleteMessages(messageCount, options).block();
}

/**
Expand All @@ -889,7 +888,10 @@ public int deleteMessages(int messageCount, DeleteMessagesOptions options) {
* This method may invoke multiple service requests to delete all messages. Because multiple service requests may be
* made, the possibility of partial success exists, in such scenario, the method will stop attempting to delete
* additional messages and throw the exception that was encountered. Also, due to the multiple service requests,
* purge operation may exceed the configured {@link AmqpRetryOptions#getTryTimeout()}.
* purge operation may take longer if there are a lot of messages to delete.
* </p>
* <p>
* The api will purge all the messages enqueued before {@link OffsetDateTime#now()} UTC.
* </p>
*
* @return the number of messages deleted.
Expand All @@ -905,10 +907,12 @@ public int purgeMessages() {
* This method may invoke multiple service requests to delete all messages. Because multiple service requests may be
* made, the possibility of partial success exists, in such scenario, the method will stop attempting to delete
* additional messages and throw the exception that was encountered. Also, due to the multiple service requests,
* purge operation may exceed the configured {@link AmqpRetryOptions#getTryTimeout()}.
* purge operation may take longer if there are a lot of messages to delete.
* </p>
*
* @param options options used to purge the messages.
* @param options options used to purge the messages, application may specify a UTC timestamp in the options
* indicating only purge messages enqueued before that time, if such a cut of time is not specified then
* {@link OffsetDateTime#now()} UTC will be used.
*
* @return the number of messages deleted.
* @throws NullPointerException if {@code options} is null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -1478,7 +1477,7 @@ void purgeMessages() {
messages.add(getMessage(UUID.randomUUID().toString(), false));
}

StepVerifier.create(sendMessages(messages))
StepVerifier.create(sender.sendMessages(messages))
.verifyComplete();

final PurgeMessagesOptions options = new PurgeMessagesOptions()
Expand Down Expand Up @@ -1553,41 +1552,6 @@ private Mono<Void> sendMessage(ServiceBusMessage message) {
});
}

private Mono<Void> sendMessages(List<ServiceBusMessage> messages) {
final Iterator<ServiceBusMessage> messagesItr = messages.iterator();
if (messagesItr.hasNext()) {
return sendNextBatch(messagesItr.next(), messagesItr);
} else {
return Mono.empty();
}
}

private Mono<Void> sendNextBatch(ServiceBusMessage first, Iterator<ServiceBusMessage> messagesItr) {
return sender.createMessageBatch().flatMap(batch -> {
ServiceBusMessage next = first;
do {
if (!batch.tryAddMessage(next)) {
if (next == first) {
return Mono.error(
new IllegalArgumentException("The event " + first + " is too big to send even in a Batch."));
}
return sender.sendMessages(batch).then(Mono.just(next));
}
if (messagesItr.hasNext()) {
next = messagesItr.next();
} else {
return sender.sendMessages(batch).then(Mono.just(END));
}
} while (true);
}).flatMap(missed -> {
if (missed == END) {
return Mono.empty();
} else {
return sendNextBatch(missed, messagesItr);
}
});
}

/**
* Class represents various options while creating receiver/sender client.
*/
Expand Down

0 comments on commit f46a871

Please sign in to comment.