Skip to content

Commit

Permalink
Annotate StrimziPodSets before rolling during CA renewal (#10711)
Browse files Browse the repository at this point in the history
Signed-off-by: Katherine Stanley <[email protected]>
  • Loading branch information
katheris authored Oct 17, 2024
1 parent 23e5305 commit dcaa5e4
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 56 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* after a scaling up, the operator triggers an auto-rebalancing for moving some of the existing partitions to the newly added brokers.
* before scaling down, and if the brokers to remove are hosting partitions, the operator triggers an auto-rebalancing to these partitions off the brokers to make them free to be removed.
* Strimzi Access Operator 0.1.0 added to the installation files and examples
* Allow rolling update for new cluster CA trust (during Cluster CA key replacement) to continue where it left off before interruption without rolling all pods again.

### Changes, deprecations and removals

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,36 @@ public static StrimziPodSet createPodSet(
.build();
}

/**
* Patch a Strimzi PodSet to merge the provided annotations with the annotations on the Pod resources defined
* in the PodSet
*
* @param strimziPodSet Strimzi PodSet to patch
* @param annotationsToBeUpdated Annotations to merge with the existing annotations
*
* @return Patched PodSet
*/
public static StrimziPodSet patchAnnotations(StrimziPodSet strimziPodSet, Map<String, String> annotationsToBeUpdated) {
List<Map<String, Object>> newPods = PodSetUtils.podSetToPods(strimziPodSet)
.stream()
.map(pod -> {
Map<String, String> updatedAnnotations = pod.getMetadata().getAnnotations();
updatedAnnotations.putAll(annotationsToBeUpdated);
return pod.edit()
.editMetadata()
.withAnnotations(updatedAnnotations)
.endMetadata()
.build();
})
.map(PodSetUtils::podToMap)
.toList();
return new StrimziPodSetBuilder(strimziPodSet)
.editSpec()
.withPods(newPods)
.endSpec()
.build();
}

/**
* Creates a stateful Pod for use with StrimziPodSets. Stateful in this context means that it has a stable name and
* typically uses storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.strimzi.operator.cluster.model.NodeRef;
import io.strimzi.operator.cluster.model.RestartReason;
import io.strimzi.operator.cluster.model.RestartReasons;
import io.strimzi.operator.cluster.model.WorkloadUtils;
import io.strimzi.operator.cluster.operator.resource.KafkaAgentClientProvider;
import io.strimzi.operator.cluster.operator.resource.KafkaRoller;
import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier;
Expand Down Expand Up @@ -55,11 +56,11 @@

import java.time.Clock;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Class used for reconciliation of Cluster and Client CAs. This class contains both the steps of the CA reconciliation
Expand Down Expand Up @@ -395,8 +396,8 @@ Future<Void> rollingUpdateForNewCaKey() {
TlsPemIdentity coTlsPemIdentity = new TlsPemIdentity(new PemTrustSet(clusterCa.caCertSecret()), PemAuthIdentity.clusterOperator(coSecret));
return getZooKeeperReplicas()
.compose(replicas -> maybeRollZookeeper(replicas, podRollReasons, coTlsPemIdentity))
.compose(i -> getKafkaReplicas())
.compose(nodes -> rollKafkaBrokers(nodes, podRollReasons, coTlsPemIdentity))
.compose(i -> patchCaGenerationAndReturnNodes())
.compose(nodes -> rollKafka(nodes, podRollReasons, coTlsPemIdentity))
.compose(i -> maybeRollDeploymentIfExists(KafkaResources.entityOperatorDeploymentName(reconciliation.name()), podRollReasons))
.compose(i -> maybeRollDeploymentIfExists(KafkaExporterResources.componentName(reconciliation.name()), podRollReasons))
.compose(i -> maybeRollDeploymentIfExists(CruiseControlResources.componentName(reconciliation.name()), podRollReasons));
Expand Down Expand Up @@ -527,27 +528,40 @@ Future<Void> rollingUpdateForNewCaKey() {
}
}

/* test */ Future<Set<NodeRef>> getKafkaReplicas() {
/* test */ Future<Set<NodeRef>> patchCaGenerationAndReturnNodes() {
Labels selectorLabels = Labels.EMPTY
.withStrimziKind(reconciliation.kind())
.withStrimziCluster(reconciliation.name())
.withStrimziName(KafkaResources.kafkaComponentName(reconciliation.name()));

return strimziPodSetOperator.listAsync(reconciliation.namespace(), selectorLabels)
.compose(podSets -> {
Set<NodeRef> nodes = new LinkedHashSet<>();

if (podSets != null) {
for (StrimziPodSet podSet : podSets) {
nodes.addAll(ReconcilerUtils.nodesFromPodSet(podSet));
}
List<StrimziPodSet> updatedPodSets = podSets
.stream()
.map(podSet -> WorkloadUtils.patchAnnotations(
podSet,
Map.of(
Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, String.valueOf(clusterCa.caCertGeneration()),
Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, String.valueOf(clusterCa.caKeyGeneration()),
Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, String.valueOf(clientsCa.caCertGeneration())
)))
.toList();
return strimziPodSetOperator.batchReconcile(
reconciliation,
reconciliation.namespace(),
updatedPodSets,
selectorLabels
).map(i -> updatedPodSets.stream()
.flatMap(podSet -> ReconcilerUtils.nodesFromPodSet(podSet).stream())
.collect(Collectors.toSet()));
} else {
return Future.succeededFuture(Set.of());
}

return Future.succeededFuture(nodes);
});

}

/* test */ Future<Void> rollKafkaBrokers(Set<NodeRef> nodes, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) {
/* test */ Future<Void> rollKafka(Set<NodeRef> nodes, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) {
return new KafkaRoller(
reconciliation,
vertx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.strimzi.api.kafka.model.kafka.PersistentClaimStorageBuilder;
import io.strimzi.api.kafka.model.kafka.Storage;
import io.strimzi.api.kafka.model.podset.StrimziPodSet;
import io.strimzi.api.kafka.model.podset.StrimziPodSetBuilder;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.model.Labels;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -363,6 +364,44 @@ public void testCreateStrimziPodSetFromNodeReferencesWithTemplate() {
assertThat(sps.getSpec().getPods().stream().map(pod -> PodSetUtils.mapToPod(pod).getMetadata().getName()).toList(), hasItems("my-cluster-nodes-10", "my-cluster-nodes-11", "my-cluster-nodes-12"));
}

@Test
public void testPatchPodAnnotations() {
Map<String, String> annotations = Map.of("anno-1", "value-1", "anno-2", "value-2", "anno-3", "value-3");
List<Pod> pods = new ArrayList<>();
pods.add(new PodBuilder()
.withNewMetadata()
.withName("pod-0")
.withNamespace(NAMESPACE)
.withAnnotations(annotations)
.endMetadata()
.build()
);
pods.add(new PodBuilder()
.withNewMetadata()
.withName("pod-1")
.withNamespace(NAMESPACE)
.withAnnotations(annotations)
.endMetadata()
.build()
);

StrimziPodSet sps = new StrimziPodSetBuilder()
.withNewMetadata()
.withName("my-sps")
.withNamespace(NAMESPACE)
.endMetadata()
.withNewSpec()
.withPods(PodSetUtils.podsToMaps(pods))
.endSpec()
.build();

List<Pod> resultPods = PodSetUtils.podSetToPods(WorkloadUtils.patchAnnotations(sps, Map.of("anno-2", "value-2a", "anno-4", "value-4")));
assertThat(resultPods.size(), is(2));
Map<String, String> expectedAnnotations = Map.of("anno-1", "value-1", "anno-2", "value-2a", "anno-3", "value-3", "anno-4", "value-4");
assertThat(resultPods.get(0).getMetadata().getAnnotations(), is(expectedAnnotations));
assertThat(resultPods.get(1).getMetadata().getAnnotations(), is(expectedAnnotations));
}

//////////////////////////////////////////////////
// Stateful Pod tests
//////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit dcaa5e4

Please sign in to comment.