Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus]: Batch delete #40398

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Features Added

- Setting the v2 stack as the default for "Synchronous Receiver Client".
- Added the support for batch delete APIs in `ServiceBusReceiverClient` and `ServiceBusReceiverAsyncClient`.

### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ public final class ServiceBusErrorSource extends ExpandableStringEnum<ServiceBus
/** Error while completing the message.*/
public static final ServiceBusErrorSource COMPLETE = fromString("COMPLETE", ServiceBusErrorSource.class);

/** Error while deleting messages. */
public static final ServiceBusErrorSource DELETE_MESSAGES = fromString("DELETE_MESSAGES", ServiceBusErrorSource.class);

/** Error while receiving the message(s).*/
public static final ServiceBusErrorSource RECEIVE = fromString("RECEIVE", ServiceBusErrorSource.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import com.azure.messaging.servicebus.models.CompleteOptions;
import com.azure.messaging.servicebus.models.DeadLetterOptions;
import com.azure.messaging.servicebus.models.DeferOptions;
import com.azure.messaging.servicebus.models.DeleteMessagesOptions;
import com.azure.messaging.servicebus.models.PurgeMessagesOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import reactor.core.Disposable;
import reactor.core.Disposables;
Expand All @@ -40,6 +42,7 @@

import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
Expand All @@ -52,6 +55,7 @@

import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY;
import static com.azure.core.amqp.implementation.ClientConstants.LINK_NAME_KEY;
import static com.azure.core.amqp.implementation.RetryUtil.withRetry;
import static com.azure.core.util.FluxUtil.fluxError;
import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.messaging.servicebus.implementation.Messages.INVALID_OPERATION_DISPOSED_RECEIVER;
Expand Down Expand Up @@ -330,6 +334,10 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable {
private static final Duration EXPIRED_RENEWAL_CLEANUP_INTERVAL = Duration.ofMinutes(2);
private static final DeadLetterOptions DEFAULT_DEAD_LETTER_OPTIONS = new DeadLetterOptions();
private static final String TRANSACTION_LINK_NAME = "coordinator";
/**
* The maximum number of messages to delete in a single batch. This cap is established and enforced by the service.
*/
static final int MAX_DELETE_MESSAGES_COUNT = 4000;
anuchandy marked this conversation as resolved.
Show resolved Hide resolved
private static final ClientLogger LOGGER = new ClientLogger(ServiceBusReceiverAsyncClient.class);

private final LockContainer<LockRenewalOperation> renewalContainer;
Expand Down Expand Up @@ -1460,6 +1468,146 @@ public Mono<Void> rollbackTransaction(ServiceBusTransactionContext transactionCo

}

/**
* Deletes up to {@code messageCount} messages from the entity enqueued before {@link OffsetDateTime#now()}.
* The actual number of deleted messages may be less if there are fewer eligible messages in the entity.
* <p>If the lock for a message is held by a receiver, it will be respected and the message will not be deleted.</p>
* <p>You can delete a maximum of 4000 messages in a single API call, this is the current limit determined by
* the Service Bus service.</p>
*
* @param messageCount the desired number of messages to delete.
* @return a {@link Mono} indicating the number of messages deleted.
*
* @throws IllegalArgumentException when the {@code messageCount} is less than 1 or exceeds the maximum allowed, as
* determined by the Service Bus service.
*/
public Mono<Integer> deleteMessages(int messageCount) {
return deleteMessages(messageCount, new DeleteMessagesOptions(), receiverOptions.getSessionId());
}

/**
* Deletes up to {@code messageCount} messages from the entity. The actual number of deleted messages may be less
* if there are fewer eligible messages in the entity.
* <p>If the lock for a message is held by a receiver, it will be respected and the message will not be deleted.</p>
* <p>You can delete a maximum of 4000 messages in a single API call, this is the current limit determined by
* the Service Bus service.</p>
*
* @param messageCount the desired number of messages to delete.
* @param options options used to delete the messages.
* @return a {@link Mono} indicating the number of messages deleted.
*
* @throws IllegalArgumentException when the {@code messageCount} is less than 1 or exceeds the maximum allowed, as
* determined by the Service Bus service.
* @throws NullPointerException if {@code options} is null.
*/
public Mono<Integer> deleteMessages(int messageCount, DeleteMessagesOptions options) {
return deleteMessages(messageCount, options, receiverOptions.getSessionId());
}

/**
* Attempts to purge all messages from an entity.
* <p>If the lock for a message is held by a receiver, it will be respected and the message will not be deleted.</p>
* <p>
* This method may invoke multiple service requests to delete all messages. Because multiple service requests may be
* made, the possibility of partial success exists, in such scenario, the method will stop attempting to delete
* additional messages and throw the exception that was encountered. Also, due to the multiple service requests,
* purge operation may take longer if there are a lot of messages to delete.
* </p>
* <p>
* The api will purge all the messages enqueued before {@link OffsetDateTime#now()} UTC.
* </p>
*
* @return a {@link Mono} indicating the number of messages deleted.
*/
public Mono<Integer> purgeMessages() {
return purgeMessages(new PurgeMessagesOptions());
}

/**
* Attempts to purge all messages from an entity.
lmolkova marked this conversation as resolved.
Show resolved Hide resolved
* <p>If the lock for a message is held by a receiver, it will be respected and the message will not be deleted.</p>
* <p>
* This method may invoke multiple service requests to delete all messages. Because multiple service requests may be
* made, the possibility of partial success exists, in such scenario, the method will stop attempting to delete
* additional messages and throw the exception that was encountered. Also, due to the multiple service requests,
* purge operation may take longer if there are a lot of messages to delete.
* </p>
*
* @param options options used to purge the messages, application may specify a UTC timestamp in the options
* indicating only purge messages enqueued before that time, if such a cut of time is not specified then
* {@link OffsetDateTime#now()} UTC will be used.
*
* @return a {@link Mono} indicating the number of messages deleted.
* @throws NullPointerException if {@code options} is null.
*/
public Mono<Integer> purgeMessages(PurgeMessagesOptions options) {
if (Objects.isNull(options)) {
return monoError(LOGGER, new NullPointerException("'options' cannot be null."));
}
final DeleteMessagesOptions deleteMessagesOptions;
if (options.getBeforeEnqueueTimeUtc() != null) {
deleteMessagesOptions = new DeleteMessagesOptions()
.setBeforeEnqueueTimeUtc(options.getBeforeEnqueueTimeUtc());
} else {
deleteMessagesOptions = new DeleteMessagesOptions()
.setBeforeEnqueueTimeUtc(OffsetDateTime.now(ZoneOffset.UTC));
}
return deleteMessages(MAX_DELETE_MESSAGES_COUNT, deleteMessagesOptions, receiverOptions.getSessionId())
.expand(deleted -> {
// The service currently has a known bug that should be fixed before GA, where the delete operation may
// not delete the requested batch size in a single call, even when there are enough messages to do so.
// This logic should check "deleted == MAX_DELETE_MESSAGES_COUNT". Until this is fixed, we'll need to
// loop if there were any messages deleted, which will cost an extra service call.
if (deleted > 0) {
return deleteMessages(MAX_DELETE_MESSAGES_COUNT, deleteMessagesOptions, receiverOptions.getSessionId());
} else {
return Mono.empty();
}
})
.reduce(0, Integer::sum);
}

/**
* Deletes up to {@code messageCount} messages from the entity. The actual number of deleted messages may be less
* if there are fewer eligible messages in the entity.
*
* <p>If the lock for a message is held by a receiver, it will be respected and the message will not be deleted.</p>
*
* @param messageCount the desired number of messages to delete.
* @param options options used to delete the messages.
* @param sessionId Session id of the messages to delete from. {@code null} if there is no session.
* @return a {@link Mono} indicating the number of messages deleted.
*
* @throws IllegalArgumentException when the {@code messageCount} is less than 1 or exceeds the maximum allowed, as
* determined by the Service Bus service.
*/
private Mono<Integer> deleteMessages(int messageCount, DeleteMessagesOptions options, String sessionId) {
if (isDisposed.get()) {
return monoError(LOGGER, new IllegalStateException(
String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "deleteMessages")));
}
if (Objects.isNull(options)) {
return monoError(LOGGER, new NullPointerException("'options' cannot be null."));
}
if (messageCount <= 0 || messageCount > MAX_DELETE_MESSAGES_COUNT) {
final String message = "'messageCount' must be a positive number and less than " + MAX_DELETE_MESSAGES_COUNT;
return monoError(LOGGER, new IllegalArgumentException(message));
}
final OffsetDateTime beforeEnqueueTimeUtc;
if (options.getBeforeEnqueueTimeUtc() != null) {
beforeEnqueueTimeUtc = options.getBeforeEnqueueTimeUtc();
} else {
beforeEnqueueTimeUtc = OffsetDateTime.now(ZoneOffset.UTC);
}
final Mono<Integer> deleteMessages = connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
.flatMap(managementNode -> managementNode.deleteMessages(messageCount, beforeEnqueueTimeUtc, getLinkName(sessionId), sessionId));

return withRetry(deleteMessages, connectionCacheWrapper.getRetryOptions(),
lmolkova marked this conversation as resolved.
Show resolved Hide resolved
"Deleting messages timed out.", false)
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.DELETE_MESSAGES));
}

