Skip to content

Commit

Permalink
make tests KRaft compatible
Browse files Browse the repository at this point in the history
Signed-off-by: see-quick <[email protected]>
  • Loading branch information
see-quick committed Oct 11, 2024
1 parent 089052a commit d9b2a03
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import io.strimzi.systemtest.cli.KafkaCmdClient;
import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClients;
import io.strimzi.systemtest.resources.NamespaceManager;
import io.strimzi.systemtest.resources.NodePoolsConverter;
import io.strimzi.systemtest.resources.ResourceManager;
import io.strimzi.systemtest.resources.crd.KafkaNodePoolResource;
import io.strimzi.systemtest.resources.crd.KafkaResource;
import io.strimzi.systemtest.resources.crd.KafkaTopicResource;
import io.strimzi.systemtest.resources.operator.SetupClusterOperator;
import io.strimzi.systemtest.storage.TestStorage;
import io.strimzi.systemtest.templates.crd.KafkaNodePoolTemplates;
import io.strimzi.systemtest.templates.crd.KafkaTemplates;
import io.strimzi.systemtest.templates.crd.KafkaTopicTemplates;
import io.strimzi.systemtest.utils.ClientUtils;
Expand Down Expand Up @@ -79,18 +82,33 @@ void testTopicAvailable() {
KafkaTopicResource.kafkaTopicClient().inNamespace(testStorage.getNamespaceName()).resource(kafkaTopic).create();
}

