Skip to content

Commit

Permalink
[fix] distinguish sorting-file from sort-tasks waiting to be submitted.
Browse files Browse the repository at this point in the history
  • Loading branch information
Z1Wu committed Dec 30, 2024
1 parent a572380 commit 149dc32
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,14 @@ public PartitionFilesSorter(
});
}

public int getSortingCount() {
public int getPendingSortTaskCount() {
return shuffleSortTaskDeque.size();
}

public int getSortingCount() {
return sortingShuffleFiles.values().stream().map(Set::size).reduce(Integer::sum).orElse(0);
}

public int getSortedCount() {
return sortedFileCount.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,9 @@ private[celeborn] class Worker(
workerSource.addGauge(WorkerSource.SORT_MEMORY) { () =>
memoryManager.getSortMemoryCounter.get()
}
workerSource.addGauge(WorkerSource.PENDING_SORT_TASKS) { () =>
partitionsSorter.getPendingSortTaskCount
}
workerSource.addGauge(WorkerSource.SORTING_FILES) { () =>
partitionsSorter.getSortingCount
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ object WorkerSource {
val SORT_TIME = "SortTime"
val SORT_MEMORY = "SortMemory"
val SORTING_FILES = "SortingFiles"
val PENDING_SORT_TASKS = "PendingSortTasks"
val SORTED_FILES = "SortedFiles"
val SORTED_FILE_SIZE = "SortedFileSize"
val DISK_BUFFER = "DiskBuffer"
Expand Down

0 comments on commit 149dc32

Please sign in to comment.