Skip to content

Commit

Permalink
Add stop_ongoing_execution flag to rebalance requests for full run
Browse files Browse the repository at this point in the history
Signed-off-by: Gantigmaa Selenge <[email protected]>
  • Loading branch information
tinaselenge committed Oct 16, 2024
1 parent 4ffe9f1 commit a574286
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onProposalReady(Re
AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder) {
if (Annotations.booleanAnnotation(kafkaRebalance, ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL, false)) {
LOGGER.infoCr(reconciliation, "Auto-approval set on the KafkaRebalance resource");
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, false, rebalanceOptionsBuilder);
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, false, rebalanceOptionsBuilder, true);
} else {
KafkaRebalanceAnnotation rebalanceAnnotation = rebalanceAnnotation(kafkaRebalance);
switch (rebalanceAnnotation) {
Expand All @@ -862,7 +862,7 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onProposalReady(Re
return configMapOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName()).compose(loadmap -> Future.succeededFuture(new MapAndStatus<>(loadmap, buildRebalanceStatusFromPreviousStatus(kafkaRebalance.getStatus(), StatusUtils.validate(reconciliation, kafkaRebalance)))));
case approve:
LOGGER.debugCr(reconciliation, "Annotation {}={}", ANNO_STRIMZI_IO_REBALANCE, KafkaRebalanceAnnotation.approve);
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, false, rebalanceOptionsBuilder);
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, false, rebalanceOptionsBuilder, true);
case refresh:
LOGGER.debugCr(reconciliation, "Annotation {}={}", ANNO_STRIMZI_IO_REBALANCE, KafkaRebalanceAnnotation.refresh);
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder);
Expand Down Expand Up @@ -1110,22 +1110,33 @@ private boolean isKafkaClusterReady(Kafka kafka) {
&& kafka.getStatus().getConditions().stream().anyMatch(condition -> condition.getType().equals("Ready") && condition.getStatus().equals("True"));
}

private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> requestRebalance(Reconciliation reconciliation,
String host, CruiseControlApi apiClient, KafkaRebalance kafkaRebalance,
boolean dryrun, AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder,
boolean stopOngoingExecution) {
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, dryrun, rebalanceOptionsBuilder, null, stopOngoingExecution);
}

private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> requestRebalance(Reconciliation reconciliation,
String host, CruiseControlApi apiClient, KafkaRebalance kafkaRebalance,
boolean dryrun, AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder) {
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, dryrun, rebalanceOptionsBuilder, null);
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, dryrun, rebalanceOptionsBuilder, null, false);
}