resourceManager.createResourceWithWait(KafkaTemplates.kafkaPersistentWithoutNodePools(testStorage.getNamespaceName(), testStorage.getClusterName(), 3, 3)
resourceManager.createResourceWithWait(
NodePoolsConverter.convertNodePoolsIfNeeded(
KafkaNodePoolTemplates.brokerPoolPersistentStorage(testStorage.getNamespaceName(), KafkaNodePoolResource.getBrokerPoolName(testStorage.getClusterName()), testStorage.getClusterName(), 3)
.editSpec()
.withNewPersistentClaimStorage()
.withSize("1Gi")
.withStorageClass(storageClassName)
.endPersistentClaimStorage()
.endSpec()
.build(),
KafkaNodePoolTemplates.controllerPoolPersistentStorage(testStorage.getNamespaceName(), KafkaNodePoolResource.getControllerPoolName(testStorage.getClusterName()), testStorage.getClusterName(), 3)
.editSpec()
.withNewPersistentClaimStorage()
.withSize("1Gi")
.withStorageClass(storageClassName)
.endPersistentClaimStorage()
.endSpec()
.build())
);

resourceManager.createResourceWithWait(KafkaTemplates.kafkaPersistentKRaft(testStorage.getNamespaceName(), testStorage.getClusterName(), 3)
.editSpec()
.editKafka()
.withNewPersistentClaimStorage()
.withSize("1Gi")
.endPersistentClaimStorage()
.endKafka()
.editZookeeper()
.withNewPersistentClaimStorage()
.withSize("1Gi")
.endPersistentClaimStorage()
.endZookeeper()
.endSpec()
.build());

Expand Down Expand Up @@ -138,8 +156,14 @@ void testTopicNotAvailable() {
LOGGER.info("Recreating Cluster Operator");
recreateClusterOperator(testStorage.getNamespaceName());

resourceManager.createResourceWithWait(
NodePoolsConverter.convertNodePoolsIfNeeded(
KafkaNodePoolTemplates.brokerPoolPersistentStorage(testStorage.getNamespaceName(), KafkaNodePoolResource.getBrokerPoolName(testStorage.getClusterName()), testStorage.getClusterName(), 3).build(),
KafkaNodePoolTemplates.controllerPoolPersistentStorage(testStorage.getNamespaceName(), KafkaNodePoolResource.getControllerPoolName(testStorage.getClusterName()), testStorage.getClusterName(), 3).build())
);

LOGGER.info("Recreating Kafka cluster without Topic Operator");
resourceManager.createResourceWithWait(KafkaTemplates.kafkaPersistentWithoutNodePools(testStorage.getNamespaceName(), testStorage.getClusterName(), 3, 3)
resourceManager.createResourceWithWait(KafkaTemplates.kafkaPersistentKRaft(testStorage.getNamespaceName(), testStorage.getClusterName(), 3)
.editSpec()
.withNewEntityOperator()
.endEntityOperator()
Expand Down Expand Up @@ -176,20 +200,34 @@ private void prepareEnvironmentForRecovery(TestStorage testStorage) {
.createInstallation()
.runInstallation();

resourceManager.createResourceWithWait(KafkaTemplates.kafkaPersistentWithoutNodePools(testStorage.getNamespaceName(), testStorage.getClusterName(), 3, 3)
resourceManager.createResourceWithWait(
NodePoolsConverter.convertNodePoolsIfNeeded(
KafkaNodePoolTemplates.brokerPoolPersistentStorage(testStorage.getNamespaceName(), KafkaNodePoolResource.getBrokerPoolName(testStorage.getClusterName()), testStorage.getClusterName(), 3)
.editSpec()
.withNewPersistentClaimStorage()
.withSize("1Gi")
.withStorageClass(storageClassName)
.endPersistentClaimStorage()
.endSpec()
.build(),
KafkaNodePoolTemplates.controllerPoolPersistentStorage(testStorage.getNamespaceName(), KafkaNodePoolResource.getControllerPoolName(testStorage.getClusterName()), testStorage.getClusterName(), 3)
.editSpec()
.withNewPersistentClaimStorage()
.withSize("1Gi")
.withStorageClass(storageClassName)
.endPersistentClaimStorage()
.endSpec()
.build())
);

resourceManager.createResourceWithWait(KafkaTemplates.kafkaPersistentKRaft(testStorage.getNamespaceName(), testStorage.getClusterName(), 3)
.editSpec()
.editKafka()
.withNewPersistentClaimStorage()
.withSize("1Gi")
.withStorageClass(storageClassName)
.endPersistentClaimStorage()
.endKafka()
.editZookeeper()
.withNewPersistentClaimStorage()
.withSize("1Gi")
.withStorageClass(storageClassName)
.endPersistentClaimStorage()
.endZookeeper()
.endSpec()
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
import io.strimzi.systemtest.TestConstants;
import io.strimzi.systemtest.annotations.IsolatedTest;
import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClients;
import io.strimzi.systemtest.resources.NodePoolsConverter;
import io.strimzi.systemtest.resources.ResourceManager;
import io.strimzi.systemtest.resources.crd.KafkaNodePoolResource;
import io.strimzi.systemtest.resources.crd.KafkaResource;
import io.strimzi.systemtest.resources.crd.StrimziPodSetResource;
import io.strimzi.systemtest.storage.TestStorage;
import io.strimzi.systemtest.templates.crd.KafkaBridgeTemplates;
import io.strimzi.systemtest.templates.crd.KafkaNodePoolTemplates;
import io.strimzi.systemtest.templates.crd.KafkaTemplates;
import io.strimzi.systemtest.utils.ClientUtils;
import io.strimzi.systemtest.utils.RollingUpdateUtils;
Expand All @@ -44,14 +48,13 @@ class RecoveryST extends AbstractST {

static String sharedClusterName;
private static final int KAFKA_REPLICAS = 3;
private static final int ZOOKEEPER_REPLICAS = KAFKA_REPLICAS;

private static final Logger LOGGER = LogManager.getLogger(RecoveryST.class);

@IsolatedTest("We need for each test case its own Cluster Operator")
void testRecoveryFromKafkaStrimziPodSetDeletion() {
// kafka cluster already deployed
String kafkaName = KafkaResources.kafkaComponentName(sharedClusterName);
String kafkaName = StrimziPodSetResource.getBrokerComponentName(sharedClusterName);
String kafkaUid = StrimziPodSetUtils.getStrimziPodSetUID(Environment.TEST_SUITE_NAMESPACE, kafkaName);

kubeClient().getClient().apps().deployments().inNamespace(clusterOperator.getDeploymentNamespace()).withName(clusterOperator.getClusterOperatorName()).withTimeoutInMillis(600_000L).scale(0);
Expand All @@ -65,23 +68,6 @@ void testRecoveryFromKafkaStrimziPodSetDeletion() {
StrimziPodSetUtils.waitForAllStrimziPodSetAndPodsReady(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, kafkaName, KAFKA_REPLICAS);
}

@IsolatedTest("We need for each test case its own Cluster Operator")
void testRecoveryFromZookeeperStrimziPodSetDeletion() {
// kafka cluster already deployed
String zookeeperName = KafkaResources.zookeeperComponentName(sharedClusterName);
String zookeeperUid = StrimziPodSetUtils.getStrimziPodSetUID(Environment.TEST_SUITE_NAMESPACE, zookeeperName);

kubeClient().getClient().apps().deployments().inNamespace(clusterOperator.getDeploymentNamespace()).withName(clusterOperator.getClusterOperatorName()).withTimeoutInMillis(600_000L).scale(0);
StrimziPodSetUtils.deleteStrimziPodSet(Environment.TEST_SUITE_NAMESPACE, zookeeperName);

PodUtils.waitForPodsWithPrefixDeletion(zookeeperName);
kubeClient().getClient().apps().deployments().inNamespace(clusterOperator.getDeploymentNamespace()).withName(clusterOperator.getClusterOperatorName()).withTimeoutInMillis(600_000L).scale(1);

LOGGER.info("Waiting for recovery {}", zookeeperName);
StrimziPodSetUtils.waitForStrimziPodSetRecovery(Environment.TEST_SUITE_NAMESPACE, zookeeperName, zookeeperUid);
StrimziPodSetUtils.waitForAllStrimziPodSetAndPodsReady(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, zookeeperName, ZOOKEEPER_REPLICAS);
}

@IsolatedTest("We need for each test case its own Cluster Operator")
void testRecoveryFromKafkaServiceDeletion() {
final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());
Expand All @@ -99,24 +85,6 @@ void testRecoveryFromKafkaServiceDeletion() {
verifyStabilityBySendingAndReceivingMessages(testStorage);
}

@IsolatedTest("We need for each test case its own Cluster Operator")
void testRecoveryFromZookeeperServiceDeletion() {
final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());

// kafka cluster already deployed
LOGGER.info("Running deleteKafkaService with cluster {}", sharedClusterName);

String zookeeperServiceName = KafkaResources.zookeeperServiceName(sharedClusterName);
String zookeeperServiceUid = kubeClient().getServiceUid(zookeeperServiceName);

kubeClient().deleteService(zookeeperServiceName);

LOGGER.info("Waiting for creation {}", zookeeperServiceName);
ServiceUtils.waitForServiceRecovery(Environment.TEST_SUITE_NAMESPACE, zookeeperServiceName, zookeeperServiceUid);

verifyStabilityBySendingAndReceivingMessages(testStorage);
}

@IsolatedTest("We need for each test case its own Cluster Operator")
void testRecoveryFromKafkaHeadlessServiceDeletion() {
final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());
Expand All @@ -135,24 +103,6 @@ void testRecoveryFromKafkaHeadlessServiceDeletion() {
verifyStabilityBySendingAndReceivingMessages(testStorage);
}

@IsolatedTest("We need for each test case its own Cluster Operator")
void testRecoveryFromZookeeperHeadlessServiceDeletion() {
final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());

// kafka cluster already deployed
LOGGER.info("Running deleteKafkaHeadlessService with cluster {}", sharedClusterName);

String zookeeperHeadlessServiceName = KafkaResources.zookeeperHeadlessServiceName(sharedClusterName);
String zookeeperHeadlessServiceUid = kubeClient().getServiceUid(zookeeperHeadlessServiceName);

kubeClient().deleteService(zookeeperHeadlessServiceName);

LOGGER.info("Waiting for creation {}", zookeeperHeadlessServiceName);
ServiceUtils.waitForServiceRecovery(Environment.TEST_SUITE_NAMESPACE, zookeeperHeadlessServiceName, zookeeperHeadlessServiceUid);

verifyStabilityBySendingAndReceivingMessages(testStorage);
}

/**
* We are deploying Kafka cluster with an impossible memory request, all 3 Kafka pods are `Pending`. After we
* check that Kafka pods are stable in `Pending` phase (for one minute), we change the memory request so that the pods are again schedulable
Expand All @@ -161,24 +111,24 @@ void testRecoveryFromZookeeperHeadlessServiceDeletion() {
*/
@IsolatedTest("We need for each test case its own Cluster Operator")
void testRecoveryFromImpossibleMemoryRequest() {
final String kafkaSsName = KafkaResources.kafkaComponentName(sharedClusterName);
final LabelSelector brokerSelector = KafkaResource.getLabelSelectorForAllKafkaPods(sharedClusterName);
final String kafkaSsName = KafkaResource.getStrimziPodSetName(sharedClusterName, KafkaNodePoolResource.getBrokerPoolName(sharedClusterName));
final LabelSelector brokerSelector = KafkaResource.getLabelSelector(sharedClusterName, kafkaSsName);
final Map<String, Quantity> requests = new HashMap<>(1);

requests.put("memory", new Quantity("465458732Gi"));
final ResourceRequirements resourceReq = new ResourceRequirementsBuilder()
.withRequests(requests)
.build();

KafkaResource.replaceKafkaResourceInSpecificNamespace(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, k -> k.getSpec().getKafka().setResources(resourceReq));
KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(Environment.TEST_SUITE_NAMESPACE, KafkaNodePoolResource.getBrokerPoolName(sharedClusterName), knp -> knp.getSpec().setResources(resourceReq));

PodUtils.waitForPendingPod(Environment.TEST_SUITE_NAMESPACE, kafkaSsName);
PodUtils.verifyThatPendingPodsAreStable(Environment.TEST_SUITE_NAMESPACE, kafkaSsName);

requests.put("memory", new Quantity("512Mi"));
resourceReq.setRequests(requests);

KafkaResource.replaceKafkaResourceInSpecificNamespace(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, k -> k.getSpec().getKafka().setResources(resourceReq));
KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(Environment.TEST_SUITE_NAMESPACE, KafkaNodePoolResource.getBrokerPoolName(sharedClusterName), knp -> knp.getSpec().setResources(resourceReq));

RollingUpdateUtils.waitForComponentAndPodsReady(Environment.TEST_SUITE_NAMESPACE, brokerSelector, KAFKA_REPLICAS);
KafkaUtils.waitForKafkaReady(Environment.TEST_SUITE_NAMESPACE, sharedClusterName);
Expand All @@ -191,25 +141,16 @@ private void verifyStabilityBySendingAndReceivingMessages(TestStorage testStorag
}

@IsolatedTest
void testRecoveryFromKafkaAndZookeeperPodDeletion() {
final String kafkaStrimziPodSet = KafkaResources.kafkaComponentName(sharedClusterName);
final String zkName = KafkaResources.zookeeperComponentName(sharedClusterName);
void testRecoveryFromKafkaPodDeletion() {
final String kafkaSPsName = KafkaResource.getStrimziPodSetName(sharedClusterName, KafkaNodePoolResource.getBrokerPoolName(sharedClusterName));

final LabelSelector brokerSelector = KafkaResource.getLabelSelectorForAllKafkaPods(sharedClusterName);
final LabelSelector controllerSelector = KafkaResource.getLabelSelector(sharedClusterName, zkName);

LOGGER.info("Deleting most of the Kafka pods");
LOGGER.info("Deleting most of the Kafka broker pods");
List<Pod> kafkaPodList = kubeClient().listPods(brokerSelector);
kafkaPodList.subList(0, kafkaPodList.size() - 1).forEach(pod -> kubeClient().deletePod(pod));

if (!Environment.isKRaftModeEnabled()) {
LOGGER.info("Deleting most of the Zookeeper pods");
List<Pod> zkPodList = kubeClient().listPods(controllerSelector);
zkPodList.subList(0, zkPodList.size() - 1).forEach(pod -> kubeClient().deletePod(pod));
StrimziPodSetUtils.waitForAllStrimziPodSetAndPodsReady(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, zkName, ZOOKEEPER_REPLICAS);
}

StrimziPodSetUtils.waitForAllStrimziPodSetAndPodsReady(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, kafkaStrimziPodSet, KAFKA_REPLICAS);
StrimziPodSetUtils.waitForAllStrimziPodSetAndPodsReady(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, kafkaSPsName, KAFKA_REPLICAS);
KafkaUtils.waitForKafkaReady(Environment.TEST_SUITE_NAMESPACE, sharedClusterName);
}

Expand All @@ -223,7 +164,13 @@ void setup() {

sharedClusterName = generateRandomNameOfKafka("recovery-cluster");

resourceManager.createResourceWithWait(KafkaTemplates.kafkaPersistentWithoutNodePools(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, KAFKA_REPLICAS, 3).build());
resourceManager.createResourceWithWait(
NodePoolsConverter.convertNodePoolsIfNeeded(
KafkaNodePoolTemplates.brokerPoolPersistentStorage(Environment.TEST_SUITE_NAMESPACE, KafkaNodePoolResource.getBrokerPoolName(sharedClusterName), sharedClusterName, 3).build(),
KafkaNodePoolTemplates.controllerPoolPersistentStorage(Environment.TEST_SUITE_NAMESPACE, KafkaNodePoolResource.getControllerPoolName(sharedClusterName), sharedClusterName, 3).build()
)
);
resourceManager.createResourceWithWait(KafkaTemplates.kafkaPersistentKRaft(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, KAFKA_REPLICAS).build());
resourceManager.createResourceWithWait(KafkaBridgeTemplates.kafkaBridge(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, KafkaResources.plainBootstrapAddress(sharedClusterName), 1).build());
}
}

0 comments on commit d9b2a03

Please sign in to comment.