/**
* Disposes of the consumer by closing the underlying links to the service.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.azure.messaging.servicebus.models.CompleteOptions;
import com.azure.messaging.servicebus.models.DeadLetterOptions;
import com.azure.messaging.servicebus.models.DeferOptions;
import com.azure.messaging.servicebus.models.DeleteMessagesOptions;
import com.azure.messaging.servicebus.models.PurgeMessagesOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
Expand Down Expand Up @@ -843,6 +845,82 @@ public void rollbackTransaction(ServiceBusTransactionContext transactionContext)
asyncClient.rollbackTransaction(transactionContext).block(operationTimeout);
}

/**
* Deletes up to {@code messageCount} messages from the entity enqueued before {@link OffsetDateTime#now()}.
* The actual number of deleted messages may be less if there are fewer eligible messages in the entity.
* <p>If the lock for a message is held by a receiver, it will be respected and the message will not be deleted.</p>
* <p>You can delete a maximum of 4000 messages in a single API call, this is the current limit determined by
* the Service Bus service.</p>
*
* @param messageCount the desired number of messages to delete.
* @return the number of messages deleted.
*
* @throws IllegalArgumentException when the {@code messageCount} is less than 1 or exceeds the maximum allowed, as
* determined by the Service Bus service.
*/
public int deleteMessages(int messageCount) {
return deleteMessages(messageCount, new DeleteMessagesOptions());
}

