Skip to content

Commit

Permalink
NIFI-12453, NIFI-12454: Allow easily determining cluster state of a n…
Browse files Browse the repository at this point in the history
…ode by running nifi.sh cluster-status and allow decommissioning of nodes without shutting down
  • Loading branch information
markap14 committed Dec 5, 2023
1 parent a21993e commit fb7f794
Show file tree
Hide file tree
Showing 11 changed files with 279 additions and 17 deletions.
61 changes: 51 additions & 10 deletions nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -115,6 +116,7 @@ public class RunNiFi {
public static final String PING_CMD = "PING";
public static final String DUMP_CMD = "DUMP";
public static final String DIAGNOSTICS_CMD = "DIAGNOSTICS";
public static final String CLUSTER_STATUS_CMD = "CLUSTER_STATUS";
public static final String IS_LOADED_CMD = "IS_LOADED";
public static final String STATUS_HISTORY_CMD = "STATUS_HISTORY";

Expand Down Expand Up @@ -204,6 +206,10 @@ public static void main(String[] args) throws IOException {
dumpFile = new File(args[1]);
}
}
} else if (cmd.equalsIgnoreCase("cluster-status")) {
if (args.length > 1) {
dumpFile = new File(args[1]);
}
} else if (cmd.equalsIgnoreCase("status-history")) {
if (args.length < 2) {
System.err.printf("Wrong number of arguments: %d instead of 1 or 2, the command parameters are: " +
Expand Down Expand Up @@ -253,6 +259,7 @@ public static void main(String[] args) throws IOException {
case "status-history":
case "restart":
case "env":
case "cluster-status":
break;
default:
printUsage();
Expand All @@ -274,7 +281,8 @@ public static void main(String[] args) throws IOException {
runNiFi.stop();
break;
case "decommission":
exitStatus = runNiFi.decommission();
final boolean shutdown = args.length < 2 || !"--shutdown=false".equals(args[1]);
exitStatus = runNiFi.decommission(shutdown);
break;
case "status":
exitStatus = runNiFi.status();
Expand All @@ -296,6 +304,9 @@ public static void main(String[] args) throws IOException {
case "diagnostics":
runNiFi.diagnostics(dumpFile, verbose);
break;
case "cluster-status":
runNiFi.clusterStatus(dumpFile);
break;
case "status-history":
runNiFi.statusHistory(dumpFile, statusHistoryDays);
break;
Expand Down Expand Up @@ -659,7 +670,16 @@ public void env() {
*/
public void diagnostics(final File dumpFile, final boolean verbose) throws IOException {
final String args = verbose ? "--verbose=true" : null;
makeRequest(DIAGNOSTICS_CMD, args, dumpFile, "diagnostics information");
makeRequest(DIAGNOSTICS_CMD, args, dumpFile, null, "diagnostics information");
}

public void clusterStatus(final File dumpFile) throws IOException {
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
makeRequest(CLUSTER_STATUS_CMD, null, dumpFile, baos, "cluster status");

final String response = baos.toString(StandardCharsets.UTF_8);
System.out.println("Cluster Status: " + response);
}
}

/**
Expand All @@ -670,7 +690,7 @@ public void diagnostics(final File dumpFile, final boolean verbose) throws IOExc
* @throws IOException if any issues occur while writing the dump file
*/
public void dump(final File dumpFile) throws IOException {
makeRequest(DUMP_CMD, null, dumpFile, "thread dump");
makeRequest(DUMP_CMD, null, dumpFile, null, "thread dump");
}

/**
Expand All @@ -681,7 +701,7 @@ public void dump(final File dumpFile) throws IOException {
*/
public void statusHistory(final File dumpFile, final String days) throws IOException {
// Due to input validation, the dumpFile cannot currently be null in this scenario.
makeRequest(STATUS_HISTORY_CMD, days, dumpFile, "status history information");
makeRequest(STATUS_HISTORY_CMD, days, dumpFile, null, "status history information");
}

private boolean isNiFiFullyLoaded() throws IOException, NiFiNotRunningException {
Expand All @@ -703,7 +723,16 @@ private boolean isNiFiFullyLoaded() throws IOException, NiFiNotRunningException
}
}

private void makeRequest(final String request, final String arguments, final File dumpFile, final String contentsDescription) throws IOException {
/**
* Makes a request to the Bootstrap Listener
* @param request the request to send
* @param arguments any arguments for the command, or <code>null</code> if the command takes no arguments
* @param dumpFile a file to write the results to, or <code>null</code> to skip writing the results to any file
* @param outputStream an OutputStream to write the results to, or <code>null</code> to skip writing the results to any OutputStream
* @param contentsDescription a description of the contents being written; used for logging purposes
* @throws IOException if unable to communicate with the NiFi instance or write out the results
*/
private void makeRequest(final String request, final String arguments, final File dumpFile, final OutputStream outputStream, final String contentsDescription) throws IOException {
final Logger logger = defaultLogger; // dump to bootstrap log file by default
final Integer port = getCurrentPort(logger);
if (port == null) {
Expand All @@ -721,11 +750,21 @@ private void makeRequest(final String request, final String arguments, final Fil
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
String line;
while ((line = reader.readLine()) != null) {
if (fileOut == null) {
logger.info(line);
} else {
boolean written = false;
if (fileOut != null) {
fileOut.write(line.getBytes(StandardCharsets.UTF_8));
fileOut.write('\n');
written = true;
}

if (outputStream != null) {
outputStream.write(line.getBytes(StandardCharsets.UTF_8));
outputStream.write('\n');
written = true;
}

if (!written) {
logger.info(line);
}
}
}
Expand Down Expand Up @@ -760,7 +799,8 @@ private void sendRequest(Socket socket, Integer port, String request, String arg
socketOut.flush();
}

public Integer decommission() throws IOException {

public Integer decommission(final boolean shutdown) throws IOException {
final Logger logger = cmdLogger;
final Integer port = getCurrentPort(logger);
if (port == null) {
Expand Down Expand Up @@ -792,7 +832,8 @@ public Integer decommission() throws IOException {

logger.debug("Sending DECOMMISSION Command to port {}", port);
final OutputStream out = socket.getOutputStream();
out.write((DECOMMISSION_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
final String command = DECOMMISSION_CMD + " " + secretKey + " --shutdown=" + shutdown + "\n";
out.write(command.getBytes(StandardCharsets.UTF_8));
out.flush();
socket.shutdownOutput();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.cluster;

public interface ClusterDetailsFactory {

/**
* @return the current Connection State of this NiFi instance
*/
ConnectionState getConnectionState();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.cluster;

public enum ConnectionState {

/**
* This NiFi instance is not part of a cluster
*/
NOT_CLUSTERED,

/**
* Instance is in the process of connecting to the cluster
*/
CONNECTING,

/**
* Instance is connected to the cluster
*/
CONNECTED,

/**
* Instance is in the process of disconnecting from the cluster
*/
DISCONNECTING,

/**
* Instance is disconnected from the cluster
*/
DISCONNECTED,

/**
* Instance is offloading
*/
OFFLOADING,

/**
* Instances has completed offloading
*/
OFFLOADED,

/**
* Instance has been removed from the cluster
*/
REMOVED,

/**
* The state is not currently known
*/
UNKNOWN;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.cluster;

import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class StandardClusterDetailsFactory implements ClusterDetailsFactory {
private static final Logger logger = LoggerFactory.getLogger(StandardClusterDetailsFactory.class);

private final ClusterCoordinator clusterCoordinator;

/**
* Constructor marked as never used because it is constructed via Spring
*/
public StandardClusterDetailsFactory(final ClusterCoordinator clusterCoordinator) {
this.clusterCoordinator = clusterCoordinator;
}

@Override
public ConnectionState getConnectionState() {
if (clusterCoordinator == null) {
logger.debug("No Cluster Coordinator has been configured; returning Connection State of NOT_CLUSTERED");
return ConnectionState.NOT_CLUSTERED;
}

final NodeIdentifier nodeIdentifier = clusterCoordinator.getLocalNodeIdentifier();
if (nodeIdentifier == null) {
logger.info("Local Node Identifier has not yet been established; returning Connection State of UNKNOWN");
return ConnectionState.UNKNOWN;
}

final NodeConnectionStatus connectionStatus = clusterCoordinator.getConnectionStatus(nodeIdentifier);
if (connectionStatus == null) {
logger.info("Cluster connection status is not currently known for Node Identifier {}; returning Connection State of UNKNOWN", nodeIdentifier.getId());
return ConnectionState.UNKNOWN;
}

final String stateName = connectionStatus.getState().name();
try {
final ConnectionState connectionState = ConnectionState.valueOf(stateName);
logger.debug("Returning Connection State of {}", connectionState);
return connectionState;
} catch (final IllegalArgumentException iae) {
logger.warn("Cluster Coordinator reports Connection State of {}, which is not a known state; returning UNKNOWN", stateName);
return ConnectionState.UNKNOWN;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.FlowController.GroupStatusCounts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -139,6 +140,7 @@ private void waitForState(final Set<NodeConnectionState> acceptableStates) throw
private void waitForOffloadToFinish() throws InterruptedException {
logger.info("Waiting for Node to finish offloading");

int iterations = 0;
while (true) {
final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(localNodeIdentifier);
final NodeConnectionState state = status.getState();
Expand All @@ -150,7 +152,16 @@ private void waitForOffloadToFinish() throws InterruptedException {
throw new IllegalStateException("Expected state of Node to be OFFLOADING but Node is now in a state of " + state);
}

logger.debug("Node state is OFFLOADING. Will wait {} seconds and check again", delaySeconds);
// Every 10th iteration log how many FlowFiles are left
if (++iterations % 10 == 0) {
final GroupStatusCounts statusCounts = flowController.getGroupStatusCounts(flowController.getFlowManager().getRootGroup());
final int flowFileCount = statusCounts.getQueuedCount();
final long byteCount = statusCounts.getQueuedContentSize();
logger.info("Node state is OFFLOADING. Currently, there are {} FlowFiles ({} bytes) left on node.", flowFileCount, byteCount);
} else {
logger.debug("Node state is OFFLOADING. Will wait {} seconds and check again", delaySeconds);
}

TimeUnit.SECONDS.sleep(delaySeconds);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,7 @@
<constructor-arg ref="flowController" />
</bean>

<bean id="clusterDetailsFactory" class="org.apache.nifi.cluster.StandardClusterDetailsFactory">
<constructor-arg ref="clusterCoordinator" />
</bean>
</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.nifi.authorization.exception.AuthorizerCreationException;
import org.apache.nifi.authorization.exception.AuthorizerDestructionException;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.cluster.ClusterDetailsFactory;
import org.apache.nifi.cluster.ConnectionState;
import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.StandardFlowService;
Expand Down Expand Up @@ -216,6 +218,11 @@ public DecommissionTask getDecommissionTask() {
return null;
}

@Override
public ClusterDetailsFactory getClusterDetailsFactory() {
return () -> ConnectionState.NOT_CLUSTERED;
}

@Override
public StatusHistoryDumpFactory getStatusHistoryDumpFactory() {
return null;
Expand Down
Loading

0 comments on commit fb7f794

Please sign in to comment.