Skip to content

Commit

Permalink
FederationTests updates
Browse files Browse the repository at this point in the history
  • Loading branch information
michalxo committed Oct 9, 2024
1 parent 28b07eb commit ae07704
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,25 @@ public Object executeCommand() {
return executeCommand(Constants.DURATION_3_MINUTES);
}

public Object executeCommand(boolean disableOutput) {
return executeCommand(Constants.DURATION_3_MINUTES, disableOutput);
}

public Object executeCommand(long maxTimeout) {
return executeCommand(maxTimeout, false);
}

public Object executeCommand(long maxTimeout, boolean disableOutput) {
String cmdOutput;
String[] command = constructClientCommand();
cmdOutput = (String) deployableClient.getExecutor().executeCommand(maxTimeout, command);
if (artemisCommand.equals(ArtemisCommand.PERF_CLIENT)) {
LOGGER.debug("[PERF] Client detected, to see it's output use trace logging.");
LOGGER.trace(cmdOutput);
} else {
LOGGER.debug(cmdOutput);
if (!disableOutput) {
LOGGER.debug(cmdOutput);
}
}
return parseOutput(cmdOutput);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class BundledClientOptions {
Boolean multicast = false;
Protocol protocol;
int timeout = 90;
public boolean disableOutput;

public BundledClientOptions withDeployableClient(DeployableClient deployableClient) {
this.deployableClient = deployableClient;
Expand Down Expand Up @@ -81,6 +82,11 @@ public BundledClientOptions withTimeout(int timeout) {
return this;
}

public BundledClientOptions withDisabledOutput(boolean disableOutput) {
this.disableOutput = disableOutput;
return this;
}

public DeployableClient getDeployableClient() {
return deployableClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public abstract class BundledMessagingClient implements MessagingClient {
private int sentMessages = 0;
private Executor subscriberExecutor;
private int timeout;
private boolean disableOutput;


public BundledMessagingClient(BundledClientOptions options) {
Expand All @@ -50,6 +51,7 @@ public BundledMessagingClient(BundledClientOptions options) {
this.persistenceDisabled = options.persistenceDisabled;
this.isMulticast = options.multicast;
this.timeout = options.timeout;
this.disableOutput = options.disableOutput;
}

abstract String getProtocol();
Expand Down Expand Up @@ -163,7 +165,9 @@ public int receiveMessages(long duration) {
String cmdOutput;
String[] command = constructClientCommand(CONSUMER);
cmdOutput = (String) deployableClient.getExecutor().executeCommand(duration, command);
LOGGER.debug("[{}] {}", deployableClient.getContainerName(), cmdOutput);
if (!disableOutput) {
LOGGER.debug("[{}] {}", deployableClient.getContainerName(), cmdOutput);
}
return parseMessageCount(cmdOutput, CONSUMER);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,8 @@ public static MessagingClient createMessagingClient(ClientType clientType, Pod e
.withPersistenceDisabled(persistenceDisabled)
.withDestinationQueue(queue)
.withDestinationUrl(serviceUrl)
.withMulticast(multicast);
.withMulticast(multicast)
.withDisabledOutput(true);
if (clientType.equals(ClientType.BUNDLED_AMQP)) {
messagingClient = new BundledAmqpMessagingClient(options);
} else if (clientType.equals(ClientType.BUNDLED_CORE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,6 @@ public Map<Integer, String> browseMessages(Pod brokerPod, String fqqn, String us

public Map<Integer, String> browseMessages(Pod brokerPod, String fqqn, int messageCount, String username, String password) {
String namespace = brokerPod.getMetadata().getNamespace();
LOGGER.debug("[{}] Browse messages in {} on {}", namespace, fqqn, brokerPod.getMetadata().getName());
Map<String, String> commandOptions = new HashMap<>(Map.of(
"destination", fqqn,
"url", "tcp://" + brokerPod.getMetadata().getName() + ":61616"
Expand All @@ -630,7 +629,7 @@ public Map<Integer, String> browseMessages(Pod brokerPod, String fqqn, int messa

DeployableClient<Deployment, Pod> deployableClient = new BundledClientDeployment(brokerPod.getMetadata().getNamespace(), brokerPod);
BundledArtemisClient browserClient = new BundledArtemisClient(deployableClient, ArtemisCommand.BROWSE_CLIENT, username, password, commandOptions);
Map<Integer, String> browsedMessages = (Map<Integer, String>) browserClient.executeCommand();
Map<Integer, String> browsedMessages = (Map<Integer, String>) browserClient.executeCommand(true);
LOGGER.debug("[{}] Browsed {} messages from {} on {}", namespace, browsedMessages.get(-1), fqqn, brokerPod.getMetadata().getName());
return browsedMessages;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/
package io.brokerqe.claire.federation;

import io.amq.broker.v1beta1.ActiveMQArtemis;
import io.amq.broker.v1beta1.ActiveMQArtemisBuilder;
import io.brokerqe.claire.ArtemisConstants;
import io.brokerqe.claire.ArtemisVersion;
Expand All @@ -15,6 +14,8 @@
import io.brokerqe.claire.clients.MessagingClient;
import io.brokerqe.claire.junit.TestValidSince;
import io.fabric8.kubernetes.api.model.Pod;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
Expand All @@ -32,6 +33,27 @@
@TestValidSince(ArtemisVersion.VERSION_2_33)
public class MirroringUnsecuredTests extends MirroringTests {

List<Pod> prodBrokerPods;
List<Pod> drBrokerPods;
Pod prodBrokerPod;
Pod drBrokerPod;
private int totalReceivedA = 0;
private int totalReceivedB = 0;
private int totalSentA = 0;
private int totalSentB = 0;

@BeforeEach
void resetCounters() {
totalReceivedA = 0;
totalReceivedB = 0;
totalSentA = 0;
totalSentB = 0;
prodBrokerPods = null;
drBrokerPods = null;
prodBrokerPod = null;
drBrokerPod = null;
}

void setupDeployment(int size) {
getClient().createSecretEncodedData(prodNamespace, LOGGER_SECRET_NAME, Map.of(ArtemisConstants.LOGGING_PROPERTIES_CONFIG_KEY, TestUtils.getFileContentAsBase64(DEBUG_LOG_FILE)), true);
getClient().createSecretEncodedData(drNamespace, LOGGER_SECRET_NAME, Map.of(ArtemisConstants.LOGGING_PROPERTIES_CONFIG_KEY, TestUtils.getFileContentAsBase64(DEBUG_LOG_FILE)), true);
Expand All @@ -58,15 +80,14 @@ void setupDeployment(int size) {
.editOrNewSpec()
.withNewDeploymentPlan()
.withSize(size)
.withClustered(true)
.withClustered(false) // federation + mirring does not need clustered
.withPersistenceEnabled(true)
.withMessageMigration(true)
.withManagementRBACEnabled(true)
.withRequireLogin(true)
.withJournalType("aio")
.withEnableMetricsPlugin(true)
.withJolokiaAgentEnabled(true)
.withClustered(true)
.editOrNewExtraMounts()
.withSecrets(LOGGER_SECRET_NAME)
.endExtraMounts()
Expand Down Expand Up @@ -137,15 +158,14 @@ void setupDeployment(int size) {
.editOrNewSpec()
.withNewDeploymentPlan()
.withSize(size)
.withClustered(true)
.withClustered(false) // federation + mirroring does not need clustered
.withPersistenceEnabled(true)
.withMessageMigration(true)
.withManagementRBACEnabled(true)
.withRequireLogin(true)
.withJournalType("aio")
.withEnableMetricsPlugin(true)
.withJolokiaAgentEnabled(true)
.withClustered(true)
.editOrNewExtraMounts()
.withSecrets(LOGGER_SECRET_NAME)
.endExtraMounts()
Expand Down Expand Up @@ -234,8 +254,9 @@ void simpleMirroringTest() {
prodBroker = ResourceManager.addToBrokerProperties(prodBroker, deployAddresses, false);

// make sure given addresses are present on DR broker
ActiveMQArtemis brk = ResourceManager.getArtemisClient().inNamespace(prodNamespace).resource(prodBroker).get();
brk.getStatus().getConditions().contains(ArtemisConstants.CONDITION_TYPE_BROKER_PROPERTIES_APPLIED);
// TODO: make use of CONDITION_TYPE_BROKER_PROPERTIES_APPLIED?
// ActiveMQArtemis brk = ResourceManager.getArtemisClient().inNamespace(prodNamespace).resource(prodBroker).get();
// assertTrue(brk.getStatus().getConditions().contains(ArtemisConstants.CONDITION_TYPE_BROKER_PROPERTIES_APPLIED));
checkMessageCount(prodNamespace, prodBrokerPod, "my-deletion1", 0, ADMIN, ADMIN_PASS);
checkMessageCount(drNamespace, drBrokerPod, "my-deletion1", 0, ADMIN, ADMIN_PASS);
checkMessageCount(drNamespace, drBrokerPod, "my-deletion2", 0, ADMIN, ADMIN_PASS);
Expand Down Expand Up @@ -338,6 +359,7 @@ void addressFilteringTest() {
}

@Test
@Disabled("ENTMQBR-9474")
void scaleUpDownTest() {
setupDeployment(1);
int scaleUpSize = 4;
Expand All @@ -347,73 +369,77 @@ void scaleUpDownTest() {
drBroker = doArtemisScale(drNamespace, drBroker, 1, scaleUpSize);

// Send few messages & do checks
int initialCountA = 2; // 200
int initialCountB = 1; // 50
int initialCountA = 200;
int initialCountB = 50;

int scaleupCountA = 150;
int scaleupCountB = 40;

List<Pod> prodBrokerPods = getClient().listPodsByPrefixName(prodNamespace, prodBroker.getMetadata().getName());
List<Pod> drBrokerPods = getClient().listPodsByPrefixName(drNamespace, drBroker.getMetadata().getName());
prodBrokerPods = getClient().listPodsByPrefixName(prodNamespace, prodBroker.getMetadata().getName());
drBrokerPods = getClient().listPodsByPrefixName(drNamespace, drBroker.getMetadata().getName());

// Send messages
int brokerCount = getClient().listPodsByPrefixName(prodNamespace, prodBroker.getMetadata().getName()).size();
for (int i = 0; i < brokerCount; i++) {
Pod prodBrokerPodI = prodBrokerPods.get(i);
Pod drBrokerPodI = drBrokerPods.get(i);

LOGGER.info("[{}] Send {} messages to {}", prodNamespace, initialCountA, prodBrokerPodI.getMetadata().getName());
LOGGER.info("[{}] Send {} messages to {}/{}", prodNamespace, initialCountA, prodBrokerPodI.getMetadata().getName(), addressA);
MessagingClient prodClientA = ResourceManager.createMessagingClient(ClientType.BUNDLED_CORE, prodBrokerPodI, allDefaultPort, addressA, initialCountA, ADMIN, ADMIN_PASS);
int sent0 = prodClientA.sendMessages();
assertThat("Sent different amount of messages than expected", sent0, equalTo(initialCountA));
int sent0A = prodClientA.sendMessages();
assertThat("Sent different amount of messages than expected", sent0A, equalTo(initialCountA));
totalSentA += sent0A;

LOGGER.info("[{}] Send {} messages to {}", prodNamespace, initialCountB, prodBrokerPodI.getMetadata().getName());
LOGGER.info("[{}] Send {} messages to {}/{}", prodNamespace, initialCountB, prodBrokerPodI.getMetadata().getName(), addressB);
MessagingClient prodClientB = ResourceManager.createMessagingClient(ClientType.BUNDLED_CORE, prodBrokerPodI, allDefaultPort, addressB, initialCountB, ADMIN, ADMIN_PASS);
int sent0B = prodClientB.sendMessages();
assertThat("Sent different amount of messages than expected", sent0B, equalTo(initialCountB));
totalSentB += sent0B;
}
Pod prodBrokerPod = prodBrokerPods.get(0);
Pod drBrokerPod = drBrokerPods.get(0);

checkClusteredMessageCount(prodBroker, drBroker, addressA, initialCountA * scaleUpSize, ADMIN, ADMIN_PASS);
checkClusteredMessageCount(prodBroker, drBroker, addressB, initialCountB * scaleUpSize, ADMIN, ADMIN_PASS);
//
// // Receive messages
// LOGGER.info("[{}] Receive {} messages from {}", drNamespace, initialCountA, prodBrokerPod.getMetadata().getName());
// MessagingClient prodClientA1 = ResourceManager.createMessagingClient(ClientType.BUNDLED_CORE, prodBrokerPod, allDefaultPort, addressA, initialCountA - scaleupCountA, ADMIN, ADMIN_PASS);
// int receivedA1 = prodClientA1.receiveMessages();
// assertThat("Received different amount of messages than expected", receivedA1, equalTo(initialCountA - scaleupCountA));
//// assertThat("Sent & received different amount of messages than expected", sent0, equalTo(received1));
//
// LOGGER.info("[{}] Receive {} messages from {}", drNamespace, initialCountA, prodBrokerPod.getMetadata().getName());
// MessagingClient prodClientB1 = ResourceManager.createMessagingClient(ClientType.BUNDLED_CORE, prodBrokerPod, allDefaultPort, addressB, initialCountB - scaleupCountB, ADMIN, ADMIN_PASS);
// int receivedB1 = prodClientB1.receiveMessages();
// assertThat("Received different amount of messages than expected", receivedB1, equalTo(initialCountB - scaleupCountB));
//
// getQueueStats(prodNamespace, prodBrokerPod, null, addressABPrefix, ADMIN, ADMIN_PASS, true);
// getQueueStats(drNamespace, drBrokerPod, null, addressABPrefix, ADMIN, ADMIN_PASS, true);
// checkClusteredMessageCount(prodBroker, drBroker, addressA, initialCountA * scaleUpSize - 50, ADMIN, ADMIN_PASS);
// checkClusteredMessageCount(prodBroker, drBroker, addressB, initialCountB * scaleUpSize - 10, ADMIN, ADMIN_PASS);

// TODO scaledown does not work properly? // Scale down to 2
getQueueStats(prodNamespace, prodBrokerPod, null, addressABPrefix, ADMIN, ADMIN_PASS, true);
getQueueStats(drNamespace, drBrokerPod, null, addressABPrefix, ADMIN, ADMIN_PASS, true);
updateCheckDeployment(initialCountA * scaleUpSize, initialCountB * scaleUpSize);
receiveMessagesAB(prodBrokerPod, initialCountA - scaleupCountA, initialCountB - scaleupCountB);

// Scale down to 2
prodBroker = doArtemisScale(prodNamespace, prodBroker, scaleUpSize, scaleDownSize, true);
drBroker = doArtemisScale(drNamespace, drBroker, scaleUpSize, scaleDownSize, true);
// BUG: fails expected 750, got 400 messages. Message migration is not working with clustered -> false
updateCheckDeployment(initialCountA * scaleUpSize - 50, initialCountB * scaleUpSize - 10);
receiveMessagesAB(prodBrokerPod, 100, 30);

// Refresh variables
prodBrokerPods = getClient().listPodsByPrefixName(prodNamespace, prodBroker.getMetadata().getName());
drBrokerPods = getClient().listPodsByPrefixName(drNamespace, drBroker.getMetadata().getName());
prodBrokerPod = prodBrokerPods.get(0);
drBrokerPod = drBrokerPods.get(0);
// Scale down to 1
prodBroker = doArtemisScale(prodNamespace, prodBroker, scaleDownSize, 1);
drBroker = doArtemisScale(drNamespace, drBroker, scaleDownSize, 1);
updateCheckDeployment(initialCountA * scaleUpSize, initialCountB * scaleUpSize);
receiveMessagesAB(prodBrokerPod, 50, 10);

getQueueStats(prodNamespace, prodBrokerPod, null, addressABPrefix, ADMIN, ADMIN_PASS, true);
getQueueStats(drNamespace, drBrokerPod, null, addressABPrefix, ADMIN, ADMIN_PASS, true);
// waitForScaleDownDrainer(testNamespace, operator.getOperatorName(), brokerName, Constants.DURATION_2_MINUTES, initialSize, scaledDownSize);
// Send few messages & do checks
// checkClusteredMessageCount(prodBroker, drBroker, addressA, initialCountA * scaleUpSize, ADMIN, ADMIN_PASS);
// checkClusteredMessageCount(prodBroker, drBroker, addressB, initialCountB * scaleUpSize, ADMIN, ADMIN_PASS);
assertThat("Total sentA != totalReceivedA!", totalSentA, equalTo(totalReceivedA));
assertThat("Total sentB != totalReceivedB!", totalSentB, equalTo(totalReceivedB));

teardownDeployment(false);
}

private void updateCheckDeployment(int expectedA, int expectedB) {
prodBrokerPods = getClient().listPodsByPrefixName(prodNamespace, prodBroker.getMetadata().getName());
drBrokerPods = getClient().listPodsByPrefixName(drNamespace, drBroker.getMetadata().getName());
prodBrokerPod = prodBrokerPods.get(prodBrokerPods.size() - 1);
drBrokerPod = drBrokerPods.get(drBrokerPods.size() - 1);
Map<String, Map<String, String>> statsProd = getQueueStats(prodNamespace, prodBrokerPod, null, addressABPrefix, ADMIN, ADMIN_PASS, true);
Map<String, Map<String, String>> statsDr = getQueueStats(drNamespace, drBrokerPod, null, addressABPrefix, ADMIN, ADMIN_PASS, true);
checkClusteredMessageCount(prodBroker, drBroker, addressA, expectedA, ADMIN, ADMIN_PASS);
checkClusteredMessageCount(prodBroker, drBroker, addressB, expectedB, ADMIN, ADMIN_PASS);
}

private void receiveMessagesAB(Pod prodBrokerPod, int receiveA, int receiveB) {
LOGGER.info("[{}] Receive {} messages from {}/{}", drNamespace, receiveA, prodBrokerPod.getMetadata().getName(), addressA);
MessagingClient prodClientA1 = ResourceManager.createMessagingClient(ClientType.BUNDLED_CORE, prodBrokerPod, allDefaultPort, addressA, receiveA, ADMIN, ADMIN_PASS);
int receivedA1 = prodClientA1.receiveMessages();
assertThat("Received different amount of messages than expected", receivedA1, equalTo(50));
totalReceivedA += receivedA1;

LOGGER.info("[{}] Receive {} messages from {}/{}", drNamespace, receiveB, prodBrokerPod.getMetadata().getName(), addressB);
MessagingClient prodClientB1 = ResourceManager.createMessagingClient(ClientType.BUNDLED_CORE, prodBrokerPod, allDefaultPort, addressB, receiveB, ADMIN, ADMIN_PASS);
int receivedB1 = prodClientB1.receiveMessages();
assertThat("Received different amount of messages than expected", receivedB1, equalTo(10));
totalReceivedB += receivedB1;
}
}

0 comments on commit ae07704

Please sign in to comment.