Skip to content

Commit

Permalink
NIFI-12326: When a Python Processor is created, it attempts to downlo…
Browse files Browse the repository at this point in the history
…ad dependencies in the background and then load the processor code. If that fails, it previously gave up; now it will log the exception and keep trying

This closes apache#7990

Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
markap14 authored and exceptionfactory committed Nov 7, 2023
1 parent 940c927 commit a978406
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private void setupEnvironment() throws IOException {
private void installDebugPy() throws IOException {
final String pythonCommand = processConfig.getPythonCommand();

final ProcessBuilder processBuilder = new ProcessBuilder(pythonCommand, "-m", "pip", "install", "--upgrade", "debugpy", "--target",
final ProcessBuilder processBuilder = new ProcessBuilder(pythonCommand, "-m", "pip", "install", "--no-cache-dir", "--upgrade", "debugpy", "--target",
virtualEnvHome.getAbsolutePath());
processBuilder.directory(virtualEnvHome);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.io.File;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState;

Expand Down Expand Up @@ -63,31 +64,69 @@ public void initialize(final PythonProcessorInitializationContext context) {
this.initializationContext = context;

final String threadName = "Initialize Python Processor %s (%s)".formatted(initializationContext.getIdentifier(), getProcessorType());
Thread.ofVirtual().name(threadName).start(this::initializePythonSide);
Thread.ofVirtual().name(threadName).start(() -> initializePythonSide(true));
}

public LoadState getLoadState() {
return loadState;
}

private void initializePythonSide() {
try {
creationWorkflow.downloadDependencies();
loadState = LoadState.LOADING_PROCESSOR_CODE;
} catch (final Exception e) {
loadState = LoadState.DEPENDENCY_DOWNLOAD_FAILED;
throw e;
private void initializePythonSide(final boolean continualRetry) {
long sleepMillis = 1_000L;
while (true) {
loadState = LoadState.DOWNLOADING_DEPENDENCIES;

try {
creationWorkflow.downloadDependencies();
logger.info("Successfully downloaded dependencies for Python Processor {} ({})", initializationContext.getIdentifier(), getProcessorType());
break;
} catch (final Exception e) {
loadState = LoadState.DEPENDENCY_DOWNLOAD_FAILED;
if (!continualRetry) {
throw e;
}

sleepMillis = Math.min(sleepMillis * 2, TimeUnit.MINUTES.toMillis(10));
logger.error("Failed to download dependencies for Python Processor {} ({}). Will try again in {} millis", initializationContext.getIdentifier(), getProcessorType(), sleepMillis);

try {
Thread.sleep(sleepMillis);
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
e.addSuppressed(ex);
throw e;
}
}
}

final PythonProcessorAdapter pythonProcessorAdapter;
try {
pythonProcessorAdapter = creationWorkflow.createProcessor();
pythonProcessorAdapter.initialize(initializationContext);
this.adapter = pythonProcessorAdapter;
loadState = LoadState.FINISHED_LOADING;
} catch (final Exception e) {
loadState = LoadState.LOADING_PROCESSOR_CODE_FAILED;
throw e;
while (true) {
loadState = LoadState.LOADING_PROCESSOR_CODE;

try {
final PythonProcessorAdapter pythonProcessorAdapter = creationWorkflow.createProcessor();
pythonProcessorAdapter.initialize(initializationContext);
this.adapter = pythonProcessorAdapter;
loadState = LoadState.FINISHED_LOADING;
logger.info("Successfully loaded Python Processor {} ({})", initializationContext.getIdentifier(), getProcessorType());
break;
} catch (final Exception e) {
loadState = LoadState.LOADING_PROCESSOR_CODE_FAILED;

if (!continualRetry) {
throw e;
}

sleepMillis = Math.min(sleepMillis * 2, TimeUnit.MINUTES.toMillis(10));
logger.error("Failed to load code for Python Processor {} ({}). Will try again in {} millis", initializationContext.getIdentifier(), getProcessorType(), sleepMillis);

try {
Thread.sleep(sleepMillis);
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
e.addSuppressed(ex);
throw e;
}
}
}
}

Expand All @@ -104,7 +143,7 @@ public boolean reload() {
}

controller.reloadProcessor(getProcessorType(), processorDetails.getProcessorVersion(), workingDir.getAbsolutePath());
initializePythonSide();
initializePythonSide(false);
lastModified = moduleFile.lastModified();

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ def import_external_dependencies(self, processor_details, work_dir):
package_dir = os.path.dirname(processor_details.source_location)
requirements_file = os.path.join(package_dir, 'requirements.txt')
if os.path.exists(requirements_file):
args = [python_cmd, '-m', 'pip', 'install', '--target', target_dir, '-r', requirements_file]
args = [python_cmd, '-m', 'pip', 'install', '--no-cache-dir', '--target', target_dir, '-r', requirements_file]

logger.info(f"Importing dependencies from requirements file for package {package_dir} to {target_dir} using command {args}")
result = subprocess.run(args)
Expand All @@ -498,7 +498,7 @@ def import_external_dependencies(self, processor_details, work_dir):
dependencies = processor_details.getDependencies()
if len(dependencies) > 0:
python_cmd = os.getenv("PYTHON_CMD")
args = [python_cmd, '-m', 'pip', 'install', '--target', target_dir]
args = [python_cmd, '-m', 'pip', 'install', '--no-cache-dir', '--target', target_dir]
for dep in dependencies:
args.append(dep)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@
"Replacement Value Strategy" = "Literal Value"
A single additional property is added to the Processor. The name of the property is a RecordPath identifying the field to update.
The value is an Expression Language expression that references the `field.name` variable. For example, to change the date/time format of \
The value is an Expression Language expression that references the `field.value` variable. For example, to change the date/time format of \
a field named `txDate` from `year-month-day` format to `month/day/year` format, we add a property named `/txDate` with a value of \
`${field.value:toDate('yyyy-MM-dd'):format('MM/dd/yyyy')}`. We could also change the timezone of a timestamp field (and insert the timezone for clarity) by using a value of \
`${field.value:toDate('yyyy-MM-dd HH:mm:ss', 'UTC-0400'):format('yyyy-MM-dd HH:mm:ss Z', 'UTC')}`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -155,7 +154,7 @@ public void testRecordTransform() throws NiFiClientException, IOException, Inter
final String firstRecordLine = lines[1];
final List<String> firstRecordValues = Stream.of(firstRecordLine.split(","))
.map(String::trim)
.collect(Collectors.toList());
.toList();
assertEquals("Jane Doe", firstRecordValues.get( headerIndices.get("name") ));
assertEquals("yellow", firstRecordValues.get( headerIndices.get("color") ));
assertEquals("3", firstRecordValues.get( headerIndices.get("age") ));
Expand All @@ -165,7 +164,7 @@ public void testRecordTransform() throws NiFiClientException, IOException, Inter
final String secondRecordLine = lines[2];
final List<String> secondRecordValues = Stream.of(secondRecordLine.split(","))
.map(String::trim)
.collect(Collectors.toList());
.toList();
assertEquals("Jake Doe", secondRecordValues.get( headerIndices.get("name") ));
assertEquals("yellow", secondRecordValues.get( headerIndices.get("color") ));
assertEquals("3", secondRecordValues.get( headerIndices.get("age") ));
Expand Down

0 comments on commit a978406

Please sign in to comment.