Skip to content

Commit

Permalink
[CELEBORN-1032][FOLLOWUP] Use scheduleWithFixedDelay instead of sched…
Browse files Browse the repository at this point in the history
…uleAtFixedRate in threads pool of master and worker

### What changes were proposed in this pull request?

Use `scheduleWithFixedDelay` instead of `scheduleAtFixedRate` in thread pool of Celeborn Master and Worker.

### Why are the changes needed?

Follow up apache#1970.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Internal tests.

Closes apache#2048 from SteNicholas/CELEBORN-1032.

Authored-by: SteNicholas <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
  • Loading branch information
SteNicholas authored and waitinfuture committed Oct 27, 2023
1 parent 5d3ae31 commit df40a28
Show file tree
Hide file tree
Showing 13 changed files with 19 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private SendBufferPool(int capacity, long checkInterval, long timeout) {
pushTaskQueues = new LinkedList<>();

lastAquireTime = System.currentTimeMillis();
cleaner.scheduleAtFixedRate(
cleaner.scheduleWithFixedDelay(
() -> {
if (System.currentTimeMillis() - lastAquireTime > timeout) {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public ReviveManager(ShuffleClientImpl shuffleClient, CelebornConf conf) {
this.interval = conf.clientPushReviveInterval();
this.batchSize = conf.clientPushReviveBatchSize();

batchReviveRequestScheduler.scheduleAtFixedRate(
batchReviveRequestScheduler.scheduleWithFixedDelay(
() -> {
Map<Integer, Set<ReviveRequest>> shuffleMap = new HashMap<>();
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ApplicationHeartbeater(
private var appHeartbeat: ScheduledFuture[_] = _

def start(): Unit = {
appHeartbeat = appHeartbeatHandlerThread.scheduleAtFixedRate(
appHeartbeat = appHeartbeatHandlerThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class ChangePartitionManager(
def start(): Unit = {
batchHandleChangePartition = batchHandleChangePartitionSchedulerThread.map {
// noinspection ConvertExpressionToSAM
_.scheduleAtFixedRate(
_.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class CommitManager(appUniqueId: String, val conf: CelebornConf, lifecycleManage
lifecycleManager.registerWorkerStatusListener(new ShutdownWorkerListener)

batchHandleCommitPartition = batchHandleCommitPartitionSchedulerThread.map {
_.scheduleAtFixedRate(
_.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
committedPartitionInfo.asScala.foreach { case (shuffleId, shuffleCommittedInfo) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends

override def onStart(): Unit = {
// noinspection ConvertExpressionToSAM
checkForShuffleRemoval = forwardMessageThread.scheduleAtFixedRate(
checkForShuffleRemoval = forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(RemoveExpiredShuffle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ReleasePartitionManager(
def start(): Unit = {
batchHandleReleasePartition = batchHandleReleasePartitionSchedulerThread.map {
// noinspection ConvertExpressionToSAM
_.scheduleAtFixedRate(
_.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (enableHeartbeat) {
heartbeatFuture =
ctx.executor()
.scheduleAtFixedRate(
.scheduleWithFixedDelay(
() -> {
logger.debug("send heartbeat");
ctx.writeAndFlush(new Heartbeat());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class AppDiskUsageMetric(conf: CelebornConf) extends Logging {
})
}

logExecutor.scheduleAtFixedRate(
logExecutor.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
if (currentSnapShot.get() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private[celeborn] class Master(
conf.estimatedPartitionSizeForEstimationUpdateInterval
private val partitionSizeUpdateService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("partition-size-updater")
partitionSizeUpdateService.scheduleAtFixedRate(
partitionSizeUpdateService.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
executeWithLeaderChecker(
Expand Down Expand Up @@ -199,7 +199,7 @@ private[celeborn] class Master(

// start threads to check timeout for workers and applications
override def onStart(): Unit = {
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
checkForWorkerTimeOutTask = forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(ControlMessages.pbCheckForWorkerTimeout)
Expand All @@ -209,7 +209,7 @@ private[celeborn] class Master(
workerHeartbeatTimeoutMs,
TimeUnit.MILLISECONDS)

checkForApplicationTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
checkForApplicationTimeOutTask = forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForApplicationTimeOut)
Expand All @@ -219,7 +219,7 @@ private[celeborn] class Master(
appHeartbeatTimeoutMs / 2,
TimeUnit.MILLISECONDS)

checkForUnavailableWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
checkForUnavailableWorkerTimeOutTask = forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerUnavailableInfoTimeout)
Expand All @@ -230,7 +230,7 @@ private[celeborn] class Master(
TimeUnit.MILLISECONDS)

if (hasHDFSStorage) {
checkForHDFSRemnantDirsTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
checkForHDFSRemnantDirsTimeOutTask = forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForHDFSExpiredDirsTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,15 +372,15 @@ private[celeborn] class Worker(
registerWithMaster()

// start heartbeat
sendHeartbeatTask = forwardMessageScheduler.scheduleAtFixedRate(
sendHeartbeatTask = forwardMessageScheduler.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError { heartbeatToMaster() }
},
heartbeatInterval,
heartbeatInterval,
TimeUnit.MILLISECONDS)

checkFastFailTask = forwardMessageScheduler.scheduleAtFixedRate(
checkFastFailTask = forwardMessageScheduler.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
unavailablePeers.entrySet().forEach { entry: JMap.Entry[WorkerInfo, Long] =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class LocalDeviceMonitor(
}

override def startCheck(): Unit = {
diskChecker.scheduleAtFixedRate(
diskChecker.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
logDebug("Device check start")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
saveCommittedFileInfosExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"StorageManager-save-committed-fileinfo-thread")
saveCommittedFileInfosExecutor.scheduleAtFixedRate(
saveCommittedFileInfosExecutor.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
if (!committedFileInfos.isEmpty) {
Expand Down Expand Up @@ -588,7 +588,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
private val storageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("storage-scheduler")

storageScheduler.scheduleAtFixedRate(
storageScheduler.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
try {
Expand Down

0 comments on commit df40a28

Please sign in to comment.