Skip to content

Commit

Permalink
[system test] make Recovery tests run only on ZK mode only
Browse files Browse the repository at this point in the history
Signed-off-by: see-quick <[email protected]>
  • Loading branch information
see-quick committed Sep 24, 2024
1 parent dd48ff1 commit 010b8d8
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,27 @@
*/
package io.strimzi.systemtest.operators;

import io.fabric8.kubernetes.api.model.LabelSelectorBuilder;
import io.fabric8.kubernetes.api.model.PersistentVolume;
import io.fabric8.kubernetes.api.model.PersistentVolumeClaim;
import io.fabric8.kubernetes.api.model.storage.StorageClass;
import io.fabric8.kubernetes.api.model.storage.StorageClassBuilder;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaResources;
import io.strimzi.api.kafka.model.kafka.entityoperator.EntityOperatorSpecBuilder;
import io.strimzi.api.kafka.model.topic.KafkaTopic;
import io.strimzi.operator.common.model.Labels;
import io.strimzi.systemtest.AbstractST;
import io.strimzi.systemtest.TestConstants;
import io.strimzi.systemtest.annotations.IsolatedTest;
import io.strimzi.systemtest.cli.KafkaCmdClient;
import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClients;
import io.strimzi.systemtest.resources.NamespaceManager;
import io.strimzi.systemtest.resources.NodePoolsConverter;
import io.strimzi.systemtest.resources.ResourceManager;
import io.strimzi.systemtest.resources.crd.KafkaResource;
import io.strimzi.systemtest.resources.crd.KafkaTopicResource;
import io.strimzi.systemtest.resources.operator.SetupClusterOperator;
import io.strimzi.systemtest.storage.TestStorage;
import io.strimzi.systemtest.templates.crd.KafkaNodePoolTemplates;
import io.strimzi.systemtest.templates.crd.KafkaTemplates;
import io.strimzi.systemtest.templates.crd.KafkaTopicTemplates;
import io.strimzi.systemtest.utils.ClientUtils;
Expand Down Expand Up @@ -78,27 +79,7 @@ void testTopicAvailable() {
KafkaTopicResource.kafkaTopicClient().inNamespace(testStorage.getNamespaceName()).resource(kafkaTopic).create();
}

