Skip to content

Commit

Permalink
[CELEBORN-1412] celeborn.client.rpc.*.askTimeout should fallback to c…
Browse files Browse the repository at this point in the history
…eleborn.rpc.askTimeout

### What changes were proposed in this pull request?

`celeborn.client.rpc.*.askTimeout` should fallback to `celeborn.rpc.askTimeout`.

### Why are the changes needed?

The config option series `celeborn.client.rpc.*.askTimeout` should fallback to `celeborn.rpc.askTimeout` instead of `celeborn.<module>.io.connectionTimeout`, which including `celeborn.client.rpc.getReducerFileGroup.askTimeout`, `celeborn.client.rpc.registerShuffle.askTimeout` and `celeborn.client.rpc.requestPartition.askTimeout`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

GA.

Closes apache#2492 from SteNicholas/CELEBORN-1412.

Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit 1cd231f)
Signed-off-by: SteNicholas <[email protected]>
  • Loading branch information
SteNicholas committed May 7, 2024
1 parent 4d0bd47 commit 15de4e5
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ public Optional<PartitionLocation> revive(
PbChangeLocationResponse response =
lifecycleManagerRef.askSync(
ControlMessages.Revive$.MODULE$.apply(shuffleId, mapIds, requests),
conf.clientRpcRequestPartitionLocationRpcAskTimeout(),
conf.clientRpcRequestPartitionLocationAskTimeout(),
ClassTag$.MODULE$.apply(PbChangeLocationResponse.class));
// per partitionKey only serve single PartitionLocation in Client Cache.
PbChangeLocationPartitionInfo partitionInfo = response.getPartitionInfo(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,7 @@ private void submitRetryPushMergedData(
requests,
remainReviveTimes - 1,
System.currentTimeMillis()
+ conf.clientRpcRequestPartitionLocationRpcAskTimeout()
.duration()
.toMillis()));
+ conf.clientRpcRequestPartitionLocationAskTimeout().duration().toMillis()));
}
}

Expand All @@ -479,7 +477,7 @@ private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
() ->
lifecycleManagerRef.askSync(
RegisterShuffle$.MODULE$.apply(shuffleId, numMappers, numPartitions),
conf.clientRpcRegisterShuffleRpcAskTimeout(),
conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
}

Expand All @@ -502,7 +500,7 @@ public PartitionLocation registerMapPartitionTask(
lifecycleManagerRef.askSync(
RegisterMapPartitionTask$.MODULE$.apply(
shuffleId, numMappers, mapId, attemptId, partitionId),
conf.clientRpcRegisterShuffleRpcAskTimeout(),
conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));

