Skip to content

Commit

Permalink
Handle the failure due to reaching the servlet capacity when getting …
Browse files Browse the repository at this point in the history
…user tasks (#10768)

Signed-off-by: Gantigmaa Selenge <[email protected]>
  • Loading branch information
tinaselenge authored Nov 6, 2024
1 parent 76b49aa commit 9a9a6df
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlApi;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlApiImpl;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlRebalanceResponse;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlResponse;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlRestException;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlUserTasksResponse;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.RebalanceOptions;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.RemoveBrokerOptions;
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.RemoveDisksOptions;
Expand Down Expand Up @@ -810,12 +810,20 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onPendingProposal(
return p.future();
}

private void handleUserTaskStatusResponse(Reconciliation reconciliation, CruiseControlResponse cruiseControlResponse,
private void handleUserTaskStatusResponse(Reconciliation reconciliation, CruiseControlUserTasksResponse cruiseControlResponse,
Promise<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> p, String sessionId,
Set<Condition> conditions, KafkaRebalance kafkaRebalance,
ConfigMapOperator configMapOperator, boolean dryRun,
String host, CruiseControlApi apiClient,
AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder) {
if (cruiseControlResponse.isMaxActiveUserTasksReached()) {
LOGGER.warnCr(reconciliation, "The maximum number of active user tasks that Cruise Control can run concurrently has been reached, therefore will retry getting user tasks in the next reconciliation. " +
"If this occurs often, consider increasing the value for max.active.user.tasks in the Cruise Control configuration.");
configMapOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName())
.onSuccess(loadmap -> p.complete(new MapAndStatus<>(loadmap, buildRebalanceStatusFromPreviousStatus(kafkaRebalance.getStatus(), conditions))));
return;
}

if (cruiseControlResponse.getJson().isEmpty()) {
// This may happen if:
// 1. Cruise Control restarted so resetting the state because the tasks queue is not persisted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public interface CruiseControlApi {
* This is used to retrieve the task's current state.
* @return A future for the state of the specified task.
*/
Future<CruiseControlResponse> getUserTaskStatus(Reconciliation reconciliation, String host, int port, String userTaskID);
Future<CruiseControlUserTasksResponse> getUserTaskStatus(Reconciliation reconciliation, String host, int port, String userTaskID);

/**
* Issue a stop command to the Cruise Control server. This will halt any task (e.g. a rebalance) which is currently
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ public Future<CruiseControlRebalanceResponse> removeDisks(Reconciliation reconci

@Override
@SuppressWarnings("deprecation")
public Future<CruiseControlResponse> getUserTaskStatus(Reconciliation reconciliation, String host, int port, String userTaskId) {
public Future<CruiseControlUserTasksResponse> getUserTaskStatus(Reconciliation reconciliation, String host, int port, String userTaskId) {

PathBuilder pathBuilder = new PathBuilder(CruiseControlEndpoints.USER_TASKS)
.withParameter(CruiseControlParameters.JSON, "true")
Expand Down Expand Up @@ -365,7 +365,7 @@ public Future<CruiseControlResponse> getUserTaskStatus(Reconciliation reconcilia
// This may happen if:
// 1. Cruise Control restarted so resetting the state because the tasks queue is not persisted
// 2. Task's retention time expired, or the cache has become full
result.complete(new CruiseControlResponse(userTaskID, statusJson));
result.complete(new CruiseControlUserTasksResponse(userTaskID, statusJson));
} else {
JsonObject jsonUserTask = userTasks.getJsonObject(0);
String taskStatusStr = jsonUserTask.getString(STATUS_KEY);
Expand Down Expand Up @@ -409,7 +409,7 @@ public Future<CruiseControlResponse> getUserTaskStatus(Reconciliation reconcilia
default:
throw new IllegalStateException("Unexpected user task status: " + taskStatus);
}
result.complete(new CruiseControlResponse(userTaskID, statusJson));
result.complete(new CruiseControlUserTasksResponse(userTaskID, statusJson));
}
});
} else if (response.result().statusCode() == 500) {
Expand All @@ -423,8 +423,15 @@ public Future<CruiseControlResponse> getUserTaskStatus(Reconciliation reconcilia
} else {
errorString = json.toString();
}
result.fail(new CruiseControlRestException(
"Error for request: " + host + ":" + port + path + ". Server returned: " + errorString));
if (errorString.matches(".*" + "There are already \\d+ active user tasks, which has reached the servlet capacity." + ".*")) {
LOGGER.debugCr(reconciliation, errorString);
CruiseControlUserTasksResponse ccResponse = new CruiseControlUserTasksResponse(userTaskID, json);
ccResponse.setMaxActiveUserTasksReached(true);
result.complete(ccResponse);
} else {
result.fail(new CruiseControlRestException(
"Error for request: " + host + ":" + port + path + ". Server returned: " + errorString));
}
});
} else {
result.fail(new CruiseControlRestException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.operator.cluster.operator.resource.cruisecontrol;

import io.vertx.core.json.JsonObject;

/**
* Response to user tasks request
*/
public class CruiseControlUserTasksResponse extends CruiseControlResponse {
private boolean isMaxActiveUserTasksReached;

/**
* Constructor
*
* @param userTaskId User task ID
* @param json JSON data
*/
CruiseControlUserTasksResponse(String userTaskId, JsonObject json) {
super(userTaskId, json);
// The maximum number of active user tasks that can run concurrently has reached
// Sourced from the error message that contains "reached the servlet capacity" from the Cruise Control response
this.isMaxActiveUserTasksReached = false;
}

/**
* @return True If the maximum number of active user tasks that can run concurrently has reached
*/
public boolean isMaxActiveUserTasksReached() {
return isMaxActiveUserTasksReached;
}

protected void setMaxActiveUserTasksReached(boolean maxActiveUserTasksReached) {
this.isMaxActiveUserTasksReached = maxActiveUserTasksReached;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;

@ExtendWith(VertxExtension.class)
Expand Down Expand Up @@ -1298,6 +1299,50 @@ private void krRebalancingCompletedWithError(Vertx vertx, VertxTestContext conte
.onComplete(result -> checkOptimizationResults(result, context, true));
}

@Test
public void testGetUserTasksReturnError(Vertx vertx, VertxTestContext context) throws IOException, URISyntaxException {
KafkaRebalance kcRebalance = createKafkaRebalance(KafkaRebalanceState.Rebalancing, MockCruiseControl.REBALANCE_NO_GOALS_RESPONSE_UTID, null, REMOVE_BROKER_KAFKA_REBALANCE_SPEC, null, false);
cruiseControlServer.setupCCUserTasksToReturnError(500, "Some error");

CruiseControlApi client = new CruiseControlApiImpl(vertx, HTTP_DEFAULT_IDLE_TIMEOUT_SECONDS, MockCruiseControl.CC_SECRET, MockCruiseControl.CC_API_SECRET, true, true);
KafkaRebalanceAssemblyOperator kcrao = new KafkaRebalanceAssemblyOperator(vertx, ResourceUtils.supplierWithMocks(true), ResourceUtils.dummyClusterOperatorConfig(), cruiseControlPort) {
@Override
public String cruiseControlHost(String clusterName, String clusterNamespace) {
return HOST;
}
};

Reconciliation recon = new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, CLUSTER_NAMESPACE, RESOURCE_NAME);

kcrao.computeNextStatus(recon, HOST, client, kcRebalance, KafkaRebalanceState.Rebalancing, null)
.onComplete(mapAndStatusAsyncResult -> {
assertTrue(mapAndStatusAsyncResult.failed());
context.completeNow();
});
}

@Test
public void testGetUserTasksForRebalancingReturnServletFullError(Vertx vertx, VertxTestContext context) throws IOException, URISyntaxException {
KafkaRebalance kcRebalance = createKafkaRebalance(KafkaRebalanceState.Rebalancing, MockCruiseControl.REBALANCE_NO_GOALS_RESPONSE_UTID, null, REMOVE_BROKER_KAFKA_REBALANCE_SPEC, null, false);
cruiseControlServer.setupCCUserTasksToReturnError(500, "Error processing POST request '/remove_broker' due to: 'There are already 5 active user tasks, which has reached the servlet capacity'.");

checkTransition(vertx, context,
KafkaRebalanceState.Rebalancing, KafkaRebalanceState.Rebalancing,
kcRebalance)
.onComplete(result -> checkOptimizationResults(result, context, true));
}

@Test
public void testGetUserTasksForProposalReturnServletFullError(Vertx vertx, VertxTestContext context) throws IOException, URISyntaxException {
KafkaRebalance kcRebalance = createKafkaRebalance(KafkaRebalanceState.PendingProposal, MockCruiseControl.REBALANCE_NO_GOALS_RESPONSE_UTID, null, REMOVE_BROKER_KAFKA_REBALANCE_SPEC, null, false);
cruiseControlServer.setupCCUserTasksToReturnError(500, "Error processing POST request '/remove_broker' due to: 'There are already 5 active user tasks, which has reached the servlet capacity'.");

checkTransition(vertx, context,
KafkaRebalanceState.PendingProposal, KafkaRebalanceState.PendingProposal,
kcRebalance)
.onComplete(result -> checkOptimizationResults(result, context, true));
}

/**
* Tests the transition from 'Stopped' to 'PendingProposal' when refresh
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ public void testMockCCServerPendingCallsOverride(Vertx vertx, VertxTestContext c

cruiseControlServer.setupCCUserTasksResponseNoGoals(0, pendingCalls1);

Future<CruiseControlResponse> statusFuture = client.getUserTaskStatus(Reconciliation.DUMMY_RECONCILIATION, HOST, cruiseControlPort, userTaskID);
Future<CruiseControlUserTasksResponse> statusFuture = client.getUserTaskStatus(Reconciliation.DUMMY_RECONCILIATION, HOST, cruiseControlPort, userTaskID);

for (int i = 1; i <= pendingCalls1; i++) {
statusFuture = statusFuture.compose(response -> {
Expand Down Expand Up @@ -611,7 +611,7 @@ private void runTest(Vertx vertx, VertxTestContext context, String userTaskID, i

CruiseControlApi client = cruiseControlClientProvider(vertx);

Future<CruiseControlResponse> statusFuture = client.getUserTaskStatus(Reconciliation.DUMMY_RECONCILIATION, HOST, cruiseControlPort, userTaskID);
Future<CruiseControlUserTasksResponse> statusFuture = client.getUserTaskStatus(Reconciliation.DUMMY_RECONCILIATION, HOST, cruiseControlPort, userTaskID);

// One interaction is always expected at the end of the test, hence the +1
Checkpoint expectedInteractions = context.checkpoint(pendingCalls + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,25 @@ public void setupCCUserTasksCompletedWithError() throws IOException, URISyntaxEx
.withDelay(TimeUnit.SECONDS, 0));
}

public void setupCCUserTasksToReturnError(int statusCode, String errorMessage) throws IOException, URISyntaxException {
// This simulates asking for the status of a task that has Complete with error and fetch_completed_task=true
JsonBody errorJson = new JsonBody("{\"errorMessage\":\"" + errorMessage + "\"}");
server
.when(
request()
.withMethod("GET")
.withQueryStringParameter(Parameter.param(CruiseControlParameters.JSON.toString(), "true"))
.withQueryStringParameter(Parameter.param(CruiseControlParameters.FETCH_COMPLETE.toString(), "true"))
.withPath(CruiseControlEndpoints.USER_TASKS.toString())
.withHeader(AUTH_HEADER)
.withSecure(true))
.respond(
response()
.withBody(errorJson)
.withStatusCode(statusCode)
.withDelay(TimeUnit.SECONDS, 0));
}

/**
* Setup response when user task is not found
*/
Expand Down

0 comments on commit 9a9a6df

Please sign in to comment.