Skip to content

Commit

Permalink
NIFI-12016: This closes apache#7662. Allow use of compatible NAR bund…
Browse files Browse the repository at this point in the history
…les when loading flow from cluster connection; when determining what bundles are compatible, consider not just any bundle if it's the only one but also any bundle whose version matches the framework version so that when NiFi is upgraded, it is handled more gracefully.

Signed-off-by: Joseph Witt <[email protected]>
  • Loading branch information
markap14 authored and joewitt committed Sep 14, 2023
1 parent 292b5d1 commit f27ace1
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,40 @@
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarClassLoadersHolder;
import org.apache.nifi.nar.PythonBundle;
import org.apache.nifi.web.api.dto.BundleDTO;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Utility class for Bundles.
*/
public final class BundleUtils {
private static Optional<BundleCoordinate> findOptionalBundleForType(final ExtensionManager extensionManager, final String type, final BundleCoordinate desiredCoordinate) {
static Optional<BundleCoordinate> findOptionalBundleForType(final ExtensionManager extensionManager, final String type, final Bundle frameworkBundle) {
final List<Bundle> bundles = extensionManager.getBundles(type);
if (bundles.size() == 1) {
return Optional.of(bundles.get(0).getBundleDetails().getCoordinate());
}

// All NARs that are packaged with NiFi will have the same bundle coordinate as the NiFi framework bundle.
// During an upgrade, it's fairly common to have two versions of a NAR: the version shipped with NiFi and another version, perhaps to maintain
// backward compatibility to because the new version behaves some different way and the user wants the old behavior in some instances, etc.
// In this case, the user may have two versions. For example, version 2.2.0 and 2.4.0 while NiFi is at version 2.4.0.
// Now, during upgrade to 2.4.1, there will no longer be a 2.4.0 available. We want to be smart enough to realize that those extension using version
// 2.2.0 stay there but those using 2.4.0 upgrade to 2.4.1.
// To do this, we always first match on the exact version but this method is called when there's no exact match. So those marked 2.2.0 won't arrive here.
// But for those extensions that were using 2.4.0, we want to now look for version 2.4.1 - I.e., the one with the same version as the framework. If we
// find that version, then we want to use it. This helps to smooth out the upgrade process even when users have multiple versions of a given NAR.
final String frameworkVersion = frameworkBundle.getBundleDetails().getCoordinate().getVersion();
for (final Bundle bundle : bundles) {
final String componentVersion = bundle.getBundleDetails().getCoordinate().getVersion();
if (frameworkVersion.equals(componentVersion)) {
return Optional.of(bundle.getBundleDetails().getCoordinate());
}
}

return Optional.empty();
}

Expand Down Expand Up @@ -71,7 +89,10 @@ private static BundleCoordinate findCompatibleBundle(final ExtensionManager exte
throw new IllegalStateException(String.format("%s from %s is not known to this NiFi instance.", type, coordinate));
}
} else {
final List<BundleCoordinate> bundlesForType = extensionManager.getBundles(type).stream().map(b -> b.getBundleDetails().getCoordinate()).collect(Collectors.toList());
final List<BundleCoordinate> bundlesForType = extensionManager.getBundles(type).stream()
.map(b -> b.getBundleDetails().getCoordinate())
.toList();

if (bundlesForType.contains(coordinate)) {
return coordinate;
} else {
Expand All @@ -82,18 +103,17 @@ private static BundleCoordinate findCompatibleBundle(final ExtensionManager exte


private static Optional<BundleCoordinate> findOptionalCompatibleBundle(final ExtensionManager extensionManager, final String type,
final BundleDTO bundleDTO, final boolean allowCompatibleBundle) {
final BundleDTO bundleDTO) {
final BundleCoordinate coordinate = new BundleCoordinate(bundleDTO.getGroup(), bundleDTO.getArtifact(), bundleDTO.getVersion());
final Bundle bundle = extensionManager.getBundle(coordinate);

if (bundle == null) {
if (allowCompatibleBundle) {
return findOptionalBundleForType(extensionManager, type, coordinate);
} else {
return Optional.empty();
}
return findOptionalBundleForType(extensionManager, type, NarClassLoadersHolder.getInstance().getFrameworkBundle());
} else {
final List<BundleCoordinate> bundlesForType = extensionManager.getBundles(type).stream().map(b -> b.getBundleDetails().getCoordinate()).collect(Collectors.toList());
final List<BundleCoordinate> bundlesForType = extensionManager.getBundles(type).stream()
.map(b -> b.getBundleDetails().getCoordinate())
.toList();

if (bundlesForType.contains(coordinate)) {
return Optional.of(coordinate);
} else {
Expand Down Expand Up @@ -181,9 +201,9 @@ public static BundleCoordinate getCompatibleBundle(final ExtensionManager extens

public static Optional<BundleCoordinate> getOptionalCompatibleBundle(final ExtensionManager extensionManager, final String type, final BundleDTO bundleDTO) {
if (bundleDTO == null) {
return findOptionalBundleForType(extensionManager, type, null);
return findOptionalBundleForType(extensionManager, type, NarClassLoadersHolder.getInstance().getFrameworkBundle());
} else {
return findOptionalCompatibleBundle(extensionManager, type, bundleDTO, true);
return findOptionalCompatibleBundle(extensionManager, type, bundleDTO);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;

import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.bundle.BundleDetails;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarClassLoaders;
import org.apache.nifi.nar.NarClassLoadersHolder;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;

public class TestBundleUtils {

private static final String PROCESSOR_TYPE = "MyProcessor";
private static final String FRAMEWORK_VERSION = "5.0.0";

private static final Bundle frameworkBundle = createBundle("framework-bundle", FRAMEWORK_VERSION);
private static ExtensionManager extensionManager;


@BeforeAll
public static void setup() throws IOException, ClassNotFoundException {
extensionManager = Mockito.mock(ExtensionManager.class);

final NarClassLoaders narClassLoaders = NarClassLoadersHolder.getInstance();
narClassLoaders.init(null, new File("target/extensions"));
}

@Test
public void findOptionalBundleMatchingFramework() throws IOException, ClassNotFoundException {
final Bundle frameworkVersionBundle = createBundle("my-bundle", FRAMEWORK_VERSION);
final Bundle otherBundle = createBundle("my-bundle", "1.2.3");
final List<Bundle> bundles = Arrays.asList(frameworkVersionBundle, otherBundle);
when(extensionManager.getBundles(PROCESSOR_TYPE)).thenReturn(bundles);

final Optional<BundleCoordinate> compatibleCoordinate = BundleUtils.findOptionalBundleForType(extensionManager, PROCESSOR_TYPE, frameworkBundle);
assertTrue(compatibleCoordinate.isPresent());
assertEquals(frameworkVersionBundle.getBundleDetails().getCoordinate(), compatibleCoordinate.get());
}

@Test
public void findOptionalBundleNotMatchingFramework() throws IOException, ClassNotFoundException {
final Bundle version3 = createBundle("my-bundle", "3.0.0");
final Bundle otherBundle = createBundle("my-bundle", "1.2.3");
final List<Bundle> bundles = Arrays.asList(version3, otherBundle);
when(extensionManager.getBundles(PROCESSOR_TYPE)).thenReturn(bundles);

final Optional<BundleCoordinate> compatibleCoordinate = BundleUtils.findOptionalBundleForType(extensionManager, PROCESSOR_TYPE, frameworkBundle);
assertFalse(compatibleCoordinate.isPresent());
}

@Test
public void testFindOptionalBundleOnlyOneBundle() throws IOException, ClassNotFoundException {
final Bundle otherBundle = createBundle("my-bundle", "1.2.3");
final List<Bundle> bundles = Collections.singletonList(otherBundle);
when(extensionManager.getBundles(PROCESSOR_TYPE)).thenReturn(bundles);

final Optional<BundleCoordinate> compatibleCoordinate = BundleUtils.findOptionalBundleForType(extensionManager, PROCESSOR_TYPE, frameworkBundle);
assertTrue(compatibleCoordinate.isPresent());
assertEquals(otherBundle.getBundleDetails().getCoordinate(), compatibleCoordinate.get());
}

private static Bundle createBundle(final String artifactId, final String version) {
final BundleDetails bundleDetails = new BundleDetails.Builder()
.coordinate(new BundleCoordinate("org.apache.nifi", artifactId, version))
.workingDir(new File("target"))
.build();

return new Bundle(bundleDetails, TestBundleUtils.class.getClassLoader());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ private void loadFromConnectionResponse(final ConnectionResponse response) throw
controller.setNodeId(nodeId);

// load new controller state
loadFromBytes(dataFlow, true, BundleUpdateStrategy.USE_SPECIFIED_OR_FAIL);
loadFromBytes(dataFlow, true, BundleUpdateStrategy.USE_SPECIFIED_OR_COMPATIBLE_OR_GHOST);

// set node ID on controller before we start heartbeating because heartbeat needs node ID
clusterCoordinator.setLocalNodeIdentifier(nodeId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
Expand Down Expand Up @@ -353,84 +352,6 @@ public void testReconnectionWithUpdatedConnection() throws NiFiClientException,
});
}

@Test
public void testCannotJoinClusterIfMissingNar() throws NiFiClientException, IOException, InterruptedException {
getClientUtil().createProcessor("GenerateFlowFile");

// Shut down node 2
disconnectNode(2);
final NiFiInstance node2 = getNiFiInstance().getNodeInstance(2);
node2.stop();

// Remove node from the cluster. This way we know when it's attempted to connect
final Integer node2ApiPort = getNodeApiPort(2);
removeNode(2);
removeExtensionsNar(node2);

node2.start(false);

// Wait until node is no longer removed from cluster, which will happen when it starts up and requests to connect
waitFor(() -> !isNodeRemoved(node2ApiPort));

// Wait for node to show as disconnected because it doesn't have the necessary nar
waitForNodeState(2, NodeConnectionState.DISCONNECTED);

// We need to restore the extensions nar and restart the node so that subsequent tests can succeed
restoreExtensionsNar(node2);
node2.stop();
node2.start();

waitForAllNodesConnected();
}

private void removeNode(final int index) throws NiFiClientException, IOException, InterruptedException {
final NodeDTO nodeDto = getNodeEntity(index).getNode();
final String nodeId = nodeDto.getNodeId();
final Integer apiPort = nodeDto.getApiPort();
getNifiClient().getControllerClient().deleteNode(nodeId);
waitFor(() -> isNodeRemoved(apiPort));
}

private Integer getNodeApiPort(final int index) throws NiFiClientException, IOException {
final NodeDTO nodeDto = getNodeEntity(index).getNode();
final Integer apiPort = nodeDto.getApiPort();
return apiPort;
}

@Test
public void testCanJoinClusterIfAllNodesMissingNar() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");

// Shut down node 2
disconnectNode(2);
final NiFiInstance node2 = getNiFiInstance().getNodeInstance(2);
node2.stop();

final NiFiInstance node1 = getNiFiInstance().getNodeInstance(1);
node1.stop();

removeExtensionsNar(node1);
removeExtensionsNar(node2);

node1.start(false);
node2.start(true);

waitForAllNodesConnected();

assertTrue(getNifiClient().getProcessorClient().getProcessor(generate.getId()).getComponent().getExtensionMissing());

// In order to ensure that subsequent tests are able to operate properly, we need to restore the nar and restart
node1.stop();
node2.stop();

restoreExtensionsNar(node1);
restoreExtensionsNar(node2);

node1.start(false);
node2.start(true);
waitForAllNodesConnected();
}


@Test
public void testCannotRemoveComponentsWhileNodeDisconnected() throws NiFiClientException, IOException, InterruptedException {
Expand Down Expand Up @@ -497,36 +418,6 @@ public void testComponentStatesRestoredOnReconnect() throws NiFiClientException,
}


private void removeExtensionsNar(final NiFiInstance nifiInstance) {
final File extensionsNar = getExtensionsNar(nifiInstance);
final File backupFile = new File(extensionsNar.getParentFile(), extensionsNar.getName() + ".backup");
assertTrue(extensionsNar.renameTo(backupFile));
}

private void restoreExtensionsNar(final NiFiInstance nifiInstance) {
final File backupFile = getExtensionsNar(nifiInstance);
final File extensionsNar = new File(backupFile.getParentFile(), backupFile.getName().replace(".backup", ""));
assertTrue(backupFile.renameTo(extensionsNar));
}

private File getExtensionsNar(final NiFiInstance nifiInstance) {
final File libDir = new File(nifiInstance.getInstanceDirectory(), "lib");
final File[] testExtensionsNar = libDir.listFiles(file -> file.getName().startsWith("nifi-system-test-extensions-nar-"));
assertEquals(1, testExtensionsNar.length);

return testExtensionsNar[0];
}


private boolean isNodeRemoved(final int apiPort) {
try {
return getNifiClient().getControllerClient().getNodes().getCluster().getNodes().stream()
.noneMatch(dto -> dto.getApiPort() == apiPort);
} catch (Exception e) {
return false;
}
}

@Test
public void testRestartWithFlowXmlGzNoJson() throws NiFiClientException, IOException {
restartWithOnlySingleFlowPersistenceFile("flow.json.gz");
Expand Down

0 comments on commit f27ace1

Please sign in to comment.