Skip to content

Commit

Permalink
[CELEBORN-1421] Refine code in master to reduce unnecessary sync to g…
Browse files Browse the repository at this point in the history
…et workers/lostworkers/shutdownWorkers

### What changes were proposed in this pull request?

1. Use ConcurrentSet to replace ArrayList for workers.
2. Remove unnecessary sync and snapshot when get workers/lostworkers/shutdownWorkers

### Why are the changes needed?

1. Reduce unnecessary sync to get workers/lostworkers/shutdownWorkers.
2. Somewhere in the Master, directly using statusSystem.workers(ArrayList) is not safe, potentially leading to concurrent modification issues.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GA

Closes apache#2507 from RexXiong/CELEBORN-1421.

Authored-by: Shuang <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
  • Loading branch information
RexXiong authored and SteNicholas committed May 17, 2024
1 parent 9908035 commit 8a10a2d
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ object PbSerDeUtils {
manuallyExcludedWorkers: java.util.Set[WorkerInfo],
workerLostEvent: java.util.Set[WorkerInfo],
appHeartbeatTime: java.util.Map[String, java.lang.Long],
workers: java.util.List[WorkerInfo],
workers: java.util.Set[WorkerInfo],
partitionTotalWritten: java.lang.Long,
partitionTotalFileCount: java.lang.Long,
appDiskUsageMetricSnapshots: Array[AppDiskUsageSnapShot],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -66,7 +65,8 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
// Metadata for master service
public final Set<String> registeredShuffle = ConcurrentHashMap.newKeySet();
public final Set<String> hostnameSet = ConcurrentHashMap.newKeySet();
public final ArrayList<WorkerInfo> workers = new ArrayList<>();
public final Set<WorkerInfo> workers = ConcurrentHashMap.newKeySet();

public final ConcurrentHashMap<WorkerInfo, Long> lostWorkers = JavaUtils.newConcurrentHashMap();
public final ConcurrentHashMap<WorkerInfo, WorkerEventInfo> workerEventInfos =
JavaUtils.newConcurrentHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,6 @@ private[celeborn] class Master(
private val userResourceConsumptions =
JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption, Long)]()

// States
private def workersSnapShot: util.List[WorkerInfo] =
statusSystem.workers.synchronized(new util.ArrayList[WorkerInfo](statusSystem.workers))
private def lostWorkersSnapshot: ConcurrentHashMap[WorkerInfo, java.lang.Long] =
statusSystem.workers.synchronized(JavaUtils.newConcurrentHashMap(statusSystem.lostWorkers))
private def shutdownWorkerSnapshot: util.List[WorkerInfo] =
statusSystem.workers.synchronized(new util.ArrayList[WorkerInfo](statusSystem.shutdownWorkers))

private def diskReserveSize = conf.workerDiskReserveSize
private def diskReserveRatio = conf.workerDiskReserveRatio

Expand Down Expand Up @@ -565,7 +557,7 @@ private[celeborn] class Master(
return
}

