Skip to content

Commit

Permalink
ARTEMIS-5131 Add A Copy message button to console
Browse files Browse the repository at this point in the history
This exposes a copyMessage method that simply copies a message to a different queue so the new console can add a copy button
  • Loading branch information
andytaylor committed Oct 23, 2024
1 parent 02e0b14 commit a0d4cf8
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2744,4 +2744,12 @@ static void getAuthorizationFailureCount(Object source) {

@LogMessage(id = 601781, value = "User {} is getting authorization failure count on target resource: {}", level = LogMessage.Level.INFO)
void getAuthorizationFailureCount(String user, Object source);


static void copyMessage(Object source, Object... args) {
BASE_LOGGER.copyMessage(getCaller(), source, parametersList(args));
}

@LogMessage(id = 601782, value = "User {} is copying a message to another queue on target resource: {} {}", level = LogMessage.Level.INFO)
void copyMessage(String user, Object source, String args);
}
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,10 @@ int moveMessages(@Parameter(name = "flushLimit", desc = "Limit to flush transact
@Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates,
@Parameter(name = "messageCount", desc = "Number of messages to move.") int messageCount) throws Exception;

@Operation(desc = "Send a copy of the message with given messageID to another queue)", impact = MBeanOperationInfo.ACTION)
boolean copyMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID,
@Parameter(name = "targetQueue", desc = "The name of the queue to copy the messages to") String targetQueue) throws Exception;

