Skip to content

Commit

Permalink
[CELEBORN-1663] Only register appShuffleDeterminate if stage using ce…
Browse files Browse the repository at this point in the history
…leborn for shuffle

Only register appShuffleDeterminate if stage using celeborn for shuffle

Currently we are passing stage info to lifecyclemanager, eventhough it is not required.

NA

Existing UTs

Closes apache#2832 from s0nskar/fix_register.

Authored-by: Sanskar Modi <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit 813b45f)
Signed-off-by: Shuang <[email protected]>
  • Loading branch information
s0nskar authored and RexXiong committed Oct 22, 2024
1 parent bce6166 commit 8b7b181
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,6 @@ public <K, V, C> ShuffleHandle registerShuffle(
appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
initializeLifecycleManager();

lifecycleManager.registerAppShuffleDeterminate(
shuffleId,
dependency.rdd().getOutputDeterministicLevel() != DeterministicLevel.INDETERMINATE());

if (fallbackPolicyRunner.applyAllFallbackPolicy(
lifecycleManager, dependency.partitioner().numPartitions())) {
if (conf.getBoolean("spark.dynamicAllocation.enabled", false)
Expand All @@ -178,6 +174,11 @@ public <K, V, C> ShuffleHandle registerShuffle(
sortShuffleIds.add(shuffleId);
return sortShuffleManager().registerShuffle(shuffleId, dependency);
} else {
lifecycleManager.registerAppShuffleDeterminate(
shuffleId,
!DeterministicLevel.INDETERMINATE()
.equals(dependency.rdd().getOutputDeterministicLevel()));

return new CelebornShuffleHandle<>(
appUniqueId,
lifecycleManager.getHost(),
Expand Down

0 comments on commit 8b7b181

Please sign in to comment.