workersSnapShot.asScala.foreach { worker =>
statusSystem.workers.asScala.foreach { worker =>
if (worker.lastHeartbeat < currentTime - workerHeartbeatTimeoutMs
&& !statusSystem.workerLostEvents.contains(worker)) {
logWarning(s"Worker ${worker.readableAddress()} timeout! Trigger WorkerLost event.")
Expand All @@ -588,7 +580,7 @@ private[celeborn] class Master(
return
}

val unavailableInfoTimeoutWorkers = lostWorkersSnapshot.asScala.filter {
val unavailableInfoTimeoutWorkers = statusSystem.lostWorkers.asScala.filter {
case (_, lostTime) => currentTime - lostTime > workerUnavailableInfoExpireTimeoutMs
}.keySet.toList.asJava

Expand Down Expand Up @@ -638,7 +630,7 @@ private[celeborn] class Master(
workerStatus: WorkerStatus,
requestId: String): Unit = {
val targetWorker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort)
val registered = workersSnapShot.asScala.contains(targetWorker)
val registered = statusSystem.workers.asScala.contains(targetWorker)
if (!registered) {
logWarning(s"Received heartbeat from unknown worker " +
s"$host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
Expand Down Expand Up @@ -708,7 +700,7 @@ private[celeborn] class Master(
-1,
new util.HashMap[String, DiskInfo](),
JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption]())
val worker: WorkerInfo = workersSnapShot
val worker: WorkerInfo = statusSystem.workers
.asScala
.find(_ == targetWorker)
.orNull
Expand Down Expand Up @@ -745,7 +737,7 @@ private[celeborn] class Master(
internalPort,
disks,
userResourceConsumption)
if (workersSnapShot.contains(workerToRegister)) {
if (statusSystem.workers.contains(workerToRegister)) {
logWarning(s"Receive RegisterWorker while worker" +
s" ${workerToRegister.toString()} already exists, re-register.")
// TODO: remove `WorkerRemove` because we have improve register logic to cover `WorkerRemove`
Expand Down Expand Up @@ -1015,14 +1007,14 @@ private[celeborn] class Master(
System.currentTimeMillis(),
requestId)
// unknown workers will retain in needCheckedWorkerList
needCheckedWorkerList.removeAll(workersSnapShot)
needCheckedWorkerList.removeAll(statusSystem.workers)
if (shouldResponse) {
context.reply(HeartbeatFromApplicationResponse(
StatusCode.SUCCESS,
new util.ArrayList(
(statusSystem.excludedWorkers.asScala ++ statusSystem.manuallyExcludedWorkers.asScala).asJava),
needCheckedWorkerList,
shutdownWorkerSnapshot))
new util.ArrayList[WorkerInfo](statusSystem.shutdownWorkers)))
} else {
context.reply(OneWayMessageResponse)
}
Expand Down Expand Up @@ -1124,9 +1116,9 @@ private[celeborn] class Master(

private def workersAvailable(
tmpExcludedWorkerList: Set[WorkerInfo] = Set.empty): util.List[WorkerInfo] = {
workersSnapShot.asScala.filter { w =>
statusSystem.workers.asScala.filter { w =>
statusSystem.isWorkerAvailable(w) && !tmpExcludedWorkerList.contains(w)
}.asJava
}.toList.asJava
}

private def handleRequestForApplicationMeta(
Expand Down Expand Up @@ -1157,7 +1149,7 @@ private[celeborn] class Master(
}

private def getWorkers: String = {
workersSnapShot.asScala.mkString("\n")
statusSystem.workers.asScala.mkString("\n")
}

override def handleWorkerEvent(workerEventType: String, workers: String): String = {
Expand Down Expand Up @@ -1199,7 +1191,7 @@ private[celeborn] class Master(
override def getLostWorkers: String = {
val sb = new StringBuilder
sb.append("======================= Lost Workers in Master ========================\n")
lostWorkersSnapshot.asScala.toSeq.sortBy(_._2).foreach { case (worker, time) =>
statusSystem.lostWorkers.asScala.toSeq.sortBy(_._2).foreach { case (worker, time) =>
sb.append(s"${worker.toUniqueId().padTo(50, " ").mkString}${Utils.formatTimestamp(time)}\n")
}
sb.toString()
Expand All @@ -1208,7 +1200,7 @@ private[celeborn] class Master(
override def getShutdownWorkers: String = {
val sb = new StringBuilder
sb.append("===================== Shutdown Workers in Master ======================\n")
shutdownWorkerSnapshot.asScala.foreach { worker =>
statusSystem.shutdownWorkers.asScala.foreach { worker =>
sb.append(s"${worker.toUniqueId()}\n")
}
sb.toString()
Expand Down Expand Up @@ -1281,7 +1273,7 @@ private[celeborn] class Master(
s"Failed to Exclude workers add ${workersToAdd.mkString(",")} and remove ${workersToRemove.mkString(",")}.\n")
}
val unknownExcludedWorkers =
(workersToAdd ++ workersToRemove).filter(!workersSnapShot.contains(_))
(workersToAdd ++ workersToRemove).filter(!statusSystem.workers.contains(_))
if (unknownExcludedWorkers.nonEmpty) {
sb.append(
s"Unknown worker ${unknownExcludedWorkers.mkString(",")}. Workers in Master:\n$getWorkers.")
Expand Down

0 comments on commit 8a10a2d

Please sign in to comment.