/**
* Deletes up to {@code messageCount} messages from the entity. The actual number of deleted messages may be less
* if there are fewer eligible messages in the entity.
* <p>If the lock for a message is held by a receiver, it will be respected and the message will not be deleted.</p>
* <p>You can delete a maximum of 4000 messages in a single API call, this is the current limit determined by
* the Service Bus service.</p>
*
* @param messageCount the desired number of messages to delete.
* @param options options used to delete the messages.
* @return the number of messages deleted.
*
* @throws IllegalArgumentException when the {@code messageCount} is less than 1 or exceeds the maximum allowed, as
* determined by the Service Bus service.
* @throws NullPointerException if {@code options} is null.
*/
public int deleteMessages(int messageCount, DeleteMessagesOptions options) {
return asyncClient.deleteMessages(messageCount, options).block();
}

/**
* Attempts to purge all messages from an entity.
* <p>If the lock for a message is held by a receiver, it will be respected and the message will not be deleted.</p>
* <p>
* This method may invoke multiple service requests to delete all messages. Because multiple service requests may be
* made, the possibility of partial success exists, in such scenario, the method will stop attempting to delete
* additional messages and throw the exception that was encountered. Also, due to the multiple service requests,
* purge operation may take longer if there are a lot of messages to delete.
* </p>
* <p>
* The api will purge all the messages enqueued before {@link OffsetDateTime#now()} UTC.
* </p>
*
* @return the number of messages deleted.
*/
public int purgeMessages() {
return purgeMessages(new PurgeMessagesOptions());
}

