Skip to content

Commit

Permalink
basic methods testing for both spark2 and spark3
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Dec 28, 2024
1 parent 1f252e7 commit fbca79a
Showing 1 changed file with 52 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,56 @@ class SparkUtilsSuite extends AnyFunSuite
}
}
}

test("getTaskSetManager/getTaskAttempts test") {
val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[2,3]")
val sparkSession = SparkSession.builder()
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.config("spark.sql.shuffle.partitions", 2)
.config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
.config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
.config(
"spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager")
.getOrCreate()

try {
val sc = sparkSession.sparkContext
val jobThread = new Thread {
override def run(): Unit = {
try {
sc.parallelize(1 to 100, 2)
.repartition(1)
.mapPartitions { iter =>
Thread.sleep(3000)
iter
}.collect()
} catch {
case _: InterruptedException =>
}
}
}
jobThread.start()

val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl]
eventually(timeout(3.seconds), interval(100.milliseconds)) {
val taskId = 0
val taskSetManager = SparkUtils.getTaskSetManager(taskScheduler, taskId)
assert(taskSetManager != null)
assert(SparkUtils.getTaskAttempts(taskSetManager, taskId)._2.size() == 1)
assert(!SparkUtils.taskAnotherAttemptRunningOrSuccessful(taskId))
assert(SparkUtils.reportedStageShuffleFetchFailureTaskIds.size() == 1)
}

sparkSession.sparkContext.cancelAllJobs()

jobThread.interrupt()

eventually(timeout(3.seconds), interval(100.milliseconds)) {
assert(SparkUtils.reportedStageShuffleFetchFailureTaskIds.size() == 0)
}
} finally {
sparkSession.stop()
}
}
}

0 comments on commit fbca79a

Please sign in to comment.