/**
* Sends the message corresponding to the specified message ID to this queue's dead letter address.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,31 @@ public boolean moveMessage(final long messageID,

}

public boolean copyMessage(final long messageID,
final String targetQueue) throws Exception {
// this is a critical task, we need to prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.copyMessage(queue, messageID, targetQueue);
}
checkStarted();

clearIO();
try {
Binding binding = server.getPostOffice().getBinding(SimpleString.of(targetQueue));

if (binding == null) {
throw ActiveMQMessageBundle.BUNDLE.noQueueFound(targetQueue);
}

return queue.copyReference(messageID, binding.getAddress(), binding);
} finally {
blockOnIO();
}
}

}

@Override
public int moveMessages(final String filterStr, final String otherQueueName) throws Exception {
return moveMessages(filterStr, otherQueueName, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ int moveReferences(int flushLimit,
int messageCount,
Binding binding) throws Exception;


boolean copyReference(long messageID, SimpleString queue, Binding binding) throws Exception;

int retryMessages(Filter filter) throws Exception;

default int retryMessages(Filter filter, Integer expectedHits) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2791,6 +2791,25 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
});
}

public synchronized boolean copyReference(final long messageID,
final SimpleString toQueue,
final Binding binding) throws Exception {
try (LinkedListIterator<MessageReference> iter = iterator()) {
while (iter.hasNext()) {
MessageReference ref = iter.next();
if (ref.getMessage().getMessageID() == messageID) {
try {
copy(null, toQueue, binding, ref);
} catch (Exception e) {
throw e;
}
return true;
}
}
return false;
}
}

public synchronized int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception {
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
@Override
Expand Down Expand Up @@ -3679,6 +3698,38 @@ private RoutingStatus move(final Transaction originalTX,
return routingStatus;
}

private RoutingStatus copy(final Transaction originalTX,
final SimpleString address,
final Binding binding,
final MessageReference ref) throws Exception {
Transaction tx;

if (originalTX != null) {
tx = originalTX;
} else {
// if no TX we create a new one to commit at the end
tx = new TransactionImpl(storageManager);
}

Message copyMessage = makeCopy(ref, false, false, address);

Object originalRoutingType = ref.getMessage().getBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE);
if (originalRoutingType != null && originalRoutingType instanceof Byte) {
copyMessage.setRoutingType(RoutingType.getType((Byte) originalRoutingType));
}

RoutingStatus routingStatus;
{
RoutingContext context = new RoutingContextImpl(tx);
routingStatus = postOffice.route(copyMessage, context, false, false, binding);
}

if (originalTX == null) {
tx.commit();
}
return routingStatus;
}

@SuppressWarnings({"ArrayToString", "ArrayToStringConcatenation"})
private boolean moveBetweenSnFQueues(final SimpleString queueSuffix,
final Transaction tx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,11 @@ public int moveReferences(int flushLimit,
return 0;
}

@Override
public boolean copyReference(long messageID, SimpleString address, Binding binding) throws Exception {
return false;
}

@Override
public int retryMessages(Filter filter) throws Exception {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1486,6 +1486,11 @@ public int moveReferences(int flushLimit, Filter filter, SimpleString toAddress,
return 0;
}

@Override
public boolean copyReference(long messageID, SimpleString address, Binding binding) throws Exception {
return false;
}

@Override
public void addRedistributor(long delay) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,11 @@ public int moveReferences(int flushLimit, Filter filter, SimpleString toAddress,
return 0;
}

@Override
public boolean copyReference(long messageID, SimpleString address, Binding binding) throws Exception {
return false;
}

@Override
public void forceDelivery() {
// no-op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2618,6 +2618,60 @@ public void testMoveMessage() throws Exception {
session.deleteQueue(otherQueue);
}

@TestTemplate
public void testCopyMessage() throws Exception {
SimpleString address = SimpleString.of("address");//RandomUtil.randomSimpleString();
SimpleString queue = SimpleString.of("queue");//RandomUtil.randomSimpleString();
SimpleString otherAddress = SimpleString.of("otherAddress");//RandomUtil.randomSimpleString();
SimpleString otherQueue = SimpleString.of("otherQueue");//RandomUtil.randomSimpleString();
SimpleString otherQueue2 = SimpleString.of("otherQueue2");//RandomUtil.randomSimpleString();


session.createQueue(QueueConfiguration.of(queue).setAddress(address).setDurable(durable));
session.createQueue(QueueConfiguration.of(otherQueue).setAddress(otherAddress).setDurable(durable));
session.createQueue(QueueConfiguration.of(otherQueue2).setAddress(otherAddress).setDurable(durable));
ClientProducer producer = session.createProducer(address);

// send 2 messages on queue
producer.send(session.createMessage(durable));
producer.send(session.createMessage(durable));

QueueControl queueControl = createManagementControl(address, queue);
QueueControl otherQueueControl = createManagementControl(otherAddress, otherQueue);
QueueControl otherQueueControl2 = createManagementControl(otherAddress, otherQueue2);
assertMessageMetrics(queueControl, 2, durable);
assertMessageMetrics(otherQueueControl, 0, durable);
assertMessageMetrics(otherQueueControl2, 0, durable);

// the message IDs are set on the server
Map<String, Object>[] messages = queueControl.listMessages(null);
assertEquals(2, messages.length);
long messageID = (Long) messages[0].get("messageID");

boolean copied = queueControl.copyMessage(messageID, otherQueue.toString());
assertTrue(copied);

assertMessageMetrics(queueControl, 2, durable);
assertMessageMetrics(otherQueueControl, 1, durable);
assertMessageMetrics(otherQueueControl2, 0, durable);

messageID = (Long) messages[1].get("messageID");
copied = queueControl.copyMessage(messageID, otherQueue.toString());
assertTrue(copied);

assertMessageMetrics(queueControl, 2, durable);
assertMessageMetrics(otherQueueControl, 2, durable);
assertMessageMetrics(otherQueueControl2, 0, durable);

consumeMessages(2, session, queue);
consumeMessages(2, session, otherQueue);
consumeMessages(0, session, otherQueue2);

session.deleteQueue(queue);
session.deleteQueue(otherQueue);
session.deleteQueue(otherQueue2);
}

/**
* Moving message from another address to a single "child" queue of a multicast address
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,11 @@ public int moveMessages(int flushLimit, String filter, String otherQueueName, bo
return (Integer) proxy.invokeOperation(Integer.class, "moveMessages", flushLimit, filter, otherQueueName, rejectDuplicates, messageCount);
}

@Override
public boolean copyMessage(long messageID, String targetQueue) throws Exception {
return (Boolean) proxy.invokeOperation("copyMessage", messageID, targetQueue);
}

@Override
public int moveMessages(final String filter,
final String otherQueueName,
Expand Down

0 comments on commit a0d4cf8

Please sign in to comment.