Skip to content

Commit

Permalink
[CELEBORN-1663][FOLLOWUP] Only register appShuffleDeterminate if stag…
Browse files Browse the repository at this point in the history
…e using celeborn for shuffle

Making the same changes for Spark2 codebase

Followup for apache#2832

NA

Existing UTs

Closes apache#2837 from s0nskar/fix_register_spark2.

Authored-by: Sanskar Modi <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit 1e77f01)
Signed-off-by: SteNicholas <[email protected]>
  • Loading branch information
s0nskar authored and SteNicholas committed Oct 22, 2024
1 parent 8b7b181 commit f089a0b
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,17 @@ public <K, V, C> ShuffleHandle registerShuffle(
appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
initializeLifecycleManager(appUniqueId);

lifecycleManager.registerAppShuffleDeterminate(
shuffleId,
dependency.rdd().getOutputDeterministicLevel() != DeterministicLevel.INDETERMINATE());

if (fallbackPolicyRunner.applyAllFallbackPolicy(
lifecycleManager, dependency.partitioner().numPartitions())) {
logger.warn("Fallback to SortShuffleManager!");
sortShuffleIds.add(shuffleId);
return sortShuffleManager().registerShuffle(shuffleId, numMaps, dependency);
} else {
lifecycleManager.registerAppShuffleDeterminate(
shuffleId,
!DeterministicLevel.INDETERMINATE()
.equals(dependency.rdd().getOutputDeterministicLevel()));

return new CelebornShuffleHandle<>(
appUniqueId,
lifecycleManager.getHost(),
Expand Down

0 comments on commit f089a0b

Please sign in to comment.