diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java index 22f5e76d81ea..c540c4cda4cc 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java @@ -235,7 +235,7 @@ private void setupEnvironment() throws IOException { private void installDebugPy() throws IOException { final String pythonCommand = processConfig.getPythonCommand(); - final ProcessBuilder processBuilder = new ProcessBuilder(pythonCommand, "-m", "pip", "install", "--upgrade", "debugpy", "--target", + final ProcessBuilder processBuilder = new ProcessBuilder(pythonCommand, "-m", "pip", "install", "--no-cache-dir", "--upgrade", "debugpy", "--target", virtualEnvHome.getAbsolutePath()); processBuilder.directory(virtualEnvHome); diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java index ba9d95fe0188..56b61a772723 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/StandardPythonProcessorBridge.java @@ -27,6 +27,7 @@ import java.io.File; import java.util.Optional; +import java.util.concurrent.TimeUnit; import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState; @@ -63,31 +64,69 @@ public void initialize(final PythonProcessorInitializationContext context) { this.initializationContext = context; final String threadName = "Initialize Python Processor %s (%s)".formatted(initializationContext.getIdentifier(), getProcessorType()); - Thread.ofVirtual().name(threadName).start(this::initializePythonSide); + Thread.ofVirtual().name(threadName).start(() -> initializePythonSide(true)); } public LoadState getLoadState() { return loadState; } - private void initializePythonSide() { - try { - creationWorkflow.downloadDependencies(); - loadState = LoadState.LOADING_PROCESSOR_CODE; - } catch (final Exception e) { - loadState = LoadState.DEPENDENCY_DOWNLOAD_FAILED; - throw e; + private void initializePythonSide(final boolean continualRetry) { + long sleepMillis = 1_000L; + while (true) { + loadState = LoadState.DOWNLOADING_DEPENDENCIES; + + try { + creationWorkflow.downloadDependencies(); + logger.info("Successfully downloaded dependencies for Python Processor {} ({})", initializationContext.getIdentifier(), getProcessorType()); + break; + } catch (final Exception e) { + loadState = LoadState.DEPENDENCY_DOWNLOAD_FAILED; + if (!continualRetry) { + throw e; + } + + sleepMillis = Math.min(sleepMillis * 2, TimeUnit.MINUTES.toMillis(10)); + logger.error("Failed to download dependencies for Python Processor {} ({}). Will try again in {} millis", initializationContext.getIdentifier(), getProcessorType(), sleepMillis); + + try { + Thread.sleep(sleepMillis); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + e.addSuppressed(ex); + throw e; + } + } } - final PythonProcessorAdapter pythonProcessorAdapter; - try { - pythonProcessorAdapter = creationWorkflow.createProcessor(); - pythonProcessorAdapter.initialize(initializationContext); - this.adapter = pythonProcessorAdapter; - loadState = LoadState.FINISHED_LOADING; - } catch (final Exception e) { - loadState = LoadState.LOADING_PROCESSOR_CODE_FAILED; - throw e; + while (true) { + loadState = LoadState.LOADING_PROCESSOR_CODE; + + try { + final PythonProcessorAdapter pythonProcessorAdapter = creationWorkflow.createProcessor(); + pythonProcessorAdapter.initialize(initializationContext); + this.adapter = pythonProcessorAdapter; + loadState = LoadState.FINISHED_LOADING; + logger.info("Successfully loaded Python Processor {} ({})", initializationContext.getIdentifier(), getProcessorType()); + break; + } catch (final Exception e) { + loadState = LoadState.LOADING_PROCESSOR_CODE_FAILED; + + if (!continualRetry) { + throw e; + } + + sleepMillis = Math.min(sleepMillis * 2, TimeUnit.MINUTES.toMillis(10)); + logger.error("Failed to load code for Python Processor {} ({}). Will try again in {} millis", initializationContext.getIdentifier(), getProcessorType(), sleepMillis); + + try { + Thread.sleep(sleepMillis); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + e.addSuppressed(ex); + throw e; + } + } } } @@ -104,7 +143,7 @@ public boolean reload() { } controller.reloadProcessor(getProcessorType(), processorDetails.getProcessorVersion(), workingDir.getAbsolutePath()); - initializePythonSide(); + initializePythonSide(false); lastModified = moduleFile.lastModified(); return true; diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionManager.py b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionManager.py index 70ca140e49cd..81e641bfa1ad 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionManager.py +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/ExtensionManager.py @@ -485,7 +485,7 @@ def import_external_dependencies(self, processor_details, work_dir): package_dir = os.path.dirname(processor_details.source_location) requirements_file = os.path.join(package_dir, 'requirements.txt') if os.path.exists(requirements_file): - args = [python_cmd, '-m', 'pip', 'install', '--target', target_dir, '-r', requirements_file] + args = [python_cmd, '-m', 'pip', 'install', '--no-cache-dir', '--target', target_dir, '-r', requirements_file] logger.info(f"Importing dependencies from requirements file for package {package_dir} to {target_dir} using command {args}") result = subprocess.run(args) @@ -498,7 +498,7 @@ def import_external_dependencies(self, processor_details, work_dir): dependencies = processor_details.getDependencies() if len(dependencies) > 0: python_cmd = os.getenv("PYTHON_CMD") - args = [python_cmd, '-m', 'pip', 'install', '--target', target_dir] + args = [python_cmd, '-m', 'pip', 'install', '--no-cache-dir', '--target', target_dir] for dep in dependencies: args.append(dep) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java index 168dfe59acb0..73e448f7882c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java @@ -130,7 +130,7 @@ "Replacement Value Strategy" = "Literal Value" A single additional property is added to the Processor. The name of the property is a RecordPath identifying the field to update. - The value is an Expression Language expression that references the `field.name` variable. For example, to change the date/time format of \ + The value is an Expression Language expression that references the `field.value` variable. For example, to change the date/time format of \ a field named `txDate` from `year-month-day` format to `month/day/year` format, we add a property named `/txDate` with a value of \ `${field.value:toDate('yyyy-MM-dd'):format('MM/dd/yyyy')}`. We could also change the timezone of a timestamp field (and insert the timezone for clarity) by using a value of \ `${field.value:toDate('yyyy-MM-dd HH:mm:ss', 'UTC-0400'):format('yyyy-MM-dd HH:mm:ss Z', 'UTC')}`. diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/python/PythonProcessorIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/python/PythonProcessorIT.java index f3452f99744b..244ee9f7d833 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/python/PythonProcessorIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/python/PythonProcessorIT.java @@ -33,7 +33,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -155,7 +154,7 @@ public void testRecordTransform() throws NiFiClientException, IOException, Inter final String firstRecordLine = lines[1]; final List firstRecordValues = Stream.of(firstRecordLine.split(",")) .map(String::trim) - .collect(Collectors.toList()); + .toList(); assertEquals("Jane Doe", firstRecordValues.get( headerIndices.get("name") )); assertEquals("yellow", firstRecordValues.get( headerIndices.get("color") )); assertEquals("3", firstRecordValues.get( headerIndices.get("age") )); @@ -165,7 +164,7 @@ public void testRecordTransform() throws NiFiClientException, IOException, Inter final String secondRecordLine = lines[2]; final List secondRecordValues = Stream.of(secondRecordLine.split(",")) .map(String::trim) - .collect(Collectors.toList()); + .toList(); assertEquals("Jake Doe", secondRecordValues.get( headerIndices.get("name") )); assertEquals("yellow", secondRecordValues.get( headerIndices.get("color") )); assertEquals("3", secondRecordValues.get( headerIndices.get("age") ));