private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> requestRebalance(Reconciliation reconciliation, String host, CruiseControlApi apiClient, KafkaRebalance kafkaRebalance,
boolean dryrun,
AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder, String userTaskID) {
AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder,
String userTaskID, boolean stopOngoingExecution) {

LOGGER.infoCr(reconciliation, "Requesting Cruise Control rebalance [dryrun={}]", dryrun);
LOGGER.infoCr(reconciliation, "Requesting Cruise Control rebalance [dryrun={}] [stop_ongoing_execution={}]", dryrun, stopOngoingExecution);
rebalanceOptionsBuilder.withVerboseResponse();
if (!dryrun) {
rebalanceOptionsBuilder.withFullRun();
}
if (stopOngoingExecution) {
rebalanceOptionsBuilder.withStopOngoingExecution();
}
// backward compatibility, no mode specified means "full"
KafkaRebalanceMode mode = Optional.ofNullable(kafkaRebalance.getSpec())
.map(spec -> spec.getMode())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public abstract class AbstractRebalanceOptions {
private final boolean skipHardGoalCheck;
/** Sets whether the response should be JSON formatted or formatted for readability on the command line */
private final boolean json;
/** Sets whether to stop the ongoing execution (if any) and start executing the given request. */
private final boolean stopOngoingExecution;
/** A regular expression to specify topics that should not be considered for replica movement */
private final String excludedTopics;
/** The upper bound of ongoing replica movements going into/out of each broker */
Expand Down Expand Up @@ -67,6 +69,13 @@ public boolean isJson() {
return json;
}

/**
* @return True if stopping the ongoing execution (if any) and starting executing the given request. False otherwise.
*/
public boolean isStopOngoingExecution() {
return stopOngoingExecution;
}

/**
* @return Excludes topics
*/
Expand Down Expand Up @@ -108,6 +117,7 @@ public List<String> getReplicaMovementStrategies() {
this.verbose = builder.verbose;
this.skipHardGoalCheck = builder.skipHardGoalCheck;
this.json = builder.json;
this.stopOngoingExecution = builder.stopOngoingExecution;
this.excludedTopics = builder.excludedTopics;
this.concurrentPartitionMovementsPerBroker = builder.concurrentPartitionMovementsPerBroker;
this.concurrentLeaderMovements = builder.concurrentLeaderMovements;
Expand All @@ -127,6 +137,7 @@ public abstract static class AbstractRebalanceOptionsBuilder<B extends AbstractR
private boolean verbose;
private boolean skipHardGoalCheck;
private boolean json;
private boolean stopOngoingExecution;
private String excludedTopics;
private int concurrentPartitionMovementsPerBroker;
private int concurrentLeaderMovements;
Expand All @@ -138,6 +149,7 @@ public abstract static class AbstractRebalanceOptionsBuilder<B extends AbstractR
goals = null;
verbose = false;
skipHardGoalCheck = false;
stopOngoingExecution = false;
json = true;
excludedTopics = null;
concurrentPartitionMovementsPerBroker = 0;
Expand Down Expand Up @@ -183,6 +195,16 @@ public B withSkipHardGoalCheck() {
return self();
}

/**
* Stop the ongoing execution (if any) and start executing the given request
*
* @return Instance of this builder
*/
public B withStopOngoingExecution() {
this.stopOngoingExecution = true;
return self();
}

/**
* Set rebalance goals
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ private PathBuilder withAbstractRebalanceParameters(AbstractRebalanceOptions opt
if (options != null) {
PathBuilder builder = withParameter(CruiseControlParameters.DRY_RUN, String.valueOf(options.isDryRun()))
.withParameter(CruiseControlParameters.VERBOSE, String.valueOf(options.isVerbose()))
.withParameter(CruiseControlParameters.SKIP_HARD_GOAL_CHECK, String.valueOf(options.isSkipHardGoalCheck()));
.withParameter(CruiseControlParameters.SKIP_HARD_GOAL_CHECK, String.valueOf(options.isSkipHardGoalCheck()))
.withParameter(CruiseControlParameters.STOP_ONGOING_EXECUTION, String.valueOf(options.isStopOngoingExecution()));

if (options.getExcludedTopics() != null) {
builder.withParameter(CruiseControlParameters.EXCLUDED_TOPICS, options.getExcludedTopics());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,74 @@ private void krNewToProposalReadyToRebalancingToReadyThenRefresh(VertxTestContex
}));
}

/**
* Tests the transition from 'New' to 'Ready'
* The rebalance proposal is auto approved and the resource moves to 'Rebalancing'.
* Then the Rebalancing KafkaRebalance is refreshed and a moved to 'ProposalReady' again.
* Then the ProposalReady KafkaRebalance moves to Rebalancing again and finally to 'Ready'
*
* 1. A new KafkaRebalance resource is created with auto-approval annotation set; it is in the 'New' state
* 2. The operator requests a rebalance proposal through the Cruise Control REST API
* 3. The rebalance proposal is ready on the first call
* 4. The KafkaRebalance resource transitions to the 'ProposalReady' state
* 5. The operator requests the rebalance operation through the Cruise Control REST API
* 6. The rebalance operation is not done immediately; the operator starts polling the Cruise Control REST API
* 7. The KafkaRebalance resource moves to the 'Rebalancing' state
* 8. The KafkaRebalance resource is annotated with 'strimzi.io/rebalance=refresh' while the rebalancing is still in progress
* 9. The operator stops polling the Cruise Control REST API and requests a stop execution
* 10. The operator requests a new rebalance proposal through the Cruise Control REST API
* 11. The KafkaRebalance resource transitions to the 'ProposalReady' state again
* 12. The operator requests the rebalance operation through the Cruise Control REST API
* 13. The rebalance operation is not done immediately; the operator starts polling the Cruise Control REST API
* 14. The KafkaRebalance resource moves to the 'Rebalancing' state again
* 15. The rebalance operation is done
* 16. The KafkaRebalance resource moves to the 'Ready' state
*/
@Test
public void krNewToProposalReadyToRebalancingToRefresh(VertxTestContext context) throws IOException, URISyntaxException {
KafkaRebalance kr = createKafkaRebalance(namespace, CLUSTER_NAME, RESOURCE_NAME, REMOVE_BROKER_KAFKA_REBALANCE_SPEC, true);

// Set up the rebalance and user tasks endpoints with the number of pending calls before a response is received.
cruiseControlServer.setupCCRebalanceResponse(0, CruiseControlEndpoints.REMOVE_BROKER);
cruiseControlServer.setupCCUserTasksResponseNoGoals(0, 0);
cruiseControlServer.setupCCStopResponse();

Crds.kafkaRebalanceOperation(client).inNamespace(namespace).resource(kr).create();
crdCreateKafka();
crdCreateCruiseControlSecrets();

Checkpoint checkpoint = context.checkpoint();
krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName()))
// the resource moved from 'New' to 'ProposalReady' directly (no pending calls in the Mock server)
.onComplete(context.succeeding(v ->
assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.ProposalReady)))
.compose(v -> krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName())))
.onComplete(context.succeeding(v -> {
// the resource moved from ProposalReady to Rebalancing on auto approval
assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.Rebalancing);
}))
.compose(v -> {
// apply the "refresh" annotation to the resource in the Rebalancing state
annotate(client, namespace, kr.getMetadata().getName(), KafkaRebalanceAnnotation.refresh);
return krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName()));
})
.onComplete(context.succeeding(v -> {
// the resource moved from Rebalancing to ProposalReady due to refresh annotation
assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.ProposalReady);
}))
.compose(v -> krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName())))
.onComplete(context.succeeding(v -> {
// the resource moved from ProposalReady to Rebalancing
assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.Rebalancing);
}))
.compose(v -> krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName())))
.onComplete(context.succeeding(v -> {
// the resource moved from Rebalancing to Ready
assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.Ready);
checkpoint.flag();
}));
}

