From a824d4a61403b16d174ef8059018068572da4d57 Mon Sep 17 00:00:00 2001 From: Katherine Stanley <11195226+katheris@users.noreply.github.com> Date: Wed, 30 Oct 2024 17:03:04 +0000 Subject: [PATCH] Add tests to CaReconciler to verify trust rolling behaviour Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com> --- .../operator/assembly/CaReconciler.java | 24 +- .../operator/assembly/CaReconcilerTest.java | 1211 ++++++++++++++--- 2 files changed, 1054 insertions(+), 181 deletions(-) diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/CaReconciler.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/CaReconciler.java index 3a6288465ea..0d072614461 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/CaReconciler.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/CaReconciler.java @@ -75,7 +75,7 @@ public class CaReconciler { /* test */ final DeploymentOperator deploymentOperator; private final StrimziPodSetOperator strimziPodSetOperator; private final SecretOperator secretOperator; - private final PodOperator podOperator; + /* test */ final PodOperator podOperator; private final AdminClientProvider adminClientProvider; private final KafkaAgentClientProvider kafkaAgentClientProvider; private final ZookeeperLeaderFinder zookeeperLeaderFinder; @@ -548,8 +548,14 @@ Future rollingUpdateForNewCaKey() { } /* test */ Future rollKafkaBrokers(Set nodes, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) { - return new KafkaRoller( - reconciliation, + return createKafkaRoller(nodes, coTlsPemIdentity).rollingRestart(pod -> { + LOGGER.debugCr(reconciliation, "Rolling Pod {} due to {}", pod.getMetadata().getName(), podRollReasons.getReasons()); + return podRollReasons; + }); + } + + /* test */ KafkaRoller createKafkaRoller(Set nodes, TlsPemIdentity coTlsPemIdentity) { + return new KafkaRoller(reconciliation, vertx, podOperator, 1_000, @@ -563,15 +569,11 @@ Future rollingUpdateForNewCaKey() { null, null, false, - eventPublisher - ).rollingRestart(pod -> { - LOGGER.debugCr(reconciliation, "Rolling Pod {} due to {}", pod.getMetadata().getName(), podRollReasons.getReasons()); - return podRollReasons; - }); + eventPublisher); } // Entity Operator, Kafka Exporter, and Cruise Control are only rolled when the cluster CA cert key is replaced - Future maybeRollDeploymentIfExists(String deploymentName, RestartReasons podRollReasons) { + private Future maybeRollDeploymentIfExists(String deploymentName, RestartReasons podRollReasons) { if (podRollReasons.contains(RestartReason.CLUSTER_CA_CERT_KEY_REPLACED)) { return rollDeploymentIfExists(deploymentName, RestartReason.CLUSTER_CA_CERT_KEY_REPLACED.getDefaultNote()); } else { @@ -587,7 +589,7 @@ Future maybeRollDeploymentIfExists(String deploymentName, RestartReasons p * * @return Succeeded future if it succeeded, failed otherwise. */ - Future rollDeploymentIfExists(String deploymentName, String reason) { + /* test */ Future rollDeploymentIfExists(String deploymentName, String reason) { return deploymentOperator.getAsync(reconciliation.namespace(), deploymentName) .compose(dep -> { if (dep != null) { @@ -603,7 +605,7 @@ Future rollDeploymentIfExists(String deploymentName, String reason) { * Remove older cluster CA certificates if present in the corresponding Secret after a renewal by replacing the * corresponding CA private key. */ - Future maybeRemoveOldClusterCaCertificates() { + /* test */ Future maybeRemoveOldClusterCaCertificates() { // if the new CA certificate is used to sign all server certificates if (isClusterCaFullyUsed) { LOGGER.debugCr(reconciliation, "Maybe there are old cluster CA certificates to remove"); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CaReconcilerTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CaReconcilerTest.java index 7331b7ddda2..3c2eaac1c08 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CaReconcilerTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/CaReconcilerTest.java @@ -4,13 +4,13 @@ */ package io.strimzi.operator.cluster.operator.assembly; -import io.fabric8.kubernetes.api.model.OwnerReference; -import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.Secret; +import io.fabric8.kubernetes.api.model.SecretBuilder; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; +import io.strimzi.api.ResourceAnnotations; import io.strimzi.api.kafka.model.common.CertificateAuthority; import io.strimzi.api.kafka.model.common.CertificateAuthorityBuilder; import io.strimzi.api.kafka.model.common.CertificateExpirationPolicy; @@ -19,6 +19,8 @@ import io.strimzi.api.kafka.model.kafka.KafkaResources; import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder; import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType; +import io.strimzi.api.kafka.model.podset.StrimziPodSet; +import io.strimzi.api.kafka.model.podset.StrimziPodSetBuilder; import io.strimzi.certs.CertAndKey; import io.strimzi.certs.CertManager; import io.strimzi.certs.OpenSslCertManager; @@ -29,8 +31,10 @@ import io.strimzi.operator.cluster.model.AbstractModel; import io.strimzi.operator.cluster.model.ModelUtils; import io.strimzi.operator.cluster.model.NodeRef; +import io.strimzi.operator.cluster.model.PodSetUtils; import io.strimzi.operator.cluster.model.RestartReason; import io.strimzi.operator.cluster.model.RestartReasons; +import io.strimzi.operator.cluster.operator.resource.KafkaRoller; import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier; import io.strimzi.operator.cluster.operator.resource.kubernetes.DeploymentOperator; import io.strimzi.operator.cluster.operator.resource.kubernetes.PodOperator; @@ -76,11 +80,11 @@ import java.util.ArrayList; import java.util.Base64; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import static io.strimzi.operator.common.model.Ca.CA_CRT; @@ -94,10 +98,12 @@ import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.aMapWithSize; +import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasSize; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @SuppressWarnings({"checkstyle:ClassFanOutComplexity"}) @@ -1308,14 +1314,6 @@ public void testClusterCASecretsWithoutOwnerReference(Vertx vertx, VertxTestCont .endSpec() .build(); - OwnerReference ownerReference = new OwnerReferenceBuilder() - .withKind(kafka.getKind()) - .withApiVersion(kafka.getApiVersion()) - .withName(kafka.getMetadata().getName()) - .withBlockOwnerDeletion(false) - .withController(false) - .build(); - ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); SecretOperator secretOps = supplier.secretOperations; PodOperator podOps = supplier.podOperations; @@ -1374,14 +1372,6 @@ public void testClientsCASecretsWithoutOwnerReference(Vertx vertx, VertxTestCont .endSpec() .build(); - OwnerReference ownerReference = new OwnerReferenceBuilder() - .withKind(kafka.getKind()) - .withApiVersion(kafka.getApiVersion()) - .withName(kafka.getMetadata().getName()) - .withBlockOwnerDeletion(false) - .withController(false) - .build(); - ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); SecretOperator secretOps = supplier.secretOperations; PodOperator podOps = supplier.podOperations; @@ -1429,220 +1419,1101 @@ public void testClientsCASecretsWithoutOwnerReference(Vertx vertx, VertxTestCont }))); } + ////////// + // Tests for trust rollout + ////////// + @Test - public void testClusterCAKeyNotTrusted(Vertx vertx, VertxTestContext context) { + public void testStrimziManagedClusterCaKeyReplaced(Vertx vertx, VertxTestContext context) throws CertificateException, IOException, KeyStoreException, NoSuchAlgorithmException { Reconciliation reconciliation = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, NAME); ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); SecretOperator secretOps = supplier.secretOperations; - ArgumentCaptor clusterCaCert = ArgumentCaptor.forClass(Secret.class); - ArgumentCaptor clusterCaKey = ArgumentCaptor.forClass(Secret.class); - ArgumentCaptor clientsCaCert = ArgumentCaptor.forClass(Secret.class); - ArgumentCaptor clientsCaKey = ArgumentCaptor.forClass(Secret.class); - when(secretOps.reconcile(any(), eq(NAMESPACE), eq(AbstractModel.clusterCaCertSecretName(NAME)), clusterCaCert.capture())).thenAnswer(i -> { - Secret s = clusterCaCert.getValue(); - s.getMetadata().setAnnotations(Map.of(Ca.ANNO_STRIMZI_IO_CA_CERT_GENERATION, "1")); - return Future.succeededFuture(ReconcileResult.created(s)); - }); - when(secretOps.reconcile(any(), eq(NAMESPACE), eq(AbstractModel.clusterCaKeySecretName(NAME)), clusterCaKey.capture())).thenAnswer(i -> { - Secret s = clusterCaKey.getValue(); - s.getMetadata().setAnnotations(Map.of(Ca.ANNO_STRIMZI_IO_CA_KEY_GENERATION, "1")); - return Future.succeededFuture(ReconcileResult.created(s)); - }); - when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clientsCaCertificateSecretName(NAME)), clientsCaCert.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0)))); - when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clientsCaKeySecretName(NAME)), clientsCaKey.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0)))); - when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clusterOperatorCertsSecretName(NAME)), any())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0)))); - when(secretOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of())); + CertificateAuthority certificateAuthority = new CertificateAuthorityBuilder() + .withValidityDays(100) + .withRenewalDays(10) + .withGenerateCertificateAuthority(true) + .build(); + + List clusterCaSecrets = initialClusterCaSecrets(certificateAuthority); + //Annotate Cluster CA key to force replacement + Secret initialClusterCaKeySecret = clusterCaSecrets.get(0).edit() + .editMetadata() + .addToAnnotations(ResourceAnnotations.ANNO_STRIMZI_IO_FORCE_REPLACE, "true") + .endMetadata() + .build(); + Secret initialClusterCaCertSecret = clusterCaSecrets.get(1); + + List clientsCaSecrets = initialClientsCaSecrets(certificateAuthority); + Secret initialClientsCaKeySecret = clientsCaSecrets.get(0); + Secret initialClientsCaCertSecret = clientsCaSecrets.get(1); + + when(secretOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of( + initialClusterCaKeySecret, initialClusterCaCertSecret, + initialClientsCaKeySecret, initialClientsCaCertSecret))); + when(secretOps.reconcile(any(), eq(NAMESPACE), any(), any(Secret.class))).thenReturn(Future.succeededFuture()); Map generationAnnotations = - Map.of(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, "0", Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, "0"); + Map.of(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, "0"); + + // Kafka pods with old CA cert and key generation + List controllerPods = new ArrayList<>(); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-3", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-4", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-5", false, true, generationAnnotations)); + List brokerPods = new ArrayList<>(); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-0", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-1", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-2", true, false, generationAnnotations)); + List pods = new ArrayList<>(controllerPods); + pods.addAll(brokerPods); PodOperator mockPodOps = supplier.podOperations; - when(mockPodOps.listAsync(any(), any(Labels.class))).thenAnswer(i -> { - List pods = new ArrayList<>(); - // adding a terminating Cruise Control pod to test that it's skipped during the key generation check - Pod ccPod = podWithNameAndAnnotations("my-cluster-cruise-control", false, false, generationAnnotations); - ccPod.getMetadata().setDeletionTimestamp("2023-06-08T16:23:18Z"); - pods.add(ccPod); - // adding Kafka pods with old CA cert and key generation - pods.add(podWithNameAndAnnotations("my-cluster-controllers-3", false, true, generationAnnotations)); - pods.add(podWithNameAndAnnotations("my-cluster-controllers-4", false, true, generationAnnotations)); - pods.add(podWithNameAndAnnotations("my-cluster-controllers-5", false, true, generationAnnotations)); - pods.add(podWithNameAndAnnotations("my-cluster-brokers-0", true, false, generationAnnotations)); - pods.add(podWithNameAndAnnotations("my-cluster-brokers-1", true, false, generationAnnotations)); - pods.add(podWithNameAndAnnotations("my-cluster-brokers-2", true, false, generationAnnotations)); - return Future.succeededFuture(pods); - }); + when(mockPodOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(pods)); + + StrimziPodSetOperator spsOps = supplier.strimziPodSetOperator; + when(spsOps.getAsync(eq(NAMESPACE), eq(KafkaResources.zookeeperComponentName(NAME)))).thenReturn(Future.succeededFuture()); + StrimziPodSet controllerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-controller") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(controllerPods)) + .endSpec() + .build(); + + StrimziPodSet brokerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-broker") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(brokerPods)) + .endSpec() + .build(); + + when(spsOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of(controllerPodSet, brokerPodSet))); + + Map deps = new HashMap<>(); + deps.put("my-cluster-entity-operator", deploymentWithName("my-cluster-entity-operator")); + deps.put("my-cluster-cruise-control", deploymentWithName("my-cluster-cruise-control")); + deps.put("my-cluster-kafka-exporter", deploymentWithName("my-cluster-kafka-exporter")); + DeploymentOperator depsOperator = supplier.deploymentOperations; + when(depsOperator.getAsync(any(), any())).thenAnswer(i -> Future.succeededFuture(deps.get(i.getArgument(1, String.class)))); Checkpoint async = context.checkpoint(); - CaReconciler caReconciler = new CaReconciler(reconciliation, KAFKA, new ClusterOperatorConfig.ClusterOperatorConfigBuilder(ResourceUtils.dummyClusterOperatorConfig(), KafkaVersionTestUtils.getKafkaVersionLookup()).with(ClusterOperatorConfig.OPERATION_TIMEOUT_MS.key(), "1").build(), + NewMockCaReconciler mockCaReconciler = new NewMockCaReconciler(reconciliation, KAFKA, new ClusterOperatorConfig.ClusterOperatorConfigBuilder(ResourceUtils.dummyClusterOperatorConfig(), KafkaVersionTestUtils.getKafkaVersionLookup()).with(ClusterOperatorConfig.OPERATION_TIMEOUT_MS.key(), "1").build(), supplier, vertx, CERT_MANAGER, PASSWORD_GENERATOR); - caReconciler - .reconcileCas(Clock.systemUTC()) - .compose(i -> caReconciler.verifyClusterCaFullyTrustedAndUsed()) + mockCaReconciler + .reconcile(Clock.systemUTC()) .onComplete(context.succeeding(c -> context.verify(() -> { - assertThat(caReconciler.isClusterCaNeedFullTrust, is(true)); + assertThat("Kafka restart reasons", mockCaReconciler.kafkaRestartReasons, aMapWithSize(6)); + mockCaReconciler.kafkaRestartReasons.forEach((podName, restartReasons) -> { + assertThat("Restart reasons for pod " + podName, restartReasons.getReasons(), hasSize(1)); + assertThat("Restart reasons for pod " + podName, restartReasons.contains(RestartReason.CLUSTER_CA_CERT_KEY_REPLACED), is(true)); + }); + assertThat("Deployment restart reasons", mockCaReconciler.deploymentRestartReasons, aMapWithSize(3)); + mockCaReconciler.deploymentRestartReasons.forEach((deploymentName, restartReason) -> + assertThat("Deployment restart reason for " + deploymentName, restartReason.equals(RestartReason.CLUSTER_CA_CERT_KEY_REPLACED.getDefaultNote()), is(true))); async.flag(); }))); } + // Strimzi Cluster CA key replaced in previous reconcile and some pods already rolled @Test - public void testRollingReasonsWithClusterCAKeyNotTrusted(Vertx vertx, VertxTestContext context) { - Kafka kafka = new KafkaBuilder(KAFKA) - .editSpec() - .withNewEntityOperator() - .endEntityOperator() - .withNewCruiseControl() - .endCruiseControl() - .withNewKafkaExporter() - .endKafkaExporter() + public void testStrimziManagedClusterCaKeyReplacedPreviously(Vertx vertx, VertxTestContext context) throws CertificateException, IOException, KeyStoreException, NoSuchAlgorithmException { + Reconciliation reconciliation = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, NAME); + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + SecretOperator secretOps = supplier.secretOperations; + CertificateAuthority certificateAuthority = new CertificateAuthorityBuilder() + .withValidityDays(100) + .withRenewalDays(10) + .withGenerateCertificateAuthority(true) + .build(); + + List clusterCaSecrets = initialClusterCaSecrets(certificateAuthority); + // Edit Cluster CA key and cert to increment generation as though replacement happened in previous reconcile + Secret initialClusterCaKeySecret = clusterCaSecrets.get(0).edit() + .editMetadata() + .addToAnnotations(Ca.ANNO_STRIMZI_IO_CA_KEY_GENERATION, "1") + .endMetadata() + .build(); + Secret initialClusterCaCertSecret = clusterCaSecrets.get(1).edit() + .editMetadata() + .addToAnnotations(Ca.ANNO_STRIMZI_IO_CA_CERT_GENERATION, "1") + .endMetadata() + .build(); + + List clientsCaSecrets = initialClientsCaSecrets(certificateAuthority); + Secret initialClientsCaKeySecret = clientsCaSecrets.get(0); + Secret initialClientsCaCertSecret = clientsCaSecrets.get(1); + + when(secretOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of( + initialClusterCaKeySecret, initialClusterCaCertSecret, + initialClientsCaKeySecret, initialClientsCaCertSecret))); + when(secretOps.reconcile(any(), eq(NAMESPACE), any(), any(Secret.class))).thenReturn(Future.succeededFuture()); + + Map generationAnnotations = + Map.of(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, "0"); + + // Kafka pods with old CA cert and key generation + List controllerPods = new ArrayList<>(); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-3", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-4", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-5", false, true, generationAnnotations)); + List brokerPods = new ArrayList<>(); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-0", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-1", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-2", true, false, generationAnnotations)); + List pods = new ArrayList<>(controllerPods); + pods.addAll(brokerPods); + + PodOperator mockPodOps = supplier.podOperations; + when(mockPodOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(pods)); + + StrimziPodSetOperator spsOps = supplier.strimziPodSetOperator; + when(spsOps.getAsync(eq(NAMESPACE), eq(KafkaResources.zookeeperComponentName(NAME)))).thenReturn(Future.succeededFuture()); + StrimziPodSet controllerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-controller") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(controllerPods)) + .endSpec() + .build(); + + StrimziPodSet brokerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-broker") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(brokerPods)) .endSpec() .build(); + when(spsOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of(controllerPodSet, brokerPodSet))); + + Map deps = new HashMap<>(); + deps.put("my-cluster-entity-operator", deploymentWithName("my-cluster-entity-operator")); + deps.put("my-cluster-cruise-control", deploymentWithName("my-cluster-cruise-control")); + deps.put("my-cluster-kafka-exporter", deploymentWithName("my-cluster-kafka-exporter")); + DeploymentOperator depsOperator = supplier.deploymentOperations; + when(depsOperator.getAsync(any(), any())).thenAnswer(i -> Future.succeededFuture(deps.get(i.getArgument(1, String.class)))); + + Checkpoint async = context.checkpoint(); + + NewMockCaReconciler mockCaReconciler = new NewMockCaReconciler(reconciliation, KAFKA, new ClusterOperatorConfig.ClusterOperatorConfigBuilder(ResourceUtils.dummyClusterOperatorConfig(), KafkaVersionTestUtils.getKafkaVersionLookup()).with(ClusterOperatorConfig.OPERATION_TIMEOUT_MS.key(), "1").build(), + supplier, vertx, CERT_MANAGER, PASSWORD_GENERATOR); + mockCaReconciler + .reconcile(Clock.systemUTC()) + .onComplete(context.succeeding(c -> context.verify(() -> { + assertThat("Kafka restart reasons", mockCaReconciler.kafkaRestartReasons, aMapWithSize(6)); + mockCaReconciler.kafkaRestartReasons.forEach((podName, restartReasons) -> { + assertThat("Restart reasons for pod " + podName, restartReasons.getReasons(), hasSize(1)); + assertThat("Restart reasons for pod " + podName, restartReasons.contains(RestartReason.CLUSTER_CA_CERT_KEY_REPLACED), is(true)); + }); + assertThat("Deployment restart reasons", mockCaReconciler.deploymentRestartReasons, aMapWithSize(3)); + mockCaReconciler.deploymentRestartReasons.forEach((deploymentName, restartReason) -> + assertThat("Deployment restart reason for " + deploymentName, restartReason.equals(RestartReason.CLUSTER_CA_CERT_KEY_REPLACED.getDefaultNote()), is(true))); + async.flag(); + }))); + } + + @Test + public void testStrimziManagedClusterCaCertRenewed(Vertx vertx, VertxTestContext context) throws CertificateException, IOException, KeyStoreException, NoSuchAlgorithmException { Reconciliation reconciliation = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, NAME); ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); SecretOperator secretOps = supplier.secretOperations; - ArgumentCaptor clusterCaCert = ArgumentCaptor.forClass(Secret.class); - ArgumentCaptor clusterCaKey = ArgumentCaptor.forClass(Secret.class); - ArgumentCaptor clientsCaCert = ArgumentCaptor.forClass(Secret.class); - ArgumentCaptor clientsCaKey = ArgumentCaptor.forClass(Secret.class); - when(secretOps.reconcile(any(), eq(NAMESPACE), eq(AbstractModel.clusterCaCertSecretName(NAME)), clusterCaCert.capture())).thenAnswer(i -> { - Secret s = clusterCaCert.getValue(); - s.getMetadata().setAnnotations(Map.of(Ca.ANNO_STRIMZI_IO_CA_CERT_GENERATION, "1")); - return Future.succeededFuture(ReconcileResult.created(s)); - }); - when(secretOps.reconcile(any(), eq(NAMESPACE), eq(AbstractModel.clusterCaKeySecretName(NAME)), clusterCaKey.capture())).thenAnswer(i -> { - Secret s = clusterCaKey.getValue(); - s.getMetadata().setAnnotations(Map.of(Ca.ANNO_STRIMZI_IO_CA_KEY_GENERATION, "1")); - return Future.succeededFuture(ReconcileResult.created(s)); - }); - when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clientsCaCertificateSecretName(NAME)), clientsCaCert.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0)))); - when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clientsCaKeySecretName(NAME)), clientsCaKey.capture())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0)))); - when(secretOps.reconcile(any(), eq(NAMESPACE), eq(KafkaResources.clusterOperatorCertsSecretName(NAME)), any())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.created(i.getArgument(0)))); - when(secretOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of())); + CertificateAuthority certificateAuthority = new CertificateAuthorityBuilder() + .withValidityDays(100) + .withRenewalDays(10) + .withGenerateCertificateAuthority(true) + .build(); + + List clusterCaSecrets = initialClusterCaSecrets(certificateAuthority); + Secret initialClusterCaKeySecret = clusterCaSecrets.get(0); + //Annotate Cluster CA cert to force renewal + Secret initialClusterCaCertSecret = clusterCaSecrets.get(1).edit() + .editMetadata() + .addToAnnotations(ResourceAnnotations.ANNO_STRIMZI_IO_FORCE_RENEW, "true") + .endMetadata() + .build(); + + List clientsCaSecrets = initialClientsCaSecrets(certificateAuthority); + Secret initialClientsCaKeySecret = clientsCaSecrets.get(0); + Secret initialClientsCaCertSecret = clientsCaSecrets.get(1); + + when(secretOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of( + initialClusterCaKeySecret, initialClusterCaCertSecret, + initialClientsCaKeySecret, initialClientsCaCertSecret))); + when(secretOps.reconcile(any(), eq(NAMESPACE), any(), any(Secret.class))).thenReturn(Future.succeededFuture()); Map generationAnnotations = - Map.of(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, "0", Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, "0"); + Map.of(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, "0"); + + // Kafka pods with old CA cert and key generation + List controllerPods = new ArrayList<>(); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-3", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-4", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-5", false, true, generationAnnotations)); + List brokerPods = new ArrayList<>(); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-0", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-1", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-2", true, false, generationAnnotations)); + List pods = new ArrayList<>(controllerPods); + pods.addAll(brokerPods); + + PodOperator mockPodOps = supplier.podOperations; + when(mockPodOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(pods)); StrimziPodSetOperator spsOps = supplier.strimziPodSetOperator; when(spsOps.getAsync(eq(NAMESPACE), eq(KafkaResources.zookeeperComponentName(NAME)))).thenReturn(Future.succeededFuture()); + StrimziPodSet controllerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-controller") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(controllerPods)) + .endSpec() + .build(); - PodOperator mockPodOps = supplier.podOperations; - when(mockPodOps.listAsync(any(), any(Labels.class))).thenAnswer(i -> { - List pods = new ArrayList<>(); - // adding a terminating Cruise Control pod to test that it's skipped during the key generation check - Pod ccPod = podWithNameAndAnnotations("my-cluster-cruise-control", false, false, generationAnnotations); - ccPod.getMetadata().setDeletionTimestamp("2023-06-08T16:23:18Z"); - pods.add(ccPod); - // adding Kafka pods with old CA cert and key generation - pods.add(podWithNameAndAnnotations("my-cluster-controllers-3", false, true, generationAnnotations)); - pods.add(podWithNameAndAnnotations("my-cluster-controllers-4", false, true, generationAnnotations)); - pods.add(podWithNameAndAnnotations("my-cluster-controllers-5", false, true, generationAnnotations)); - pods.add(podWithNameAndAnnotations("my-cluster-brokers-0", true, false, generationAnnotations)); - pods.add(podWithNameAndAnnotations("my-cluster-brokers-1", true, false, generationAnnotations)); - pods.add(podWithNameAndAnnotations("my-cluster-brokers-2", true, false, generationAnnotations)); - return Future.succeededFuture(pods); - }); + StrimziPodSet brokerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-broker") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(brokerPods)) + .endSpec() + .build(); + + when(spsOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of(controllerPodSet, brokerPodSet))); Map deps = new HashMap<>(); deps.put("my-cluster-entity-operator", deploymentWithName("my-cluster-entity-operator")); deps.put("my-cluster-cruise-control", deploymentWithName("my-cluster-cruise-control")); deps.put("my-cluster-kafka-exporter", deploymentWithName("my-cluster-kafka-exporter")); DeploymentOperator depsOperator = supplier.deploymentOperations; - when(depsOperator.getAsync(any(), any())).thenAnswer(i -> Future.succeededFuture(deps.get(i.getArgument(1)))); + when(depsOperator.getAsync(any(), any())).thenAnswer(i -> Future.succeededFuture(deps.get(i.getArgument(1, String.class)))); Checkpoint async = context.checkpoint(); - MockCaReconciler mockCaReconciler = new MockCaReconciler(reconciliation, kafka, new ClusterOperatorConfig.ClusterOperatorConfigBuilder(ResourceUtils.dummyClusterOperatorConfig(), KafkaVersionTestUtils.getKafkaVersionLookup()).with(ClusterOperatorConfig.OPERATION_TIMEOUT_MS.key(), "1").build(), + NewMockCaReconciler mockCaReconciler = new NewMockCaReconciler(reconciliation, KAFKA, new ClusterOperatorConfig.ClusterOperatorConfigBuilder(ResourceUtils.dummyClusterOperatorConfig(), KafkaVersionTestUtils.getKafkaVersionLookup()).with(ClusterOperatorConfig.OPERATION_TIMEOUT_MS.key(), "1").build(), supplier, vertx, CERT_MANAGER, PASSWORD_GENERATOR); mockCaReconciler .reconcile(Clock.systemUTC()) .onComplete(context.succeeding(c -> context.verify(() -> { - assertThat(mockCaReconciler.isClusterCaNeedFullTrust, is(true)); - assertThat(mockCaReconciler.kPodRollReasons.contains(RestartReason.CLUSTER_CA_CERT_KEY_REPLACED), is(true)); - assertThat(mockCaReconciler.deploymentRollReason.size() == 3, is(true)); - for (String reason: mockCaReconciler.deploymentRollReason) { - assertThat(reason.equals(RestartReason.CLUSTER_CA_CERT_KEY_REPLACED.getDefaultNote()), is(true)); - } + assertThat("Kafka restart reasons", mockCaReconciler.kafkaRestartReasons, anEmptyMap()); + assertThat("Deployment restart reasons", mockCaReconciler.deploymentRestartReasons, anEmptyMap()); async.flag(); }))); } - static class MockCaReconciler extends CaReconciler { - RestartReasons kPodRollReasons; - List deploymentRollReason = new ArrayList<>(); + @Test + public void testUserManagedClusterCaKeyReplaced(Vertx vertx, VertxTestContext context) throws CertificateException, IOException, KeyStoreException, NoSuchAlgorithmException { + Reconciliation reconciliation = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, NAME); + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); - public MockCaReconciler(Reconciliation reconciliation, Kafka kafkaCr, ClusterOperatorConfig config, ResourceOperatorSupplier supplier, Vertx vertx, CertManager certManager, PasswordGenerator passwordGenerator) { - super(reconciliation, kafkaCr, config, supplier, vertx, certManager, passwordGenerator); - } + SecretOperator secretOps = supplier.secretOperations; + CertificateAuthority certificateAuthority = new CertificateAuthorityBuilder() + .withValidityDays(100) + .withRenewalDays(10) + .withGenerateCertificateAuthority(true) + .build(); - @Override - Future verifyClusterCaFullyTrustedAndUsed() { - // assuming the CA key is not trusted - this.isClusterCaNeedFullTrust = true; - this.isClusterCaFullyUsed = false; - return Future.succeededFuture(); - } + List clusterCaSecrets = initialClusterCaSecrets(certificateAuthority); + // Edit Cluster CA key and cert to increment generation as though user has replaced CA key + Secret initialClusterCaKeySecret = clusterCaSecrets.get(0).edit() + .editMetadata() + .addToAnnotations(Ca.ANNO_STRIMZI_IO_CA_KEY_GENERATION, "1") + .endMetadata() + .build(); + Secret initialClusterCaCertSecret = clusterCaSecrets.get(1).edit() + .editMetadata() + .addToAnnotations(Ca.ANNO_STRIMZI_IO_CA_CERT_GENERATION, "1") + .endMetadata() + .build(); - @Override - Future> getKafkaReplicas() { - Set nodes = new HashSet<>(); - nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-cluster-brokers-0", true, false))); - nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-cluster-brokers-1", true, false))); - nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-cluster-brokers-2", true, false))); - nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-cluster-controllers-3", true, false))); - nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-cluster-controllers-4", true, false))); - nodes.add(ReconcilerUtils.nodeFromPod(podWithName("my-cluster-controllers-5", true, false))); - return Future.succeededFuture(nodes); - } + List clientsCaSecrets = initialClientsCaSecrets(certificateAuthority); + Secret initialClientsCaKeySecret = clientsCaSecrets.get(0); + Secret initialClientsCaCertSecret = clientsCaSecrets.get(1); - @Override - Future rollKafkaBrokers(Set nodes, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) { - this.kPodRollReasons = podRollReasons; - return Future.succeededFuture(); - } + // Kafka brokers Secret with old annotation + Secret kafkaBrokersSecret = kafkaBrokersSecretWithAnnotations(Map.of( + Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, "0")); - @Override - Future rollDeploymentIfExists(String deploymentName, String reason) { - return deploymentOperator.getAsync(reconciliation.namespace(), deploymentName) - .compose(dep -> { - if (dep != null) { - this.deploymentRollReason.add(reason); - } - return Future.succeededFuture(); - }); - } + when(secretOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of( + initialClusterCaKeySecret, initialClusterCaCertSecret, + initialClientsCaKeySecret, initialClientsCaCertSecret, + kafkaBrokersSecret))); + when(secretOps.reconcile(any(), eq(NAMESPACE), any(), any(Secret.class))).thenReturn(Future.succeededFuture()); - @Override - Future maybeRemoveOldClusterCaCertificates() { - return Future.succeededFuture(); - } - } + Map generationAnnotations = + Map.of(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, "0"); + + // Kafka pods with old CA cert and key generation + List controllerPods = new ArrayList<>(); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-3", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-4", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-5", false, true, generationAnnotations)); + List brokerPods = new ArrayList<>(); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-0", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-1", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-2", true, false, generationAnnotations)); + List pods = new ArrayList<>(controllerPods); + pods.addAll(brokerPods); - public static Pod podWithName(String name, boolean broker, boolean controller) { - return podWithNameAndAnnotations(name, broker, controller, Map.of()); - } + PodOperator mockPodOps = supplier.podOperations; + when(mockPodOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(pods)); - public static Pod podWithNameAndAnnotations(String name, boolean broker, boolean controller, Map annotations) { - return new PodBuilder() + StrimziPodSetOperator spsOps = supplier.strimziPodSetOperator; + when(spsOps.getAsync(eq(NAMESPACE), eq(KafkaResources.zookeeperComponentName(NAME)))).thenReturn(Future.succeededFuture()); + StrimziPodSet controllerPodSet = new StrimziPodSetBuilder() .withNewMetadata() - .withName(name) - .withAnnotations(annotations) - .withLabels(Map.of( - Labels.STRIMZI_CLUSTER_LABEL, NAME, - Labels.STRIMZI_CONTROLLER_ROLE_LABEL, Boolean.toString(controller), - Labels.STRIMZI_BROKER_ROLE_LABEL, Boolean.toString(broker) - )) + .withName(NAME + "-controller") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(controllerPods)) + .endSpec() + .build(); + + StrimziPodSet brokerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-broker") .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(brokerPods)) + .endSpec() + .build(); + + when(spsOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of(controllerPodSet, brokerPodSet))); + + Map deps = new HashMap<>(); + deps.put("my-cluster-entity-operator", deploymentWithName("my-cluster-entity-operator")); + deps.put("my-cluster-cruise-control", deploymentWithName("my-cluster-cruise-control")); + deps.put("my-cluster-kafka-exporter", deploymentWithName("my-cluster-kafka-exporter")); + DeploymentOperator depsOperator = supplier.deploymentOperations; + when(depsOperator.getAsync(any(), any())).thenAnswer(i -> Future.succeededFuture(deps.get(i.getArgument(1, String.class)))); + + Checkpoint async = context.checkpoint(); + + // Disable CA generation + Kafka kafka = new KafkaBuilder(KAFKA) + .editSpec() + .editClusterCa() + .withGenerateCertificateAuthority(false) + .endClusterCa() + .endSpec() .build(); + NewMockCaReconciler mockCaReconciler = new NewMockCaReconciler(reconciliation, kafka, new ClusterOperatorConfig.ClusterOperatorConfigBuilder(ResourceUtils.dummyClusterOperatorConfig(), KafkaVersionTestUtils.getKafkaVersionLookup()).with(ClusterOperatorConfig.OPERATION_TIMEOUT_MS.key(), "1").build(), + supplier, vertx, CERT_MANAGER, PASSWORD_GENERATOR); + mockCaReconciler + .reconcile(Clock.systemUTC()) + .onComplete(context.succeeding(c -> context.verify(() -> { + assertThat("Kafka restart reasons", mockCaReconciler.kafkaRestartReasons, aMapWithSize(6)); + mockCaReconciler.kafkaRestartReasons.forEach((podName, restartReasons) -> { + assertThat("Restart reasons for pod " + podName, restartReasons.getReasons(), hasSize(1)); + assertThat("Restart reasons for pod " + podName, restartReasons.contains(RestartReason.CLUSTER_CA_CERT_KEY_REPLACED), is(true)); + }); + assertThat("Deployment restart reasons", mockCaReconciler.deploymentRestartReasons, aMapWithSize(3)); + mockCaReconciler.deploymentRestartReasons.forEach((deploymentName, restartReason) -> + assertThat("Deployment restart reason for " + deploymentName, restartReason.equals(RestartReason.CLUSTER_CA_CERT_KEY_REPLACED.getDefaultNote()), is(true))); + async.flag(); + }))); } - public static Deployment deploymentWithName(String name) { - return new DeploymentBuilder() + @Test + public void testUserManagedClusterCaCertRenewed(Vertx vertx, VertxTestContext context) throws CertificateException, IOException, KeyStoreException, NoSuchAlgorithmException { + Reconciliation reconciliation = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, NAME); + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + SecretOperator secretOps = supplier.secretOperations; + CertificateAuthority certificateAuthority = new CertificateAuthorityBuilder() + .withValidityDays(100) + .withRenewalDays(10) + .withGenerateCertificateAuthority(true) + .build(); + + List clusterCaSecrets = initialClusterCaSecrets(certificateAuthority); + Secret initialClusterCaKeySecret = clusterCaSecrets.get(0); + // Edit Cluster CA cert to increment generation as though user has renewed CA cert + Secret initialClusterCaCertSecret = clusterCaSecrets.get(1).edit() + .editMetadata() + .addToAnnotations(Ca.ANNO_STRIMZI_IO_CA_CERT_GENERATION, "1") + .endMetadata() + .build(); + + List clientsCaSecrets = initialClientsCaSecrets(certificateAuthority); + Secret initialClientsCaKeySecret = clientsCaSecrets.get(0); + Secret initialClientsCaCertSecret = clientsCaSecrets.get(1); + + // Kafka brokers Secret with old annotation + Secret kafkaBrokersSecret = kafkaBrokersSecretWithAnnotations(Map.of( + Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, "0")); + + when(secretOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of( + initialClusterCaKeySecret, initialClusterCaCertSecret, + initialClientsCaKeySecret, initialClientsCaCertSecret, + kafkaBrokersSecret))); + when(secretOps.reconcile(any(), eq(NAMESPACE), any(), any(Secret.class))).thenReturn(Future.succeededFuture()); + + Map generationAnnotations = + Map.of(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, "0"); + + // Kafka pods with old CA cert and key generation + List controllerPods = new ArrayList<>(); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-3", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-4", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-5", false, true, generationAnnotations)); + List brokerPods = new ArrayList<>(); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-0", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-1", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-2", true, false, generationAnnotations)); + List pods = new ArrayList<>(controllerPods); + pods.addAll(brokerPods); + + PodOperator mockPodOps = supplier.podOperations; + when(mockPodOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(pods)); + + StrimziPodSetOperator spsOps = supplier.strimziPodSetOperator; + when(spsOps.getAsync(eq(NAMESPACE), eq(KafkaResources.zookeeperComponentName(NAME)))).thenReturn(Future.succeededFuture()); + StrimziPodSet controllerPodSet = new StrimziPodSetBuilder() .withNewMetadata() - .withName(name) + .withName(NAME + "-controller") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(controllerPods)) + .endSpec() + .build(); + + StrimziPodSet brokerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-broker") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(brokerPods)) + .endSpec() + .build(); + + when(spsOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of(controllerPodSet, brokerPodSet))); + + Map deps = new HashMap<>(); + deps.put("my-cluster-entity-operator", deploymentWithName("my-cluster-entity-operator")); + deps.put("my-cluster-cruise-control", deploymentWithName("my-cluster-cruise-control")); + deps.put("my-cluster-kafka-exporter", deploymentWithName("my-cluster-kafka-exporter")); + DeploymentOperator depsOperator = supplier.deploymentOperations; + when(depsOperator.getAsync(any(), any())).thenAnswer(i -> Future.succeededFuture(deps.get(i.getArgument(1, String.class)))); + + Checkpoint async = context.checkpoint(); + + // Disable CA generation + Kafka kafka = new KafkaBuilder(KAFKA) + .editSpec() + .editClusterCa() + .withGenerateCertificateAuthority(false) + .endClusterCa() + .endSpec() + .build(); + NewMockCaReconciler mockCaReconciler = new NewMockCaReconciler(reconciliation, kafka, new ClusterOperatorConfig.ClusterOperatorConfigBuilder(ResourceUtils.dummyClusterOperatorConfig(), KafkaVersionTestUtils.getKafkaVersionLookup()).with(ClusterOperatorConfig.OPERATION_TIMEOUT_MS.key(), "1").build(), + supplier, vertx, CERT_MANAGER, PASSWORD_GENERATOR); + mockCaReconciler + .reconcile(Clock.systemUTC()) + .onComplete(context.succeeding(c -> context.verify(() -> { + // When user is managing CA a cert renewal implies a key replacement + assertThat("Kafka restart reasons", mockCaReconciler.kafkaRestartReasons, aMapWithSize(6)); + mockCaReconciler.kafkaRestartReasons.forEach((podName, restartReasons) -> { + assertThat("Restart reasons for pod " + podName, restartReasons.getReasons(), hasSize(1)); + assertThat("Restart reasons for pod " + podName, restartReasons.contains(RestartReason.CLUSTER_CA_CERT_KEY_REPLACED), is(true)); + }); + assertThat("Deployment restart reasons", mockCaReconciler.deploymentRestartReasons, aMapWithSize(3)); + mockCaReconciler.deploymentRestartReasons.forEach((deploymentName, restartReason) -> + assertThat("Deployment restart reason for " + deploymentName, restartReason.equals(RestartReason.CLUSTER_CA_CERT_KEY_REPLACED.getDefaultNote()), is(true))); + async.flag(); + }))); + } + + @Test + public void testStrimziManagedClientsCaKeyReplaced(Vertx vertx, VertxTestContext context) throws CertificateException, IOException, KeyStoreException, NoSuchAlgorithmException { + Reconciliation reconciliation = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, NAME); + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + SecretOperator secretOps = supplier.secretOperations; + CertificateAuthority certificateAuthority = new CertificateAuthorityBuilder() + .withValidityDays(100) + .withRenewalDays(10) + .withGenerateCertificateAuthority(true) + .build(); + + List clusterCaSecrets = initialClusterCaSecrets(certificateAuthority); + Secret initialClusterCaKeySecret = clusterCaSecrets.get(0); + Secret initialClusterCaCertSecret = clusterCaSecrets.get(1); + + List clientsCaSecrets = initialClientsCaSecrets(certificateAuthority); + //Annotate Clients CA key to force replacement + Secret initialClientsCaKeySecret = clientsCaSecrets.get(0).edit() + .editMetadata() + .addToAnnotations(ResourceAnnotations.ANNO_STRIMZI_IO_FORCE_REPLACE, "true") + .endMetadata() + .build(); + Secret initialClientsCaCertSecret = clientsCaSecrets.get(1); + + when(secretOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of( + initialClusterCaKeySecret, initialClusterCaCertSecret, + initialClientsCaKeySecret, initialClientsCaCertSecret))); + when(secretOps.reconcile(any(), eq(NAMESPACE), any(), any(Secret.class))).thenReturn(Future.succeededFuture()); + + Map generationAnnotations = + Map.of(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, "0"); + + // Kafka pods with old CA cert and key generation + List controllerPods = new ArrayList<>(); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-3", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-4", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-5", false, true, generationAnnotations)); + List brokerPods = new ArrayList<>(); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-0", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-1", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-2", true, false, generationAnnotations)); + List pods = new ArrayList<>(controllerPods); + pods.addAll(brokerPods); + + PodOperator mockPodOps = supplier.podOperations; + when(mockPodOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(pods)); + + StrimziPodSetOperator spsOps = supplier.strimziPodSetOperator; + when(spsOps.getAsync(eq(NAMESPACE), eq(KafkaResources.zookeeperComponentName(NAME)))).thenReturn(Future.succeededFuture()); + StrimziPodSet controllerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-controller") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(controllerPods)) + .endSpec() + .build(); + + StrimziPodSet brokerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-broker") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(brokerPods)) + .endSpec() + .build(); + + when(spsOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of(controllerPodSet, brokerPodSet))); + + Map deps = new HashMap<>(); + deps.put("my-cluster-entity-operator", deploymentWithName("my-cluster-entity-operator")); + deps.put("my-cluster-cruise-control", deploymentWithName("my-cluster-cruise-control")); + deps.put("my-cluster-kafka-exporter", deploymentWithName("my-cluster-kafka-exporter")); + DeploymentOperator depsOperator = supplier.deploymentOperations; + when(depsOperator.getAsync(any(), any())).thenAnswer(i -> Future.succeededFuture(deps.get(i.getArgument(1, String.class)))); + + Checkpoint async = context.checkpoint(); + + NewMockCaReconciler mockCaReconciler = new NewMockCaReconciler(reconciliation, KAFKA, new ClusterOperatorConfig.ClusterOperatorConfigBuilder(ResourceUtils.dummyClusterOperatorConfig(), KafkaVersionTestUtils.getKafkaVersionLookup()).with(ClusterOperatorConfig.OPERATION_TIMEOUT_MS.key(), "1").build(), + supplier, vertx, CERT_MANAGER, PASSWORD_GENERATOR); + mockCaReconciler + .reconcile(Clock.systemUTC()) + .onComplete(context.succeeding(c -> context.verify(() -> { + assertThat("Kafka restart reasons", mockCaReconciler.kafkaRestartReasons, aMapWithSize(6)); + mockCaReconciler.kafkaRestartReasons.forEach((podName, restartReasons) -> { + assertThat("Restart reasons for pod " + podName, restartReasons.getReasons(), hasSize(1)); + assertThat("Restart reasons for pod " + podName, restartReasons.contains(RestartReason.CLIENT_CA_CERT_KEY_REPLACED), is(true)); + }); + assertThat("Deployment restart reasons", mockCaReconciler.deploymentRestartReasons, anEmptyMap()); + async.flag(); + }))); + } + + // Strimzi Clients CA key replaced in previous reconcile and some pods already rolled + @Test + public void testStrimziManagedClientsCaKeyReplacedPreviously(Vertx vertx, VertxTestContext context) throws CertificateException, IOException, KeyStoreException, NoSuchAlgorithmException { + Reconciliation reconciliation = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, NAME); + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + SecretOperator secretOps = supplier.secretOperations; + CertificateAuthority certificateAuthority = new CertificateAuthorityBuilder() + .withValidityDays(100) + .withRenewalDays(10) + .withGenerateCertificateAuthority(true) + .build(); + + List clusterCaSecrets = initialClusterCaSecrets(certificateAuthority); + Secret initialClusterCaKeySecret = clusterCaSecrets.get(0); + Secret initialClusterCaCertSecret = clusterCaSecrets.get(1); + + List clientsCaSecrets = initialClientsCaSecrets(certificateAuthority); + // Edit Clients CA key and cert to increment generation as though replacement happened in previous reconcile + Secret initialClientsCaKeySecret = clientsCaSecrets.get(0).edit() + .editMetadata() + .addToAnnotations(Ca.ANNO_STRIMZI_IO_CA_KEY_GENERATION, "1") + .endMetadata() + .build(); + Secret initialClientsCaCertSecret = clientsCaSecrets.get(1).edit() + .editMetadata() + .addToAnnotations(Ca.ANNO_STRIMZI_IO_CA_CERT_GENERATION, "1") + .endMetadata() + .build(); + + when(secretOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of( + initialClusterCaKeySecret, initialClusterCaCertSecret, + initialClientsCaKeySecret, initialClientsCaCertSecret))); + when(secretOps.reconcile(any(), eq(NAMESPACE), any(), any(Secret.class))).thenReturn(Future.succeededFuture()); + + Map generationAnnotations = + Map.of(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, "0"); + + // Kafka pods with old CA cert and key generation + List controllerPods = new ArrayList<>(); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-3", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-4", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-5", false, true, generationAnnotations)); + List brokerPods = new ArrayList<>(); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-0", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-1", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-2", true, false, generationAnnotations)); + List pods = new ArrayList<>(controllerPods); + pods.addAll(brokerPods); + + PodOperator mockPodOps = supplier.podOperations; + when(mockPodOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(pods)); + + StrimziPodSetOperator spsOps = supplier.strimziPodSetOperator; + when(spsOps.getAsync(eq(NAMESPACE), eq(KafkaResources.zookeeperComponentName(NAME)))).thenReturn(Future.succeededFuture()); + StrimziPodSet controllerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-controller") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(controllerPods)) + .endSpec() + .build(); + + StrimziPodSet brokerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-broker") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(brokerPods)) + .endSpec() + .build(); + + when(spsOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of(controllerPodSet, brokerPodSet))); + + Map deps = new HashMap<>(); + deps.put("my-cluster-entity-operator", deploymentWithName("my-cluster-entity-operator")); + deps.put("my-cluster-cruise-control", deploymentWithName("my-cluster-cruise-control")); + deps.put("my-cluster-kafka-exporter", deploymentWithName("my-cluster-kafka-exporter")); + DeploymentOperator depsOperator = supplier.deploymentOperations; + when(depsOperator.getAsync(any(), any())).thenAnswer(i -> Future.succeededFuture(deps.get(i.getArgument(1, String.class)))); + + Checkpoint async = context.checkpoint(); + + NewMockCaReconciler mockCaReconciler = new NewMockCaReconciler(reconciliation, KAFKA, new ClusterOperatorConfig.ClusterOperatorConfigBuilder(ResourceUtils.dummyClusterOperatorConfig(), KafkaVersionTestUtils.getKafkaVersionLookup()).with(ClusterOperatorConfig.OPERATION_TIMEOUT_MS.key(), "1").build(), + supplier, vertx, CERT_MANAGER, PASSWORD_GENERATOR); + mockCaReconciler + .reconcile(Clock.systemUTC()) + .onComplete(context.succeeding(c -> context.verify(() -> { + // We don't handle this currently and rely on the rolling update later in the reconcile loop for Kafka + assertThat("Kafka restart reasons", mockCaReconciler.kafkaRestartReasons, anEmptyMap()); + assertThat("Deployment restart reasons", mockCaReconciler.deploymentRestartReasons, anEmptyMap()); + async.flag(); + }))); + } + + @Test + public void testStrimziManagedClientsCaCertRenewed(Vertx vertx, VertxTestContext context) throws CertificateException, IOException, KeyStoreException, NoSuchAlgorithmException { + Reconciliation reconciliation = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, NAME); + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + SecretOperator secretOps = supplier.secretOperations; + CertificateAuthority certificateAuthority = new CertificateAuthorityBuilder() + .withValidityDays(100) + .withRenewalDays(10) + .withGenerateCertificateAuthority(true) + .build(); + + List clusterCaSecrets = initialClusterCaSecrets(certificateAuthority); + Secret initialClusterCaKeySecret = clusterCaSecrets.get(0); + Secret initialClusterCaCertSecret = clusterCaSecrets.get(1); + + List clientsCaSecrets = initialClientsCaSecrets(certificateAuthority); + Secret initialClientsCaKeySecret = clientsCaSecrets.get(0); + //Annotate Clients CA cert to force renewal + Secret initialClientsCaCertSecret = clientsCaSecrets.get(1).edit() + .editMetadata() + .addToAnnotations(ResourceAnnotations.ANNO_STRIMZI_IO_FORCE_RENEW, "true") + .endMetadata() + .build(); + + when(secretOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of( + initialClusterCaKeySecret, initialClusterCaCertSecret, + initialClientsCaKeySecret, initialClientsCaCertSecret))); + when(secretOps.reconcile(any(), eq(NAMESPACE), any(), any(Secret.class))).thenReturn(Future.succeededFuture()); + + Map generationAnnotations = + Map.of(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, "0"); + + // Kafka pods with old CA cert and key generation + List controllerPods = new ArrayList<>(); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-3", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-4", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-5", false, true, generationAnnotations)); + List brokerPods = new ArrayList<>(); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-0", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-1", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-2", true, false, generationAnnotations)); + List pods = new ArrayList<>(controllerPods); + pods.addAll(brokerPods); + + PodOperator mockPodOps = supplier.podOperations; + when(mockPodOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(pods)); + + StrimziPodSetOperator spsOps = supplier.strimziPodSetOperator; + when(spsOps.getAsync(eq(NAMESPACE), eq(KafkaResources.zookeeperComponentName(NAME)))).thenReturn(Future.succeededFuture()); + StrimziPodSet controllerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-controller") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(controllerPods)) + .endSpec() + .build(); + + StrimziPodSet brokerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-broker") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(brokerPods)) + .endSpec() + .build(); + + when(spsOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of(controllerPodSet, brokerPodSet))); + + Map deps = new HashMap<>(); + deps.put("my-cluster-entity-operator", deploymentWithName("my-cluster-entity-operator")); + deps.put("my-cluster-cruise-control", deploymentWithName("my-cluster-cruise-control")); + deps.put("my-cluster-kafka-exporter", deploymentWithName("my-cluster-kafka-exporter")); + DeploymentOperator depsOperator = supplier.deploymentOperations; + when(depsOperator.getAsync(any(), any())).thenAnswer(i -> Future.succeededFuture(deps.get(i.getArgument(1, String.class)))); + + Checkpoint async = context.checkpoint(); + + NewMockCaReconciler mockCaReconciler = new NewMockCaReconciler(reconciliation, KAFKA, new ClusterOperatorConfig.ClusterOperatorConfigBuilder(ResourceUtils.dummyClusterOperatorConfig(), KafkaVersionTestUtils.getKafkaVersionLookup()).with(ClusterOperatorConfig.OPERATION_TIMEOUT_MS.key(), "1").build(), + supplier, vertx, CERT_MANAGER, PASSWORD_GENERATOR); + mockCaReconciler + .reconcile(Clock.systemUTC()) + .onComplete(context.succeeding(c -> context.verify(() -> { + assertThat("Kafka restart reasons", mockCaReconciler.kafkaRestartReasons, anEmptyMap()); + assertThat("Deployment restart reasons", mockCaReconciler.deploymentRestartReasons, anEmptyMap()); + async.flag(); + }))); + } + + @Test + public void testUserManagedClientsCaKeyReplaced(Vertx vertx, VertxTestContext context) throws CertificateException, IOException, KeyStoreException, NoSuchAlgorithmException { + Reconciliation reconciliation = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, NAME); + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + SecretOperator secretOps = supplier.secretOperations; + CertificateAuthority certificateAuthority = new CertificateAuthorityBuilder() + .withValidityDays(100) + .withRenewalDays(10) + .withGenerateCertificateAuthority(true) + .build(); + + List clusterCaSecrets = initialClusterCaSecrets(certificateAuthority); + Secret initialClusterCaKeySecret = clusterCaSecrets.get(0); + Secret initialClusterCaCertSecret = clusterCaSecrets.get(1); + + List clientsCaSecrets = initialClientsCaSecrets(certificateAuthority); + // Edit Clients CA key and cert to increment generation as though user has replaced CA key + Secret initialClientsCaKeySecret = clientsCaSecrets.get(0).edit() + .editMetadata() + .addToAnnotations(Ca.ANNO_STRIMZI_IO_CA_KEY_GENERATION, "1") + .endMetadata() + .build(); + Secret initialClientsCaCertSecret = clientsCaSecrets.get(1).edit() + .editMetadata() + .addToAnnotations(Ca.ANNO_STRIMZI_IO_CA_CERT_GENERATION, "1") + .endMetadata() + .build(); + + // Kafka brokers Secret with old annotation + Secret kafkaBrokersSecret = kafkaBrokersSecretWithAnnotations(Map.of( + Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, "0")); + + when(secretOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of( + initialClusterCaKeySecret, initialClusterCaCertSecret, + initialClientsCaKeySecret, initialClientsCaCertSecret, + kafkaBrokersSecret))); + when(secretOps.reconcile(any(), eq(NAMESPACE), any(), any(Secret.class))).thenReturn(Future.succeededFuture()); + + Map generationAnnotations = + Map.of(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, "0"); + + // Kafka pods with old CA cert and key generation + List controllerPods = new ArrayList<>(); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-3", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-4", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-5", false, true, generationAnnotations)); + List brokerPods = new ArrayList<>(); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-0", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-1", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-2", true, false, generationAnnotations)); + List pods = new ArrayList<>(controllerPods); + pods.addAll(brokerPods); + + PodOperator mockPodOps = supplier.podOperations; + when(mockPodOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(pods)); + + StrimziPodSetOperator spsOps = supplier.strimziPodSetOperator; + when(spsOps.getAsync(eq(NAMESPACE), eq(KafkaResources.zookeeperComponentName(NAME)))).thenReturn(Future.succeededFuture()); + StrimziPodSet controllerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-controller") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(controllerPods)) + .endSpec() + .build(); + + StrimziPodSet brokerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-broker") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(brokerPods)) + .endSpec() + .build(); + + when(spsOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of(controllerPodSet, brokerPodSet))); + + Map deps = new HashMap<>(); + deps.put("my-cluster-entity-operator", deploymentWithName("my-cluster-entity-operator")); + deps.put("my-cluster-cruise-control", deploymentWithName("my-cluster-cruise-control")); + deps.put("my-cluster-kafka-exporter", deploymentWithName("my-cluster-kafka-exporter")); + DeploymentOperator depsOperator = supplier.deploymentOperations; + when(depsOperator.getAsync(any(), any())).thenAnswer(i -> Future.succeededFuture(deps.get(i.getArgument(1, String.class)))); + + Checkpoint async = context.checkpoint(); + + // Disable CA generation + Kafka kafka = new KafkaBuilder(KAFKA) + .editSpec() + .editClientsCa() + .withGenerateCertificateAuthority(false) + .endClientsCa() + .endSpec() + .build(); + NewMockCaReconciler mockCaReconciler = new NewMockCaReconciler(reconciliation, kafka, new ClusterOperatorConfig.ClusterOperatorConfigBuilder(ResourceUtils.dummyClusterOperatorConfig(), KafkaVersionTestUtils.getKafkaVersionLookup()).with(ClusterOperatorConfig.OPERATION_TIMEOUT_MS.key(), "1").build(), + supplier, vertx, CERT_MANAGER, PASSWORD_GENERATOR); + mockCaReconciler + .reconcile(Clock.systemUTC()) + .onComplete(context.succeeding(c -> context.verify(() -> { + assertThat("Kafka restart reasons", mockCaReconciler.kafkaRestartReasons, aMapWithSize(6)); + mockCaReconciler.kafkaRestartReasons.forEach((podName, restartReasons) -> { + assertThat("Restart reasons for pod " + podName, restartReasons.getReasons(), hasSize(1)); + assertThat("Restart reasons for pod " + podName, restartReasons.contains(RestartReason.CLIENT_CA_CERT_KEY_REPLACED), is(true)); + }); + assertThat("Deployment restart reasons", mockCaReconciler.deploymentRestartReasons, anEmptyMap()); + async.flag(); + }))); + } + + @Test + public void testUserManagedClientsCaCertRenewed(Vertx vertx, VertxTestContext context) throws CertificateException, IOException, KeyStoreException, NoSuchAlgorithmException { + Reconciliation reconciliation = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, NAME); + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + SecretOperator secretOps = supplier.secretOperations; + CertificateAuthority certificateAuthority = new CertificateAuthorityBuilder() + .withValidityDays(100) + .withRenewalDays(10) + .withGenerateCertificateAuthority(true) + .build(); + + List clusterCaSecrets = initialClusterCaSecrets(certificateAuthority); + Secret initialClusterCaKeySecret = clusterCaSecrets.get(0); + Secret initialClusterCaCertSecret = clusterCaSecrets.get(1); + + List clientsCaSecrets = initialClientsCaSecrets(certificateAuthority); + Secret initialClientsCaKeySecret = clientsCaSecrets.get(0); + // Edit Clients CA cert to increment generation as though user has renewed CA cert + Secret initialClientsCaCertSecret = clientsCaSecrets.get(1).edit() + .editMetadata() + .addToAnnotations(Ca.ANNO_STRIMZI_IO_CA_CERT_GENERATION, "1") + .endMetadata() + .build(); + + // Kafka brokers Secret with old annotation + Secret kafkaBrokersSecret = kafkaBrokersSecretWithAnnotations(Map.of( + Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, "0")); + + when(secretOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of( + initialClusterCaKeySecret, initialClusterCaCertSecret, + initialClientsCaKeySecret, initialClientsCaCertSecret, + kafkaBrokersSecret))); + when(secretOps.reconcile(any(), eq(NAMESPACE), any(), any(Secret.class))).thenReturn(Future.succeededFuture()); + + Map generationAnnotations = + Map.of(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, "0", + Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, "0"); + + // Kafka pods with old CA cert and key generation + List controllerPods = new ArrayList<>(); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-3", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-4", false, true, generationAnnotations)); + controllerPods.add(podWithNameAndAnnotations("my-cluster-controllers-5", false, true, generationAnnotations)); + List brokerPods = new ArrayList<>(); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-0", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-1", true, false, generationAnnotations)); + brokerPods.add(podWithNameAndAnnotations("my-cluster-brokers-2", true, false, generationAnnotations)); + List pods = new ArrayList<>(controllerPods); + pods.addAll(brokerPods); + + PodOperator mockPodOps = supplier.podOperations; + when(mockPodOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(pods)); + + StrimziPodSetOperator spsOps = supplier.strimziPodSetOperator; + when(spsOps.getAsync(eq(NAMESPACE), eq(KafkaResources.zookeeperComponentName(NAME)))).thenReturn(Future.succeededFuture()); + StrimziPodSet controllerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-controller") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(controllerPods)) + .endSpec() + .build(); + + StrimziPodSet brokerPodSet = new StrimziPodSetBuilder() + .withNewMetadata() + .withName(NAME + "-broker") + .endMetadata() + .withNewSpec() + .withPods(PodSetUtils.podsToMaps(brokerPods)) + .endSpec() + .build(); + + when(spsOps.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of(controllerPodSet, brokerPodSet))); + + Map deps = new HashMap<>(); + deps.put("my-cluster-entity-operator", deploymentWithName("my-cluster-entity-operator")); + deps.put("my-cluster-cruise-control", deploymentWithName("my-cluster-cruise-control")); + deps.put("my-cluster-kafka-exporter", deploymentWithName("my-cluster-kafka-exporter")); + DeploymentOperator depsOperator = supplier.deploymentOperations; + when(depsOperator.getAsync(any(), any())).thenAnswer(i -> Future.succeededFuture(deps.get(i.getArgument(1, String.class)))); + + Checkpoint async = context.checkpoint(); + + // Disable CA generation + Kafka kafka = new KafkaBuilder(KAFKA) + .editSpec() + .editClientsCa() + .withGenerateCertificateAuthority(false) + .endClientsCa() + .endSpec() + .build(); + NewMockCaReconciler mockCaReconciler = new NewMockCaReconciler(reconciliation, kafka, new ClusterOperatorConfig.ClusterOperatorConfigBuilder(ResourceUtils.dummyClusterOperatorConfig(), KafkaVersionTestUtils.getKafkaVersionLookup()).with(ClusterOperatorConfig.OPERATION_TIMEOUT_MS.key(), "1").build(), + supplier, vertx, CERT_MANAGER, PASSWORD_GENERATOR); + mockCaReconciler + .reconcile(Clock.systemUTC()) + .onComplete(context.succeeding(c -> context.verify(() -> { + // When user is managing CA a cert renewal implies a key replacement + assertThat("Kafka restart reasons", mockCaReconciler.kafkaRestartReasons, aMapWithSize(6)); + mockCaReconciler.kafkaRestartReasons.forEach((podName, restartReasons) -> { + assertThat("Restart reasons for pod " + podName, restartReasons.getReasons(), hasSize(1)); + assertThat("Restart reasons for pod " + podName, restartReasons.contains(RestartReason.CLIENT_CA_CERT_KEY_REPLACED), is(true)); + }); + assertThat("Deployment restart reasons", mockCaReconciler.deploymentRestartReasons, anEmptyMap()); + async.flag(); + }))); + } + + static class NewMockCaReconciler extends CaReconciler { + Map kafkaRestartReasons = new HashMap<>(); + Map deploymentRestartReasons = new HashMap<>(); + + public NewMockCaReconciler(Reconciliation reconciliation, Kafka kafkaCr, ClusterOperatorConfig config, ResourceOperatorSupplier supplier, Vertx vertx, CertManager certManager, PasswordGenerator passwordGenerator) { + super(reconciliation, kafkaCr, config, supplier, vertx, certManager, passwordGenerator); + } + + @Override + KafkaRoller createKafkaRoller(Set nodes, TlsPemIdentity coTlsPemIdentity) { + KafkaRoller mockKafkaRoller = mock(KafkaRoller.class); + when(mockKafkaRoller.rollingRestart(any())).thenAnswer(i -> podOperator.listAsync(NAMESPACE, Labels.EMPTY) + .onSuccess(pods -> kafkaRestartReasons = pods.stream().collect(Collectors.toMap( + pod -> pod.getMetadata().getName(), + pod -> (RestartReasons) i.getArgument(0, Function.class).apply(pod)))) + .mapEmpty()); + return mockKafkaRoller; + } + + @Override + Future rollDeploymentIfExists(String deploymentName, String reason) { + return deploymentOperator.getAsync(reconciliation.namespace(), deploymentName) + .compose(dep -> { + if (dep != null) { + this.deploymentRestartReasons.put(deploymentName, reason); + } + return Future.succeededFuture(); + }); + } + } + + private static Pod podWithNameAndAnnotations(String name, boolean broker, boolean controller, Map annotations) { + return new PodBuilder() + .withNewMetadata() + .withName(name) + .withAnnotations(annotations) + .withLabels(Map.of( + Labels.STRIMZI_CLUSTER_LABEL, NAME, + Labels.STRIMZI_CONTROLLER_ROLE_LABEL, Boolean.toString(controller), + Labels.STRIMZI_BROKER_ROLE_LABEL, Boolean.toString(broker) + )) + .endMetadata() + .build(); + } + + private static Deployment deploymentWithName(String name) { + return new DeploymentBuilder() + .withNewMetadata() + .withName(name) + .endMetadata() + .build(); + } + + private static Secret kafkaBrokersSecretWithAnnotations(Map annotations) { + return new SecretBuilder() + .withNewMetadata() + .withName(KafkaResources.kafkaSecretName(NAME)) + .withAnnotations(annotations) .endMetadata() .build(); }