Skip to content

Commit

Permalink
[CELEBORN-1409] CommitHandler commitFiles RPC supports separate timeo…
Browse files Browse the repository at this point in the history
…ut configuration

### What changes were proposed in this pull request?
This PR aims to supports separate timeout configuration at CommitHandler commitFiles RPC.

### Why are the changes needed?
The default value of `celeborn.worker.commitFiles.timeout` is 120s, and the default value of Client's RPC is 60s.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA

Closes apache#2488 from cxzl25/CELEBORN-1409.

Authored-by: sychen <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
  • Loading branch information
cxzl25 authored and SteNicholas committed May 6, 2024
1 parent f895343 commit dc52192
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ abstract class CommitHandler(
val sharedRpcPool: ThreadPoolExecutor) extends Logging {

private val pushReplicateEnabled = conf.clientPushReplicateEnabled
private val clientRpcCommitFilesAskTimeout = conf.clientRpcCommitFilesAskTimeout

private val commitEpoch = new AtomicLong()
private val totalWritten = new LongAdder
Expand Down Expand Up @@ -452,7 +453,7 @@ abstract class CommitHandler(
message.replicaIds)
}(ec)
} else {
worker.endpoint.ask[CommitFilesResponse](message)
worker.endpoint.ask[CommitFilesResponse](message, clientRpcCommitFilesAskTimeout)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,11 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
get(CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT).milli,
CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT.key)

def clientRpcCommitFilesAskTimeout: RpcTimeout =
new RpcTimeout(
get(CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT).milli,
CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT.key)

// //////////////////////////////////////////////////////
// Shuffle Client Fetch //
// //////////////////////////////////////////////////////
Expand Down Expand Up @@ -4243,6 +4248,13 @@ object CelebornConf extends Logging {
s"By default, the value is the max timeout value `${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
.fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT)

val CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.rpc.commitFiles.askTimeout")
.categories("client")
.version("0.4.1")
.doc("Timeout for CommitHandler commit files.")
.fallbackConf(RPC_ASK_TIMEOUT)

val CLIENT_RPC_CACHE_SIZE: ConfigEntry[Int] =
buildConf("celeborn.client.rpc.cache.size")
.withAlternative("celeborn.rpc.cache.size")
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ license: |
| celeborn.client.rpc.cache.concurrencyLevel | 32 | false | The number of write locks to update rpc cache. | 0.3.0 | celeborn.rpc.cache.concurrencyLevel |
| celeborn.client.rpc.cache.expireTime | 15s | false | The time before a cache item is removed. | 0.3.0 | celeborn.rpc.cache.expireTime |
| celeborn.client.rpc.cache.size | 256 | false | The max cache items count for rpc cache. | 0.3.0 | celeborn.rpc.cache.size |
| celeborn.client.rpc.commitFiles.askTimeout | &lt;value of celeborn.rpc.askTimeout&gt; | false | Timeout for CommitHandler commit files. | 0.4.1 | |
| celeborn.client.rpc.getReducerFileGroup.askTimeout | &lt;value of celeborn.&lt;module&gt;.io.connectionTimeout&gt; | false | Timeout for ask operations during getting reducer file group information. During this process, there are `celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities for committing files and 1 times for releasing slots request. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn.<module>.io.connectionTimeout`. | 0.2.0 | |
| celeborn.client.rpc.maxRetries | 3 | false | Max RPC retry times in LifecycleManager. | 0.3.2 | |
| celeborn.client.rpc.registerShuffle.askTimeout | &lt;value of celeborn.&lt;module&gt;.io.connectionTimeout&gt; | false | Timeout for ask operations during register shuffle. During this process, there are two times for retry opportunities for requesting slots, one request for establishing a connection with Worker and `celeborn.client.reserveSlots.maxRetries` times for retry opportunities for reserving slots. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn.<module>.io.connectionTimeout`. | 0.3.0 | celeborn.rpc.registerShuffle.askTimeout |
Expand Down

0 comments on commit dc52192

Please sign in to comment.