Skip to content

Commit

Permalink
return the current rebalance result if already done (#13488)
Browse files Browse the repository at this point in the history
  • Loading branch information
jadami10 authored Jul 8, 2024
1 parent c29efe7 commit 2efd5ca
Showing 1 changed file with 29 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.inject.Inject;
Expand Down Expand Up @@ -677,14 +678,31 @@ public RebalanceResult rebalance(
if (dryRunResult.getStatus() == RebalanceResult.Status.DONE) {
// If dry-run succeeded, run rebalance asynchronously
rebalanceConfig.setDryRun(false);
_executorService.submit(() -> {
Future<RebalanceResult> rebalanceResultFuture = _executorService.submit(() -> {
try {
_pinotHelixResourceManager.rebalanceTable(tableNameWithType, rebalanceConfig, rebalanceJobId, true);
return _pinotHelixResourceManager.rebalanceTable(
tableNameWithType, rebalanceConfig, rebalanceJobId, true);
} catch (Throwable t) {
LOGGER.error("Caught exception/error while rebalancing table: {}", tableNameWithType, t);
String errorMsg = String.format("Caught exception/error while rebalancing table: %s", tableNameWithType);
LOGGER.error(errorMsg, t);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, errorMsg, null, null, null);
}
});
waitForJobIdToPersist(dryRunResult.getJobId(), tableNameWithType);
boolean isJobIdPersisted = waitForRebalanceToPersist(
dryRunResult.getJobId(), tableNameWithType, rebalanceResultFuture);

if (rebalanceResultFuture.isDone()) {
try {
return rebalanceResultFuture.get();
} catch (Throwable t) {
if (!isJobIdPersisted) {
// If the jobId is not persisted, we return the exception to indicate the rebalance failed.
// Otherwise, polling the job id return NOT_FOUND indefinitely.
throw new ControllerApplicationException(LOGGER, t.getMessage(), Response.Status.INTERNAL_SERVER_ERROR);
}
}
}

return new RebalanceResult(dryRunResult.getJobId(), RebalanceResult.Status.IN_PROGRESS,
"In progress, check controller logs for updates", dryRunResult.getInstanceAssignment(),
dryRunResult.getTierInstanceAssignment(), dryRunResult.getSegmentAssignment());
Expand All @@ -699,17 +717,21 @@ public RebalanceResult rebalance(
}

/**
* Waits for jobId to be persisted using a retry policy.
* Waits for jobId to be persisted or the rebalance to complete using a retry policy.
* Tables with 100k+ segments take up to a few seconds for the jobId to persist. This ensures the jobId is present
* before returning the jobId to the caller, so they can correctly poll the jobId.
*/
public void waitForJobIdToPersist(String jobId, String tableNameWithType) {
public boolean waitForRebalanceToPersist(
String jobId, String tableNameWithType, Future<RebalanceResult> rebalanceResultFuture) {
try {
// This retry policy waits at most for 7.5s to 15s in total. This is chosen to cover typical delays for tables
// with many segments and avoid excessive HTTP request timeouts.
RetryPolicies.exponentialBackoffRetryPolicy(5, 500L, 2.0).attempt(() -> getControllerJobMetadata(jobId) != null);
RetryPolicies.exponentialBackoffRetryPolicy(5, 500L, 2.0).attempt(() ->
getControllerJobMetadata(jobId) != null || rebalanceResultFuture.isDone());
return true;
} catch (Exception e) {
LOGGER.warn("waiting for jobId not successful while rebalancing table: {}", tableNameWithType);
return false;
}
}

Expand Down

0 comments on commit 2efd5ca

Please sign in to comment.