Skip to content

Commit

Permalink
NIFI-8433 Added ability to decommission a node in a cluster
Browse files Browse the repository at this point in the history
This closes apache#5004

Signed-off-by: Joey Frazee <[email protected]>
  • Loading branch information
markap14 authored and jfrazee committed May 1, 2021
1 parent 1090a97 commit 935566b
Show file tree
Hide file tree
Showing 16 changed files with 511 additions and 98 deletions.
181 changes: 126 additions & 55 deletions nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,10 @@ public class RunNiFi {
public static final String PID_KEY = "pid";

public static final int STARTUP_WAIT_SECONDS = 60;
public static final long GRACEFUL_SHUTDOWN_RETRY_MILLIS = 2000L;

public static final String SHUTDOWN_CMD = "SHUTDOWN";
public static final String DECOMMISSION_CMD = "DECOMMISSION";
public static final String PING_CMD = "PING";
public static final String DUMP_CMD = "DUMP";
public static final String DIAGNOSTICS_CMD = "DIAGNOSTICS";
Expand Down Expand Up @@ -169,6 +171,7 @@ private static void printUsage() {
System.out.println("Start : Start a new instance of Apache NiFi");
System.out.println("Stop : Stop a running instance of Apache NiFi");
System.out.println("Restart : Stop Apache NiFi, if it is running, and then start a new instance");
System.out.println("Decommission : Disconnects Apache NiFi from its cluster, offloads its data to other nodes in the cluster, removes itself from the cluster, and shuts down the instance");
System.out.println("Status : Determine if there is a running instance of Apache NiFi");
System.out.println("Dump : Write a Thread Dump to the file specified by [options], or to the log if no file is given");
System.out.println("Diagnostics : Write diagnostic information to the file specified by [options], or to the log if no file is given. The --verbose flag may be provided as an option before " +
Expand Down Expand Up @@ -219,6 +222,7 @@ public static void main(String[] args) throws IOException, InterruptedException
case "start":
case "run":
case "stop":
case "decommission":
case "status":
case "is_loaded":
case "dump":
Expand All @@ -245,6 +249,9 @@ public static void main(String[] args) throws IOException, InterruptedException
case "stop":
runNiFi.stop();
break;
case "decommission":
exitStatus = runNiFi.decommission();
break;
case "status":
exitStatus = runNiFi.status();
break;
Expand Down Expand Up @@ -810,6 +817,63 @@ public void notifyStop() {
"Hello,\n\nApache NiFi has been told to initiate a shutdown on host " + hostname + " at " + now + " by user " + user);
}

public Integer decommission() throws IOException {
final Logger logger = cmdLogger;
final Integer port = getCurrentPort(logger);
if (port == null) {
logger.info("Apache NiFi is not currently running");
return 15;
}

// indicate that a stop command is in progress
final File lockFile = getLockFile(logger);
if (!lockFile.exists()) {
lockFile.createNewFile();
}

final Properties nifiProps = loadProperties(logger);
final String secretKey = nifiProps.getProperty("secret.key");
final String pid = nifiProps.getProperty(PID_KEY);
final File statusFile = getStatusFile(logger);
final File pidFile = getPidFile(logger);

try (final Socket socket = new Socket()) {
logger.debug("Connecting to NiFi instance");
socket.setSoTimeout(10000);
socket.connect(new InetSocketAddress("localhost", port));
logger.debug("Established connection to NiFi instance.");

// We don't know how long it will take for the offloading to complete. It could be a while. So don't timeout.
// User can press Ctrl+C to terminate if they don't want to wait
socket.setSoTimeout(0);

logger.debug("Sending DECOMMISSION Command to port {}", port);
final OutputStream out = socket.getOutputStream();
out.write((DECOMMISSION_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
out.flush();
socket.shutdownOutput();

final String response = readResponse(socket.getInputStream());

if (DECOMMISSION_CMD.equals(response)) {
logger.debug("Received response to DECOMMISSION command: {}", response);

if (pid != null) {
waitForShutdown(pid, logger, statusFile, pidFile);
}

return null;
} else {
logger.error("When sending DECOMMISSION command to NiFi, got unexpected response {}", response);
return 18;
}
} finally {
if (lockFile.exists() && !lockFile.delete()) {
logger.error("Failed to delete lock file {}; this file should be cleaned up manually", lockFile);
}
}
}

public void stop() throws IOException {
final Logger logger = cmdLogger;
final Integer port = getCurrentPort(logger);
Expand Down Expand Up @@ -843,69 +907,17 @@ public void stop() throws IOException {
out.flush();
socket.shutdownOutput();

final InputStream in = socket.getInputStream();
int lastChar;
final StringBuilder sb = new StringBuilder();
while ((lastChar = in.read()) > -1) {
sb.append((char) lastChar);
}
final String response = sb.toString().trim();

final String response = readResponse(socket.getInputStream());
logger.debug("Received response to SHUTDOWN command: {}", response);

if (SHUTDOWN_CMD.equals(response)) {
logger.info("Apache NiFi has accepted the Shutdown Command and is shutting down now");

if (pid != null) {
final Properties bootstrapProperties = new Properties();
try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) {
bootstrapProperties.load(fis);
}

String gracefulShutdown = bootstrapProperties.getProperty(GRACEFUL_SHUTDOWN_PROP, DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
int gracefulShutdownSeconds;
try {
gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown);
} catch (final NumberFormatException nfe) {
gracefulShutdownSeconds = Integer.parseInt(DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
}

notifyStop();
final long startWait = System.nanoTime();
while (isProcessRunning(pid, logger)) {
logger.info("Waiting for Apache NiFi to finish shutting down...");
final long waitNanos = System.nanoTime() - startWait;
final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos);
if (waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0) {
if (isProcessRunning(pid, logger)) {
logger.warn("NiFi has not finished shutting down after {} seconds. Killing process.", gracefulShutdownSeconds);
try {
killProcessTree(pid, logger);
} catch (final IOException ioe) {
logger.error("Failed to kill Process with PID {}", pid);
}
}
break;
} else {
try {
Thread.sleep(2000L);
} catch (final InterruptedException ie) {
}
}
}

if (statusFile.exists() && !statusFile.delete()) {
logger.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile);
}

if (pidFile.exists() && !pidFile.delete()) {
logger.error("Failed to delete pid file {}; this file should be cleaned up manually", pidFile);
}

logger.info("NiFi has finished shutting down.");
waitForShutdown(pid, logger, statusFile, pidFile);
}
} else {
logger.error("When sending SHUTDOWN command to NiFi, got unexpected response {}", response);
logger.error("When sending SHUTDOWN command to NiFi, got unexpected response: {}", response);
}
} catch (final IOException ioe) {
if (pid == null) {
Expand All @@ -926,6 +938,65 @@ public void stop() throws IOException {
}
}

private String readResponse(final InputStream in) throws IOException {
int lastChar;
final StringBuilder sb = new StringBuilder();
while ((lastChar = in.read()) > -1) {
sb.append((char) lastChar);
}

return sb.toString().trim();
}

private void waitForShutdown(final String pid, final Logger logger, final File statusFile, final File pidFile) throws IOException {
final Properties bootstrapProperties = new Properties();
try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) {
bootstrapProperties.load(fis);
}

String gracefulShutdown = bootstrapProperties.getProperty(GRACEFUL_SHUTDOWN_PROP, DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
int gracefulShutdownSeconds;
try {
gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown);
} catch (final NumberFormatException nfe) {
gracefulShutdownSeconds = Integer.parseInt(DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
}

notifyStop();
final long startWait = System.nanoTime();
while (isProcessRunning(pid, logger)) {
logger.info("Waiting for Apache NiFi to finish shutting down...");
final long waitNanos = System.nanoTime() - startWait;
final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos);
if (waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0) {
if (isProcessRunning(pid, logger)) {
logger.warn("NiFi has not finished shutting down after {} seconds. Killing process.", gracefulShutdownSeconds);
try {
killProcessTree(pid, logger);
} catch (final IOException ioe) {
logger.error("Failed to kill Process with PID {}", pid);
}
}
break;
} else {
try {
Thread.sleep(GRACEFUL_SHUTDOWN_RETRY_MILLIS);
} catch (final InterruptedException ie) {
}
}
}

if (statusFile.exists() && !statusFile.delete()) {
logger.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile);
}

if (pidFile.exists() && !pidFile.delete()) {
logger.error("Failed to delete pid file {}; this file should be cleaned up manually", pidFile);
}

logger.info("NiFi has finished shutting down.");
}

private static List<String> getChildProcesses(final String ppid) throws IOException {
final Process proc = Runtime.getRuntime().exec(new String[]{"ps", "-o", "pid", "--no-headers", "--ppid", ppid});
final List<String> childPids = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.controller;

public interface DecommissionTask {
void decommission() throws InterruptedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.nifi.NiFiServer;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.diagnostics.DiagnosticsFactory;
import org.apache.nifi.nar.ExtensionMapping;
import org.apache.nifi.util.NiFiProperties;
Expand Down Expand Up @@ -54,4 +55,9 @@ public DiagnosticsFactory getDiagnosticsFactory() {
public DiagnosticsFactory getThreadDumpFactory() {
return null;
}

@Override
public DecommissionTask getDecommissionTask() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package org.apache.nifi.cluster.coordination;

import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
import org.apache.nifi.cluster.coordination.node.OffloadCode;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.coordination.node.NodeWorkload;
import org.apache.nifi.cluster.coordination.node.OffloadCode;
import org.apache.nifi.cluster.event.NodeEvent;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.reporting.Severity;
Expand All @@ -32,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;

/**
* <p>
Expand Down Expand Up @@ -85,7 +86,7 @@ public interface ClusterCoordinator {
* @param offloadCode the code that represents why this node is being asked to be offloaded
* @param explanation an explanation as to why the node is being asked to be offloaded
*/
void requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation);
Future<Void> requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation);

/**
* Sends a request to the node to disconnect from the cluster.
Expand All @@ -96,7 +97,7 @@ public interface ClusterCoordinator {
* @param explanation an explanation as to why the node is being asked to disconnect
* from the cluster
*/
void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation);
Future<Void> requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation);

/**
* Notifies the Cluster Coordinator that the node with the given ID has requested to disconnect
Expand Down
Loading

0 comments on commit 935566b

Please sign in to comment.