diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java index d0c428f263ba..d7ad003f627f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java @@ -20,22 +20,40 @@ import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.NarClassLoadersHolder; import org.apache.nifi.nar.PythonBundle; import org.apache.nifi.web.api.dto.BundleDTO; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; /** * Utility class for Bundles. */ public final class BundleUtils { - private static Optional findOptionalBundleForType(final ExtensionManager extensionManager, final String type, final BundleCoordinate desiredCoordinate) { + static Optional findOptionalBundleForType(final ExtensionManager extensionManager, final String type, final Bundle frameworkBundle) { final List bundles = extensionManager.getBundles(type); if (bundles.size() == 1) { return Optional.of(bundles.get(0).getBundleDetails().getCoordinate()); } + + // All NARs that are packaged with NiFi will have the same bundle coordinate as the NiFi framework bundle. + // During an upgrade, it's fairly common to have two versions of a NAR: the version shipped with NiFi and another version, perhaps to maintain + // backward compatibility to because the new version behaves some different way and the user wants the old behavior in some instances, etc. + // In this case, the user may have two versions. For example, version 2.2.0 and 2.4.0 while NiFi is at version 2.4.0. + // Now, during upgrade to 2.4.1, there will no longer be a 2.4.0 available. We want to be smart enough to realize that those extension using version + // 2.2.0 stay there but those using 2.4.0 upgrade to 2.4.1. + // To do this, we always first match on the exact version but this method is called when there's no exact match. So those marked 2.2.0 won't arrive here. + // But for those extensions that were using 2.4.0, we want to now look for version 2.4.1 - I.e., the one with the same version as the framework. If we + // find that version, then we want to use it. This helps to smooth out the upgrade process even when users have multiple versions of a given NAR. + final String frameworkVersion = frameworkBundle.getBundleDetails().getCoordinate().getVersion(); + for (final Bundle bundle : bundles) { + final String componentVersion = bundle.getBundleDetails().getCoordinate().getVersion(); + if (frameworkVersion.equals(componentVersion)) { + return Optional.of(bundle.getBundleDetails().getCoordinate()); + } + } + return Optional.empty(); } @@ -71,7 +89,10 @@ private static BundleCoordinate findCompatibleBundle(final ExtensionManager exte throw new IllegalStateException(String.format("%s from %s is not known to this NiFi instance.", type, coordinate)); } } else { - final List bundlesForType = extensionManager.getBundles(type).stream().map(b -> b.getBundleDetails().getCoordinate()).collect(Collectors.toList()); + final List bundlesForType = extensionManager.getBundles(type).stream() + .map(b -> b.getBundleDetails().getCoordinate()) + .toList(); + if (bundlesForType.contains(coordinate)) { return coordinate; } else { @@ -82,18 +103,17 @@ private static BundleCoordinate findCompatibleBundle(final ExtensionManager exte private static Optional findOptionalCompatibleBundle(final ExtensionManager extensionManager, final String type, - final BundleDTO bundleDTO, final boolean allowCompatibleBundle) { + final BundleDTO bundleDTO) { final BundleCoordinate coordinate = new BundleCoordinate(bundleDTO.getGroup(), bundleDTO.getArtifact(), bundleDTO.getVersion()); final Bundle bundle = extensionManager.getBundle(coordinate); if (bundle == null) { - if (allowCompatibleBundle) { - return findOptionalBundleForType(extensionManager, type, coordinate); - } else { - return Optional.empty(); - } + return findOptionalBundleForType(extensionManager, type, NarClassLoadersHolder.getInstance().getFrameworkBundle()); } else { - final List bundlesForType = extensionManager.getBundles(type).stream().map(b -> b.getBundleDetails().getCoordinate()).collect(Collectors.toList()); + final List bundlesForType = extensionManager.getBundles(type).stream() + .map(b -> b.getBundleDetails().getCoordinate()) + .toList(); + if (bundlesForType.contains(coordinate)) { return Optional.of(coordinate); } else { @@ -181,9 +201,9 @@ public static BundleCoordinate getCompatibleBundle(final ExtensionManager extens public static Optional getOptionalCompatibleBundle(final ExtensionManager extensionManager, final String type, final BundleDTO bundleDTO) { if (bundleDTO == null) { - return findOptionalBundleForType(extensionManager, type, null); + return findOptionalBundleForType(extensionManager, type, NarClassLoadersHolder.getInstance().getFrameworkBundle()); } else { - return findOptionalCompatibleBundle(extensionManager, type, bundleDTO, true); + return findOptionalCompatibleBundle(extensionManager, type, bundleDTO); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestBundleUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestBundleUtils.java new file mode 100644 index 000000000000..0368ac513b94 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestBundleUtils.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.bundle.BundleDetails; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.NarClassLoaders; +import org.apache.nifi.nar.NarClassLoadersHolder; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; + +public class TestBundleUtils { + + private static final String PROCESSOR_TYPE = "MyProcessor"; + private static final String FRAMEWORK_VERSION = "5.0.0"; + + private static final Bundle frameworkBundle = createBundle("framework-bundle", FRAMEWORK_VERSION); + private static ExtensionManager extensionManager; + + + @BeforeAll + public static void setup() throws IOException, ClassNotFoundException { + extensionManager = Mockito.mock(ExtensionManager.class); + + final NarClassLoaders narClassLoaders = NarClassLoadersHolder.getInstance(); + narClassLoaders.init(null, new File("target/extensions")); + } + + @Test + public void findOptionalBundleMatchingFramework() throws IOException, ClassNotFoundException { + final Bundle frameworkVersionBundle = createBundle("my-bundle", FRAMEWORK_VERSION); + final Bundle otherBundle = createBundle("my-bundle", "1.2.3"); + final List bundles = Arrays.asList(frameworkVersionBundle, otherBundle); + when(extensionManager.getBundles(PROCESSOR_TYPE)).thenReturn(bundles); + + final Optional compatibleCoordinate = BundleUtils.findOptionalBundleForType(extensionManager, PROCESSOR_TYPE, frameworkBundle); + assertTrue(compatibleCoordinate.isPresent()); + assertEquals(frameworkVersionBundle.getBundleDetails().getCoordinate(), compatibleCoordinate.get()); + } + + @Test + public void findOptionalBundleNotMatchingFramework() throws IOException, ClassNotFoundException { + final Bundle version3 = createBundle("my-bundle", "3.0.0"); + final Bundle otherBundle = createBundle("my-bundle", "1.2.3"); + final List bundles = Arrays.asList(version3, otherBundle); + when(extensionManager.getBundles(PROCESSOR_TYPE)).thenReturn(bundles); + + final Optional compatibleCoordinate = BundleUtils.findOptionalBundleForType(extensionManager, PROCESSOR_TYPE, frameworkBundle); + assertFalse(compatibleCoordinate.isPresent()); + } + + @Test + public void testFindOptionalBundleOnlyOneBundle() throws IOException, ClassNotFoundException { + final Bundle otherBundle = createBundle("my-bundle", "1.2.3"); + final List bundles = Collections.singletonList(otherBundle); + when(extensionManager.getBundles(PROCESSOR_TYPE)).thenReturn(bundles); + + final Optional compatibleCoordinate = BundleUtils.findOptionalBundleForType(extensionManager, PROCESSOR_TYPE, frameworkBundle); + assertTrue(compatibleCoordinate.isPresent()); + assertEquals(otherBundle.getBundleDetails().getCoordinate(), compatibleCoordinate.get()); + } + + private static Bundle createBundle(final String artifactId, final String version) { + final BundleDetails bundleDetails = new BundleDetails.Builder() + .coordinate(new BundleCoordinate("org.apache.nifi", artifactId, version)) + .workingDir(new File("target")) + .build(); + + return new Bundle(bundleDetails, TestBundleUtils.class.getClassLoader()); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 2ce547747444..26f835a08cff 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -956,7 +956,7 @@ private void loadFromConnectionResponse(final ConnectionResponse response) throw controller.setNodeId(nodeId); // load new controller state - loadFromBytes(dataFlow, true, BundleUpdateStrategy.USE_SPECIFIED_OR_FAIL); + loadFromBytes(dataFlow, true, BundleUpdateStrategy.USE_SPECIFIED_OR_COMPATIBLE_OR_GHOST); // set node ID on controller before we start heartbeating because heartbeat needs node ID clusterCoordinator.setLocalNodeIdentifier(nodeId); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java index f4926bc04f7e..48c426bfc7c7 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java @@ -30,7 +30,6 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient; import org.apache.nifi.web.api.dto.ControllerServiceDTO; -import org.apache.nifi.web.api.dto.NodeDTO; import org.apache.nifi.web.api.dto.PortDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.flow.FlowDTO; @@ -353,84 +352,6 @@ public void testReconnectionWithUpdatedConnection() throws NiFiClientException, }); } - @Test - public void testCannotJoinClusterIfMissingNar() throws NiFiClientException, IOException, InterruptedException { - getClientUtil().createProcessor("GenerateFlowFile"); - - // Shut down node 2 - disconnectNode(2); - final NiFiInstance node2 = getNiFiInstance().getNodeInstance(2); - node2.stop(); - - // Remove node from the cluster. This way we know when it's attempted to connect - final Integer node2ApiPort = getNodeApiPort(2); - removeNode(2); - removeExtensionsNar(node2); - - node2.start(false); - - // Wait until node is no longer removed from cluster, which will happen when it starts up and requests to connect - waitFor(() -> !isNodeRemoved(node2ApiPort)); - - // Wait for node to show as disconnected because it doesn't have the necessary nar - waitForNodeState(2, NodeConnectionState.DISCONNECTED); - - // We need to restore the extensions nar and restart the node so that subsequent tests can succeed - restoreExtensionsNar(node2); - node2.stop(); - node2.start(); - - waitForAllNodesConnected(); - } - - private void removeNode(final int index) throws NiFiClientException, IOException, InterruptedException { - final NodeDTO nodeDto = getNodeEntity(index).getNode(); - final String nodeId = nodeDto.getNodeId(); - final Integer apiPort = nodeDto.getApiPort(); - getNifiClient().getControllerClient().deleteNode(nodeId); - waitFor(() -> isNodeRemoved(apiPort)); - } - - private Integer getNodeApiPort(final int index) throws NiFiClientException, IOException { - final NodeDTO nodeDto = getNodeEntity(index).getNode(); - final Integer apiPort = nodeDto.getApiPort(); - return apiPort; - } - - @Test - public void testCanJoinClusterIfAllNodesMissingNar() throws NiFiClientException, IOException, InterruptedException { - final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); - - // Shut down node 2 - disconnectNode(2); - final NiFiInstance node2 = getNiFiInstance().getNodeInstance(2); - node2.stop(); - - final NiFiInstance node1 = getNiFiInstance().getNodeInstance(1); - node1.stop(); - - removeExtensionsNar(node1); - removeExtensionsNar(node2); - - node1.start(false); - node2.start(true); - - waitForAllNodesConnected(); - - assertTrue(getNifiClient().getProcessorClient().getProcessor(generate.getId()).getComponent().getExtensionMissing()); - - // In order to ensure that subsequent tests are able to operate properly, we need to restore the nar and restart - node1.stop(); - node2.stop(); - - restoreExtensionsNar(node1); - restoreExtensionsNar(node2); - - node1.start(false); - node2.start(true); - waitForAllNodesConnected(); - } - @Test public void testCannotRemoveComponentsWhileNodeDisconnected() throws NiFiClientException, IOException, InterruptedException { @@ -497,36 +418,6 @@ public void testComponentStatesRestoredOnReconnect() throws NiFiClientException, } - private void removeExtensionsNar(final NiFiInstance nifiInstance) { - final File extensionsNar = getExtensionsNar(nifiInstance); - final File backupFile = new File(extensionsNar.getParentFile(), extensionsNar.getName() + ".backup"); - assertTrue(extensionsNar.renameTo(backupFile)); - } - - private void restoreExtensionsNar(final NiFiInstance nifiInstance) { - final File backupFile = getExtensionsNar(nifiInstance); - final File extensionsNar = new File(backupFile.getParentFile(), backupFile.getName().replace(".backup", "")); - assertTrue(backupFile.renameTo(extensionsNar)); - } - - private File getExtensionsNar(final NiFiInstance nifiInstance) { - final File libDir = new File(nifiInstance.getInstanceDirectory(), "lib"); - final File[] testExtensionsNar = libDir.listFiles(file -> file.getName().startsWith("nifi-system-test-extensions-nar-")); - assertEquals(1, testExtensionsNar.length); - - return testExtensionsNar[0]; - } - - - private boolean isNodeRemoved(final int apiPort) { - try { - return getNifiClient().getControllerClient().getNodes().getCluster().getNodes().stream() - .noneMatch(dto -> dto.getApiPort() == apiPort); - } catch (Exception e) { - return false; - } - } - @Test public void testRestartWithFlowXmlGzNoJson() throws NiFiClientException, IOException { restartWithOnlySingleFlowPersistenceFile("flow.json.gz");