resourceManager.createResourceWithWait(
NodePoolsConverter.convertNodePoolsIfNeeded(
KafkaNodePoolTemplates.brokerPoolPersistentStorage(testStorage.getNamespaceName(), testStorage.getBrokerPoolName(), testStorage.getClusterName(), 3)
.editSpec()
.withNewPersistentClaimStorage()
.withSize("1Gi")
.withStorageClass(storageClassName)
.endPersistentClaimStorage()
.endSpec()
.build(),
KafkaNodePoolTemplates.controllerPoolPersistentStorage(testStorage.getNamespaceName(), testStorage.getControllerPoolName(), testStorage.getClusterName(), 3)
.editSpec()
.withNewPersistentClaimStorage()
.withSize("1Gi")
.withStorageClass(storageClassName)
.endPersistentClaimStorage()
.endSpec()
.build()
)
);
resourceManager.createResourceWithWait(KafkaTemplates.kafkaPersistent(testStorage.getNamespaceName(), testStorage.getClusterName(), 3, 3)
resourceManager.createResourceWithWait(KafkaTemplates.kafkaPersistentWithoutNodePools(testStorage.getNamespaceName(), testStorage.getClusterName(), 3, 3)
.editSpec()
.editKafka()
.withNewPersistentClaimStorage()
Expand Down Expand Up @@ -138,7 +119,12 @@ void testTopicNotAvailable() {
LOGGER.info("Claim: {} has bounded Volume: {}", pvc.getMetadata().getName(), pv.getMetadata().getName());
}

String kafkaPodName = kubeClient().listPodsByPrefixInName(testStorage.getNamespaceName(), testStorage.getBrokerComponentName()).get(0).getMetadata().getName();
final Map<String, String> kafkaLabels = Map.of(
Labels.STRIMZI_CLUSTER_LABEL, testStorage.getClusterName(),
Labels.STRIMZI_KIND_LABEL, Kafka.RESOURCE_KIND,
Labels.STRIMZI_BROKER_ROLE_LABEL, "true");
final String kafkaPodName = kubeClient().listPods(testStorage.getNamespaceName(), new LabelSelectorBuilder()
.withMatchLabels(kafkaLabels).build()).get(0).getMetadata().getName();

LOGGER.info("Currently present Topics inside Kafka: {}/{} are: {}", testStorage.getNamespaceName(), kafkaPodName,
KafkaCmdClient.listTopicsUsingPodCli(testStorage.getNamespaceName(), kafkaPodName, KafkaResources.plainBootstrapAddress(testStorage.getClusterName())));
Expand All @@ -153,13 +139,7 @@ void testTopicNotAvailable() {
recreateClusterOperator(testStorage.getNamespaceName());

LOGGER.info("Recreating Kafka cluster without Topic Operator");
resourceManager.createResourceWithWait(
NodePoolsConverter.convertNodePoolsIfNeeded(
KafkaNodePoolTemplates.brokerPoolPersistentStorage(testStorage.getNamespaceName(), testStorage.getBrokerPoolName(), testStorage.getClusterName(), 3).build(),
KafkaNodePoolTemplates.controllerPoolPersistentStorage(testStorage.getNamespaceName(), testStorage.getControllerPoolName(), testStorage.getClusterName(), 3).build()
)
);
resourceManager.createResourceWithWait(KafkaTemplates.kafkaPersistent(testStorage.getNamespaceName(), testStorage.getClusterName(), 3, 3)
resourceManager.createResourceWithWait(KafkaTemplates.kafkaPersistentWithoutNodePools(testStorage.getNamespaceName(), testStorage.getClusterName(), 3, 3)
.editSpec()
.withNewEntityOperator()
.endEntityOperator()
Expand Down Expand Up @@ -196,27 +176,7 @@ private void prepareEnvironmentForRecovery(TestStorage testStorage) {
.createInstallation()
.runInstallation();

resourceManager.createResourceWithWait(
NodePoolsConverter.convertNodePoolsIfNeeded(
KafkaNodePoolTemplates.brokerPoolPersistentStorage(testStorage.getNamespaceName(), testStorage.getBrokerPoolName(), testStorage.getClusterName(), 3)
.editSpec()
.withNewPersistentClaimStorage()
.withSize("1Gi")
.withStorageClass(storageClassName)
.endPersistentClaimStorage()
.endSpec()
.build(),
KafkaNodePoolTemplates.controllerPoolPersistentStorage(testStorage.getNamespaceName(), testStorage.getControllerPoolName(), testStorage.getClusterName(), 3)
.editSpec()
.withNewPersistentClaimStorage()
.withSize("1Gi")
.withStorageClass(storageClassName)
.endPersistentClaimStorage()
.endSpec()
.build()
)
);
resourceManager.createResourceWithWait(KafkaTemplates.kafkaPersistent(testStorage.getNamespaceName(), testStorage.getClusterName(), 3, 3)
resourceManager.createResourceWithWait(KafkaTemplates.kafkaPersistentWithoutNodePools(testStorage.getNamespaceName(), testStorage.getClusterName(), 3, 3)
.editSpec()
.editKafka()
.withNewPersistentClaimStorage()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,11 @@
import io.strimzi.systemtest.Environment;
import io.strimzi.systemtest.TestConstants;
import io.strimzi.systemtest.annotations.IsolatedTest;
import io.strimzi.systemtest.annotations.KRaftNotSupported;
import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClients;
import io.strimzi.systemtest.resources.NodePoolsConverter;
import io.strimzi.systemtest.resources.ResourceManager;
import io.strimzi.systemtest.resources.crd.KafkaNodePoolResource;
import io.strimzi.systemtest.resources.crd.KafkaResource;
import io.strimzi.systemtest.resources.crd.StrimziPodSetResource;
import io.strimzi.systemtest.storage.TestStorage;
import io.strimzi.systemtest.templates.crd.KafkaBridgeTemplates;
import io.strimzi.systemtest.templates.crd.KafkaNodePoolTemplates;
import io.strimzi.systemtest.templates.crd.KafkaTemplates;
import io.strimzi.systemtest.utils.ClientUtils;
import io.strimzi.systemtest.utils.RollingUpdateUtils;
Expand Down Expand Up @@ -56,7 +51,7 @@ class RecoveryST extends AbstractST {
@IsolatedTest("We need for each test case its own Cluster Operator")
void testRecoveryFromKafkaStrimziPodSetDeletion() {
// kafka cluster already deployed
String kafkaName = StrimziPodSetResource.getBrokerComponentName(sharedClusterName);
String kafkaName = KafkaResources.kafkaComponentName(sharedClusterName);
String kafkaUid = StrimziPodSetUtils.getStrimziPodSetUID(Environment.TEST_SUITE_NAMESPACE, kafkaName);

kubeClient().getClient().apps().deployments().inNamespace(clusterOperator.getDeploymentNamespace()).withName(clusterOperator.getClusterOperatorName()).withTimeoutInMillis(600_000L).scale(0);
Expand All @@ -71,7 +66,6 @@ void testRecoveryFromKafkaStrimziPodSetDeletion() {
}

@IsolatedTest("We need for each test case its own Cluster Operator")
@KRaftNotSupported("Zookeeper is not supported by KRaft mode and is used in this test class")
void testRecoveryFromZookeeperStrimziPodSetDeletion() {
// kafka cluster already deployed
String zookeeperName = KafkaResources.zookeeperComponentName(sharedClusterName);
Expand Down Expand Up @@ -106,7 +100,6 @@ void testRecoveryFromKafkaServiceDeletion() {
}

@IsolatedTest("We need for each test case its own Cluster Operator")
@KRaftNotSupported("Zookeeper is not supported by KRaft mode and is used in this test class")
void testRecoveryFromZookeeperServiceDeletion() {
final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());

Expand Down Expand Up @@ -143,7 +136,6 @@ void testRecoveryFromKafkaHeadlessServiceDeletion() {
}

@IsolatedTest("We need for each test case its own Cluster Operator")
@KRaftNotSupported("Zookeeper is not supported by KRaft mode and is used in this test class")
void testRecoveryFromZookeeperHeadlessServiceDeletion() {
final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());

Expand All @@ -169,34 +161,24 @@ void testRecoveryFromZookeeperHeadlessServiceDeletion() {
*/
@IsolatedTest("We need for each test case its own Cluster Operator")
void testRecoveryFromImpossibleMemoryRequest() {
final String kafkaSsName = KafkaResource.getStrimziPodSetName(sharedClusterName, KafkaNodePoolResource.getBrokerPoolName(sharedClusterName));
final LabelSelector brokerSelector = KafkaResource.getLabelSelector(sharedClusterName, kafkaSsName);
final String kafkaSsName = KafkaResources.kafkaComponentName(sharedClusterName);
final LabelSelector brokerSelector = KafkaResource.getLabelSelectorForAllKafkaPods(sharedClusterName);
final Map<String, Quantity> requests = new HashMap<>(1);

requests.put("memory", new Quantity("465458732Gi"));
final ResourceRequirements resourceReq = new ResourceRequirementsBuilder()
.withRequests(requests)
.build();

if (Environment.isKafkaNodePoolsEnabled()) {
KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(Environment.TEST_SUITE_NAMESPACE, KafkaNodePoolResource.getBrokerPoolName(sharedClusterName),
knp -> knp.getSpec().setResources(resourceReq));
} else {
KafkaResource.replaceKafkaResourceInSpecificNamespace(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, k -> k.getSpec().getKafka().setResources(resourceReq));
}
KafkaResource.replaceKafkaResourceInSpecificNamespace(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, k -> k.getSpec().getKafka().setResources(resourceReq));

PodUtils.waitForPendingPod(Environment.TEST_SUITE_NAMESPACE, kafkaSsName);
PodUtils.verifyThatPendingPodsAreStable(Environment.TEST_SUITE_NAMESPACE, kafkaSsName);

requests.put("memory", new Quantity("512Mi"));
resourceReq.setRequests(requests);

if (Environment.isKafkaNodePoolsEnabled()) {
KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(Environment.TEST_SUITE_NAMESPACE, KafkaNodePoolResource.getBrokerPoolName(sharedClusterName),
knp -> knp.getSpec().setResources(resourceReq));
} else {
KafkaResource.replaceKafkaResourceInSpecificNamespace(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, k -> k.getSpec().getKafka().setResources(resourceReq));
}
KafkaResource.replaceKafkaResourceInSpecificNamespace(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, k -> k.getSpec().getKafka().setResources(resourceReq));

RollingUpdateUtils.waitForComponentAndPodsReady(Environment.TEST_SUITE_NAMESPACE, brokerSelector, KAFKA_REPLICAS);
KafkaUtils.waitForKafkaReady(Environment.TEST_SUITE_NAMESPACE, sharedClusterName);
Expand All @@ -210,10 +192,10 @@ private void verifyStabilityBySendingAndReceivingMessages(TestStorage testStorag

@IsolatedTest
void testRecoveryFromKafkaAndZookeeperPodDeletion() {
final String kafkaStrimziPodSet = StrimziPodSetResource.getBrokerComponentName(sharedClusterName);
final String kafkaStrimziPodSet = KafkaResources.kafkaComponentName(sharedClusterName);
final String zkName = KafkaResources.zookeeperComponentName(sharedClusterName);

final LabelSelector brokerSelector = KafkaResource.getLabelSelector(sharedClusterName, kafkaStrimziPodSet);
final LabelSelector brokerSelector = KafkaResource.getLabelSelectorForAllKafkaPods(sharedClusterName);
final LabelSelector controllerSelector = KafkaResource.getLabelSelector(sharedClusterName, zkName);

LOGGER.info("Deleting most of the Kafka pods");
Expand Down Expand Up @@ -241,13 +223,7 @@ void setup() {

sharedClusterName = generateRandomNameOfKafka("recovery-cluster");

resourceManager.createResourceWithWait(
NodePoolsConverter.convertNodePoolsIfNeeded(
KafkaNodePoolTemplates.brokerPoolPersistentStorage(Environment.TEST_SUITE_NAMESPACE, KafkaNodePoolResource.getBrokerPoolName(sharedClusterName), sharedClusterName, 3).build(),
KafkaNodePoolTemplates.controllerPoolPersistentStorage(Environment.TEST_SUITE_NAMESPACE, KafkaNodePoolResource.getControllerPoolName(sharedClusterName), sharedClusterName, 3).build()
)
);
resourceManager.createResourceWithWait(KafkaTemplates.kafkaPersistent(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, KAFKA_REPLICAS).build());
resourceManager.createResourceWithWait(KafkaTemplates.kafkaPersistentWithoutNodePools(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, KAFKA_REPLICAS, 3).build());
resourceManager.createResourceWithWait(KafkaBridgeTemplates.kafkaBridge(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, KafkaResources.plainBootstrapAddress(sharedClusterName), 1).build());
}
}

0 comments on commit 010b8d8

Please sign in to comment.