/**
* Attempts to purge all messages from an entity.
* <p>If the lock for a message is held by a receiver, it will be respected and the message will not be deleted.</p>
* <p>
* This method may invoke multiple service requests to delete all messages. Because multiple service requests may be
* made, the possibility of partial success exists, in such scenario, the method will stop attempting to delete
* additional messages and throw the exception that was encountered. Also, due to the multiple service requests,
* purge operation may take longer if there are a lot of messages to delete.
* </p>
*
* @param options options used to purge the messages, application may specify a UTC timestamp in the options
* indicating only purge messages enqueued before that time, if such a cut of time is not specified then
* {@link OffsetDateTime#now()} UTC will be used.
*
* @return the number of messages deleted.
* @throws NullPointerException if {@code options} is null.
*/
public int purgeMessages(PurgeMessagesOptions options) {
return asyncClient.purgeMessages(options).block();
}

/**
* Disposes of the consumer by closing the underlying links to the service.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import static com.azure.core.util.FluxUtil.fluxError;
import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_ADD_RULE;
import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_DELETE_MESSAGES;
import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_GET_RULES;
import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_GET_SESSION_STATE;
import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_PEEK;
Expand Down Expand Up @@ -533,6 +534,58 @@ public Flux<RuleProperties> listRules() {
});
}

/**
* {@inheritDoc}
*/
@Override
public Mono<Integer> deleteMessages(int messageCount, OffsetDateTime beforeEnqueueTimeUtc,
String associatedLinkName, String sessionId) {
return isAuthorized(OPERATION_DELETE_MESSAGES).then(createChannel.flatMap(channel -> {
final Message message = createManagementMessage(OPERATION_DELETE_MESSAGES, associatedLinkName);
final Map<String, Object> body = new HashMap<>(3);
body.put(ManagementConstants.MESSAGE_COUNT_KEY, messageCount);
body.put(ManagementConstants.ENQUEUED_TIME_UTC, Date.from(beforeEnqueueTimeUtc.toInstant()));
if (!CoreUtils.isNullOrEmpty(sessionId)) {
body.put(ManagementConstants.SESSION_ID, sessionId);
}
message.setBody(new AmqpValue(body));
return sendWithVerify(channel, message, null);
})).flatMap(response -> {
AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(response);
if (statusCode == AmqpResponseCode.OK) {
if (!(response.getBody() instanceof AmqpValue)) {
return monoError(logger, new AmqpException(false,
String.format("Expected response body of type AmqpValue not found %s", response.getBody()),
getErrorContext()));
}
final AmqpValue body = (AmqpValue) response.getBody();
if (!(body.getValue() instanceof Map)) {
return monoError(logger, new AmqpException(false,
String.format("Expected value of type Map not found in the response body %s", body.getValue()),
getErrorContext()));
}
@SuppressWarnings("unchecked") final Map<String, Integer> map = (Map<String, Integer>) body.getValue();
if (!map.containsKey("message-count")) {
return monoError(logger, new AmqpException(false,
String.format("Expected message-count entry not found in the map %s", map),
getErrorContext()));
}
final int count = map.get("message-count");
return Mono.just(count);
} else if (statusCode == AmqpResponseCode.NO_CONTENT) {
return Mono.just(0);
} else if (statusCode == AmqpResponseCode.NOT_FOUND) {
final String errorCondition = RequestResponseUtils.getErrorCondition(response);
final AmqpErrorCondition amqpErrorCondition = AmqpErrorCondition.fromString(errorCondition);
if (amqpErrorCondition == AmqpErrorCondition.MESSAGE_NOT_FOUND) {
return Mono.just(0);
}
}
return monoError(logger,
new IllegalStateException("Unexpected state when deleting messages."));
});
}

/**
* {@inheritDoc}
*/
Expand Down
Loading