Skip to content

Commit

Permalink
NIFI-12308: Create Python Environment in background thread instead of…
Browse files Browse the repository at this point in the history
… during Processor creation

This closes apache#7971

Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
markap14 authored and exceptionfactory committed Nov 2, 2023
1 parent f1a34a5 commit 754baf0
Show file tree
Hide file tree
Showing 15 changed files with 309 additions and 368 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@

package org.apache.nifi.components;

public interface AsyncLoadedProcessor {
import org.apache.nifi.processor.Processor;

public interface AsyncLoadedProcessor extends Processor {
default boolean isLoaded() {
return getState() == LoadState.FINISHED_LOADING;
}

LoadState getState();

enum LoadState {
INITIALIZING_ENVIRONMENT,

DOWNLOADING_DEPENDENCIES,

LOADING_PROCESSOR_CODE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,7 @@ protected Collection<ValidationResult> computeValidationErrors(final ValidationC
if (component instanceof final AsyncLoadedProcessor asyncLoadedProcessor) {
if (!asyncLoadedProcessor.isLoaded()) {
final String explanation = switch (asyncLoadedProcessor.getState()) {
case INITIALIZING_ENVIRONMENT -> "Initializing runtime environment for the Processor.";
case DEPENDENCY_DOWNLOAD_FAILED -> "Failed to download one or more Processor dependencies. See logs for additional details.";
case DOWNLOADING_DEPENDENCIES -> "In the process of downloading third-party dependencies required by the Processor.";
case LOADING_PROCESSOR_CODE -> "In the process of loading Processor code";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.nifi.python.PythonBridge;
import org.apache.nifi.python.PythonBridgeInitializationContext;
import org.apache.nifi.python.PythonProcessorDetails;
import org.apache.nifi.python.processor.PythonProcessorBridge;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -97,7 +96,7 @@ public void discoverExtensions() {
}

@Override
public PythonProcessorBridge createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) {
public AsyncLoadedProcessor createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(classLoader)) {
return delegate.createProcessor(identifier, type, version, preferIsolatedProcess);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,6 @@
*/
package org.apache.nifi.controller;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
Expand Down Expand Up @@ -77,8 +67,6 @@
import org.apache.nifi.processor.StandardProcessorInitializationContext;
import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.python.PythonBridge;
import org.apache.nifi.python.processor.PythonProcessorBridge;
import org.apache.nifi.python.processor.PythonProcessorInitializationContext;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.FlowRegistryClientInitializationContext;
import org.apache.nifi.registry.flow.FlowRegistryClientNode;
Expand All @@ -94,6 +82,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class ExtensionBuilder {
private static final Logger logger = LoggerFactory.getLogger(ExtensionBuilder.class);

Expand Down Expand Up @@ -750,9 +749,9 @@ private LoggableComponent<Processor> createLoggableProcessor(final LoggingContex

final Processor processor = processorComponent.getComponent();

final ProcessorInitializationContext initiContext = new StandardProcessorInitializationContext(identifier, processorComponent.getLogger(),
final ProcessorInitializationContext initContext = new StandardProcessorInitializationContext(identifier, processorComponent.getLogger(),
serviceProvider, nodeTypeProvider, kerberosConfig);
processor.initialize(initiContext);
processor.initialize(initContext);

final Bundle bundle = extensionManager.getBundle(bundleCoordinate);
verifyControllerServiceReferences(processor, bundle.getClassLoader());
Expand Down Expand Up @@ -867,24 +866,14 @@ private LoggableComponent<Processor> createLoggablePythonProcessor() {

// TODO: This is a hack because there's a bug in the UI that causes it to not load extensions that don't have a `.` in the type.
final String processorType = type.startsWith("python.") ? type.substring("python.".length()) : type;
final PythonProcessorBridge processorBridge = pythonBridge.createProcessor(identifier, processorType, bundleCoordinate.getVersion(), true);
final Processor processor = processorBridge.getProcessorProxy();
final Processor processor = pythonBridge.createProcessor(identifier, processorType, bundleCoordinate.getVersion(), true);

final ComponentLog componentLog = new SimpleProcessLogger(identifier, processor, new StandardLoggingContext(null));
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLog);

final PythonProcessorInitializationContext initContext = new PythonProcessorInitializationContext() {
@Override
public String getIdentifier() {
return identifier;
}

@Override
public ComponentLog getLogger() {
return terminationAwareLogger;
}
};
processorBridge.initialize(initContext);
final ProcessorInitializationContext initContext = new StandardProcessorInitializationContext(identifier, terminationAwareLogger,
serviceProvider, nodeTypeProvider, kerberosConfig);
processor.initialize(initContext);

return new LoggableComponent<>(processor, bundleCoordinate, terminationAwareLogger);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,6 @@
*/
package org.apache.nifi.nar;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.authentication.LoginIdentityProvider;
import org.apache.nifi.authorization.AccessPolicyProvider;
Expand All @@ -63,23 +40,43 @@
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.init.ConfigurableComponentInitializer;
import org.apache.nifi.init.ConfigurableComponentInitializerFactory;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.mock.MockComponentLogger;
import org.apache.nifi.nar.ExtensionDefinition.ExtensionRuntime;
import org.apache.nifi.parameter.ParameterProvider;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.python.PythonBridge;
import org.apache.nifi.python.PythonProcessorDetails;
import org.apache.nifi.python.processor.PythonProcessorBridge;
import org.apache.nifi.python.processor.PythonProcessorInitializationContext;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

/**
* Scans through the classpath to load all FlowFileProcessors, FlowFileComparators, and ReportingTasks using the service provider API and running through all classloaders (root, NARs).
*
Expand Down Expand Up @@ -711,23 +708,7 @@ public synchronized ConfigurableComponent getTempComponent(final String classTyp
final String type = classType.startsWith(PYTHON_TYPE_PREFIX) ? classType.substring(PYTHON_TYPE_PREFIX.length()) : classType;

final String procId = "temp-component-" + type;
final PythonProcessorBridge processorBridge = pythonBridge.createProcessor(procId, type, bundleCoordinate.getVersion(), false);
tempComponent = processorBridge.getProcessorProxy();

final ComponentLog componentLog = new MockComponentLogger();
final PythonProcessorInitializationContext initContext = new PythonProcessorInitializationContext() {
@Override
public String getIdentifier() {
return procId;
}

@Override
public ComponentLog getLogger() {
return componentLog;
}
};

processorBridge.initialize(initContext);
tempComponent = pythonBridge.createProcessor(procId, type, bundleCoordinate.getVersion(), false);
} else {
final Class<?> componentClass = Class.forName(classType, true, bundleClassLoader);
tempComponent = (ConfigurableComponent) componentClass.getDeclaredConstructor().newInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@

package org.apache.nifi.py4j;

import org.apache.nifi.components.AsyncLoadedProcessor;
import org.apache.nifi.python.BoundObjectCounts;
import org.apache.nifi.python.ControllerServiceTypeLookup;
import org.apache.nifi.python.PythonBridge;
import org.apache.nifi.python.PythonBridgeInitializationContext;
import org.apache.nifi.python.PythonController;
import org.apache.nifi.python.PythonProcessConfig;
import org.apache.nifi.python.PythonProcessorDetails;
import org.apache.nifi.python.processor.FlowFileTransform;
import org.apache.nifi.python.processor.FlowFileTransformProxy;
import org.apache.nifi.python.processor.PythonProcessorAdapter;
import org.apache.nifi.python.processor.PythonProcessorBridge;
import org.apache.nifi.python.processor.RecordTransform;
import org.apache.nifi.python.processor.RecordTransformProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -38,6 +43,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class StandardPythonBridge implements PythonBridge {
Expand Down Expand Up @@ -89,8 +95,7 @@ public void discoverExtensions() {
controllerProcess.getController().discoverExtensions(extensionsDirs, workDirPath);
}

@Override
public PythonProcessorBridge createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) {
private PythonProcessorBridge createProcessorBridge(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) {
ensureStarted();

logger.debug("Creating Python Processor of type {}", type);
Expand Down Expand Up @@ -127,6 +132,25 @@ public PythonProcessorAdapter createProcessor() {
return processorBridge;
}

@Override
public AsyncLoadedProcessor createProcessor(final String identifier, final String type, final String version, final boolean preferIsolatedProcess) {
final PythonProcessorDetails processorDetails = getProcessorTypes().stream()
.filter(details -> details.getProcessorType().equals(type))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unknown Python Processor type: " + type));

final String implementedInterface = processorDetails.getInterface();
final Supplier<PythonProcessorBridge> processorBridgeFactory = () -> createProcessorBridge(identifier, type, version, preferIsolatedProcess);

if (FlowFileTransform.class.getName().equals(implementedInterface)) {
return new FlowFileTransformProxy(type, processorBridgeFactory);
}
if (RecordTransform.class.getName().equals(implementedInterface)) {
return new RecordTransformProxy(type, processorBridgeFactory);
}
return null;
}

@Override
public synchronized void onProcessorRemoved(final String identifier, final String type, final String version) {
final ExtensionId extensionId = new ExtensionId(type, version);
Expand Down
Loading

0 comments on commit 754baf0

Please sign in to comment.