if (partitionLocationMap == null) {
Expand Down Expand Up @@ -538,7 +536,7 @@ public int getShuffleId(int appShuffleId, String appShuffleIdentifier, boolean i
PbGetShuffleIdResponse pbGetShuffleIdResponse =
lifecycleManagerRef.askSync(
pbGetShuffleId,
conf.clientRpcRegisterShuffleRpcAskTimeout(),
conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbGetShuffleIdResponse.class));
return pbGetShuffleIdResponse.getShuffleId();
});
Expand All @@ -554,7 +552,7 @@ public boolean reportShuffleFetchFailure(int appShuffleId, int shuffleId) {
PbReportShuffleFetchFailureResponse pbReportShuffleFetchFailureResponse =
lifecycleManagerRef.askSync(
pbReportShuffleFetchFailure,
conf.clientRpcRegisterShuffleRpcAskTimeout(),
conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbReportShuffleFetchFailureResponse.class));
return pbReportShuffleFetchFailureResponse.getSuccess();
}
Expand Down Expand Up @@ -744,7 +742,7 @@ Map<Integer, Integer> reviveBatch(
PbChangeLocationResponse response =
lifecycleManagerRef.askSync(
Revive$.MODULE$.apply(shuffleId, mapIds, requests),
conf.clientRpcRequestPartitionLocationRpcAskTimeout(),
conf.clientRpcRequestPartitionLocationAskTimeout(),
ClassTag$.MODULE$.apply(PbChangeLocationResponse.class));

for (int i = 0; i < response.getEndedMapIdCount(); i++) {
Expand Down Expand Up @@ -991,7 +989,7 @@ public void onSuccess(ByteBuffer response) {
reviveManager.addRequest(reviveRequest);
long dueTime =
System.currentTimeMillis()
+ conf.clientRpcRequestPartitionLocationRpcAskTimeout()
+ conf.clientRpcRequestPartitionLocationAskTimeout()
.duration()
.toMillis();
pushDataRetryPool.submit(
Expand Down Expand Up @@ -1077,9 +1075,7 @@ public void onFailure(Throwable e) {
reviveManager.addRequest(reviveRequest);
long dueTime =
System.currentTimeMillis()
+ conf.clientRpcRequestPartitionLocationRpcAskTimeout()
.duration()
.toMillis();
+ conf.clientRpcRequestPartitionLocationAskTimeout().duration().toMillis();
pushDataRetryPool.submit(
() ->
submitRetryPushData(
Expand Down Expand Up @@ -1385,7 +1381,7 @@ public void onSuccess(ByteBuffer response) {
requests,
remainReviveTimes,
System.currentTimeMillis()
+ conf.clientRpcRequestPartitionLocationRpcAskTimeout()
+ conf.clientRpcRequestPartitionLocationAskTimeout()
.duration()
.toMillis()));
} else if (reason == StatusCode.PUSH_DATA_SUCCESS_PRIMARY_CONGESTED.getValue()) {
Expand Down Expand Up @@ -1465,7 +1461,7 @@ public void onFailure(Throwable e) {
requests,
remainReviveTimes - 1,
System.currentTimeMillis()
+ conf.clientRpcRequestPartitionLocationRpcAskTimeout()
+ conf.clientRpcRequestPartitionLocationAskTimeout()
.duration()
.toMillis()));
} else {
Expand Down Expand Up @@ -1583,7 +1579,7 @@ protected Tuple2<ReduceFileGroups, String> loadFileGroupInternal(int shuffleId)
GetReducerFileGroupResponse response =
lifecycleManagerRef.askSync(
getReducerFileGroup,
conf.clientRpcGetReducerFileGroupRpcAskTimeout(),
conf.clientRpcGetReducerFileGroupAskTimeout(),
ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class));

switch (response.status()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object ShuffleClientHelper extends Logging {
shuffleLocs: ConcurrentHashMap[Integer, PartitionLocation]): Unit = {
endpointRef.ask[PbChangeLocationResponse](
req,
conf.clientRpcRequestPartitionLocationRpcAskTimeout).onComplete {
conf.clientRpcRequestPartitionLocationAskTimeout).onComplete {
case Success(resp) =>
val partitionInfo = resp.getPartitionInfo(0)
val respStatus = Utils.toStatusCode(partitionInfo.getStatus)
Expand Down
39 changes: 18 additions & 21 deletions common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -782,20 +782,20 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
get(CLIENT_RESERVE_SLOTS_RPC_TIMEOUT).milli,
CLIENT_RESERVE_SLOTS_RPC_TIMEOUT.key)

def clientRpcRegisterShuffleRpcAskTimeout: RpcTimeout =
def clientRpcRegisterShuffleAskTimeout: RpcTimeout =
new RpcTimeout(
get(CLIENT_RPC_REGISTER_SHUFFLE_RPC_ASK_TIMEOUT).milli,
CLIENT_RPC_REGISTER_SHUFFLE_RPC_ASK_TIMEOUT.key)
get(CLIENT_RPC_REGISTER_SHUFFLE_ASK_TIMEOUT).milli,
CLIENT_RPC_REGISTER_SHUFFLE_ASK_TIMEOUT.key)

def clientRpcRequestPartitionLocationRpcAskTimeout: RpcTimeout =
def clientRpcRequestPartitionLocationAskTimeout: RpcTimeout =
new RpcTimeout(
get(CLIENT_RPC_REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT).milli,
CLIENT_RPC_REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT.key)
get(CLIENT_RPC_REQUEST_PARTITION_LOCATION_ASK_TIMEOUT).milli,
CLIENT_RPC_REQUEST_PARTITION_LOCATION_ASK_TIMEOUT.key)

def clientRpcGetReducerFileGroupRpcAskTimeout: RpcTimeout =
def clientRpcGetReducerFileGroupAskTimeout: RpcTimeout =
new RpcTimeout(
get(CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT).milli,
CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT.key)
get(CLIENT_RPC_GET_REDUCER_FILE_GROUP_ASK_TIMEOUT).milli,
CLIENT_RPC_GET_REDUCER_FILE_GROUP_ASK_TIMEOUT.key)

def clientRpcCommitFilesAskTimeout: RpcTimeout =
new RpcTimeout(
Expand Down Expand Up @@ -3767,7 +3767,7 @@ object CelebornConf extends Logging {
.doc("Timeout for LifecycleManager request reserve slots.")
.fallbackConf(RPC_ASK_TIMEOUT)

val CLIENT_RPC_REGISTER_SHUFFLE_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
val CLIENT_RPC_REGISTER_SHUFFLE_ASK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.rpc.registerShuffle.askTimeout")
.withAlternative("celeborn.rpc.registerShuffle.askTimeout")
.categories("client")
Expand All @@ -3776,29 +3776,26 @@ object CelebornConf extends Logging {
s"During this process, there are two times for retry opportunities for requesting slots, " +
s"one request for establishing a connection with Worker and " +
s"`${CLIENT_RESERVE_SLOTS_MAX_RETRIES.key}` times for retry opportunities for reserving slots. " +
s"User can customize this value according to your setting. " +
s"By default, the value is the max timeout value `${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
.fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT)
s"User can customize this value according to your setting.")
.fallbackConf(RPC_ASK_TIMEOUT)

val CLIENT_RPC_REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
val CLIENT_RPC_REQUEST_PARTITION_LOCATION_ASK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.rpc.requestPartition.askTimeout")
.categories("client")
.version("0.2.0")
.doc(s"Timeout for ask operations during requesting change partition location, such as reviving or splitting partition. " +
s"During this process, there are `${CLIENT_RESERVE_SLOTS_MAX_RETRIES.key}` times for retry opportunities for reserving slots. " +
s"User can customize this value according to your setting. " +
s"By default, the value is the max timeout value `${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
.fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT)
s"User can customize this value according to your setting.")
.fallbackConf(RPC_ASK_TIMEOUT)

val CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
val CLIENT_RPC_GET_REDUCER_FILE_GROUP_ASK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.rpc.getReducerFileGroup.askTimeout")
.categories("client")
.version("0.2.0")
.doc(s"Timeout for ask operations during getting reducer file group information. " +
s"During this process, there are `${CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY.key}` times for retry opportunities for committing files " +
s"and 1 times for releasing slots request. User can customize this value according to your setting. " +
s"By default, the value is the max timeout value `${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
.fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT)
s"and 1 times for releasing slots request. User can customize this value according to your setting.")
.fallbackConf(RPC_ASK_TIMEOUT)

val CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.rpc.commitFiles.askTimeout")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ class CelebornConfSuite extends CelebornFunSuite {
assert(conf.networkTimeout.duration.toMillis == 20000L)
assert(conf.networkIoConnectionTimeoutMs("data") == 20000L)
assert(conf.clientPushStageEndTimeout == 20000L)
assert(conf.clientRpcRegisterShuffleRpcAskTimeout.duration.toMillis == 20000L)
assert(conf.clientRpcRequestPartitionLocationRpcAskTimeout.duration.toMillis == 20000L)
assert(conf.clientRpcGetReducerFileGroupRpcAskTimeout.duration.toMillis == 20000L)
assert(conf.clientRpcRegisterShuffleAskTimeout.duration.toMillis == 1000L)
assert(conf.clientRpcRequestPartitionLocationAskTimeout.duration.toMillis == 1000L)
assert(conf.clientRpcGetReducerFileGroupAskTimeout.duration.toMillis == 1000L)
assert(conf.networkConnectTimeout.duration.toMillis == 2000L)
assert(conf.networkIoConnectTimeoutMs("data") == 2000L)
}
Expand Down
6 changes: 3 additions & 3 deletions docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ license: |
| celeborn.client.rpc.cache.expireTime | 15s | The time before a cache item is removed. | 0.3.0 | celeborn.rpc.cache.expireTime |
| celeborn.client.rpc.cache.size | 256 | The max cache items count for rpc cache. | 0.3.0 | celeborn.rpc.cache.size |
| celeborn.client.rpc.commitFiles.askTimeout | &lt;value of celeborn.rpc.askTimeout&gt; | Timeout for CommitHandler commit files. | 0.4.1 | |
| celeborn.client.rpc.getReducerFileGroup.askTimeout | &lt;value of celeborn.&lt;module&gt;.io.connectionTimeout&gt; | Timeout for ask operations during getting reducer file group information. During this process, there are `celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities for committing files and 1 times for releasing slots request. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn.<module>.io.connectionTimeout`. | 0.2.0 | |
| celeborn.client.rpc.getReducerFileGroup.askTimeout | &lt;value of celeborn.rpc.askTimeout&gt; | Timeout for ask operations during getting reducer file group information. During this process, there are `celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities for committing files and 1 times for releasing slots request. User can customize this value according to your setting. | 0.2.0 | |
| celeborn.client.rpc.maxRetries | 3 | Max RPC retry times in LifecycleManager. | 0.3.2 | |
| celeborn.client.rpc.registerShuffle.askTimeout | &lt;value of celeborn.&lt;module&gt;.io.connectionTimeout&gt; | Timeout for ask operations during register shuffle. During this process, there are two times for retry opportunities for requesting slots, one request for establishing a connection with Worker and `celeborn.client.reserveSlots.maxRetries` times for retry opportunities for reserving slots. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn.<module>.io.connectionTimeout`. | 0.3.0 | celeborn.rpc.registerShuffle.askTimeout |
| celeborn.client.rpc.requestPartition.askTimeout | &lt;value of celeborn.&lt;module&gt;.io.connectionTimeout&gt; | Timeout for ask operations during requesting change partition location, such as reviving or splitting partition. During this process, there are `celeborn.client.reserveSlots.maxRetries` times for retry opportunities for reserving slots. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn.<module>.io.connectionTimeout`. | 0.2.0 | |
| celeborn.client.rpc.registerShuffle.askTimeout | &lt;value of celeborn.rpc.askTimeout&gt; | Timeout for ask operations during register shuffle. During this process, there are two times for retry opportunities for requesting slots, one request for establishing a connection with Worker and `celeborn.client.reserveSlots.maxRetries` times for retry opportunities for reserving slots. User can customize this value according to your setting. | 0.3.0 | celeborn.rpc.registerShuffle.askTimeout |
| celeborn.client.rpc.requestPartition.askTimeout | &lt;value of celeborn.rpc.askTimeout&gt; | Timeout for ask operations during requesting change partition location, such as reviving or splitting partition. During this process, there are `celeborn.client.reserveSlots.maxRetries` times for retry opportunities for reserving slots. User can customize this value according to your setting. | 0.2.0 | |
| celeborn.client.rpc.reserveSlots.askTimeout | &lt;value of celeborn.rpc.askTimeout&gt; | Timeout for LifecycleManager request reserve slots. | 0.3.0 | |
| celeborn.client.rpc.shared.threads | 16 | Number of shared rpc threads in LifecycleManager. | 0.3.2 | |
| celeborn.client.shuffle.batchHandleChangePartition.interval | 100ms | Interval for LifecycleManager to schedule handling change partition requests in batch. | 0.3.0 | celeborn.shuffle.batchHandleChangePartition.interval |
Expand Down
2 changes: 2 additions & 0 deletions docs/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ license: |
- Since 0.4.1, Celeborn master adds a limit to the estimated partition size used for computing worker slots.
This size is now constrained within the range specified by `celeborn.master.estimatedPartitionSize.minSize` and `celeborn.master.estimatedPartitionSize.maxSize`.

- Since 0.4.1, Celeborn changed the fallback configuration of `celeborn.client.rpc.getReducerFileGroup.askTimeout`, `celeborn.client.rpc.registerShuffle.askTimeout` and `celeborn.client.rpc.requestPartition.askTimeout` from `celeborn.<module>.io.connectionTimeout` to `celeborn.rpc.askTimeout`.

## Upgrading from 0.3 to 0.4

- Since 0.4.0, Celeborn won't be compatible with Celeborn client that versions below 0.3.0.
Expand Down

0 comments on commit 15de4e5

Please sign in to comment.