From a928c40a8cba422e6dc9cff55f124c86d9b6c114 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Tue, 10 Dec 2024 11:13:41 +0800 Subject: [PATCH 01/24] retry when send Rpc LifecycleManager RpcTimeout --- .../apache/celeborn/client/ShuffleClient.java | 4 +- .../celeborn/client/ShuffleClientImpl.java | 71 ++++++++++++------- .../apache/celeborn/common/CelebornConf.scala | 20 ++++++ 3 files changed, 68 insertions(+), 27 deletions(-) diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java index efa9641f671..d1d824503c2 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java @@ -67,7 +67,7 @@ public static ShuffleClient get( String driverHost, int port, CelebornConf conf, - UserIdentifier userIdentifier) { + UserIdentifier userIdentifier) throws CelebornIOException { return ShuffleClient.get(appUniqueId, driverHost, port, conf, userIdentifier, null); } @@ -77,7 +77,7 @@ public static ShuffleClient get( int port, CelebornConf conf, UserIdentifier userIdentifier, - byte[] extension) { + byte[] extension) throws CelebornIOException { if (null == _instance || !initialized) { synchronized (ShuffleClient.class) { if (null == _instance) { diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index e50e6093c35..b5631ebb016 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -20,10 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import scala.Tuple2; import scala.reflect.ClassTag$; @@ -63,6 +60,7 @@ import org.apache.celeborn.common.rpc.RpcAddress; import org.apache.celeborn.common.rpc.RpcEndpointRef; import org.apache.celeborn.common.rpc.RpcEnv; +import org.apache.celeborn.common.rpc.RpcTimeoutException; import org.apache.celeborn.common.unsafe.Platform; import org.apache.celeborn.common.util.*; import org.apache.celeborn.common.write.DataBatches; @@ -81,6 +79,8 @@ public class ShuffleClientImpl extends ShuffleClient { private final int registerShuffleMaxRetries; private final long registerShuffleRetryWaitMs; + private final long mapEndRetryWaitMs; + private final long loadFileGroupRetryWaitMs; private final int maxReviveTimes; private final boolean testRetryRevive; private final int pushBufferMaxSize; @@ -179,6 +179,8 @@ public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier u this.userIdentifier = userIdentifier; registerShuffleMaxRetries = conf.clientRegisterShuffleMaxRetry(); registerShuffleRetryWaitMs = conf.clientRegisterShuffleRetryWaitMs(); + mapEndRetryWaitMs = conf.clientMapEndRetryWaitMs(); + loadFileGroupRetryWaitMs = conf.clientLoadFileGroupRetryWaitMs(); maxReviveTimes = conf.clientPushMaxReviveTimes(); testRetryRevive = conf.testRetryRevive(); pushBufferMaxSize = conf.clientPushBufferMaxSize(); @@ -1701,18 +1703,40 @@ private void mapEndInternal( final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId); PushState pushState = getPushState(mapKey); - try { - limitZeroInFlight(mapKey, pushState); - - MapperEndResponse response = - lifecycleManagerRef.askSync( - new MapperEnd(shuffleId, mapId, attemptId, numMappers, partitionId), - ClassTag$.MODULE$.apply(MapperEndResponse.class)); - if (response.status() != StatusCode.SUCCESS) { - throw new CelebornIOException("MapperEnd failed! StatusCode: " + response.status()); + limitZeroInFlight(mapKey, pushState); + int numRetries = 3; + while (numRetries > 0) { + numRetries--; + try { + MapperEndResponse response = + lifecycleManagerRef.askSync( + new MapperEnd(shuffleId, mapId, attemptId, numMappers, partitionId), + ClassTag$.MODULE$.apply(MapperEndResponse.class)); + if (response.status() != StatusCode.SUCCESS) { + throw new CelebornIOException("MapperEnd failed! StatusCode: " + response.status()); + } else { + break; + } + } catch (Exception e) { + if (e instanceof RpcTimeoutException && numRetries > 0) { + logger.warn( + "MapperEnd for shuffleId {} attemptId {} Rpc timeout, left retries: {}", + shuffleId, + attemptId, + numRetries); + } else { + logger.error( + "MapperEnd for shuffleId {} attemptId {} failed: {}", shuffleId, attemptId, e); + throw e; + } + } finally { + pushStates.remove(mapKey); + } + try { + TimeUnit.MILLISECONDS.sleep(mapEndRetryWaitMs); + } catch (InterruptedException e) { + break; } - } finally { - pushStates.remove(mapKey); } } @@ -1741,23 +1765,17 @@ public boolean cleanupShuffle(int shuffleId) { protected Tuple2 loadFileGroupInternal( int shuffleId, boolean isSegmentGranularityVisible) { - { long getReducerFileGroupStartTime = System.nanoTime(); String exceptionMsg = null; + if (lifecycleManagerRef != null) { try { - if (lifecycleManagerRef == null) { - exceptionMsg = "Driver endpoint is null!"; - logger.warn(exceptionMsg); - } else { GetReducerFileGroup getReducerFileGroup = new GetReducerFileGroup(shuffleId, isSegmentGranularityVisible); - GetReducerFileGroupResponse response = lifecycleManagerRef.askSync( getReducerFileGroup, conf.clientRpcGetReducerFileGroupAskTimeout(), ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class)); - switch (response.status()) { case SUCCESS: logger.info( @@ -1781,6 +1799,7 @@ protected Tuple2 loadFileGroupInternal( response.fileGroup(), response.attempts(), response.partitionIds()), null); case STAGE_END_TIME_OUT: + break; case SHUFFLE_DATA_LOST: exceptionMsg = String.format( @@ -1790,16 +1809,18 @@ protected Tuple2 loadFileGroupInternal( break; default: // fall out } - } - } catch (Exception e) { + } catch (Exception e) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } logger.error("Exception raised while call GetReducerFileGroup for {}.", shuffleId, e); exceptionMsg = e.getMessage(); } - return Tuple2.apply(null, exceptionMsg); + } else { + exceptionMsg = "Driver endpoint is null!"; + logger.warn(exceptionMsg); } + return Tuple2.apply(null, exceptionMsg); } @Override diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 1efab142e82..59348ae7bec 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -901,6 +901,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def clientCloseIdleConnections: Boolean = get(CLIENT_CLOSE_IDLE_CONNECTIONS) def clientRegisterShuffleMaxRetry: Int = get(CLIENT_REGISTER_SHUFFLE_MAX_RETRIES) def clientRegisterShuffleRetryWaitMs: Long = get(CLIENT_REGISTER_SHUFFLE_RETRY_WAIT) + def clientMapEndRetryWaitMs: Long = get(CLIENT_MAP_END_RETRY_WAIT) + def clientLoadFileGroupRetryWaitMs: Long = get(CLIENT_LOAD_FILE_GROUP_RETRY_WAIT) def clientReserveSlotsRackAwareEnabled: Boolean = get(CLIENT_RESERVE_SLOTS_RACKAWARE_ENABLED) def clientReserveSlotsMaxRetries: Int = get(CLIENT_RESERVE_SLOTS_MAX_RETRIES) def clientReserveSlotsRetryWait: Long = get(CLIENT_RESERVE_SLOTS_RETRY_WAIT) @@ -4884,6 +4886,24 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("3s") + val CLIENT_LOAD_FILE_GROUP_RETRY_WAIT: ConfigEntry[Long] = + buildConf("celeborn.client.loadFileGroup.retryWait") + .withAlternative("celeborn.shuffle.loadFileGroup.retryWait") + .categories("client") + .version("0.6.0") + .doc("Wait time before next retry if loadFileGroup failed.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("3s") + + val CLIENT_MAP_END_RETRY_WAIT: ConfigEntry[Long] = + buildConf("celeborn.client.mapEnd.retryWait") + .withAlternative("celeborn.shuffle.mapEnd.retryWait") + .categories("client") + .version("0.6.0") + .doc("Wait time before next retry if mapEnd failed.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("3s") + val CLIENT_RESERVE_SLOTS_MAX_RETRIES: ConfigEntry[Int] = buildConf("celeborn.client.reserveSlots.maxRetries") .withAlternative("celeborn.slots.reserve.maxRetries") From 591ad34b0ff8c3ce937dac000180a23107299f73 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Thu, 19 Dec 2024 11:36:43 +0800 Subject: [PATCH 02/24] retry in ont function --- .../celeborn/client/ShuffleClientImpl.java | 96 +++++++++++-------- 1 file changed, 55 insertions(+), 41 deletions(-) diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index b5631ebb016..92a0673c07f 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -669,7 +669,7 @@ private ConcurrentHashMap registerShuffleInternal( StatusCode lastFailedStatusCode = null; while (numRetries > 0) { try { - PbRegisterShuffleResponse response = callable.call(); + PbRegisterShuffleResponse response = callLifecycleManagerWithRetry(callable); StatusCode respStatus = Utils.toStatusCode(response.getStatus()); if (StatusCode.SUCCESS.equals(respStatus)) { ConcurrentHashMap result = JavaUtils.newConcurrentHashMap(); @@ -1702,41 +1702,19 @@ private void mapEndInternal( throws IOException { final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId); PushState pushState = getPushState(mapKey); - - limitZeroInFlight(mapKey, pushState); - int numRetries = 3; - while (numRetries > 0) { - numRetries--; - try { - MapperEndResponse response = - lifecycleManagerRef.askSync( - new MapperEnd(shuffleId, mapId, attemptId, numMappers, partitionId), - ClassTag$.MODULE$.apply(MapperEndResponse.class)); - if (response.status() != StatusCode.SUCCESS) { - throw new CelebornIOException("MapperEnd failed! StatusCode: " + response.status()); - } else { - break; - } - } catch (Exception e) { - if (e instanceof RpcTimeoutException && numRetries > 0) { - logger.warn( - "MapperEnd for shuffleId {} attemptId {} Rpc timeout, left retries: {}", - shuffleId, - attemptId, - numRetries); - } else { - logger.error( - "MapperEnd for shuffleId {} attemptId {} failed: {}", shuffleId, attemptId, e); - throw e; - } - } finally { - pushStates.remove(mapKey); - } - try { - TimeUnit.MILLISECONDS.sleep(mapEndRetryWaitMs); - } catch (InterruptedException e) { - break; + try { + limitZeroInFlight(mapKey, pushState); + MapperEndResponse response = + callLifecycleManagerWithRetry( + () -> + lifecycleManagerRef.askSync( + new MapperEnd(shuffleId, mapId, attemptId, numMappers, partitionId), + ClassTag$.MODULE$.apply(MapperEndResponse.class))); + if (response.status() != StatusCode.SUCCESS) { + throw new CelebornIOException("MapperEnd failed! StatusCode: " + response.status()); } + } finally { + pushStates.remove(mapKey); } } @@ -1772,10 +1750,12 @@ protected Tuple2 loadFileGroupInternal( GetReducerFileGroup getReducerFileGroup = new GetReducerFileGroup(shuffleId, isSegmentGranularityVisible); GetReducerFileGroupResponse response = - lifecycleManagerRef.askSync( - getReducerFileGroup, - conf.clientRpcGetReducerFileGroupAskTimeout(), - ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class)); + callLifecycleManagerWithRetry( + () -> + lifecycleManagerRef.askSync( + getReducerFileGroup, + conf.clientRpcGetReducerFileGroupAskTimeout(), + ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class))); switch (response.status()) { case SUCCESS: logger.info( @@ -1949,11 +1929,45 @@ public void shutdown() { @Override public void setupLifecycleManagerRef(String host, int port) { logger.info("setupLifecycleManagerRef: host = {}, port = {}", host, port); - lifecycleManagerRef = - rpcEnv.setupEndpointRef(new RpcAddress(host, port), RpcNameConstants.LIFECYCLE_MANAGER_EP); + lifecycleManagerRef = + callLifecycleManagerWithRetry( + () -> + rpcEnv.setupEndpointRef( + new RpcAddress(host, port), RpcNameConstants.LIFECYCLE_MANAGER_EP)); initDataClientFactoryIfNeeded(); } + public T callLifecycleManagerWithRetry(Callable callable) { + return callLifecycleManagerWithRetry(callable, 3); + } + + public T callLifecycleManagerWithRetry(Callable callable, int numRetries) { + T result; + while (numRetries > 0) { + numRetries--; + try { + result = callable.call(); + return result; + } catch (Exception error) { + if (error instanceof RpcTimeoutException && numRetries > 0) { + logger.warn( + "RpcTimeout while calling LifecycleManager, left retry times: {}", numRetries); + try { + Random random = new Random(); + long retryWaitMs = random.nextInt(500); + TimeUnit.MILLISECONDS.sleep(retryWaitMs); + } catch (InterruptedException e) { + break; + } + } else { + logger.error("Exception raised while calling LifecycleManager"); + break; + } + } + } + return null; + } + @Override public void setupLifecycleManagerRef(RpcEndpointRef endpointRef) { lifecycleManagerRef = endpointRef; From b636fbb9b7fb92146a6eb17a99fd43db0346dbf7 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Thu, 19 Dec 2024 12:09:03 +0800 Subject: [PATCH 03/24] reformat --- .../apache/celeborn/client/ShuffleClient.java | 4 +- .../celeborn/client/ShuffleClientImpl.java | 153 +++++++++--------- 2 files changed, 77 insertions(+), 80 deletions(-) diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java index d1d824503c2..efa9641f671 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java @@ -67,7 +67,7 @@ public static ShuffleClient get( String driverHost, int port, CelebornConf conf, - UserIdentifier userIdentifier) throws CelebornIOException { + UserIdentifier userIdentifier) { return ShuffleClient.get(appUniqueId, driverHost, port, conf, userIdentifier, null); } @@ -77,7 +77,7 @@ public static ShuffleClient get( int port, CelebornConf conf, UserIdentifier userIdentifier, - byte[] extension) throws CelebornIOException { + byte[] extension) { if (null == _instance || !initialized) { synchronized (ShuffleClient.class) { if (null == _instance) { diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index 92a0673c07f..6b25a52e4a1 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -1743,53 +1743,50 @@ public boolean cleanupShuffle(int shuffleId) { protected Tuple2 loadFileGroupInternal( int shuffleId, boolean isSegmentGranularityVisible) { - long getReducerFileGroupStartTime = System.nanoTime(); - String exceptionMsg = null; - if (lifecycleManagerRef != null) { + long getReducerFileGroupStartTime = System.nanoTime(); + String exceptionMsg = null; + if (lifecycleManagerRef != null) { try { - GetReducerFileGroup getReducerFileGroup = - new GetReducerFileGroup(shuffleId, isSegmentGranularityVisible); - GetReducerFileGroupResponse response = - callLifecycleManagerWithRetry( - () -> - lifecycleManagerRef.askSync( - getReducerFileGroup, - conf.clientRpcGetReducerFileGroupAskTimeout(), - ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class))); - switch (response.status()) { - case SUCCESS: - logger.info( - "Shuffle {} request reducer file group success using {} ms, result partition size {}.", - shuffleId, - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - getReducerFileGroupStartTime), - response.fileGroup().size()); - return Tuple2.apply( - new ReduceFileGroups( - response.fileGroup(), response.attempts(), response.partitionIds()), - null); - case SHUFFLE_NOT_REGISTERED: - logger.warn( - "Request {} return {} for {}.", - getReducerFileGroup, - response.status(), - shuffleId); - // return empty result - return Tuple2.apply( - new ReduceFileGroups( - response.fileGroup(), response.attempts(), response.partitionIds()), - null); - case STAGE_END_TIME_OUT: - break; - case SHUFFLE_DATA_LOST: - exceptionMsg = - String.format( - "Request %s return %s for %s.", - getReducerFileGroup, response.status(), shuffleId); - logger.warn(exceptionMsg); - break; - default: // fall out - } - } catch (Exception e) { + GetReducerFileGroup getReducerFileGroup = + new GetReducerFileGroup(shuffleId, isSegmentGranularityVisible); + GetReducerFileGroupResponse response = + callLifecycleManagerWithRetry( + () -> + lifecycleManagerRef.askSync( + getReducerFileGroup, + conf.clientRpcGetReducerFileGroupAskTimeout(), + ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class))); + switch (response.status()) { + case SUCCESS: + logger.info( + "Shuffle {} request reducer file group success using {} ms, result partition size {}.", + shuffleId, + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - getReducerFileGroupStartTime), + response.fileGroup().size()); + return Tuple2.apply( + new ReduceFileGroups( + response.fileGroup(), response.attempts(), response.partitionIds()), + null); + case SHUFFLE_NOT_REGISTERED: + logger.warn( + "Request {} return {} for {}.", getReducerFileGroup, response.status(), shuffleId); + // return empty result + return Tuple2.apply( + new ReduceFileGroups( + response.fileGroup(), response.attempts(), response.partitionIds()), + null); + case STAGE_END_TIME_OUT: + break; + case SHUFFLE_DATA_LOST: + exceptionMsg = + String.format( + "Request %s return %s for %s.", + getReducerFileGroup, response.status(), shuffleId); + logger.warn(exceptionMsg); + break; + default: // fall out + } + } catch (Exception e) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } @@ -1929,44 +1926,44 @@ public void shutdown() { @Override public void setupLifecycleManagerRef(String host, int port) { logger.info("setupLifecycleManagerRef: host = {}, port = {}", host, port); - lifecycleManagerRef = - callLifecycleManagerWithRetry( - () -> - rpcEnv.setupEndpointRef( - new RpcAddress(host, port), RpcNameConstants.LIFECYCLE_MANAGER_EP)); + lifecycleManagerRef = + callLifecycleManagerWithRetry( + () -> + rpcEnv.setupEndpointRef( + new RpcAddress(host, port), RpcNameConstants.LIFECYCLE_MANAGER_EP)); initDataClientFactoryIfNeeded(); } - public T callLifecycleManagerWithRetry(Callable callable) { - return callLifecycleManagerWithRetry(callable, 3); - } + public T callLifecycleManagerWithRetry(Callable callable) { + return callLifecycleManagerWithRetry(callable, 3); + } - public T callLifecycleManagerWithRetry(Callable callable, int numRetries) { - T result; - while (numRetries > 0) { - numRetries--; - try { - result = callable.call(); - return result; - } catch (Exception error) { - if (error instanceof RpcTimeoutException && numRetries > 0) { - logger.warn( - "RpcTimeout while calling LifecycleManager, left retry times: {}", numRetries); - try { - Random random = new Random(); - long retryWaitMs = random.nextInt(500); - TimeUnit.MILLISECONDS.sleep(retryWaitMs); - } catch (InterruptedException e) { - break; - } - } else { - logger.error("Exception raised while calling LifecycleManager"); - break; - } - } + public T callLifecycleManagerWithRetry(Callable callable, int numRetries) { + T result; + while (numRetries > 0) { + numRetries--; + try { + result = callable.call(); + return result; + } catch (Exception error) { + if (error instanceof RpcTimeoutException && numRetries > 0) { + logger.warn( + "RpcTimeout while calling LifecycleManager, left retry times: {}", numRetries); + try { + Random random = new Random(); + long retryWaitMs = random.nextInt(500); + TimeUnit.MILLISECONDS.sleep(retryWaitMs); + } catch (InterruptedException e) { + break; + } + } else { + logger.error("Exception raised while calling LifecycleManager"); + break; } - return null; + } } + return null; + } @Override public void setupLifecycleManagerRef(RpcEndpointRef endpointRef) { From b63c2320f02ccee14dd0806e606d12c3cf98f2d1 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Thu, 19 Dec 2024 12:27:24 +0800 Subject: [PATCH 04/24] config change --- .../celeborn/client/ShuffleClientImpl.java | 9 ++++---- .../apache/celeborn/common/CelebornConf.scala | 22 +++++-------------- docs/configuration/client.md | 1 + docs/configuration/network.md | 2 +- 4 files changed, 12 insertions(+), 22 deletions(-) diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index 6b25a52e4a1..b68cc0db98e 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -79,8 +79,7 @@ public class ShuffleClientImpl extends ShuffleClient { private final int registerShuffleMaxRetries; private final long registerShuffleRetryWaitMs; - private final long mapEndRetryWaitMs; - private final long loadFileGroupRetryWaitMs; + private final long lifecycleManagerRpcTimeoutRetryWaitMs; private final int maxReviveTimes; private final boolean testRetryRevive; private final int pushBufferMaxSize; @@ -179,8 +178,7 @@ public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier u this.userIdentifier = userIdentifier; registerShuffleMaxRetries = conf.clientRegisterShuffleMaxRetry(); registerShuffleRetryWaitMs = conf.clientRegisterShuffleRetryWaitMs(); - mapEndRetryWaitMs = conf.clientMapEndRetryWaitMs(); - loadFileGroupRetryWaitMs = conf.clientLoadFileGroupRetryWaitMs(); + lifecycleManagerRpcTimeoutRetryWaitMs = conf.clientCallLifecycleManagerRetryWaitMs(); maxReviveTimes = conf.clientPushMaxReviveTimes(); testRetryRevive = conf.testRetryRevive(); pushBufferMaxSize = conf.clientPushBufferMaxSize(); @@ -1951,7 +1949,8 @@ public T callLifecycleManagerWithRetry(Callable callable, int numRetries) "RpcTimeout while calling LifecycleManager, left retry times: {}", numRetries); try { Random random = new Random(); - long retryWaitMs = random.nextInt(500); + int waitTimeBound = (int) lifecycleManagerRpcTimeoutRetryWaitMs; + long retryWaitMs = random.nextInt(waitTimeBound); TimeUnit.MILLISECONDS.sleep(retryWaitMs); } catch (InterruptedException e) { break; diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 59348ae7bec..82a6d7d6823 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -901,8 +901,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def clientCloseIdleConnections: Boolean = get(CLIENT_CLOSE_IDLE_CONNECTIONS) def clientRegisterShuffleMaxRetry: Int = get(CLIENT_REGISTER_SHUFFLE_MAX_RETRIES) def clientRegisterShuffleRetryWaitMs: Long = get(CLIENT_REGISTER_SHUFFLE_RETRY_WAIT) - def clientMapEndRetryWaitMs: Long = get(CLIENT_MAP_END_RETRY_WAIT) - def clientLoadFileGroupRetryWaitMs: Long = get(CLIENT_LOAD_FILE_GROUP_RETRY_WAIT) + def clientCallLifecycleManagerRetryWaitMs: Long = get(CLIENT_CALL_LIFECYCLEMANAGER_RETRY_WAIT) def clientReserveSlotsRackAwareEnabled: Boolean = get(CLIENT_RESERVE_SLOTS_RACKAWARE_ENABLED) def clientReserveSlotsMaxRetries: Int = get(CLIENT_RESERVE_SLOTS_MAX_RETRIES) def clientReserveSlotsRetryWait: Long = get(CLIENT_RESERVE_SLOTS_RETRY_WAIT) @@ -4886,23 +4885,14 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("3s") - val CLIENT_LOAD_FILE_GROUP_RETRY_WAIT: ConfigEntry[Long] = - buildConf("celeborn.client.loadFileGroup.retryWait") - .withAlternative("celeborn.shuffle.loadFileGroup.retryWait") + val CLIENT_CALL_LIFECYCLEMANAGER_RETRY_WAIT: ConfigEntry[Long] = + buildConf("celeborn.client.callLifecycleManager.retryWait") + .withAlternative("celeborn.shuffle.callLifecycleManager.retryWait") .categories("client") .version("0.6.0") - .doc("Wait time before next retry if loadFileGroup failed.") + .doc("Wait time before next retry if call LifecycleManager failed.") .timeConf(TimeUnit.MILLISECONDS) - .createWithDefaultString("3s") - - val CLIENT_MAP_END_RETRY_WAIT: ConfigEntry[Long] = - buildConf("celeborn.client.mapEnd.retryWait") - .withAlternative("celeborn.shuffle.mapEnd.retryWait") - .categories("client") - .version("0.6.0") - .doc("Wait time before next retry if mapEnd failed.") - .timeConf(TimeUnit.MILLISECONDS) - .createWithDefaultString("3s") + .createWithDefaultString("1s") val CLIENT_RESERVE_SLOTS_MAX_RETRIES: ConfigEntry[Int] = buildConf("celeborn.client.reserveSlots.maxRetries") diff --git a/docs/configuration/client.md b/docs/configuration/client.md index f035713eb90..82a7f9ba08c 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -22,6 +22,7 @@ license: | | celeborn.client.application.heartbeatInterval | 10s | false | Interval for client to send heartbeat message to master. | 0.3.0 | celeborn.application.heartbeatInterval | | celeborn.client.application.unregister.enabled | true | false | When true, Celeborn client will inform celeborn master the application is already shutdown during client exit, this allows the cluster to release resources immediately, resulting in resource savings. | 0.3.2 | | | celeborn.client.application.uuidSuffix.enabled | false | false | Whether to add UUID suffix for application id for unique. When `true`, add UUID suffix for unique application id. Currently, this only applies to Spark and MR. | 0.6.0 | | +| celeborn.client.callLifecycleManager.retryWait | 1s | false | Wait time before next retry if call LifecycleManager failed. | 0.6.0 | celeborn.shuffle.callLifecycleManager.retryWait | | celeborn.client.chunk.prefetch.enabled | false | false | Whether to enable chunk prefetch when creating CelebornInputStream. | 0.6.0 | | | celeborn.client.closeIdleConnections | true | false | Whether client will close idle connections. | 0.3.0 | | | celeborn.client.commitFiles.ignoreExcludedWorker | false | false | When true, LifecycleManager will skip workers which are in the excluded list. | 0.3.0 | | diff --git a/docs/configuration/network.md b/docs/configuration/network.md index f690d205e25..1f1f987012a 100644 --- a/docs/configuration/network.md +++ b/docs/configuration/network.md @@ -29,7 +29,7 @@ license: | | celeborn.<module>.io.enableVerboseMetrics | false | false | Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked. | | | | celeborn.<module>.io.lazyFD | true | false | Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only when data is going to be transferred. This can reduce the number of open files. If setting to `fetch`, it works for worker fetch server. | | | | celeborn.<module>.io.maxRetries | 3 | false | Max number of times we will try IO exceptions (such as connection timeouts) per request. If set to 0, we will not do any retries. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. If setting to `push`, it works for Flink shuffle client push data. | | | -| celeborn.<module>.io.mode | EPOLL | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | | +| celeborn.<module>.io.mode | NIO | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | | | celeborn.<module>.io.numConnectionsPerPeer | 1 | false | Number of concurrent connections between two nodes. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. | | | | celeborn.<module>.io.preferDirectBufs | true | false | If true, we will prefer allocating off-heap byte buffers within Netty. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | | | | celeborn.<module>.io.receiveBuffer | 0b | false | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps buffer size should be ~ 1.25MB. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | 0.2.0 | | From d968c7761e233a6e1908ab099a6e7c7cf1475db9 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Thu, 19 Dec 2024 14:05:40 +0800 Subject: [PATCH 05/24] config error --- docs/configuration/network.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/network.md b/docs/configuration/network.md index 1f1f987012a..f690d205e25 100644 --- a/docs/configuration/network.md +++ b/docs/configuration/network.md @@ -29,7 +29,7 @@ license: | | celeborn.<module>.io.enableVerboseMetrics | false | false | Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked. | | | | celeborn.<module>.io.lazyFD | true | false | Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only when data is going to be transferred. This can reduce the number of open files. If setting to `fetch`, it works for worker fetch server. | | | | celeborn.<module>.io.maxRetries | 3 | false | Max number of times we will try IO exceptions (such as connection timeouts) per request. If set to 0, we will not do any retries. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. If setting to `push`, it works for Flink shuffle client push data. | | | -| celeborn.<module>.io.mode | NIO | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | | +| celeborn.<module>.io.mode | EPOLL | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | | | celeborn.<module>.io.numConnectionsPerPeer | 1 | false | Number of concurrent connections between two nodes. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. | | | | celeborn.<module>.io.preferDirectBufs | true | false | If true, we will prefer allocating off-heap byte buffers within Netty. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | | | | celeborn.<module>.io.receiveBuffer | 0b | false | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps buffer size should be ~ 1.25MB. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | 0.2.0 | | From 4b983742472bb1ee4ad66ae79cf32c4cf14ee4fc Mon Sep 17 00:00:00 2001 From: zhengtao Date: Thu, 19 Dec 2024 14:14:54 +0800 Subject: [PATCH 06/24] delete break --- .../main/java/org/apache/celeborn/client/ShuffleClientImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index b68cc0db98e..85ae7f20778 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -1774,7 +1774,6 @@ protected Tuple2 loadFileGroupInternal( response.fileGroup(), response.attempts(), response.partitionIds()), null); case STAGE_END_TIME_OUT: - break; case SHUFFLE_DATA_LOST: exceptionMsg = String.format( From 42687e7e1d964d4bf87c54158914af08a9cfc8e2 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Thu, 19 Dec 2024 19:11:41 +0800 Subject: [PATCH 07/24] add throws --- .../apache/celeborn/client/ShuffleClient.java | 8 +-- .../celeborn/client/ShuffleClientImpl.java | 53 ++++++++++++------- 2 files changed, 39 insertions(+), 22 deletions(-) diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java index efa9641f671..7c31050b958 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java @@ -67,7 +67,8 @@ public static ShuffleClient get( String driverHost, int port, CelebornConf conf, - UserIdentifier userIdentifier) { + UserIdentifier userIdentifier) + throws CelebornIOException { return ShuffleClient.get(appUniqueId, driverHost, port, conf, userIdentifier, null); } @@ -77,7 +78,8 @@ public static ShuffleClient get( int port, CelebornConf conf, UserIdentifier userIdentifier, - byte[] extension) { + byte[] extension) + throws CelebornIOException { if (null == _instance || !initialized) { synchronized (ShuffleClient.class) { if (null == _instance) { @@ -137,7 +139,7 @@ public static void printReadStats(Logger logger) { String.format("%.2f", (localReadCount * 1.0d / totalReadCount) * 100)); } - public abstract void setupLifecycleManagerRef(String host, int port); + public abstract void setupLifecycleManagerRef(String host, int port) throws CelebornIOException; public abstract void setupLifecycleManagerRef(RpcEndpointRef endpointRef); diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index 85ae7f20778..6e421528a32 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -667,7 +667,8 @@ private ConcurrentHashMap registerShuffleInternal( StatusCode lastFailedStatusCode = null; while (numRetries > 0) { try { - PbRegisterShuffleResponse response = callLifecycleManagerWithRetry(callable); + PbRegisterShuffleResponse response = + callLifecycleManagerWithTimeoutRetry(callable, "registerShuffle"); StatusCode respStatus = Utils.toStatusCode(response.getStatus()); if (StatusCode.SUCCESS.equals(respStatus)) { ConcurrentHashMap result = JavaUtils.newConcurrentHashMap(); @@ -1703,14 +1704,17 @@ private void mapEndInternal( try { limitZeroInFlight(mapKey, pushState); MapperEndResponse response = - callLifecycleManagerWithRetry( + callLifecycleManagerWithTimeoutRetry( () -> lifecycleManagerRef.askSync( new MapperEnd(shuffleId, mapId, attemptId, numMappers, partitionId), - ClassTag$.MODULE$.apply(MapperEndResponse.class))); + ClassTag$.MODULE$.apply(MapperEndResponse.class)), + "mapperEnd"); if (response.status() != StatusCode.SUCCESS) { throw new CelebornIOException("MapperEnd failed! StatusCode: " + response.status()); } + } catch (Exception e) { + throw new CelebornIOException("MapperEnd failed!", e); } finally { pushStates.remove(mapKey); } @@ -1748,12 +1752,13 @@ protected Tuple2 loadFileGroupInternal( GetReducerFileGroup getReducerFileGroup = new GetReducerFileGroup(shuffleId, isSegmentGranularityVisible); GetReducerFileGroupResponse response = - callLifecycleManagerWithRetry( + callLifecycleManagerWithTimeoutRetry( () -> lifecycleManagerRef.askSync( getReducerFileGroup, conf.clientRpcGetReducerFileGroupAskTimeout(), - ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class))); + ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class)), + "getReducerFileGroup"); switch (response.status()) { case SUCCESS: logger.info( @@ -1921,22 +1926,26 @@ public void shutdown() { } @Override - public void setupLifecycleManagerRef(String host, int port) { + public void setupLifecycleManagerRef(String host, int port) throws CelebornIOException { logger.info("setupLifecycleManagerRef: host = {}, port = {}", host, port); - lifecycleManagerRef = - callLifecycleManagerWithRetry( - () -> - rpcEnv.setupEndpointRef( - new RpcAddress(host, port), RpcNameConstants.LIFECYCLE_MANAGER_EP)); + try { + lifecycleManagerRef = + callLifecycleManagerWithTimeoutRetry( + () -> + rpcEnv.setupEndpointRef( + new RpcAddress(host, port), RpcNameConstants.LIFECYCLE_MANAGER_EP), + "setupLifecycleManagerRef"); + } catch (Exception e) { + logger.error("setupLifecycleManagerRef failed, host = {}, port = {}", host, port); + throw new CelebornIOException("setupLifecycleManagerRef failed", e); + } initDataClientFactoryIfNeeded(); } - public T callLifecycleManagerWithRetry(Callable callable) { - return callLifecycleManagerWithRetry(callable, 3); - } - - public T callLifecycleManagerWithRetry(Callable callable, int numRetries) { + public T callLifecycleManagerWithTimeoutRetry(Callable callable, String name) + throws Exception { T result; + int numRetries = 3; while (numRetries > 0) { numRetries--; try { @@ -1945,18 +1954,24 @@ public T callLifecycleManagerWithRetry(Callable callable, int numRetries) } catch (Exception error) { if (error instanceof RpcTimeoutException && numRetries > 0) { logger.warn( - "RpcTimeout while calling LifecycleManager, left retry times: {}", numRetries); + "RpcTimeout while {} calling LifecycleManager, left retry times: {}", + name, + numRetries); try { Random random = new Random(); int waitTimeBound = (int) lifecycleManagerRpcTimeoutRetryWaitMs; long retryWaitMs = random.nextInt(waitTimeBound); TimeUnit.MILLISECONDS.sleep(retryWaitMs); } catch (InterruptedException e) { + logger.warn("retry wait interrupted", e); break; } } else { - logger.error("Exception raised while calling LifecycleManager"); - break; + logger.error( + "Exception raised while {} calling LifecycleManager, tried {} times", + name, + 3 - numRetries); + throw error; } } } From 0a4b8b76c3bb1a4332c3af01cdb5b29d6c1e757b Mon Sep 17 00:00:00 2001 From: zhengtao Date: Thu, 19 Dec 2024 19:13:56 +0800 Subject: [PATCH 08/24] no need init wait time --- .../java/org/apache/celeborn/client/ShuffleClientImpl.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index 6e421528a32..9d2b0564796 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -79,7 +79,6 @@ public class ShuffleClientImpl extends ShuffleClient { private final int registerShuffleMaxRetries; private final long registerShuffleRetryWaitMs; - private final long lifecycleManagerRpcTimeoutRetryWaitMs; private final int maxReviveTimes; private final boolean testRetryRevive; private final int pushBufferMaxSize; @@ -178,7 +177,6 @@ public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier u this.userIdentifier = userIdentifier; registerShuffleMaxRetries = conf.clientRegisterShuffleMaxRetry(); registerShuffleRetryWaitMs = conf.clientRegisterShuffleRetryWaitMs(); - lifecycleManagerRpcTimeoutRetryWaitMs = conf.clientCallLifecycleManagerRetryWaitMs(); maxReviveTimes = conf.clientPushMaxReviveTimes(); testRetryRevive = conf.testRetryRevive(); pushBufferMaxSize = conf.clientPushBufferMaxSize(); @@ -1959,7 +1957,7 @@ public T callLifecycleManagerWithTimeoutRetry(Callable callable, String n numRetries); try { Random random = new Random(); - int waitTimeBound = (int) lifecycleManagerRpcTimeoutRetryWaitMs; + int waitTimeBound = (int) conf.clientCallLifecycleManagerRetryWaitMs(); long retryWaitMs = random.nextInt(waitTimeBound); TimeUnit.MILLISECONDS.sleep(retryWaitMs); } catch (InterruptedException e) { From 0364bd36e00513d28ff9964f0c9159ef16fa7fd0 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Thu, 19 Dec 2024 19:24:45 +0800 Subject: [PATCH 09/24] add exception for flink sc --- .../plugin/flink/readclient/FlinkShuffleClientImpl.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java index 5602d1aac1c..9c5aeefe395 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java @@ -92,7 +92,7 @@ public static FlinkShuffleClientImpl get( long driverTimestamp, CelebornConf conf, UserIdentifier userIdentifier) - throws DriverChangedException { + throws CelebornIOException { return get( appUniqueId, driverHost, @@ -111,7 +111,7 @@ public static FlinkShuffleClientImpl get( CelebornConf conf, UserIdentifier userIdentifier, int bufferSizeBytes) - throws DriverChangedException { + throws CelebornIOException { if (null == _instance || !initialized || _instance.driverTimestamp < driverTimestamp) { synchronized (FlinkShuffleClientImpl.class) { if (null == _instance) { @@ -169,7 +169,7 @@ public FlinkShuffleClientImpl( long driverTimestamp, CelebornConf conf, UserIdentifier userIdentifier, - int bufferSizeBytes) { + int bufferSizeBytes) throws CelebornIOException { super(appUniqueId, conf, userIdentifier); this.bufferSizeBytes = bufferSizeBytes; String module = TransportModuleConstants.DATA_MODULE; @@ -190,7 +190,7 @@ private void initializeTransportClientFactory() { } @Override - public void setupLifecycleManagerRef(String host, int port) { + public void setupLifecycleManagerRef(String host, int port) throws CelebornIOException { super.setupLifecycleManagerRef(host, port); initializeTransportClientFactory(); } From 09997485ee6b06deb3ca18b96482ee9d76fcb39f Mon Sep 17 00:00:00 2001 From: zhengtao Date: Thu, 19 Dec 2024 19:27:09 +0800 Subject: [PATCH 10/24] reformat --- .../plugin/flink/readclient/FlinkShuffleClientImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java index 9c5aeefe395..82697ef8d74 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java @@ -169,7 +169,8 @@ public FlinkShuffleClientImpl( long driverTimestamp, CelebornConf conf, UserIdentifier userIdentifier, - int bufferSizeBytes) throws CelebornIOException { + int bufferSizeBytes) + throws CelebornIOException { super(appUniqueId, conf, userIdentifier); this.bufferSizeBytes = bufferSizeBytes; String module = TransportModuleConstants.DATA_MODULE; From dce8f131984681eb57b688149da3ae666c9dc256 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Thu, 19 Dec 2024 19:35:03 +0800 Subject: [PATCH 11/24] exception change --- .../plugin/flink/RemoteShuffleInputGateDelegation.java | 4 ++-- .../apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java | 4 ++-- .../plugin/flink/tiered/CelebornTierConsumerAgent.java | 4 ++-- .../plugin/flink/tiered/CelebornTierProducerAgent.java | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java index 95ad204959c..cebcdf5c0af 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java @@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.common.CelebornConf; -import org.apache.celeborn.common.exception.DriverChangedException; +import org.apache.celeborn.common.exception.CelebornIOException; import org.apache.celeborn.common.exception.PartitionUnRetryAbleException; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.plugin.flink.buffer.BufferPacker; @@ -165,7 +165,7 @@ public RemoteShuffleInputGateDelegation( shuffleResource.getLifecycleManagerTimestamp(), celebornConf, new UserIdentifier("default", "default")); - } catch (DriverChangedException e) { + } catch (CelebornIOException e) { throw new RuntimeException(e.getMessage()); } diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java index f695af14d74..c3d516b35df 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java @@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.common.CelebornConf; -import org.apache.celeborn.common.exception.DriverChangedException; +import org.apache.celeborn.common.exception.CelebornIOException; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.protocol.PartitionLocation; import org.apache.celeborn.plugin.flink.buffer.BufferHeader; @@ -201,7 +201,7 @@ FlinkShuffleClientImpl getShuffleClient() { lifecycleManagerTimestamp, celebornConf, userIdentifier); - } catch (DriverChangedException e) { + } catch (CelebornIOException e) { // would generate a new attempt to retry output gate throw new RuntimeException(e.getMessage()); } diff --git a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java index 8d06ba77c09..bbf46d7fd8a 100644 --- a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java +++ b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java @@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.common.CelebornConf; -import org.apache.celeborn.common.exception.DriverChangedException; +import org.apache.celeborn.common.exception.CelebornIOException; import org.apache.celeborn.common.exception.PartitionUnRetryAbleException; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.plugin.flink.RemoteShuffleResource; @@ -332,7 +332,7 @@ private void initShuffleClient(TierShuffleDescriptorImpl remoteShuffleDescriptor conf, new UserIdentifier("default", "default"), bufferSizeBytes); - } catch (DriverChangedException e) { + } catch (CelebornIOException e) { throw new RuntimeException(e.getMessage()); } } diff --git a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java index fc2c149820e..420b383853a 100644 --- a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java +++ b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java @@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.common.CelebornConf; -import org.apache.celeborn.common.exception.DriverChangedException; +import org.apache.celeborn.common.exception.CelebornIOException; import org.apache.celeborn.common.protocol.PartitionLocation; import org.apache.celeborn.plugin.flink.buffer.BufferHeader; import org.apache.celeborn.plugin.flink.buffer.BufferPacker; @@ -487,7 +487,7 @@ FlinkShuffleClientImpl getShuffleClient() { celebornConf, null, bufferSizeBytes); - } catch (DriverChangedException e) { + } catch (CelebornIOException e) { // would generate a new attempt to retry output gate throw new RuntimeException(e.getMessage()); } From 99154e8e35d74af1793476f831d8cbd5c9df6535 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Thu, 19 Dec 2024 19:44:03 +0800 Subject: [PATCH 12/24] mr exception --- .../task/reduce/CelebornShuffleConsumer.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java index 778e243faec..d8a4498740f 100644 --- a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java +++ b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java @@ -29,6 +29,7 @@ import org.apache.celeborn.client.read.CelebornInputStream; import org.apache.celeborn.client.read.MetricsCallback; import org.apache.celeborn.common.CelebornConf; +import org.apache.celeborn.common.exception.CelebornIOException; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.reflect.DynConstructors; import org.apache.celeborn.reflect.DynMethods; @@ -73,14 +74,18 @@ public void init(Context context) { int lmPort = Integer.parseInt(celebornJobConf.get(HadoopUtils.MR_CELEBORN_LM_PORT)); logger.info("Reducer initialized with celeborn {} {} {}", appId, lmHost, lmPort); CelebornConf celebornConf = HadoopUtils.fromYarnConf(mrJobConf); - shuffleClient = - ShuffleClient.get( - appId, - lmHost, - lmPort, - celebornConf, - new UserIdentifier( - celebornConf.userSpecificTenant(), celebornConf.userSpecificUserName())); + try { + shuffleClient = + ShuffleClient.get( + appId, + lmHost, + lmPort, + celebornConf, + new UserIdentifier( + celebornConf.userSpecificTenant(), celebornConf.userSpecificUserName())); + } catch (CelebornIOException e) { + reportException(e); + } this.merger = new MergeManagerImpl<>( reduceId, From 5bcf9fcf9bf5e8d35c5a19b3170e9647fee6e5f3 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Mon, 23 Dec 2024 14:01:32 +0800 Subject: [PATCH 13/24] change Exception to runtimeException --- .../RemoteShuffleInputGateDelegation.java | 4 +-- .../plugin/flink/RemoteShuffleOutputGate.java | 4 +-- .../readclient/FlinkShuffleClientImpl.java | 9 +++--- .../tiered/CelebornTierProducerAgent.java | 4 +-- .../task/reduce/CelebornShuffleConsumer.java | 21 ++++++-------- .../apache/celeborn/client/ShuffleClient.java | 8 ++---- .../celeborn/client/ShuffleClientImpl.java | 28 ++++++++----------- 7 files changed, 32 insertions(+), 46 deletions(-) diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java index cebcdf5c0af..95ad204959c 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java @@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.common.CelebornConf; -import org.apache.celeborn.common.exception.CelebornIOException; +import org.apache.celeborn.common.exception.DriverChangedException; import org.apache.celeborn.common.exception.PartitionUnRetryAbleException; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.plugin.flink.buffer.BufferPacker; @@ -165,7 +165,7 @@ public RemoteShuffleInputGateDelegation( shuffleResource.getLifecycleManagerTimestamp(), celebornConf, new UserIdentifier("default", "default")); - } catch (CelebornIOException e) { + } catch (DriverChangedException e) { throw new RuntimeException(e.getMessage()); } diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java index c3d516b35df..f695af14d74 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java @@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.common.CelebornConf; -import org.apache.celeborn.common.exception.CelebornIOException; +import org.apache.celeborn.common.exception.DriverChangedException; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.protocol.PartitionLocation; import org.apache.celeborn.plugin.flink.buffer.BufferHeader; @@ -201,7 +201,7 @@ FlinkShuffleClientImpl getShuffleClient() { lifecycleManagerTimestamp, celebornConf, userIdentifier); - } catch (CelebornIOException e) { + } catch (DriverChangedException e) { // would generate a new attempt to retry output gate throw new RuntimeException(e.getMessage()); } diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java index 82697ef8d74..5602d1aac1c 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java @@ -92,7 +92,7 @@ public static FlinkShuffleClientImpl get( long driverTimestamp, CelebornConf conf, UserIdentifier userIdentifier) - throws CelebornIOException { + throws DriverChangedException { return get( appUniqueId, driverHost, @@ -111,7 +111,7 @@ public static FlinkShuffleClientImpl get( CelebornConf conf, UserIdentifier userIdentifier, int bufferSizeBytes) - throws CelebornIOException { + throws DriverChangedException { if (null == _instance || !initialized || _instance.driverTimestamp < driverTimestamp) { synchronized (FlinkShuffleClientImpl.class) { if (null == _instance) { @@ -169,8 +169,7 @@ public FlinkShuffleClientImpl( long driverTimestamp, CelebornConf conf, UserIdentifier userIdentifier, - int bufferSizeBytes) - throws CelebornIOException { + int bufferSizeBytes) { super(appUniqueId, conf, userIdentifier); this.bufferSizeBytes = bufferSizeBytes; String module = TransportModuleConstants.DATA_MODULE; @@ -191,7 +190,7 @@ private void initializeTransportClientFactory() { } @Override - public void setupLifecycleManagerRef(String host, int port) throws CelebornIOException { + public void setupLifecycleManagerRef(String host, int port) { super.setupLifecycleManagerRef(host, port); initializeTransportClientFactory(); } diff --git a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java index 420b383853a..fc2c149820e 100644 --- a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java +++ b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java @@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.common.CelebornConf; -import org.apache.celeborn.common.exception.CelebornIOException; +import org.apache.celeborn.common.exception.DriverChangedException; import org.apache.celeborn.common.protocol.PartitionLocation; import org.apache.celeborn.plugin.flink.buffer.BufferHeader; import org.apache.celeborn.plugin.flink.buffer.BufferPacker; @@ -487,7 +487,7 @@ FlinkShuffleClientImpl getShuffleClient() { celebornConf, null, bufferSizeBytes); - } catch (CelebornIOException e) { + } catch (DriverChangedException e) { // would generate a new attempt to retry output gate throw new RuntimeException(e.getMessage()); } diff --git a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java index d8a4498740f..778e243faec 100644 --- a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java +++ b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java @@ -29,7 +29,6 @@ import org.apache.celeborn.client.read.CelebornInputStream; import org.apache.celeborn.client.read.MetricsCallback; import org.apache.celeborn.common.CelebornConf; -import org.apache.celeborn.common.exception.CelebornIOException; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.reflect.DynConstructors; import org.apache.celeborn.reflect.DynMethods; @@ -74,18 +73,14 @@ public void init(Context context) { int lmPort = Integer.parseInt(celebornJobConf.get(HadoopUtils.MR_CELEBORN_LM_PORT)); logger.info("Reducer initialized with celeborn {} {} {}", appId, lmHost, lmPort); CelebornConf celebornConf = HadoopUtils.fromYarnConf(mrJobConf); - try { - shuffleClient = - ShuffleClient.get( - appId, - lmHost, - lmPort, - celebornConf, - new UserIdentifier( - celebornConf.userSpecificTenant(), celebornConf.userSpecificUserName())); - } catch (CelebornIOException e) { - reportException(e); - } + shuffleClient = + ShuffleClient.get( + appId, + lmHost, + lmPort, + celebornConf, + new UserIdentifier( + celebornConf.userSpecificTenant(), celebornConf.userSpecificUserName())); this.merger = new MergeManagerImpl<>( reduceId, diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java index 7c31050b958..efa9641f671 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java @@ -67,8 +67,7 @@ public static ShuffleClient get( String driverHost, int port, CelebornConf conf, - UserIdentifier userIdentifier) - throws CelebornIOException { + UserIdentifier userIdentifier) { return ShuffleClient.get(appUniqueId, driverHost, port, conf, userIdentifier, null); } @@ -78,8 +77,7 @@ public static ShuffleClient get( int port, CelebornConf conf, UserIdentifier userIdentifier, - byte[] extension) - throws CelebornIOException { + byte[] extension) { if (null == _instance || !initialized) { synchronized (ShuffleClient.class) { if (null == _instance) { @@ -139,7 +137,7 @@ public static void printReadStats(Logger logger) { String.format("%.2f", (localReadCount * 1.0d / totalReadCount) * 100)); } - public abstract void setupLifecycleManagerRef(String host, int port) throws CelebornIOException; + public abstract void setupLifecycleManagerRef(String host, int port); public abstract void setupLifecycleManagerRef(RpcEndpointRef endpointRef); diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index 9d2b0564796..69e86411e38 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -40,6 +40,7 @@ import org.apache.celeborn.client.read.MetricsCallback; import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.exception.CelebornIOException; +import org.apache.celeborn.common.exception.CelebornRuntimeException; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.network.TransportContext; import org.apache.celeborn.common.network.buffer.NettyManagedBuffer; @@ -1711,8 +1712,6 @@ private void mapEndInternal( if (response.status() != StatusCode.SUCCESS) { throw new CelebornIOException("MapperEnd failed! StatusCode: " + response.status()); } - } catch (Exception e) { - throw new CelebornIOException("MapperEnd failed!", e); } finally { pushStates.remove(mapKey); } @@ -1924,24 +1923,18 @@ public void shutdown() { } @Override - public void setupLifecycleManagerRef(String host, int port) throws CelebornIOException { + public void setupLifecycleManagerRef(String host, int port) { logger.info("setupLifecycleManagerRef: host = {}, port = {}", host, port); - try { - lifecycleManagerRef = - callLifecycleManagerWithTimeoutRetry( - () -> - rpcEnv.setupEndpointRef( - new RpcAddress(host, port), RpcNameConstants.LIFECYCLE_MANAGER_EP), - "setupLifecycleManagerRef"); - } catch (Exception e) { - logger.error("setupLifecycleManagerRef failed, host = {}, port = {}", host, port); - throw new CelebornIOException("setupLifecycleManagerRef failed", e); - } + lifecycleManagerRef = + callLifecycleManagerWithTimeoutRetry( + () -> + rpcEnv.setupEndpointRef( + new RpcAddress(host, port), RpcNameConstants.LIFECYCLE_MANAGER_EP), + "setupLifecycleManagerRef"); initDataClientFactoryIfNeeded(); } - public T callLifecycleManagerWithTimeoutRetry(Callable callable, String name) - throws Exception { + public T callLifecycleManagerWithTimeoutRetry(Callable callable, String name) { T result; int numRetries = 3; while (numRetries > 0) { @@ -1969,7 +1962,8 @@ public T callLifecycleManagerWithTimeoutRetry(Callable callable, String n "Exception raised while {} calling LifecycleManager, tried {} times", name, 3 - numRetries); - throw error; + throw new CelebornRuntimeException( + "Exception raised while calling LifecycleManager", error); } } } From f8cd55580f7fbd2d04ff9fb50ce392652a7167e0 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Mon, 23 Dec 2024 14:04:58 +0800 Subject: [PATCH 14/24] revert exception --- .../plugin/flink/tiered/CelebornTierConsumerAgent.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java index bbf46d7fd8a..8d06ba77c09 100644 --- a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java +++ b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java @@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.common.CelebornConf; -import org.apache.celeborn.common.exception.CelebornIOException; +import org.apache.celeborn.common.exception.DriverChangedException; import org.apache.celeborn.common.exception.PartitionUnRetryAbleException; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.plugin.flink.RemoteShuffleResource; @@ -332,7 +332,7 @@ private void initShuffleClient(TierShuffleDescriptorImpl remoteShuffleDescriptor conf, new UserIdentifier("default", "default"), bufferSizeBytes); - } catch (CelebornIOException e) { + } catch (DriverChangedException e) { throw new RuntimeException(e.getMessage()); } } From bc6237ab8a863fb02a7272a7a8edd0f21129e474 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Mon, 23 Dec 2024 18:10:41 +0800 Subject: [PATCH 15/24] move retry into rpc --- .../celeborn/client/ShuffleClientImpl.java | 160 +++++++----------- .../apache/celeborn/common/CelebornConf.scala | 21 ++- .../celeborn/common/rpc/RpcEndpointRef.scala | 60 +++++++ .../apache/celeborn/common/rpc/RpcEnv.scala | 38 +++++ docs/configuration/client.md | 2 +- docs/configuration/network.md | 1 + 6 files changed, 175 insertions(+), 107 deletions(-) diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index 69e86411e38..c1f187b1357 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -40,7 +40,6 @@ import org.apache.celeborn.client.read.MetricsCallback; import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.exception.CelebornIOException; -import org.apache.celeborn.common.exception.CelebornRuntimeException; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.network.TransportContext; import org.apache.celeborn.common.network.buffer.NettyManagedBuffer; @@ -61,7 +60,6 @@ import org.apache.celeborn.common.rpc.RpcAddress; import org.apache.celeborn.common.rpc.RpcEndpointRef; import org.apache.celeborn.common.rpc.RpcEnv; -import org.apache.celeborn.common.rpc.RpcTimeoutException; import org.apache.celeborn.common.unsafe.Platform; import org.apache.celeborn.common.util.*; import org.apache.celeborn.common.write.DataBatches; @@ -80,6 +78,7 @@ public class ShuffleClientImpl extends ShuffleClient { private final int registerShuffleMaxRetries; private final long registerShuffleRetryWaitMs; + private final int callLifecycleManagerMaxRetry; private final int maxReviveTimes; private final boolean testRetryRevive; private final int pushBufferMaxSize; @@ -178,6 +177,7 @@ public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier u this.userIdentifier = userIdentifier; registerShuffleMaxRetries = conf.clientRegisterShuffleMaxRetry(); registerShuffleRetryWaitMs = conf.clientRegisterShuffleRetryWaitMs(); + callLifecycleManagerMaxRetry = conf.clientCallLifecycleManagerMaxRetry(); maxReviveTimes = conf.clientPushMaxReviveTimes(); testRetryRevive = conf.testRetryRevive(); pushBufferMaxSize = conf.clientPushBufferMaxSize(); @@ -533,6 +533,7 @@ private ConcurrentHashMap registerShuffle( lifecycleManagerRef.askSync( RegisterShuffle$.MODULE$.apply(shuffleId, numMappers, numPartitions), conf.clientRpcRegisterShuffleAskTimeout(), + callLifecycleManagerMaxRetry, ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class))); } @@ -666,8 +667,7 @@ private ConcurrentHashMap registerShuffleInternal( StatusCode lastFailedStatusCode = null; while (numRetries > 0) { try { - PbRegisterShuffleResponse response = - callLifecycleManagerWithTimeoutRetry(callable, "registerShuffle"); + PbRegisterShuffleResponse response = callable.call(); StatusCode respStatus = Utils.toStatusCode(response.getStatus()); if (StatusCode.SUCCESS.equals(respStatus)) { ConcurrentHashMap result = JavaUtils.newConcurrentHashMap(); @@ -1703,12 +1703,10 @@ private void mapEndInternal( try { limitZeroInFlight(mapKey, pushState); MapperEndResponse response = - callLifecycleManagerWithTimeoutRetry( - () -> - lifecycleManagerRef.askSync( - new MapperEnd(shuffleId, mapId, attemptId, numMappers, partitionId), - ClassTag$.MODULE$.apply(MapperEndResponse.class)), - "mapperEnd"); + lifecycleManagerRef.askSync( + new MapperEnd(shuffleId, mapId, attemptId, numMappers, partitionId), + callLifecycleManagerMaxRetry, + ClassTag$.MODULE$.apply(MapperEndResponse.class)); if (response.status() != StatusCode.SUCCESS) { throw new CelebornIOException("MapperEnd failed! StatusCode: " + response.status()); } @@ -1744,57 +1742,56 @@ protected Tuple2 loadFileGroupInternal( int shuffleId, boolean isSegmentGranularityVisible) { long getReducerFileGroupStartTime = System.nanoTime(); String exceptionMsg = null; - if (lifecycleManagerRef != null) { - try { - GetReducerFileGroup getReducerFileGroup = - new GetReducerFileGroup(shuffleId, isSegmentGranularityVisible); - GetReducerFileGroupResponse response = - callLifecycleManagerWithTimeoutRetry( - () -> - lifecycleManagerRef.askSync( - getReducerFileGroup, - conf.clientRpcGetReducerFileGroupAskTimeout(), - ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class)), - "getReducerFileGroup"); - switch (response.status()) { - case SUCCESS: - logger.info( - "Shuffle {} request reducer file group success using {} ms, result partition size {}.", - shuffleId, - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - getReducerFileGroupStartTime), - response.fileGroup().size()); - return Tuple2.apply( - new ReduceFileGroups( - response.fileGroup(), response.attempts(), response.partitionIds()), - null); - case SHUFFLE_NOT_REGISTERED: - logger.warn( - "Request {} return {} for {}.", getReducerFileGroup, response.status(), shuffleId); - // return empty result - return Tuple2.apply( - new ReduceFileGroups( - response.fileGroup(), response.attempts(), response.partitionIds()), - null); - case STAGE_END_TIME_OUT: - case SHUFFLE_DATA_LOST: - exceptionMsg = - String.format( - "Request %s return %s for %s.", - getReducerFileGroup, response.status(), shuffleId); - logger.warn(exceptionMsg); - break; - default: // fall out - } - } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - logger.error("Exception raised while call GetReducerFileGroup for {}.", shuffleId, e); - exceptionMsg = e.getMessage(); - } - } else { + if (lifecycleManagerRef == null) { exceptionMsg = "Driver endpoint is null!"; logger.warn(exceptionMsg); + return Tuple2.apply(null, exceptionMsg); + } + try { + GetReducerFileGroup getReducerFileGroup = + new GetReducerFileGroup(shuffleId, isSegmentGranularityVisible); + + GetReducerFileGroupResponse response = + lifecycleManagerRef.askSync( + getReducerFileGroup, + conf.clientRpcGetReducerFileGroupAskTimeout(), + callLifecycleManagerMaxRetry, + ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class)); + switch (response.status()) { + case SUCCESS: + logger.info( + "Shuffle {} request reducer file group success using {} ms, result partition size {}.", + shuffleId, + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - getReducerFileGroupStartTime), + response.fileGroup().size()); + return Tuple2.apply( + new ReduceFileGroups( + response.fileGroup(), response.attempts(), response.partitionIds()), + null); + case SHUFFLE_NOT_REGISTERED: + logger.warn( + "Request {} return {} for {}.", getReducerFileGroup, response.status(), shuffleId); + // return empty result + return Tuple2.apply( + new ReduceFileGroups( + response.fileGroup(), response.attempts(), response.partitionIds()), + null); + case STAGE_END_TIME_OUT: + case SHUFFLE_DATA_LOST: + exceptionMsg = + String.format( + "Request %s return %s for %s.", + getReducerFileGroup, response.status(), shuffleId); + logger.warn(exceptionMsg); + break; + default: // fall out + } + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + logger.error("Exception raised while call GetReducerFileGroup for {}.", shuffleId, e); + exceptionMsg = e.getMessage(); } return Tuple2.apply(null, exceptionMsg); } @@ -1926,50 +1923,13 @@ public void shutdown() { public void setupLifecycleManagerRef(String host, int port) { logger.info("setupLifecycleManagerRef: host = {}, port = {}", host, port); lifecycleManagerRef = - callLifecycleManagerWithTimeoutRetry( - () -> - rpcEnv.setupEndpointRef( - new RpcAddress(host, port), RpcNameConstants.LIFECYCLE_MANAGER_EP), - "setupLifecycleManagerRef"); + rpcEnv.setupEndpointRef( + new RpcAddress(host, port), + RpcNameConstants.LIFECYCLE_MANAGER_EP, + callLifecycleManagerMaxRetry); initDataClientFactoryIfNeeded(); } - public T callLifecycleManagerWithTimeoutRetry(Callable callable, String name) { - T result; - int numRetries = 3; - while (numRetries > 0) { - numRetries--; - try { - result = callable.call(); - return result; - } catch (Exception error) { - if (error instanceof RpcTimeoutException && numRetries > 0) { - logger.warn( - "RpcTimeout while {} calling LifecycleManager, left retry times: {}", - name, - numRetries); - try { - Random random = new Random(); - int waitTimeBound = (int) conf.clientCallLifecycleManagerRetryWaitMs(); - long retryWaitMs = random.nextInt(waitTimeBound); - TimeUnit.MILLISECONDS.sleep(retryWaitMs); - } catch (InterruptedException e) { - logger.warn("retry wait interrupted", e); - break; - } - } else { - logger.error( - "Exception raised while {} calling LifecycleManager, tried {} times", - name, - 3 - numRetries); - throw new CelebornRuntimeException( - "Exception raised while calling LifecycleManager", error); - } - } - } - return null; - } - @Override public void setupLifecycleManagerRef(RpcEndpointRef endpointRef) { lifecycleManagerRef = endpointRef; diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 82a6d7d6823..4ad81e4f588 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -520,6 +520,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se new RpcTimeout(get(RPC_LOOKUP_TIMEOUT).milli, RPC_LOOKUP_TIMEOUT.key) def rpcAskTimeout: RpcTimeout = new RpcTimeout(get(RPC_ASK_TIMEOUT).milli, RPC_ASK_TIMEOUT.key) + def rpcTimeoutRetryWaitMs: Long = get(RPC_TIMEOUT_RETRY_WAIT) def rpcInMemoryBoundedInboxCapacity(): Int = { get(RPC_INBOX_CAPACITY) } @@ -901,7 +902,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def clientCloseIdleConnections: Boolean = get(CLIENT_CLOSE_IDLE_CONNECTIONS) def clientRegisterShuffleMaxRetry: Int = get(CLIENT_REGISTER_SHUFFLE_MAX_RETRIES) def clientRegisterShuffleRetryWaitMs: Long = get(CLIENT_REGISTER_SHUFFLE_RETRY_WAIT) - def clientCallLifecycleManagerRetryWaitMs: Long = get(CLIENT_CALL_LIFECYCLEMANAGER_RETRY_WAIT) + def clientCallLifecycleManagerMaxRetry: Int = get(CLIENT_CALL_LIFECYCLEMANAGER_MAX_RETRIES) def clientReserveSlotsRackAwareEnabled: Boolean = get(CLIENT_RESERVE_SLOTS_RACKAWARE_ENABLED) def clientReserveSlotsMaxRetries: Int = get(CLIENT_RESERVE_SLOTS_MAX_RETRIES) def clientReserveSlotsRetryWait: Long = get(CLIENT_RESERVE_SLOTS_RETRY_WAIT) @@ -4885,15 +4886,23 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("3s") - val CLIENT_CALL_LIFECYCLEMANAGER_RETRY_WAIT: ConfigEntry[Long] = - buildConf("celeborn.client.callLifecycleManager.retryWait") - .withAlternative("celeborn.shuffle.callLifecycleManager.retryWait") - .categories("client") + val RPC_TIMEOUT_RETRY_WAIT: ConfigEntry[Long] = + buildConf("celeborn.rpc.timeoutRetryWait") + .categories("network") .version("0.6.0") - .doc("Wait time before next retry if call LifecycleManager failed.") + .doc("Wait time before next retry if RpcTimeoutException.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1s") + val CLIENT_CALL_LIFECYCLEMANAGER_MAX_RETRIES: ConfigEntry[Int] = + buildConf("celeborn.client.callLifecycleManager.maxRetries") + .withAlternative("celeborn.callLifecycleManager.maxRetries") + .categories("client") + .version("0.6.0") + .doc("Max retry times for client to reserve slots.") + .intConf + .createWithDefault(3) + val CLIENT_RESERVE_SLOTS_MAX_RETRIES: ConfigEntry[Int] = buildConf("celeborn.client.reserveSlots.maxRetries") .withAlternative("celeborn.slots.reserve.maxRetries") diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala index edd7005e2e9..46c3223cc69 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala @@ -17,6 +17,9 @@ package org.apache.celeborn.common.rpc +import java.util.Random +import java.util.concurrent.TimeUnit + import scala.concurrent.Future import scala.reflect.ClassTag @@ -30,6 +33,7 @@ abstract class RpcEndpointRef(conf: CelebornConf) extends Serializable with Logging { private[this] val defaultAskTimeout = conf.rpcAskTimeout + private[celeborn] val waitTimeBound = conf.rpcTimeoutRetryWaitMs.toInt /** * return the address for the [[RpcEndpointRef]] @@ -72,6 +76,20 @@ abstract class RpcEndpointRef(conf: CelebornConf) */ def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout) + /** + * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a + * default timeout, retry if timeout, throw an exception if this still fails. + * + * Note: this is a blocking action which may cost a lot of time, so don't call it in a message + * loop of [[RpcEndpoint]]. + * + * @param message the message to send + * @tparam T type of the reply message + * @return the reply message from the corresponding [[RpcEndpoint]] + */ + def askSync[T: ClassTag](message: Any, retryCount: Int): T = + askSync(message, defaultAskTimeout, retryCount) + /** * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a * specified timeout, throw an exception if this fails. @@ -88,4 +106,46 @@ abstract class RpcEndpointRef(conf: CelebornConf) val future = ask[T](message, timeout) timeout.awaitResult(future, address) } + + /** + * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a + * specified timeout, retry if timeout, throw an exception if this still fails. + * + * Note: this is a blocking action which may cost a lot of time, so don't call it in a message + * loop of [[RpcEndpoint]]. + * + * @param message the message to send + * @param timeout the timeout duration + * @tparam T type of the reply message + * @return the reply message from the corresponding [[RpcEndpoint]] + */ + def askSync[T: ClassTag](message: Any, timeout: RpcTimeout, retryCount: Int): T = { + var numRetries = retryCount + while (numRetries > 0) { + numRetries -= 1 + try { + logInfo(s"[test] ask ${3 - numRetries} start, left Retries $numRetries") + val future = ask[T](message, timeout) + return timeout.awaitResult(future, address) + } catch { + case e: RpcTimeoutException => + if (numRetries > 0) { + try { + val random = new Random + val retryWaitMs = random.nextInt(waitTimeBound) + TimeUnit.MILLISECONDS.sleep(retryWaitMs) + logWarning(s"[test] ask ${3 - numRetries} failed, sleep for ${retryWaitMs} ms") + } catch { + case _: InterruptedException => + numRetries = 0 + } + } else { + throw e + } + } + } + // should never be here + val future = ask[T](message, timeout) + timeout.awaitResult(future, address) + } } diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala index 7a44d8b63aa..7d0152c0ee1 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala @@ -18,6 +18,8 @@ package org.apache.celeborn.common.rpc import java.io.File +import java.util.Random +import java.util.concurrent.TimeUnit import scala.concurrent.Future @@ -104,6 +106,7 @@ object RpcEnv { abstract class RpcEnv(config: RpcEnvConfig) { private[celeborn] val defaultLookupTimeout = config.conf.rpcLookupTimeout + private[celeborn] val waitTimeBound = config.conf.rpcTimeoutRetryWaitMs.toInt /** * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement @@ -142,6 +145,41 @@ abstract class RpcEnv(config: RpcEnvConfig) { setupEndpointRefByAddr(RpcEndpointAddress(address, endpointName)) } + /** + * Retrieve the [[RpcEndpointRef]] represented by `address` and `endpointName` with timeout retry. + * This is a blocking action. + */ + def setupEndpointRef( + address: RpcAddress, + endpointName: String, + retryCount: Int): RpcEndpointRef = { + var numRetries = retryCount + while (numRetries > 0) { + numRetries -= 1 + try { + return setupEndpointRefByAddr(RpcEndpointAddress(address, endpointName)) + } catch { + case e: RpcTimeoutException => + if (numRetries > 0) { + try { + val random = new Random + val retryWaitMs = random.nextInt(waitTimeBound) + TimeUnit.MILLISECONDS.sleep(retryWaitMs) + } catch { + case _: InterruptedException => + numRetries = 0 + } + } else { + throw e + } + case e: RpcEndpointNotFoundException => + throw e + } + } + // should never be here + null + } + /** * Stop [[RpcEndpoint]] specified by `endpoint`. */ diff --git a/docs/configuration/client.md b/docs/configuration/client.md index 82a7f9ba08c..e94a393d320 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -22,7 +22,7 @@ license: | | celeborn.client.application.heartbeatInterval | 10s | false | Interval for client to send heartbeat message to master. | 0.3.0 | celeborn.application.heartbeatInterval | | celeborn.client.application.unregister.enabled | true | false | When true, Celeborn client will inform celeborn master the application is already shutdown during client exit, this allows the cluster to release resources immediately, resulting in resource savings. | 0.3.2 | | | celeborn.client.application.uuidSuffix.enabled | false | false | Whether to add UUID suffix for application id for unique. When `true`, add UUID suffix for unique application id. Currently, this only applies to Spark and MR. | 0.6.0 | | -| celeborn.client.callLifecycleManager.retryWait | 1s | false | Wait time before next retry if call LifecycleManager failed. | 0.6.0 | celeborn.shuffle.callLifecycleManager.retryWait | +| celeborn.client.callLifecycleManager.maxRetries | 3 | false | Max retry times for client to reserve slots. | 0.6.0 | celeborn.callLifecycleManager.maxRetries | | celeborn.client.chunk.prefetch.enabled | false | false | Whether to enable chunk prefetch when creating CelebornInputStream. | 0.6.0 | | | celeborn.client.closeIdleConnections | true | false | Whether client will close idle connections. | 0.3.0 | | | celeborn.client.commitFiles.ignoreExcludedWorker | false | false | When true, LifecycleManager will skip workers which are in the excluded list. | 0.3.0 | | diff --git a/docs/configuration/network.md b/docs/configuration/network.md index f690d205e25..8d8ea6d5f5a 100644 --- a/docs/configuration/network.md +++ b/docs/configuration/network.md @@ -58,6 +58,7 @@ license: | | celeborn.rpc.lookupTimeout | 30s | false | Timeout for RPC lookup operations. | 0.2.0 | | | celeborn.rpc.slow.interval | <undefined> | false | min interval (ms) for RPC framework to log slow RPC | 0.6.0 | | | celeborn.rpc.slow.threshold | 1s | false | threshold for RPC framework to log slow RPC | 0.6.0 | | +| celeborn.rpc.timeoutRetryWait | 1s | false | Wait time before next retry if RpcTimeoutException. | 0.6.0 | | | celeborn.shuffle.io.maxChunksBeingTransferred | <undefined> | false | The max number of chunks allowed to be transferred at the same time on shuffle service. Note that new incoming connections will be closed when the max number is hit. The client will retry according to the shuffle retry configs (see `celeborn..io.maxRetries` and `celeborn..io.retryWait`), if those limits are reached the task will fail with fetch failure. | 0.2.0 | | | celeborn.ssl.<module>.enabled | false | false | Enables SSL for securing wire traffic. | 0.5.0 | | | celeborn.ssl.<module>.enabledAlgorithms | <undefined> | false | A comma-separated list of ciphers. The specified ciphers must be supported by JVM.
The reference list of protocols can be found in the "JSSE Cipher Suite Names" section of the Java security guide. The list for Java 11, for example, can be found at [this page](https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#jsse-cipher-suite-names)
Note: If not set, the default cipher suite for the JRE will be used | 0.5.0 | | From 12650d1d6bf5431c49d2ca267b697be39b047515 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Mon, 23 Dec 2024 19:42:55 +0800 Subject: [PATCH 16/24] Compatible with UT --- .../celeborn/client/ShuffleClientSuiteJ.java | 8 ++++++- .../celeborn/common/rpc/RpcEndpointRef.scala | 24 +++++++++---------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java index 5256ae0fb0c..b1d05ba8e1a 100644 --- a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java +++ b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java @@ -242,13 +242,19 @@ private CelebornConf setupEnv( shuffleClient = new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new UserIdentifier("mock", "mock")); - primaryLocation.setPeer(replicaLocation); when(endpointRef.askSync(any(), any(), any())) .thenAnswer( t -> RegisterShuffleResponse$.MODULE$.apply( statusCode, new PartitionLocation[] {primaryLocation})); + when(endpointRef.askSync(any(), any(), any(Integer.class), any())) + .thenAnswer( + t -> + RegisterShuffleResponse$.MODULE$.apply( + statusCode, new PartitionLocation[] {primaryLocation})); + primaryLocation.setPeer(replicaLocation); + shuffleClient.setupLifecycleManagerRef(endpointRef); ChannelFuture mockedFuture = diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala index 46c3223cc69..6ce710cb5d3 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala @@ -78,34 +78,34 @@ abstract class RpcEndpointRef(conf: CelebornConf) /** * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a - * default timeout, retry if timeout, throw an exception if this still fails. + * specified timeout, throw an exception if this fails. * - * Note: this is a blocking action which may cost a lot of time, so don't call it in a message + * Note: this is a blocking action which may cost a lot of time, so don't call it in a message * loop of [[RpcEndpoint]]. * * @param message the message to send + * @param timeout the timeout duration * @tparam T type of the reply message * @return the reply message from the corresponding [[RpcEndpoint]] */ - def askSync[T: ClassTag](message: Any, retryCount: Int): T = - askSync(message, defaultAskTimeout, retryCount) + def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = { + val future = ask[T](message, timeout) + timeout.awaitResult(future, address) + } /** * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a - * specified timeout, throw an exception if this fails. + * default timeout, retry if timeout, throw an exception if this still fails. * - * Note: this is a blocking action which may cost a lot of time, so don't call it in a message + * Note: this is a blocking action which may cost a lot of time, so don't call it in a message * loop of [[RpcEndpoint]]. * * @param message the message to send - * @param timeout the timeout duration * @tparam T type of the reply message * @return the reply message from the corresponding [[RpcEndpoint]] */ - def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = { - val future = ask[T](message, timeout) - timeout.awaitResult(future, address) - } + def askSync[T: ClassTag](message: Any, retryCount: Int): T = + askSync(message, defaultAskTimeout, retryCount) /** * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a @@ -124,7 +124,6 @@ abstract class RpcEndpointRef(conf: CelebornConf) while (numRetries > 0) { numRetries -= 1 try { - logInfo(s"[test] ask ${3 - numRetries} start, left Retries $numRetries") val future = ask[T](message, timeout) return timeout.awaitResult(future, address) } catch { @@ -134,7 +133,6 @@ abstract class RpcEndpointRef(conf: CelebornConf) val random = new Random val retryWaitMs = random.nextInt(waitTimeBound) TimeUnit.MILLISECONDS.sleep(retryWaitMs) - logWarning(s"[test] ask ${3 - numRetries} failed, sleep for ${retryWaitMs} ms") } catch { case _: InterruptedException => numRetries = 0 From b0d6e588320037721e1466bea599a98139f600b5 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Mon, 23 Dec 2024 19:46:37 +0800 Subject: [PATCH 17/24] useless change --- .../java/org/apache/celeborn/client/ShuffleClientSuiteJ.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java index b1d05ba8e1a..183878cbe1e 100644 --- a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java +++ b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java @@ -242,6 +242,7 @@ private CelebornConf setupEnv( shuffleClient = new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new UserIdentifier("mock", "mock")); + primaryLocation.setPeer(replicaLocation); when(endpointRef.askSync(any(), any(), any())) .thenAnswer( t -> @@ -253,7 +254,6 @@ private CelebornConf setupEnv( t -> RegisterShuffleResponse$.MODULE$.apply( statusCode, new PartitionLocation[] {primaryLocation})); - primaryLocation.setPeer(replicaLocation); shuffleClient.setupLifecycleManagerRef(endpointRef); From f515888291009c9ed470a54f0fbf6dc63c321220 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Mon, 23 Dec 2024 20:12:48 +0800 Subject: [PATCH 18/24] interruptedException --- .../org/apache/celeborn/common/rpc/RpcEndpointRef.scala | 6 +++--- .../main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala index 6ce710cb5d3..a608b97ad09 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala @@ -129,13 +129,13 @@ abstract class RpcEndpointRef(conf: CelebornConf) } catch { case e: RpcTimeoutException => if (numRetries > 0) { + val random = new Random + val retryWaitMs = random.nextInt(waitTimeBound) try { - val random = new Random - val retryWaitMs = random.nextInt(waitTimeBound) TimeUnit.MILLISECONDS.sleep(retryWaitMs) } catch { case _: InterruptedException => - numRetries = 0 + throw e } } else { throw e diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala index 7d0152c0ee1..58d29a4fa3d 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala @@ -161,13 +161,13 @@ abstract class RpcEnv(config: RpcEnvConfig) { } catch { case e: RpcTimeoutException => if (numRetries > 0) { + val random = new Random + val retryWaitMs = random.nextInt(waitTimeBound) try { - val random = new Random - val retryWaitMs = random.nextInt(waitTimeBound) TimeUnit.MILLISECONDS.sleep(retryWaitMs) } catch { case _: InterruptedException => - numRetries = 0 + throw e } } else { throw e From 0451af2e23ee7013e6f5d6416e3bb30180ef9f8c Mon Sep 17 00:00:00 2001 From: zhengtao Date: Tue, 24 Dec 2024 10:18:01 +0800 Subject: [PATCH 19/24] config --- .../celeborn/client/ShuffleClientImpl.java | 26 ++++++++++++------- .../apache/celeborn/common/CelebornConf.scala | 16 +++--------- docs/configuration/client.md | 3 +-- docs/configuration/network.md | 4 +-- 4 files changed, 22 insertions(+), 27 deletions(-) diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index c1f187b1357..2ef695c426d 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -40,6 +40,7 @@ import org.apache.celeborn.client.read.MetricsCallback; import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.exception.CelebornIOException; +import org.apache.celeborn.common.exception.CelebornRuntimeException; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.network.TransportContext; import org.apache.celeborn.common.network.buffer.NettyManagedBuffer; @@ -78,7 +79,7 @@ public class ShuffleClientImpl extends ShuffleClient { private final int registerShuffleMaxRetries; private final long registerShuffleRetryWaitMs; - private final int callLifecycleManagerMaxRetry; + private final int rpcMaxRetries; private final int maxReviveTimes; private final boolean testRetryRevive; private final int pushBufferMaxSize; @@ -177,7 +178,7 @@ public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier u this.userIdentifier = userIdentifier; registerShuffleMaxRetries = conf.clientRegisterShuffleMaxRetry(); registerShuffleRetryWaitMs = conf.clientRegisterShuffleRetryWaitMs(); - callLifecycleManagerMaxRetry = conf.clientCallLifecycleManagerMaxRetry(); + rpcMaxRetries = conf.clientRpcMaxRetries(); maxReviveTimes = conf.clientPushMaxReviveTimes(); testRetryRevive = conf.testRetryRevive(); pushBufferMaxSize = conf.clientPushBufferMaxSize(); @@ -533,7 +534,7 @@ private ConcurrentHashMap registerShuffle( lifecycleManagerRef.askSync( RegisterShuffle$.MODULE$.apply(shuffleId, numMappers, numPartitions), conf.clientRpcRegisterShuffleAskTimeout(), - callLifecycleManagerMaxRetry, + rpcMaxRetries, ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class))); } @@ -1700,12 +1701,14 @@ private void mapEndInternal( throws IOException { final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId); PushState pushState = getPushState(mapKey); + try { limitZeroInFlight(mapKey, pushState); + MapperEndResponse response = lifecycleManagerRef.askSync( new MapperEnd(shuffleId, mapId, attemptId, numMappers, partitionId), - callLifecycleManagerMaxRetry, + rpcMaxRetries, ClassTag$.MODULE$.apply(MapperEndResponse.class)); if (response.status() != StatusCode.SUCCESS) { throw new CelebornIOException("MapperEnd failed! StatusCode: " + response.status()); @@ -1755,7 +1758,7 @@ protected Tuple2 loadFileGroupInternal( lifecycleManagerRef.askSync( getReducerFileGroup, conf.clientRpcGetReducerFileGroupAskTimeout(), - callLifecycleManagerMaxRetry, + rpcMaxRetries, ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class)); switch (response.status()) { case SUCCESS: @@ -1922,11 +1925,14 @@ public void shutdown() { @Override public void setupLifecycleManagerRef(String host, int port) { logger.info("setupLifecycleManagerRef: host = {}, port = {}", host, port); - lifecycleManagerRef = - rpcEnv.setupEndpointRef( - new RpcAddress(host, port), - RpcNameConstants.LIFECYCLE_MANAGER_EP, - callLifecycleManagerMaxRetry); + try { + lifecycleManagerRef = + rpcEnv.setupEndpointRef( + new RpcAddress(host, port), RpcNameConstants.LIFECYCLE_MANAGER_EP, rpcMaxRetries); + } catch (Exception e) { + throw new CelebornRuntimeException("setupLifecycleManagerRef failed!", e); + } + initDataClientFactoryIfNeeded(); } diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 4ad81e4f588..3496d7cf933 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -902,7 +902,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def clientCloseIdleConnections: Boolean = get(CLIENT_CLOSE_IDLE_CONNECTIONS) def clientRegisterShuffleMaxRetry: Int = get(CLIENT_REGISTER_SHUFFLE_MAX_RETRIES) def clientRegisterShuffleRetryWaitMs: Long = get(CLIENT_REGISTER_SHUFFLE_RETRY_WAIT) - def clientCallLifecycleManagerMaxRetry: Int = get(CLIENT_CALL_LIFECYCLEMANAGER_MAX_RETRIES) def clientReserveSlotsRackAwareEnabled: Boolean = get(CLIENT_RESERVE_SLOTS_RACKAWARE_ENABLED) def clientReserveSlotsMaxRetries: Int = get(CLIENT_RESERVE_SLOTS_MAX_RETRIES) def clientReserveSlotsRetryWait: Long = get(CLIENT_RESERVE_SLOTS_RETRY_WAIT) @@ -4887,22 +4886,13 @@ object CelebornConf extends Logging { .createWithDefaultString("3s") val RPC_TIMEOUT_RETRY_WAIT: ConfigEntry[Long] = - buildConf("celeborn.rpc.timeoutRetryWait") + buildConf("celeborn.rpc.retryWait") .categories("network") .version("0.6.0") - .doc("Wait time before next retry if RpcTimeoutException.") + .doc("Wait time before next retry on RpcTimeoutException.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1s") - val CLIENT_CALL_LIFECYCLEMANAGER_MAX_RETRIES: ConfigEntry[Int] = - buildConf("celeborn.client.callLifecycleManager.maxRetries") - .withAlternative("celeborn.callLifecycleManager.maxRetries") - .categories("client") - .version("0.6.0") - .doc("Max retry times for client to reserve slots.") - .intConf - .createWithDefault(3) - val CLIENT_RESERVE_SLOTS_MAX_RETRIES: ConfigEntry[Int] = buildConf("celeborn.client.reserveSlots.maxRetries") .withAlternative("celeborn.slots.reserve.maxRetries") @@ -5052,7 +5042,7 @@ object CelebornConf extends Logging { buildConf("celeborn.client.rpc.maxRetries") .categories("client") .version("0.3.2") - .doc("Max RPC retry times in LifecycleManager.") + .doc("Max RPC retry times in client.") .intConf .createWithDefault(3) diff --git a/docs/configuration/client.md b/docs/configuration/client.md index e94a393d320..479d16de778 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -22,7 +22,6 @@ license: | | celeborn.client.application.heartbeatInterval | 10s | false | Interval for client to send heartbeat message to master. | 0.3.0 | celeborn.application.heartbeatInterval | | celeborn.client.application.unregister.enabled | true | false | When true, Celeborn client will inform celeborn master the application is already shutdown during client exit, this allows the cluster to release resources immediately, resulting in resource savings. | 0.3.2 | | | celeborn.client.application.uuidSuffix.enabled | false | false | Whether to add UUID suffix for application id for unique. When `true`, add UUID suffix for unique application id. Currently, this only applies to Spark and MR. | 0.6.0 | | -| celeborn.client.callLifecycleManager.maxRetries | 3 | false | Max retry times for client to reserve slots. | 0.6.0 | celeborn.callLifecycleManager.maxRetries | | celeborn.client.chunk.prefetch.enabled | false | false | Whether to enable chunk prefetch when creating CelebornInputStream. | 0.6.0 | | | celeborn.client.closeIdleConnections | true | false | Whether client will close idle connections. | 0.3.0 | | | celeborn.client.commitFiles.ignoreExcludedWorker | false | false | When true, LifecycleManager will skip workers which are in the excluded list. | 0.3.0 | | @@ -81,7 +80,7 @@ license: | | celeborn.client.rpc.cache.size | 256 | false | The max cache items count for rpc cache. | 0.3.0 | celeborn.rpc.cache.size | | celeborn.client.rpc.commitFiles.askTimeout | <value of celeborn.rpc.askTimeout> | false | Timeout for CommitHandler commit files. | 0.4.1 | | | celeborn.client.rpc.getReducerFileGroup.askTimeout | <value of celeborn.rpc.askTimeout> | false | Timeout for ask operations during getting reducer file group information. During this process, there are `celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities for committing files and 1 times for releasing slots request. User can customize this value according to your setting. | 0.2.0 | | -| celeborn.client.rpc.maxRetries | 3 | false | Max RPC retry times in LifecycleManager. | 0.3.2 | | +| celeborn.client.rpc.maxRetries | 3 | false | Max RPC retry times in client. | 0.3.2 | | | celeborn.client.rpc.registerShuffle.askTimeout | <value of celeborn.rpc.askTimeout> | false | Timeout for ask operations during register shuffle. During this process, there are two times for retry opportunities for requesting slots, one request for establishing a connection with Worker and `celeborn.client.reserveSlots.maxRetries` times for retry opportunities for reserving slots. User can customize this value according to your setting. | 0.3.0 | celeborn.rpc.registerShuffle.askTimeout | | celeborn.client.rpc.requestPartition.askTimeout | <value of celeborn.rpc.askTimeout> | false | Timeout for ask operations during requesting change partition location, such as reviving or splitting partition. During this process, there are `celeborn.client.reserveSlots.maxRetries` times for retry opportunities for reserving slots. User can customize this value according to your setting. | 0.2.0 | | | celeborn.client.rpc.reserveSlots.askTimeout | <value of celeborn.rpc.askTimeout> | false | Timeout for LifecycleManager request reserve slots. | 0.3.0 | | diff --git a/docs/configuration/network.md b/docs/configuration/network.md index 8d8ea6d5f5a..282d42e197c 100644 --- a/docs/configuration/network.md +++ b/docs/configuration/network.md @@ -29,7 +29,7 @@ license: | | celeborn.<module>.io.enableVerboseMetrics | false | false | Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked. | | | | celeborn.<module>.io.lazyFD | true | false | Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only when data is going to be transferred. This can reduce the number of open files. If setting to `fetch`, it works for worker fetch server. | | | | celeborn.<module>.io.maxRetries | 3 | false | Max number of times we will try IO exceptions (such as connection timeouts) per request. If set to 0, we will not do any retries. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. If setting to `push`, it works for Flink shuffle client push data. | | | -| celeborn.<module>.io.mode | EPOLL | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | | +| celeborn.<module>.io.mode | NIO | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | | | celeborn.<module>.io.numConnectionsPerPeer | 1 | false | Number of concurrent connections between two nodes. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. | | | | celeborn.<module>.io.preferDirectBufs | true | false | If true, we will prefer allocating off-heap byte buffers within Netty. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | | | | celeborn.<module>.io.receiveBuffer | 0b | false | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps buffer size should be ~ 1.25MB. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | 0.2.0 | | @@ -56,9 +56,9 @@ license: | | celeborn.rpc.inbox.capacity | 0 | false | Specifies size of the in memory bounded capacity. | 0.5.0 | | | celeborn.rpc.io.threads | <undefined> | false | Netty IO thread number of NettyRpcEnv to handle RPC request. The default threads number is the number of runtime available processors. | 0.2.0 | | | celeborn.rpc.lookupTimeout | 30s | false | Timeout for RPC lookup operations. | 0.2.0 | | +| celeborn.rpc.retryWait | 1s | false | Wait time before next retry on RpcTimeoutException. | 0.6.0 | | | celeborn.rpc.slow.interval | <undefined> | false | min interval (ms) for RPC framework to log slow RPC | 0.6.0 | | | celeborn.rpc.slow.threshold | 1s | false | threshold for RPC framework to log slow RPC | 0.6.0 | | -| celeborn.rpc.timeoutRetryWait | 1s | false | Wait time before next retry if RpcTimeoutException. | 0.6.0 | | | celeborn.shuffle.io.maxChunksBeingTransferred | <undefined> | false | The max number of chunks allowed to be transferred at the same time on shuffle service. Note that new incoming connections will be closed when the max number is hit. The client will retry according to the shuffle retry configs (see `celeborn..io.maxRetries` and `celeborn..io.retryWait`), if those limits are reached the task will fail with fetch failure. | 0.2.0 | | | celeborn.ssl.<module>.enabled | false | false | Enables SSL for securing wire traffic. | 0.5.0 | | | celeborn.ssl.<module>.enabledAlgorithms | <undefined> | false | A comma-separated list of ciphers. The specified ciphers must be supported by JVM.
The reference list of protocols can be found in the "JSSE Cipher Suite Names" section of the Java security guide. The list for Java 11, for example, can be found at [this page](https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#jsse-cipher-suite-names)
Note: If not set, the default cipher suite for the JRE will be used | 0.5.0 | | From 015fbb3b509641ac29d06988b3c9b0a2d07180a5 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Tue, 24 Dec 2024 10:19:38 +0800 Subject: [PATCH 20/24] networker config --- docs/configuration/network.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/network.md b/docs/configuration/network.md index 282d42e197c..14ec3431a23 100644 --- a/docs/configuration/network.md +++ b/docs/configuration/network.md @@ -29,7 +29,7 @@ license: | | celeborn.<module>.io.enableVerboseMetrics | false | false | Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked. | | | | celeborn.<module>.io.lazyFD | true | false | Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only when data is going to be transferred. This can reduce the number of open files. If setting to `fetch`, it works for worker fetch server. | | | | celeborn.<module>.io.maxRetries | 3 | false | Max number of times we will try IO exceptions (such as connection timeouts) per request. If set to 0, we will not do any retries. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. If setting to `push`, it works for Flink shuffle client push data. | | | -| celeborn.<module>.io.mode | NIO | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | | +| celeborn.<module>.io.mode | EPOLL | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | | | celeborn.<module>.io.numConnectionsPerPeer | 1 | false | Number of concurrent connections between two nodes. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. | | | | celeborn.<module>.io.preferDirectBufs | true | false | If true, we will prefer allocating off-heap byte buffers within Netty. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | | | | celeborn.<module>.io.receiveBuffer | 0b | false | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps buffer size should be ~ 1.25MB. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | 0.2.0 | | From f317d6d737fe5727df6243a35cc5eefe0b5c0538 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Tue, 24 Dec 2024 10:22:27 +0800 Subject: [PATCH 21/24] revert import change by reformat --- .../java/org/apache/celeborn/client/ShuffleClientImpl.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index 2ef695c426d..49876e7a2f3 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -20,7 +20,10 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import scala.Tuple2; import scala.reflect.ClassTag$; From ee720f35fd4b88fff633303b4a3844bfcf976f8d Mon Sep 17 00:00:00 2001 From: zhengtao Date: Mon, 20 Jan 2025 11:44:01 +0800 Subject: [PATCH 22/24] update client-specified retryWait --- .../celeborn/client/ShuffleClientImpl.java | 10 +++++++++- .../celeborn/client/ShuffleClientSuiteJ.java | 2 +- .../apache/celeborn/common/CelebornConf.scala | 19 ++++++++++++++----- .../celeborn/common/rpc/RpcEndpointRef.scala | 14 +++++++++----- .../apache/celeborn/common/rpc/RpcEnv.scala | 7 ++++--- docs/configuration/client.md | 1 + docs/configuration/network.md | 4 ++-- 7 files changed, 40 insertions(+), 17 deletions(-) diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index 49876e7a2f3..8dd9379b9a0 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -83,6 +83,7 @@ public class ShuffleClientImpl extends ShuffleClient { private final int registerShuffleMaxRetries; private final long registerShuffleRetryWaitMs; private final int rpcMaxRetries; + private final long rpcRetryWait; private final int maxReviveTimes; private final boolean testRetryRevive; private final int pushBufferMaxSize; @@ -182,6 +183,7 @@ public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier u registerShuffleMaxRetries = conf.clientRegisterShuffleMaxRetry(); registerShuffleRetryWaitMs = conf.clientRegisterShuffleRetryWaitMs(); rpcMaxRetries = conf.clientRpcMaxRetries(); + rpcRetryWait = conf.clientRpcRetryWait(); maxReviveTimes = conf.clientPushMaxReviveTimes(); testRetryRevive = conf.testRetryRevive(); pushBufferMaxSize = conf.clientPushBufferMaxSize(); @@ -538,6 +540,7 @@ private ConcurrentHashMap registerShuffle( RegisterShuffle$.MODULE$.apply(shuffleId, numMappers, numPartitions), conf.clientRpcRegisterShuffleAskTimeout(), rpcMaxRetries, + rpcRetryWait, ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class))); } @@ -1712,6 +1715,7 @@ private void mapEndInternal( lifecycleManagerRef.askSync( new MapperEnd(shuffleId, mapId, attemptId, numMappers, partitionId), rpcMaxRetries, + rpcRetryWait, ClassTag$.MODULE$.apply(MapperEndResponse.class)); if (response.status() != StatusCode.SUCCESS) { throw new CelebornIOException("MapperEnd failed! StatusCode: " + response.status()); @@ -1762,6 +1766,7 @@ protected Tuple2 loadFileGroupInternal( getReducerFileGroup, conf.clientRpcGetReducerFileGroupAskTimeout(), rpcMaxRetries, + rpcRetryWait, ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class)); switch (response.status()) { case SUCCESS: @@ -1931,7 +1936,10 @@ public void setupLifecycleManagerRef(String host, int port) { try { lifecycleManagerRef = rpcEnv.setupEndpointRef( - new RpcAddress(host, port), RpcNameConstants.LIFECYCLE_MANAGER_EP, rpcMaxRetries); + new RpcAddress(host, port), + RpcNameConstants.LIFECYCLE_MANAGER_EP, + rpcMaxRetries, + rpcRetryWait); } catch (Exception e) { throw new CelebornRuntimeException("setupLifecycleManagerRef failed!", e); } diff --git a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java index 183878cbe1e..651ff6bdb04 100644 --- a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java +++ b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java @@ -249,7 +249,7 @@ private CelebornConf setupEnv( RegisterShuffleResponse$.MODULE$.apply( statusCode, new PartitionLocation[] {primaryLocation})); - when(endpointRef.askSync(any(), any(), any(Integer.class), any())) + when(endpointRef.askSync(any(), any(), any(Integer.class), any(Long.class), any())) .thenAnswer( t -> RegisterShuffleResponse$.MODULE$.apply( diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 3496d7cf933..8fdee9df793 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -520,7 +520,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se new RpcTimeout(get(RPC_LOOKUP_TIMEOUT).milli, RPC_LOOKUP_TIMEOUT.key) def rpcAskTimeout: RpcTimeout = new RpcTimeout(get(RPC_ASK_TIMEOUT).milli, RPC_ASK_TIMEOUT.key) - def rpcTimeoutRetryWaitMs: Long = get(RPC_TIMEOUT_RETRY_WAIT) + def rpcRetryWaitMs: Long = get(RPC_RETRY_WAIT) def rpcInMemoryBoundedInboxCapacity(): Int = { get(RPC_INBOX_CAPACITY) } @@ -1012,6 +1012,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def clientRpcCacheExpireTime: Long = get(CLIENT_RPC_CACHE_EXPIRE_TIME) def clientRpcSharedThreads: Int = get(CLIENT_RPC_SHARED_THREADS) def clientRpcMaxRetries: Int = get(CLIENT_RPC_MAX_RETIRES) + def clientRpcRetryWait: Long = get(CLIENT_RPC_RETRY_WAIT) def pushDataTimeoutMs: Long = get(CLIENT_PUSH_DATA_TIMEOUT) def clientPushLimitStrategy: String = get(CLIENT_PUSH_LIMIT_STRATEGY) def clientPushSlowStartInitialSleepTime: Long = get(CLIENT_PUSH_SLOW_START_INITIAL_SLEEP_TIME) @@ -1868,6 +1869,14 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("60s") + val RPC_RETRY_WAIT: ConfigEntry[Long] = + buildConf("celeborn.rpc.retryWait") + .categories("network") + .version("0.6.0") + .doc("Time to wait before next retry on RpcTimeoutException.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") + val RPC_DISPATCHER_THREADS: ConfigEntry[Int] = buildConf("celeborn.rpc.dispatcher.threads") .withAlternative("celeborn.rpc.dispatcher.numThreads") @@ -4885,11 +4894,11 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("3s") - val RPC_TIMEOUT_RETRY_WAIT: ConfigEntry[Long] = - buildConf("celeborn.rpc.retryWait") - .categories("network") + val CLIENT_RPC_RETRY_WAIT: ConfigEntry[Long] = + buildConf("celeborn.client.rpc.retryWait") + .categories("client") .version("0.6.0") - .doc("Wait time before next retry on RpcTimeoutException.") + .doc("Client-specified time to wait before next retry on RpcTimeoutException.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1s") diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala index a608b97ad09..01af34da510 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala @@ -33,7 +33,7 @@ abstract class RpcEndpointRef(conf: CelebornConf) extends Serializable with Logging { private[this] val defaultAskTimeout = conf.rpcAskTimeout - private[celeborn] val waitTimeBound = conf.rpcTimeoutRetryWaitMs.toInt + private[this] val defaultRetryWait = conf.rpcRetryWaitMs /** * return the address for the [[RpcEndpointRef]] @@ -104,8 +104,8 @@ abstract class RpcEndpointRef(conf: CelebornConf) * @tparam T type of the reply message * @return the reply message from the corresponding [[RpcEndpoint]] */ - def askSync[T: ClassTag](message: Any, retryCount: Int): T = - askSync(message, defaultAskTimeout, retryCount) + def askSync[T: ClassTag](message: Any, retryCount: Int, retryWait: Long = defaultRetryWait): T = + askSync(message, defaultAskTimeout, retryCount, retryWait) /** * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a @@ -119,7 +119,11 @@ abstract class RpcEndpointRef(conf: CelebornConf) * @tparam T type of the reply message * @return the reply message from the corresponding [[RpcEndpoint]] */ - def askSync[T: ClassTag](message: Any, timeout: RpcTimeout, retryCount: Int): T = { + def askSync[T: ClassTag]( + message: Any, + timeout: RpcTimeout, + retryCount: Int, + retryWait: Long): T = { var numRetries = retryCount while (numRetries > 0) { numRetries -= 1 @@ -130,7 +134,7 @@ abstract class RpcEndpointRef(conf: CelebornConf) case e: RpcTimeoutException => if (numRetries > 0) { val random = new Random - val retryWaitMs = random.nextInt(waitTimeBound) + val retryWaitMs = random.nextInt(retryWait.toInt) try { TimeUnit.MILLISECONDS.sleep(retryWaitMs) } catch { diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala index 58d29a4fa3d..96059a8c984 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala @@ -106,7 +106,7 @@ object RpcEnv { abstract class RpcEnv(config: RpcEnvConfig) { private[celeborn] val defaultLookupTimeout = config.conf.rpcLookupTimeout - private[celeborn] val waitTimeBound = config.conf.rpcTimeoutRetryWaitMs.toInt + private[celeborn] val defaultRetryWait = config.conf.rpcRetryWaitMs /** * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement @@ -152,7 +152,8 @@ abstract class RpcEnv(config: RpcEnvConfig) { def setupEndpointRef( address: RpcAddress, endpointName: String, - retryCount: Int): RpcEndpointRef = { + retryCount: Int, + retryWait: Long = defaultRetryWait): RpcEndpointRef = { var numRetries = retryCount while (numRetries > 0) { numRetries -= 1 @@ -162,7 +163,7 @@ abstract class RpcEnv(config: RpcEnvConfig) { case e: RpcTimeoutException => if (numRetries > 0) { val random = new Random - val retryWaitMs = random.nextInt(waitTimeBound) + val retryWaitMs = random.nextInt(retryWait.toInt) try { TimeUnit.MILLISECONDS.sleep(retryWaitMs) } catch { diff --git a/docs/configuration/client.md b/docs/configuration/client.md index 479d16de778..8dc83601be2 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -84,6 +84,7 @@ license: | | celeborn.client.rpc.registerShuffle.askTimeout | <value of celeborn.rpc.askTimeout> | false | Timeout for ask operations during register shuffle. During this process, there are two times for retry opportunities for requesting slots, one request for establishing a connection with Worker and `celeborn.client.reserveSlots.maxRetries` times for retry opportunities for reserving slots. User can customize this value according to your setting. | 0.3.0 | celeborn.rpc.registerShuffle.askTimeout | | celeborn.client.rpc.requestPartition.askTimeout | <value of celeborn.rpc.askTimeout> | false | Timeout for ask operations during requesting change partition location, such as reviving or splitting partition. During this process, there are `celeborn.client.reserveSlots.maxRetries` times for retry opportunities for reserving slots. User can customize this value according to your setting. | 0.2.0 | | | celeborn.client.rpc.reserveSlots.askTimeout | <value of celeborn.rpc.askTimeout> | false | Timeout for LifecycleManager request reserve slots. | 0.3.0 | | +| celeborn.client.rpc.retryWait | 1s | false | Client-specified time to wait before next retry on RpcTimeoutException. | 0.6.0 | | | celeborn.client.rpc.shared.threads | 16 | false | Number of shared rpc threads in LifecycleManager. | 0.3.2 | | | celeborn.client.shuffle.batchHandleChangePartition.interval | 100ms | false | Interval for LifecycleManager to schedule handling change partition requests in batch. | 0.3.0 | celeborn.shuffle.batchHandleChangePartition.interval | | celeborn.client.shuffle.batchHandleChangePartition.partitionBuckets | 256 | false | Max number of change partition requests which can be concurrently processed. | 0.5.0 | | diff --git a/docs/configuration/network.md b/docs/configuration/network.md index 14ec3431a23..46861ab0c6c 100644 --- a/docs/configuration/network.md +++ b/docs/configuration/network.md @@ -29,7 +29,7 @@ license: | | celeborn.<module>.io.enableVerboseMetrics | false | false | Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked. | | | | celeborn.<module>.io.lazyFD | true | false | Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only when data is going to be transferred. This can reduce the number of open files. If setting to `fetch`, it works for worker fetch server. | | | | celeborn.<module>.io.maxRetries | 3 | false | Max number of times we will try IO exceptions (such as connection timeouts) per request. If set to 0, we will not do any retries. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. If setting to `push`, it works for Flink shuffle client push data. | | | -| celeborn.<module>.io.mode | EPOLL | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | | +| celeborn.<module>.io.mode | NIO | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | | | celeborn.<module>.io.numConnectionsPerPeer | 1 | false | Number of concurrent connections between two nodes. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. | | | | celeborn.<module>.io.preferDirectBufs | true | false | If true, we will prefer allocating off-heap byte buffers within Netty. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | | | | celeborn.<module>.io.receiveBuffer | 0b | false | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps buffer size should be ~ 1.25MB. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | 0.2.0 | | @@ -56,7 +56,7 @@ license: | | celeborn.rpc.inbox.capacity | 0 | false | Specifies size of the in memory bounded capacity. | 0.5.0 | | | celeborn.rpc.io.threads | <undefined> | false | Netty IO thread number of NettyRpcEnv to handle RPC request. The default threads number is the number of runtime available processors. | 0.2.0 | | | celeborn.rpc.lookupTimeout | 30s | false | Timeout for RPC lookup operations. | 0.2.0 | | -| celeborn.rpc.retryWait | 1s | false | Wait time before next retry on RpcTimeoutException. | 0.6.0 | | +| celeborn.rpc.retryWait | 1s | false | Time to wait before next retry on RpcTimeoutException. | 0.6.0 | | | celeborn.rpc.slow.interval | <undefined> | false | min interval (ms) for RPC framework to log slow RPC | 0.6.0 | | | celeborn.rpc.slow.threshold | 1s | false | threshold for RPC framework to log slow RPC | 0.6.0 | | | celeborn.shuffle.io.maxChunksBeingTransferred | <undefined> | false | The max number of chunks allowed to be transferred at the same time on shuffle service. Note that new incoming connections will be closed when the max number is hit. The client will retry according to the shuffle retry configs (see `celeborn..io.maxRetries` and `celeborn..io.retryWait`), if those limits are reached the task will fail with fetch failure. | 0.2.0 | | From ebecf6661711087030a08a7a8d053ad53bead70b Mon Sep 17 00:00:00 2001 From: zhengtao Date: Mon, 20 Jan 2025 11:49:02 +0800 Subject: [PATCH 23/24] network md file change --- docs/configuration/network.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/network.md b/docs/configuration/network.md index 46861ab0c6c..47db250201b 100644 --- a/docs/configuration/network.md +++ b/docs/configuration/network.md @@ -29,7 +29,7 @@ license: | | celeborn.<module>.io.enableVerboseMetrics | false | false | Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked. | | | | celeborn.<module>.io.lazyFD | true | false | Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only when data is going to be transferred. This can reduce the number of open files. If setting to `fetch`, it works for worker fetch server. | | | | celeborn.<module>.io.maxRetries | 3 | false | Max number of times we will try IO exceptions (such as connection timeouts) per request. If set to 0, we will not do any retries. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. If setting to `push`, it works for Flink shuffle client push data. | | | -| celeborn.<module>.io.mode | NIO | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | | +| celeborn.<module>.io.mode | EPOLL | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | | | celeborn.<module>.io.numConnectionsPerPeer | 1 | false | Number of concurrent connections between two nodes. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. | | | | celeborn.<module>.io.preferDirectBufs | true | false | If true, we will prefer allocating off-heap byte buffers within Netty. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | | | | celeborn.<module>.io.receiveBuffer | 0b | false | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps buffer size should be ~ 1.25MB. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | 0.2.0 | | From 09a7cdb700add410ccce0cd89ea510ef7d68082d Mon Sep 17 00:00:00 2001 From: zhengtao Date: Mon, 20 Jan 2025 11:51:47 +0800 Subject: [PATCH 24/24] change waitTime to same name --- .../main/scala/org/apache/celeborn/common/CelebornConf.scala | 2 +- .../scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala | 2 +- .../src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 8fdee9df793..ff258ef7b54 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -520,7 +520,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se new RpcTimeout(get(RPC_LOOKUP_TIMEOUT).milli, RPC_LOOKUP_TIMEOUT.key) def rpcAskTimeout: RpcTimeout = new RpcTimeout(get(RPC_ASK_TIMEOUT).milli, RPC_ASK_TIMEOUT.key) - def rpcRetryWaitMs: Long = get(RPC_RETRY_WAIT) + def rpcRetryWait: Long = get(RPC_RETRY_WAIT) def rpcInMemoryBoundedInboxCapacity(): Int = { get(RPC_INBOX_CAPACITY) } diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala index 01af34da510..76e3e7031ce 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala @@ -33,7 +33,7 @@ abstract class RpcEndpointRef(conf: CelebornConf) extends Serializable with Logging { private[this] val defaultAskTimeout = conf.rpcAskTimeout - private[this] val defaultRetryWait = conf.rpcRetryWaitMs + private[this] val defaultRetryWait = conf.rpcRetryWait /** * return the address for the [[RpcEndpointRef]] diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala index 96059a8c984..9af1ce39a0e 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala @@ -106,7 +106,7 @@ object RpcEnv { abstract class RpcEnv(config: RpcEnvConfig) { private[celeborn] val defaultLookupTimeout = config.conf.rpcLookupTimeout - private[celeborn] val defaultRetryWait = config.conf.rpcRetryWaitMs + private[celeborn] val defaultRetryWait = config.conf.rpcRetryWait /** * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement