diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index e6d2f5b49ae2..dbe67fe673f3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -47,6 +47,7 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.inject.Inject; @@ -677,14 +678,31 @@ public RebalanceResult rebalance( if (dryRunResult.getStatus() == RebalanceResult.Status.DONE) { // If dry-run succeeded, run rebalance asynchronously rebalanceConfig.setDryRun(false); - _executorService.submit(() -> { + Future rebalanceResultFuture = _executorService.submit(() -> { try { - _pinotHelixResourceManager.rebalanceTable(tableNameWithType, rebalanceConfig, rebalanceJobId, true); + return _pinotHelixResourceManager.rebalanceTable( + tableNameWithType, rebalanceConfig, rebalanceJobId, true); } catch (Throwable t) { - LOGGER.error("Caught exception/error while rebalancing table: {}", tableNameWithType, t); + String errorMsg = String.format("Caught exception/error while rebalancing table: %s", tableNameWithType); + LOGGER.error(errorMsg, t); + return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, errorMsg, null, null, null); } }); - waitForJobIdToPersist(dryRunResult.getJobId(), tableNameWithType); + boolean isJobIdPersisted = waitForRebalanceToPersist( + dryRunResult.getJobId(), tableNameWithType, rebalanceResultFuture); + + if (rebalanceResultFuture.isDone()) { + try { + return rebalanceResultFuture.get(); + } catch (Throwable t) { + if (!isJobIdPersisted) { + // If the jobId is not persisted, we return the exception to indicate the rebalance failed. + // Otherwise, polling the job id return NOT_FOUND indefinitely. + throw new ControllerApplicationException(LOGGER, t.getMessage(), Response.Status.INTERNAL_SERVER_ERROR); + } + } + } + return new RebalanceResult(dryRunResult.getJobId(), RebalanceResult.Status.IN_PROGRESS, "In progress, check controller logs for updates", dryRunResult.getInstanceAssignment(), dryRunResult.getTierInstanceAssignment(), dryRunResult.getSegmentAssignment()); @@ -699,17 +717,21 @@ public RebalanceResult rebalance( } /** - * Waits for jobId to be persisted using a retry policy. + * Waits for jobId to be persisted or the rebalance to complete using a retry policy. * Tables with 100k+ segments take up to a few seconds for the jobId to persist. This ensures the jobId is present * before returning the jobId to the caller, so they can correctly poll the jobId. */ - public void waitForJobIdToPersist(String jobId, String tableNameWithType) { + public boolean waitForRebalanceToPersist( + String jobId, String tableNameWithType, Future rebalanceResultFuture) { try { // This retry policy waits at most for 7.5s to 15s in total. This is chosen to cover typical delays for tables // with many segments and avoid excessive HTTP request timeouts. - RetryPolicies.exponentialBackoffRetryPolicy(5, 500L, 2.0).attempt(() -> getControllerJobMetadata(jobId) != null); + RetryPolicies.exponentialBackoffRetryPolicy(5, 500L, 2.0).attempt(() -> + getControllerJobMetadata(jobId) != null || rebalanceResultFuture.isDone()); + return true; } catch (Exception e) { LOGGER.warn("waiting for jobId not successful while rebalancing table: {}", tableNameWithType); + return false; } }