/**
* Tests the transition from 'New' to 'NotReady' due to "missing hard goals" error
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ private String getExpectedRebalanceString() {
CruiseControlParameters.DRY_RUN + "=false&" +
CruiseControlParameters.VERBOSE + "=true&" +
CruiseControlParameters.SKIP_HARD_GOAL_CHECK + "=false&" +
CruiseControlParameters.STOP_ONGOING_EXECUTION + "=false&" +
CruiseControlParameters.EXCLUDED_TOPICS + "=test-.*&" +
CruiseControlParameters.GOALS + "=");

Expand Down Expand Up @@ -72,6 +73,7 @@ public void testQueryStringList() {
.withParameter(CruiseControlParameters.DRY_RUN, "false")
.withParameter(CruiseControlParameters.VERBOSE, "true")
.withParameter(CruiseControlParameters.SKIP_HARD_GOAL_CHECK, "false")
.withParameter(CruiseControlParameters.STOP_ONGOING_EXECUTION, "false")
.withParameter(CruiseControlParameters.EXCLUDED_TOPICS, "test-.*")
.withParameter(CruiseControlParameters.GOALS, GOALS)
.withParameter(CruiseControlParameters.REBALANCE_DISK, "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ public enum CruiseControlParameters {
/**
* Skip rack awareness check
*/
SKIP_RACK_AWARENESS_CHECK("skip_rack_awareness_check");
SKIP_RACK_AWARENESS_CHECK("skip_rack_awareness_check"),

/**
* Stop the ongoing execution (if any) and start executing the given request
*/
STOP_ONGOING_EXECUTION("stop_ongoing_execution");

private final String key;

Expand Down

0 comments on commit a574286

Please sign in to comment.