Skip to content

Commit

Permalink
NIFI-7592: Allow NiFi to be started without a GUI/REST interface (apa…
Browse files Browse the repository at this point in the history
…che#4509)

* NIFI-7592: Allow NiFi to be started without a GUI/REST interface

* NIFI-7592: Enable all controller services when starting headless

* NIFI-7592: Marked duplicate dependencies as provided

* NIFI-7592: Incorporated additional review comments
  • Loading branch information
mattyb149 authored Oct 7, 2020
1 parent 7cc3713 commit 325a495
Show file tree
Hide file tree
Showing 48 changed files with 3,089 additions and 81 deletions.
33 changes: 33 additions & 0 deletions nifi-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-framework-api</artifactId>
<version>1.13.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-server-api</artifactId>
<version>1.13.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-runtime</artifactId>
Expand Down Expand Up @@ -133,6 +138,12 @@ language governing permissions and limitations under the License. -->
<version>1.13.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-server-nar</artifactId>
<version>1.13.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-provenance-repository-nar</artifactId>
Expand Down Expand Up @@ -952,6 +963,28 @@ language governing permissions and limitations under the License. -->
</dependency>
</dependencies>
</profile>
<profile>
<id>headless</id>
<!-- This profile excludes the nifi-server artifacts, such as the Web/UI NAR(s) and instead uses the nifi-headless-server-nar. -->
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-server-nar</artifactId>
<version>1.13.0-SNAPSHOT</version>
<type>nar</type>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-headless-server-nar</artifactId>
<version>1.13.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependencies>
</profile>
<profile>
<id>include-accumulo</id>
<!-- This profile handles the inclusion of nifi-accumulo artifacts. -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-jetty</artifactId>
<artifactId>nifi-jetty-bundle</artifactId>
<version>1.13.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
Expand All @@ -48,11 +50,21 @@
<artifactId>nifi-standard-prioritizers</artifactId>
<version>1.13.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-cluster</artifactId>
<version>1.13.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-stateless</artifactId>
<version>1.13.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-server</artifactId>
<version>${jersey.version}</version>
</dependency>
<!-- Override scope to compile since framework NAR won't get this from a parent NAR -->
<dependency>
<groupId>org.apache.nifi</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-server-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.documentation.example;

import org.apache.nifi.NiFiServer;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.diagnostics.DiagnosticsFactory;
import org.apache.nifi.nar.ExtensionMapping;
import org.apache.nifi.util.NiFiProperties;

import java.util.Set;

/**
* This stub is the source code for the no-op NiFiServer implementation used in the nifiserver-test-nar.nar, as NiFi requires exactly one
* implementation of NiFiServer in order to start successfully. The NAR was built externally, but the code is provided here in case
* updates are needed.
*/
public class NiFiServerStub implements NiFiServer {
@Override
public void start() {

}

@Override
public void initialize(NiFiProperties properties, Bundle systemBundle, Set<Bundle> bundles, ExtensionMapping extensionMapping) {

}

@Override
public void stop() {

}

@Override
public DiagnosticsFactory getDiagnosticsFactory() {
return null;
}

@Override
public DiagnosticsFactory getThreadDumpFactory() {
return null;
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,34 @@
package org.apache.nifi.authorization;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.Result;
import javax.xml.transform.Source;
import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.controller.serialization.FlowFromDOMFactory;
import org.apache.nifi.security.xml.XmlUtils;
import org.apache.nifi.util.LoggingXmlParserErrorHandler;
import org.apache.nifi.util.file.FileUtils;
import org.apache.nifi.web.api.dto.PortDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -54,10 +64,9 @@ public class FlowParser {
private static final String FLOW_XSD = "/FlowConfiguration.xsd";

private final Schema flowSchema;
private final SchemaFactory schemaFactory;

public FlowParser() throws SAXException {
schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
flowSchema = schemaFactory.newSchema(FlowParser.class.getResource(FLOW_XSD));
}

Expand Down Expand Up @@ -130,6 +139,48 @@ public FlowInfo parse(final File flowConfigurationFile) {
}
}

/**
* Generates a {@link Document} from the flow configuration file provided
*/
public Document parseDocument(final File flowConfigurationFile) {
if (flowConfigurationFile == null) {
logger.debug("Flow Configuration file was null");
return null;
}

// if the flow doesn't exist or is 0 bytes, then return null
final Path flowPath = flowConfigurationFile.toPath();
try {
if (!Files.exists(flowPath) || Files.size(flowPath) == 0) {
logger.warn("Flow Configuration does not exist or was empty");
return null;
}
} catch (IOException e) {
logger.error("An error occurred determining the size of the Flow Configuration file");
return null;
}

// otherwise create the appropriate input streams to read the file
try (final InputStream in = Files.newInputStream(flowPath, StandardOpenOption.READ);
final InputStream gzipIn = new GZIPInputStream(in)) {

final byte[] flowBytes = IOUtils.toByteArray(gzipIn);
if (flowBytes == null || flowBytes.length == 0) {
logger.warn("Could not extract root group id because Flow Configuration File was empty");
return null;
}

// create validating document builder
final DocumentBuilder docBuilder = XmlUtils.createSafeDocumentBuilder(flowSchema);
docBuilder.setErrorHandler(new LoggingXmlParserErrorHandler("Flow Configuration", logger));
return docBuilder.parse(new ByteArrayInputStream(flowBytes));

} catch (final SAXException | ParserConfigurationException | IOException ex) {
logger.error("Unable to parse flow {} due to {}", new Object[]{flowPath.toAbsolutePath(), ex});
return null;
}
}

/**
* Gets the ports that are direct children of the given element.
*
Expand All @@ -151,14 +202,35 @@ private List<PortDTO> getPorts(final Element element, final String type) {
return ports;
}

/**
* Writes a given XML Flow out to the specified path.
*
* @param flowDocument flowDocument of the associated XML content to write to disk
* @param flowXmlPath path on disk to write the flow
* @throws IOException if there are issues in accessing the target destination for the flow
* @throws TransformerException if there are issues in the xml transformation process
*/
public void writeFlow(final Document flowDocument, final Path flowXmlPath) throws IOException, TransformerException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
final Source xmlSource = new DOMSource(flowDocument);
final Result outputTarget = new StreamResult(outputStream);
TransformerFactory.newInstance().newTransformer().transform(xmlSource, outputTarget);
final InputStream is = new ByteArrayInputStream(outputStream.toByteArray());

try (final OutputStream output = Files.newOutputStream(flowXmlPath, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
final OutputStream gzipOut = new GZIPOutputStream(output)) {
FileUtils.copy(is, gzipOut);
}
}

/**
* Finds child elements with the given tagName.
*
* @param element the parent element
* @param tagName the child element name to find
* @return a list of matching child elements
*/
private static List<Element> getChildrenByTagName(final Element element, final String tagName) {
public static List<Element> getChildrenByTagName(final Element element, final String tagName) {
final List<Element> matches = new ArrayList<>();
final NodeList nodeList = element.getChildNodes();
for (int i = 0; i < nodeList.getLength(); i++) {
Expand All @@ -172,8 +244,6 @@ private static List<Element> getChildrenByTagName(final Element element, final S
matches.add(child);
}
}

return matches;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
*/
void enableOutputPort(Port port);

/**
* Recursively enables all Controller Services for this Process Group and all child Process Groups
*
*/
void enableAllControllerServices();

/**
* Starts the given Processor
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ public boolean isRootGroup() {
public void startProcessing() {
readLock.lock();
try {
enableAllControllerServices();
findAllProcessors().stream().filter(START_PROCESSORS_FILTER).forEach(node -> {
try {
node.getProcessGroup().startProcessor(node, true);
Expand Down Expand Up @@ -1683,6 +1684,17 @@ public void enableProcessor(final ProcessorNode processor) {
}
}

@Override
public void enableAllControllerServices() {
// Enable all valid controller services in this process group
controllerServiceProvider.enableControllerServices(controllerServices.values());

// Enable all controller services for child process groups
for(ProcessGroup pg : processGroups.values()) {
pg.enableAllControllerServices();
}
}

@Override
public void disableInputPort(final Port port) {
readLock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ public void enableOutputPort(final Port port) {

}

@Override
public void enableAllControllerServices() {

}

@Override
public CompletableFuture<Void> startProcessor(final ProcessorNode processor, final boolean failIfStopping) {
return CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-server-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ public synchronized NarLoadResult load(final Collection<File> narFiles) {
}

LOGGER.debug("Loading custom UIs for extensions...");
extensionUiLoader.loadExtensionUis(loadedBundles);
if(extensionUiLoader != null) {
extensionUiLoader.loadExtensionUis(loadedBundles);
}
}

LOGGER.info("Finished NAR loading process!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public void setup() throws IOException, ClassNotFoundException {

extensionManager = new StandardExtensionDiscoveringManager();

// Should have Framework and Jetty NARs loaded here
assertEquals(2, narClassLoaders.getBundles().size());
// Should have Framework, Jetty, and NiFiServer NARs loaded here
assertEquals(3, narClassLoaders.getBundles().size());

// No extensions should be loaded yet
assertEquals(0, extensionManager.getExtensions(Processor.class).size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testNarLoaderWhenAllAvailable() throws IOException {
assertEquals(3, narLoadResult.getLoadedBundles().size());
assertEquals(0, narLoadResult.getSkippedBundles().size());

assertEquals(5, narClassLoaders.getBundles().size());
assertEquals(6, narClassLoaders.getBundles().size());
assertEquals(1, extensionManager.getExtensions(Processor.class).size());
assertEquals(1, extensionManager.getExtensions(ControllerService.class).size());
assertEquals(0, extensionManager.getExtensions(ReportingTask.class).size());
Expand Down Expand Up @@ -111,7 +111,7 @@ public void testNarLoaderWhenDependentNarsAreMissing() throws IOException {
assertEquals(3, narLoadResult3.getLoadedBundles().size());
assertEquals(0, narLoadResult3.getSkippedBundles().size());

assertEquals(5, narClassLoaders.getBundles().size());
assertEquals(6, narClassLoaders.getBundles().size());
assertEquals(1, extensionManager.getExtensions(Processor.class).size());
assertEquals(1, extensionManager.getExtensions(ControllerService.class).size());
assertEquals(0, extensionManager.getExtensions(ReportingTask.class).size());
Expand Down
Binary file not shown.
Loading

0 comments on commit 325a495

Please sign in to comment.