diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java index 212721f6736..616671b2b94 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java @@ -853,7 +853,7 @@ private Future> onProposalReady(Re AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder rebalanceOptionsBuilder) { if (Annotations.booleanAnnotation(kafkaRebalance, ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL, false)) { LOGGER.infoCr(reconciliation, "Auto-approval set on the KafkaRebalance resource"); - return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, false, rebalanceOptionsBuilder); + return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, false, rebalanceOptionsBuilder, true); } else { KafkaRebalanceAnnotation rebalanceAnnotation = rebalanceAnnotation(kafkaRebalance); switch (rebalanceAnnotation) { @@ -862,7 +862,7 @@ private Future> onProposalReady(Re return configMapOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName()).compose(loadmap -> Future.succeededFuture(new MapAndStatus<>(loadmap, buildRebalanceStatusFromPreviousStatus(kafkaRebalance.getStatus(), StatusUtils.validate(reconciliation, kafkaRebalance))))); case approve: LOGGER.debugCr(reconciliation, "Annotation {}={}", ANNO_STRIMZI_IO_REBALANCE, KafkaRebalanceAnnotation.approve); - return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, false, rebalanceOptionsBuilder); + return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, false, rebalanceOptionsBuilder, true); case refresh: LOGGER.debugCr(reconciliation, "Annotation {}={}", ANNO_STRIMZI_IO_REBALANCE, KafkaRebalanceAnnotation.refresh); return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder); @@ -1110,22 +1110,33 @@ private boolean isKafkaClusterReady(Kafka kafka) { && kafka.getStatus().getConditions().stream().anyMatch(condition -> condition.getType().equals("Ready") && condition.getStatus().equals("True")); } + private Future> requestRebalance(Reconciliation reconciliation, + String host, CruiseControlApi apiClient, KafkaRebalance kafkaRebalance, + boolean dryrun, AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder rebalanceOptionsBuilder, + boolean stopOngoingExecution) { + return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, dryrun, rebalanceOptionsBuilder, null, stopOngoingExecution); + } + private Future> requestRebalance(Reconciliation reconciliation, String host, CruiseControlApi apiClient, KafkaRebalance kafkaRebalance, boolean dryrun, AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder rebalanceOptionsBuilder) { - return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, dryrun, rebalanceOptionsBuilder, null); + return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, dryrun, rebalanceOptionsBuilder, null, false); } private Future> requestRebalance(Reconciliation reconciliation, String host, CruiseControlApi apiClient, KafkaRebalance kafkaRebalance, boolean dryrun, - AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder rebalanceOptionsBuilder, String userTaskID) { + AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder rebalanceOptionsBuilder, + String userTaskID, boolean stopOngoingExecution) { - LOGGER.infoCr(reconciliation, "Requesting Cruise Control rebalance [dryrun={}]", dryrun); + LOGGER.infoCr(reconciliation, "Requesting Cruise Control rebalance [dryrun={}] [stop_ongoing_execution={}]", dryrun, stopOngoingExecution); rebalanceOptionsBuilder.withVerboseResponse(); if (!dryrun) { rebalanceOptionsBuilder.withFullRun(); } + if (stopOngoingExecution) { + rebalanceOptionsBuilder.withStopOngoingExecution(); + } // backward compatibility, no mode specified means "full" KafkaRebalanceMode mode = Optional.ofNullable(kafkaRebalance.getSpec()) .map(spec -> spec.getMode()) diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/AbstractRebalanceOptions.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/AbstractRebalanceOptions.java index 2b072670dcc..e2d408fd22b 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/AbstractRebalanceOptions.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/AbstractRebalanceOptions.java @@ -21,6 +21,8 @@ public abstract class AbstractRebalanceOptions { private final boolean skipHardGoalCheck; /** Sets whether the response should be JSON formatted or formatted for readability on the command line */ private final boolean json; + /** Sets whether to stop the ongoing execution (if any) and start executing the given request. */ + private final boolean stopOngoingExecution; /** A regular expression to specify topics that should not be considered for replica movement */ private final String excludedTopics; /** The upper bound of ongoing replica movements going into/out of each broker */ @@ -67,6 +69,13 @@ public boolean isJson() { return json; } + /** + * @return True if stopping the ongoing execution (if any) and starting executing the given request. False otherwise. + */ + public boolean isStopOngoingExecution() { + return stopOngoingExecution; + } + /** * @return Excludes topics */ @@ -108,6 +117,7 @@ public List getReplicaMovementStrategies() { this.verbose = builder.verbose; this.skipHardGoalCheck = builder.skipHardGoalCheck; this.json = builder.json; + this.stopOngoingExecution = builder.stopOngoingExecution; this.excludedTopics = builder.excludedTopics; this.concurrentPartitionMovementsPerBroker = builder.concurrentPartitionMovementsPerBroker; this.concurrentLeaderMovements = builder.concurrentLeaderMovements; @@ -127,6 +137,7 @@ public abstract static class AbstractRebalanceOptionsBuilder + assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.ProposalReady))) + .compose(v -> krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName()))) + .onComplete(context.succeeding(v -> { + // the resource moved from ProposalReady to Rebalancing on auto approval + assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.Rebalancing); + })) + .compose(v -> { + // apply the "refresh" annotation to the resource in the Rebalancing state + annotate(client, namespace, kr.getMetadata().getName(), KafkaRebalanceAnnotation.refresh); + return krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName())); + }) + .onComplete(context.succeeding(v -> { + // the resource moved from Rebalancing to ProposalReady due to refresh annotation + assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.ProposalReady); + })) + .compose(v -> krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName()))) + .onComplete(context.succeeding(v -> { + // the resource moved from ProposalReady to Rebalancing + assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.Rebalancing); + })) + .compose(v -> krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName()))) + .onComplete(context.succeeding(v -> { + // the resource moved from Rebalancing to Ready + assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.Ready); + checkpoint.flag(); + })); + } + /** * Tests the transition from 'New' to 'NotReady' due to "missing hard goals" error * diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/PathBuilderTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/PathBuilderTest.java index ebd28910d66..0a2fbe0e809 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/PathBuilderTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/PathBuilderTest.java @@ -34,6 +34,7 @@ private String getExpectedRebalanceString() { CruiseControlParameters.DRY_RUN + "=false&" + CruiseControlParameters.VERBOSE + "=true&" + CruiseControlParameters.SKIP_HARD_GOAL_CHECK + "=false&" + + CruiseControlParameters.STOP_ONGOING_EXECUTION + "=false&" + CruiseControlParameters.EXCLUDED_TOPICS + "=test-.*&" + CruiseControlParameters.GOALS + "="); @@ -72,6 +73,7 @@ public void testQueryStringList() { .withParameter(CruiseControlParameters.DRY_RUN, "false") .withParameter(CruiseControlParameters.VERBOSE, "true") .withParameter(CruiseControlParameters.SKIP_HARD_GOAL_CHECK, "false") + .withParameter(CruiseControlParameters.STOP_ONGOING_EXECUTION, "false") .withParameter(CruiseControlParameters.EXCLUDED_TOPICS, "test-.*") .withParameter(CruiseControlParameters.GOALS, GOALS) .withParameter(CruiseControlParameters.REBALANCE_DISK, "false") diff --git a/operator-common/src/main/java/io/strimzi/operator/common/model/cruisecontrol/CruiseControlParameters.java b/operator-common/src/main/java/io/strimzi/operator/common/model/cruisecontrol/CruiseControlParameters.java index deaa1c7a49c..d9200d6b7db 100644 --- a/operator-common/src/main/java/io/strimzi/operator/common/model/cruisecontrol/CruiseControlParameters.java +++ b/operator-common/src/main/java/io/strimzi/operator/common/model/cruisecontrol/CruiseControlParameters.java @@ -95,7 +95,12 @@ public enum CruiseControlParameters { /** * Skip rack awareness check */ - SKIP_RACK_AWARENESS_CHECK("skip_rack_awareness_check"); + SKIP_RACK_AWARENESS_CHECK("skip_rack_awareness_check"), + + /** + * Stop the ongoing execution (if any) and start executing the given request + */ + STOP_ONGOING_EXECUTION("stop_ongoing_execution"); private final String key;