From dc5219216331be1ab0b949f681542452a9c465b1 Mon Sep 17 00:00:00 2001 From: sychen Date: Mon, 6 May 2024 17:42:52 +0800 Subject: [PATCH] [CELEBORN-1409] CommitHandler commitFiles RPC supports separate timeout configuration ### What changes were proposed in this pull request? This PR aims to supports separate timeout configuration at CommitHandler commitFiles RPC. ### Why are the changes needed? The default value of `celeborn.worker.commitFiles.timeout` is 120s, and the default value of Client's RPC is 60s. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA Closes #2488 from cxzl25/CELEBORN-1409. Authored-by: sychen Signed-off-by: SteNicholas --- .../celeborn/client/commit/CommitHandler.scala | 3 ++- .../org/apache/celeborn/common/CelebornConf.scala | 12 ++++++++++++ docs/configuration/client.md | 1 + 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala index af4bd99f469..8532599bdc8 100644 --- a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala +++ b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala @@ -68,6 +68,7 @@ abstract class CommitHandler( val sharedRpcPool: ThreadPoolExecutor) extends Logging { private val pushReplicateEnabled = conf.clientPushReplicateEnabled + private val clientRpcCommitFilesAskTimeout = conf.clientRpcCommitFilesAskTimeout private val commitEpoch = new AtomicLong() private val totalWritten = new LongAdder @@ -452,7 +453,7 @@ abstract class CommitHandler( message.replicaIds) }(ec) } else { - worker.endpoint.ask[CommitFilesResponse](message) + worker.endpoint.ask[CommitFilesResponse](message, clientRpcCommitFilesAskTimeout) } } diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index ec680fd537c..edc6f25c3c5 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -924,6 +924,11 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se get(CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT).milli, CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT.key) + def clientRpcCommitFilesAskTimeout: RpcTimeout = + new RpcTimeout( + get(CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT).milli, + CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT.key) + // ////////////////////////////////////////////////////// // Shuffle Client Fetch // // ////////////////////////////////////////////////////// @@ -4243,6 +4248,13 @@ object CelebornConf extends Logging { s"By default, the value is the max timeout value `${NETWORK_IO_CONNECTION_TIMEOUT.key}`.") .fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT) + val CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT: ConfigEntry[Long] = + buildConf("celeborn.client.rpc.commitFiles.askTimeout") + .categories("client") + .version("0.4.1") + .doc("Timeout for CommitHandler commit files.") + .fallbackConf(RPC_ASK_TIMEOUT) + val CLIENT_RPC_CACHE_SIZE: ConfigEntry[Int] = buildConf("celeborn.client.rpc.cache.size") .withAlternative("celeborn.rpc.cache.size") diff --git a/docs/configuration/client.md b/docs/configuration/client.md index 64611d2111b..7ad3dbc5be5 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -74,6 +74,7 @@ license: | | celeborn.client.rpc.cache.concurrencyLevel | 32 | false | The number of write locks to update rpc cache. | 0.3.0 | celeborn.rpc.cache.concurrencyLevel | | celeborn.client.rpc.cache.expireTime | 15s | false | The time before a cache item is removed. | 0.3.0 | celeborn.rpc.cache.expireTime | | celeborn.client.rpc.cache.size | 256 | false | The max cache items count for rpc cache. | 0.3.0 | celeborn.rpc.cache.size | +| celeborn.client.rpc.commitFiles.askTimeout | <value of celeborn.rpc.askTimeout> | false | Timeout for CommitHandler commit files. | 0.4.1 | | | celeborn.client.rpc.getReducerFileGroup.askTimeout | <value of celeborn.<module>.io.connectionTimeout> | false | Timeout for ask operations during getting reducer file group information. During this process, there are `celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities for committing files and 1 times for releasing slots request. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn..io.connectionTimeout`. | 0.2.0 | | | celeborn.client.rpc.maxRetries | 3 | false | Max RPC retry times in LifecycleManager. | 0.3.2 | | | celeborn.client.rpc.registerShuffle.askTimeout | <value of celeborn.<module>.io.connectionTimeout> | false | Timeout for ask operations during register shuffle. During this process, there are two times for retry opportunities for requesting slots, one request for establishing a connection with Worker and `celeborn.client.reserveSlots.maxRetries` times for retry opportunities for reserving slots. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn..io.connectionTimeout`. | 0.3.0 | celeborn.rpc.registerShuffle.askTimeout |