Skip to content

Commit

Permalink
[CELEBORN-1409] CommitHandler commitFiles RPC supports separate timeo…
Browse files Browse the repository at this point in the history
…ut configuration

This PR aims to supports separate timeout configuration at CommitHandler commitFiles RPC.

The default value of `celeborn.worker.commitFiles.timeout` is 120s, and the default value of Client's RPC is 60s.

No

GA

Closes apache#2488 from cxzl25/CELEBORN-1409.

Authored-by: sychen <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
  • Loading branch information
cxzl25 authored and SteNicholas committed May 6, 2024
1 parent f895343 commit 5125164
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ abstract class CommitHandler(
val sharedRpcPool: ThreadPoolExecutor) extends Logging {

private val pushReplicateEnabled = conf.clientPushReplicateEnabled
private val clientRpcCommitFilesAskTimeout = conf.clientRpcCommitFilesAskTimeout

private val commitEpoch = new AtomicLong()
private val totalWritten = new LongAdder
Expand Down Expand Up @@ -452,7 +453,7 @@ abstract class CommitHandler(
message.replicaIds)
}(ec)
} else {
worker.endpoint.ask[CommitFilesResponse](message)
worker.endpoint.ask[CommitFilesResponse](message, clientRpcCommitFilesAskTimeout)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,11 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
get(CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT).milli,
CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT.key)

def clientRpcCommitFilesAskTimeout: RpcTimeout =
new RpcTimeout(
get(CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT).milli,
CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT.key)

// //////////////////////////////////////////////////////
// Shuffle Client Fetch //
// //////////////////////////////////////////////////////
Expand Down Expand Up @@ -4243,6 +4248,13 @@ object CelebornConf extends Logging {
s"By default, the value is the max timeout value `${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
.fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT)

val CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.rpc.commitFiles.askTimeout")
.categories("client")
.version("0.4.1")
.doc("Timeout for CommitHandler commit files.")
.fallbackConf(RPC_ASK_TIMEOUT)

val CLIENT_RPC_CACHE_SIZE: ConfigEntry[Int] =
buildConf("celeborn.client.rpc.cache.size")
.withAlternative("celeborn.rpc.cache.size")
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ license: |
| celeborn.client.rpc.cache.concurrencyLevel | 32 | false | The number of write locks to update rpc cache. | 0.3.0 | celeborn.rpc.cache.concurrencyLevel |
| celeborn.client.rpc.cache.expireTime | 15s | false | The time before a cache item is removed. | 0.3.0 | celeborn.rpc.cache.expireTime |
| celeborn.client.rpc.cache.size | 256 | false | The max cache items count for rpc cache. | 0.3.0 | celeborn.rpc.cache.size |
| celeborn.client.rpc.commitFiles.askTimeout | &lt;value of celeborn.rpc.askTimeout&gt; | false | Timeout for CommitHandler commit files. | 0.4.1 | |
| celeborn.client.rpc.getReducerFileGroup.askTimeout | &lt;value of celeborn.&lt;module&gt;.io.connectionTimeout&gt; | false | Timeout for ask operations during getting reducer file group information. During this process, there are `celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities for committing files and 1 times for releasing slots request. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn.<module>.io.connectionTimeout`. | 0.2.0 | |
| celeborn.client.rpc.maxRetries | 3 | false | Max RPC retry times in LifecycleManager. | 0.3.2 | |
| celeborn.client.rpc.registerShuffle.askTimeout | &lt;value of celeborn.&lt;module&gt;.io.connectionTimeout&gt; | false | Timeout for ask operations during register shuffle. During this process, there are two times for retry opportunities for requesting slots, one request for establishing a connection with Worker and `celeborn.client.reserveSlots.maxRetries` times for retry opportunities for reserving slots. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn.<module>.io.connectionTimeout`. | 0.3.0 | celeborn.rpc.registerShuffle.askTimeout |
Expand Down

0 comments on commit 5125164

Please sign in to comment.