Skip to content

Commit

Permalink
[CELEBORN-1398] Support return leader ip to client
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
As title

### Why are the changes needed?
Currently, if accessing services of a Celeborn cluster across Kubernetes clusters, one may encounter DNS resolution issues. However, connectivity may be achieved through IP addresses when combined with the Kubernetes setting hostNetwork=true for clients from different clusters. At present, the `celeborn.network.bind.preferIpAddress` configuration is only effective on worker nodes. This PR will enable the feature of returning the leader's IP when accessing the master node.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GA

Closes apache#2489 from RexXiong/CELEBORN-1398.

Authored-by: Shuang <[email protected]>
Signed-off-by: Shuang <[email protected]>
  • Loading branch information
RexXiong committed May 8, 2024
1 parent 9a9abfe commit 993d3f2
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import javax.annotation.Nullable;

import scala.Tuple2;

import org.apache.commons.lang3.StringUtils;

public class MasterNotLeaderException extends IOException {
Expand All @@ -34,19 +36,25 @@ public class MasterNotLeaderException extends IOException {

public MasterNotLeaderException(
String currentPeer, String suggestedLeaderPeer, @Nullable Throwable cause) {
this(currentPeer, suggestedLeaderPeer, suggestedLeaderPeer, cause);
this(
currentPeer,
Tuple2.apply(suggestedLeaderPeer, suggestedLeaderPeer),
Tuple2.apply(suggestedLeaderPeer, suggestedLeaderPeer),
false,
cause);
}

public MasterNotLeaderException(
String currentPeer,
String suggestedLeaderPeer,
String suggestedInternalLeaderPeer,
Tuple2<String, String> suggestedLeaderPeer,
Tuple2<String, String> suggestedInternalLeaderPeer,
boolean bindPreferIp,
@Nullable Throwable cause) {
super(
String.format(
"Master:%s is not the leader.%s%s",
currentPeer,
currentPeer.equals(suggestedLeaderPeer)
currentPeer.equals(suggestedLeaderPeer._1)
? StringUtils.EMPTY
: String.format(
" Suggested leader is Master:%s (%s).",
Expand All @@ -55,8 +63,9 @@ public MasterNotLeaderException(
? StringUtils.EMPTY
: String.format(" Exception:%s.", cause.getMessage())),
cause);
this.leaderPeer = suggestedLeaderPeer;
this.internalLeaderPeer = suggestedInternalLeaderPeer;
this.leaderPeer = bindPreferIp ? suggestedLeaderPeer._1 : suggestedLeaderPeer._2;
this.internalLeaderPeer =
bindPreferIp ? suggestedInternalLeaderPeer._1 : suggestedInternalLeaderPeer._2;
}

public String getSuggestedLeaderAddress() {
Expand Down
21 changes: 21 additions & 0 deletions common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,27 @@ object Utils extends Logging {
}
}

private def getIpHostAddressPair(host: String): (String, String) = {
try {
val inetAddress = InetAddress.getByName(host)
val hostAddress = inetAddress.getHostAddress
if (host.equals(hostAddress)) {
(hostAddress, inetAddress.getCanonicalHostName)
} else {
(hostAddress, host)
}
} catch {
case _: Throwable => (host, host) // return original input
}
}

// Convert address (ip:port or host:port) to (ip:port, host:port) pair
def addressToIpHostAddressPair(address: String): (String, String) = {
val (host, port) = Utils.parseHostPort(address)
val (_ip, _host) = Utils.getIpHostAddressPair(host)
(_ip + ":" + port, _host + ":" + port)
}

def checkHostPort(hostPort: String): Unit = {
if (hostPort != null && hostPort.split(":").length > 2) {
assert(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import java.io.IOException;
import java.util.Optional;

import scala.Tuple2;

import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
Expand All @@ -37,29 +35,31 @@
public class HAHelper {

public static boolean checkShouldProcess(
RpcCallContext context, AbstractMetaManager masterStatusSystem) {
RpcCallContext context, AbstractMetaManager masterStatusSystem, boolean bindPreferIp) {
HARaftServer ratisServer = getRatisServer(masterStatusSystem);
if (ratisServer != null) {
if (ratisServer.isLeader()) {
return true;
}
sendFailure(context, ratisServer, null);
sendFailure(context, ratisServer, null, bindPreferIp);
return false;
}
return true;
}

public static void sendFailure(
RpcCallContext context, HARaftServer ratisServer, Throwable cause) {
RpcCallContext context, HARaftServer ratisServer, Throwable cause, boolean bindPreferIp) {
if (context != null) {
if (ratisServer != null) {
Optional<Tuple2<String, String>> leaderPeer = ratisServer.getCachedLeaderPeerRpcEndpoint();
Optional<HARaftServer.LeaderPeerEndpoints> leaderPeer =
ratisServer.getCachedLeaderPeerRpcEndpoint();
if (leaderPeer.isPresent()) {
context.sendFailure(
new MasterNotLeaderException(
ratisServer.getRpcEndpoint(),
leaderPeer.get()._1(),
leaderPeer.get()._2(),
leaderPeer.get().rpcEndpoints,
leaderPeer.get().rpcInternalEndpoints,
bindPreferIp,
cause));
} else {
context.sendFailure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.celeborn.common.client.MasterClient;
import org.apache.celeborn.common.exception.CelebornRuntimeException;
import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceResponse;

Expand All @@ -72,6 +73,7 @@ static long nextCallId() {
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
}

private final MasterNode localNode;
private final InetSocketAddress ratisAddr;
private final String rpcEndpoint;
private final String internalRpcEndpoint;
Expand All @@ -90,7 +92,8 @@ static long nextCallId() {
private long roleCheckIntervalMs;
private final ReentrantReadWriteLock roleCheckLock = new ReentrantReadWriteLock();
private Optional<RaftProtos.RaftPeerRole> cachedPeerRole = Optional.empty();
private Optional<Tuple2<String, String>> cachedLeaderPeerRpcEndpoints = Optional.empty();
private Optional<LeaderPeerEndpoints> cachedLeaderPeerRpcEndpoints = Optional.empty();

private final CelebornConf conf;
private long workerTimeoutDeadline;
private long appTimeoutDeadline;
Expand All @@ -100,23 +103,22 @@ static long nextCallId() {
*
* @param conf configuration
* @param localRaftPeerId raft peer id of this Ratis server
* @param ratisAddr address of the ratis server
* @param localNode local node of this Ratis server
* @param raftPeers peer nodes in the raft ring
* @throws IOException
*/
private HARaftServer(
MetaHandler metaHandler,
CelebornConf conf,
RaftPeerId localRaftPeerId,
InetSocketAddress ratisAddr,
String rpcEndpoint,
String internalRpcEndpoint,
MasterNode localNode,
List<RaftPeer> raftPeers)
throws IOException {
this.metaHandler = metaHandler;
this.ratisAddr = ratisAddr;
this.rpcEndpoint = rpcEndpoint;
this.internalRpcEndpoint = internalRpcEndpoint;
this.localNode = localNode;
this.ratisAddr = localNode.ratisAddr();
this.rpcEndpoint = localNode.rpcEndpoint();
this.internalRpcEndpoint = localNode.internalRpcEndpoint();
this.raftPeerId = localRaftPeerId;
this.raftGroup = RaftGroup.valueOf(RAFT_GROUP_ID, raftPeers);
this.masterStateMachine = getStateMachine();
Expand Down Expand Up @@ -201,14 +203,8 @@ public static HARaftServer newMasterRatisServer(
// Add other nodes belonging to the same service to the Ratis ring
raftPeers.add(raftPeer);
});
return new HARaftServer(
metaHandler,
conf,
localRaftPeerId,
ratisAddr,
localNode.rpcEndpoint(),
localNode.internalRpcEndpoint(),
raftPeers);

return new HARaftServer(metaHandler, conf, localRaftPeerId, localNode, raftPeers);
}

public ResourceResponse submitRequest(ResourceProtos.ResourceRequest request)
Expand Down Expand Up @@ -436,9 +432,9 @@ public boolean isLeader() {
/**
* Get the suggested leader peer id.
*
* @return RaftPeerId of the suggested leader node - Tuple2(rpc endpoint, internal rpc endpoint)
* @return RaftPeerId of the suggested leader node - Optional<LeaderPeerEndpoints>
*/
public Optional<Tuple2<String, String>> getCachedLeaderPeerRpcEndpoint() {
public Optional<LeaderPeerEndpoints> getCachedLeaderPeerRpcEndpoint() {
this.roleCheckLock.readLock().lock();
try {
return cachedLeaderPeerRpcEndpoints;
Expand All @@ -455,23 +451,29 @@ public void updateServerRole() {
GroupInfoReply groupInfo = getGroupInfo();
RaftProtos.RoleInfoProto roleInfoProto = groupInfo.getRoleInfoProto();
RaftProtos.RaftPeerRole thisNodeRole = roleInfoProto.getRole();

Tuple2<String, String> leaderPeerRpcEndpoint = null;
Tuple2<String, String> leaderPeerInternalRpcEndpoint = null;
if (thisNodeRole.equals(RaftProtos.RaftPeerRole.LEADER)) {
setServerRole(thisNodeRole, getRpcEndpoint(), getInternalRpcEndpoint());
// Current Node always uses original rpcEndpoint/internalRpcEndpoint, as if something wrong
// they would never return to client.
setServerRole(
thisNodeRole,
Tuple2.apply(this.rpcEndpoint, this.rpcEndpoint),
Tuple2.apply(this.internalRpcEndpoint, this.internalRpcEndpoint));
} else if (thisNodeRole.equals(RaftProtos.RaftPeerRole.FOLLOWER)) {
ByteString leaderNodeId = roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getId();
// There may be a chance, here we get leaderNodeId as null. For
// example, in 3 node Ratis, if 2 nodes are down, there will
// be no leader.
String leaderPeerRpcEndpoint = null;
String leaderPeerInternalRpcEndpoint = null;
if (leaderNodeId != null && !leaderNodeId.isEmpty()) {
leaderPeerRpcEndpoint =
String clientAddress =
roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getClientAddress();
leaderPeerRpcEndpoint = Utils.addressToIpHostAddressPair(clientAddress);
// We use admin address to host the internal rpc address
if (conf.internalPortEnabled()) {
leaderPeerInternalRpcEndpoint =
String adminAddress =
roleInfoProto.getFollowerInfo().getLeaderInfo().getId().getAdminAddress();
leaderPeerInternalRpcEndpoint = Utils.addressToIpHostAddressPair(adminAddress);
} else {
leaderPeerInternalRpcEndpoint = leaderPeerRpcEndpoint;
}
Expand All @@ -495,8 +497,8 @@ public void updateServerRole() {
/** Set the current server role and the leader peer rpc endpoint. */
private void setServerRole(
RaftProtos.RaftPeerRole currentRole,
String leaderPeerRpcEndpoint,
String leaderPeerInternalRpcEndpoint) {
Tuple2<String, String> leaderPeerRpcEndpoint,
Tuple2<String, String> leaderPeerInternalRpcEndpoint) {
this.roleCheckLock.writeLock().lock();
try {
boolean leaderChanged = false;
Expand All @@ -518,7 +520,8 @@ private void setServerRole(
this.cachedPeerRole = Optional.ofNullable(currentRole);
if (null != leaderPeerRpcEndpoint) {
this.cachedLeaderPeerRpcEndpoints =
Optional.of(Tuple2.apply(leaderPeerRpcEndpoint, leaderPeerInternalRpcEndpoint));
Optional.of(
new LeaderPeerEndpoints(leaderPeerRpcEndpoint, leaderPeerInternalRpcEndpoint));
} else {
this.cachedLeaderPeerRpcEndpoints = Optional.empty();
}
Expand Down Expand Up @@ -578,4 +581,18 @@ public long getWorkerTimeoutDeadline() {
public long getAppTimeoutDeadline() {
return appTimeoutDeadline;
}

public static class LeaderPeerEndpoints {
// the rpcEndpoints Tuple2 (ip:port, host:port)
public final Tuple2<String, String> rpcEndpoints;

// the rpcInternalEndpoints Tuple2 (ip:port, host:port)
public final Tuple2<String, String> rpcInternalEndpoints;

public LeaderPeerEndpoints(
Tuple2<String, String> rpcEndpoints, Tuple2<String, String> rpcInternalEndpoints) {
this.rpcEndpoints = rpcEndpoints;
this.rpcInternalEndpoints = rpcInternalEndpoints;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ private[celeborn] class Master(
metricsSystem.registerSource(new JVMCPUSource(conf, MetricsSystem.ROLE_MASTER))
metricsSystem.registerSource(new SystemMiscSource(conf, MetricsSystem.ROLE_MASTER))

private val bindPreferIP: Boolean = conf.bindPreferIP
private val authEnabled = conf.authEnabled
private val secretRegistry = new MasterSecretRegistryImpl()
private val sendApplicationMetaThreads = conf.masterSendApplicationMetaThreads
Expand Down Expand Up @@ -358,12 +359,12 @@ private[celeborn] class Master(
}

def executeWithLeaderChecker[T](context: RpcCallContext, f: => T): Unit =
if (HAHelper.checkShouldProcess(context, statusSystem)) {
if (HAHelper.checkShouldProcess(context, statusSystem, bindPreferIP)) {
try {
f
} catch {
case e: Exception =>
HAHelper.sendFailure(context, HAHelper.getRatisServer(statusSystem), e)
HAHelper.sendFailure(context, HAHelper.getRatisServer(statusSystem), e, bindPreferIP)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@ case class MasterNode(

def rpcEndpoint: String = rpcHost + ":" + rpcPort

def rpcIpEndpoint: String = rpcAddr.getAddress.getHostAddress + ":" + rpcPort

def internalRpcEndpoint: String = rpcHost + ":" + internalRpcPort

def internalRpcIpEndpoint: String = rpcAddr.getAddress.getHostAddress + ":" + rpcPort

lazy val ratisAddr = MasterNode.createSocketAddr(ratisHost, ratisPort)

lazy val rpcAddr = MasterNode.createSocketAddr(rpcHost, rpcPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public static void resetRaftServer() throws IOException, InterruptedException {
}

@Test
public void testLeaderAvaiable() {
public void testLeaderAvailable() {
boolean hasLeader =
RATISSERVER1.isLeader() || RATISSERVER2.isLeader() || RATISSERVER3.isLeader();
Assert.assertTrue(hasLeader);
Expand All @@ -201,12 +201,22 @@ public void testLeaderAvaiable() {
boolean isFollowerCurrentLeader = follower.isLeader();
Assert.assertFalse(isFollowerCurrentLeader);

Optional<Tuple2<String, String>> cachedLeaderPeerRpcEndpoint =
Optional<HARaftServer.LeaderPeerEndpoints> cachedLeaderPeerRpcEndpoint =
follower.getCachedLeaderPeerRpcEndpoint();

Assert.assertTrue(cachedLeaderPeerRpcEndpoint.isPresent());
Assert.assertEquals(leader.getRpcEndpoint(), cachedLeaderPeerRpcEndpoint.get()._1());
Assert.assertEquals(leader.getInternalRpcEndpoint(), cachedLeaderPeerRpcEndpoint.get()._2());

Tuple2<String, String> rpcEndpointsPair = cachedLeaderPeerRpcEndpoint.get().rpcEndpoints;
Tuple2<String, String> rpcInternalEndpointsPair =
cachedLeaderPeerRpcEndpoint.get().rpcInternalEndpoints;

// rpc endpoint may use custom host name then this ut need check ever ip/host
Assert.assertTrue(
leader.getRpcEndpoint().equals(rpcEndpointsPair._1)
|| leader.getRpcEndpoint().equals(rpcEndpointsPair._2));
Assert.assertTrue(
leader.getInternalRpcEndpoint().equals(rpcInternalEndpointsPair._1)
|| leader.getRpcEndpoint().equals(rpcInternalEndpointsPair._2));
}

private static final String HOSTNAME1 = "host1";
Expand Down

0 comments on commit 993d3f2

Please sign in to comment.