Skip to content

Commit

Permalink
NIFI-13649 Check Cluster Node Address Status before marking CONNECTED (
Browse files Browse the repository at this point in the history
…#9168)

Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
bbende authored Aug 12, 2024
1 parent b4266a5 commit e349b7e
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ public interface ClusterCoordinator {
*/
boolean isBlockedByFirewall(Set<String> nodeIdentities);

/**
* Checks if the API of the given node is reachable.
*
* @param nodeId the node id to check
* @return true if the API is reachable, false otherwise
*/
boolean isApiReachable(NodeIdentifier nodeId);

/**
* Reports that some event occurred that is relevant to the cluster
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,11 @@ private void processHeartbeat(final NodeHeartbeat heartbeat) {
return;
}

if (!clusterCoordinator.isApiReachable(nodeId)) {
logger.info("Node API Address [{}] not reachable: cluster connection request deferred pending successful network connection", nodeId);
return;
}

// connection complete
clusterCoordinator.finishNodeConnection(nodeId);
clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received first heartbeat from connecting node. Node connected.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@

import java.io.IOException;
import java.io.StringWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -97,6 +100,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl

private static final Logger logger = LoggerFactory.getLogger(NodeClusterCoordinator.class);
private static final String EVENT_CATEGORY = "Clustering";
private static final Duration NODE_API_TIMEOUT = Duration.ofSeconds(10);

private static final Pattern COUNTER_URI_PATTERN = Pattern.compile("/nifi-api/counters/[a-f0-9\\-]{36}");

Expand Down Expand Up @@ -676,6 +680,21 @@ public boolean isBlockedByFirewall(final Set<String> nodeIdentities) {
return true;
}

@Override
public boolean isApiReachable(final NodeIdentifier nodeId) {
final String apiAddress = nodeId.getApiAddress();
final int apiPort = nodeId.getApiPort();
try {
try (final Socket soc = new Socket()) {
soc.connect(new InetSocketAddress(apiAddress, apiPort), (int) NODE_API_TIMEOUT.toMillis());
}
return true;
} catch (final Exception e) {
logger.debug("Node is not reachable at API address {} and port {}", apiAddress, apiPort, e);
return false;
}
}

@Override
public void reportEvent(final NodeIdentifier nodeId, final Severity severity, final String event) {
eventReporter.reportEvent(severity, EVENT_CATEGORY, nodeId == null ? event : "Event Reported for " + nodeId + " -- " + event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,39 @@ public boolean isActiveClusterCoordinator() {
assertTrue(requestedToConnect.isEmpty());
}

@Test
public void testConnectingNodeNotMarkedConnectedWhenHeartbeatReceivedAndApiUnreachable() throws InterruptedException {
final Set<NodeIdentifier> requestedToConnect = Collections.synchronizedSet(new HashSet<>());
final Set<NodeIdentifier> connected = Collections.synchronizedSet(new HashSet<>());
final ClusterCoordinatorAdapter adapter = new ClusterCoordinatorAdapter() {
@Override
public synchronized void requestNodeConnect(final NodeIdentifier nodeId) {
super.requestNodeConnect(nodeId);
requestedToConnect.add(nodeId);
}

@Override
public synchronized void finishNodeConnection(final NodeIdentifier nodeId) {
super.finishNodeConnection(nodeId);
connected.add(nodeId);
}

@Override
public boolean isApiReachable(final NodeIdentifier nodeId) {
return false;
}
};

final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter);

adapter.requestNodeConnect(nodeId); // set state to 'connecting'
requestedToConnect.clear();

monitor.addHeartbeat(createHeartbeat(nodeId, NodeConnectionState.CONNECTED));
monitor.waitForProcessed();

assertEquals(0, connected.size());
}

private NodeHeartbeat createHeartbeat(final NodeIdentifier nodeId, final NodeConnectionState state) {
final NodeConnectionStatus status = new NodeConnectionStatus(nodeId, state);
Expand Down Expand Up @@ -259,6 +292,11 @@ public synchronized boolean isBlockedByFirewall(Set<String> nodeIds) {
return false;
}

@Override
public boolean isApiReachable(final NodeIdentifier nodeId) {
return true;
}

@Override
public synchronized void reportEvent(NodeIdentifier nodeId, Severity severity, String event) {
events.add(new ReportedEvent(nodeId, severity, event));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ public void run() {
case CLUSTER_STATUS:
logger.info("Received CLUSTER_STATUS request from Bootstrap");
final String clusterStatus = getClusterStatus();
logger.debug("Responding to CLUSTER_STATUS request from Bootstrap with {}", clusterStatus);
sendAnswer(socket.getOutputStream(), clusterStatus);
break;
case DECOMMISSION:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
Expand Down Expand Up @@ -398,8 +400,8 @@ boolean isReplicateRequest() {

ensureFlowInitialized();

// If not connected to the cluster, we do not replicate
if (!isConnectedToCluster()) {
// If not connected or not connecting to the cluster, we do not replicate
if (!isConnectedToCluster() && !isConnectingToCluster()) {
return false;
}

Expand Down Expand Up @@ -1054,6 +1056,16 @@ boolean isConnectedToCluster() {
return isClustered() && clusterCoordinator.isConnected();
}

boolean isConnectingToCluster() {
if (!isClustered()) {
return false;
}

final NodeIdentifier nodeId = clusterCoordinator.getLocalNodeIdentifier();
final NodeConnectionStatus nodeConnectionStatus = clusterCoordinator.getConnectionStatus(nodeId);
return nodeConnectionStatus != null && nodeConnectionStatus.getState() == NodeConnectionState.CONNECTING;
}

boolean isClustered() {
return clusterCoordinator != null;
}
Expand Down

0 comments on commit e349b7e

Please sign in to comment.