diff --git a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala index 745cc0770b0..f4edc0f2ff3 100644 --- a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala +++ b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala @@ -111,7 +111,7 @@ class CelebornShuffleReader[K, C]( fileGroups = shuffleClient.updateFileGroup(shuffleId, startPartition) } catch { case ce @ (_: CelebornIOException | _: PartitionUnRetryAbleException) => - handleFetchExceptions(shuffleId, 0, ce) + handleFetchExceptions(handle.shuffleId, shuffleId, 0, ce) case e: Throwable => throw e } @@ -254,7 +254,7 @@ class CelebornShuffleReader[K, C]( if (exceptionRef.get() != null) { exceptionRef.get() match { case ce @ (_: CelebornIOException | _: PartitionUnRetryAbleException) => - handleFetchExceptions(handle.shuffleId, partitionId, ce) + handleFetchExceptions(handle.shuffleId, shuffleId, partitionId, ce) case e => throw e } } @@ -289,7 +289,7 @@ class CelebornShuffleReader[K, C]( iter } catch { case e @ (_: CelebornIOException | _: PartitionUnRetryAbleException) => - handleFetchExceptions(handle.shuffleId, partitionId, e) + handleFetchExceptions(handle.shuffleId, shuffleId, partitionId, e) } } @@ -369,17 +369,21 @@ class CelebornShuffleReader[K, C]( } } - private def handleFetchExceptions(shuffleId: Int, partitionId: Int, ce: Throwable) = { + private def handleFetchExceptions( + appShuffleId: Int, + shuffleId: Int, + partitionId: Int, + ce: Throwable) = { if (throwsFetchFailure && - shuffleClient.reportShuffleFetchFailure(handle.shuffleId, shuffleId)) { + shuffleClient.reportShuffleFetchFailure(appShuffleId, shuffleId)) { logWarning(s"Handle fetch exceptions for ${shuffleId}-${partitionId}", ce) throw new FetchFailedException( null, - handle.shuffleId, + appShuffleId, -1, -1, partitionId, - SparkUtils.FETCH_FAILURE_ERROR_MSG + handle.shuffleId + "/" + shuffleId, + SparkUtils.FETCH_FAILURE_ERROR_MSG + appShuffleId + "/" + shuffleId, ce) } else throw ce