Skip to content

Commit

Permalink
NIFI-8188 - Add 'Run Once' for processors in context menu.
Browse files Browse the repository at this point in the history
Signed-off-by: Mark Payne <[email protected]>
  • Loading branch information
tpalfy authored and markap14 committed Feb 18, 2021
1 parent 6801704 commit aa72604
Show file tree
Hide file tree
Showing 26 changed files with 452 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,7 @@ public enum ScheduledState {

STARTING,

STOPPING;
STOPPING,

RUN_ONCE;
}
1 change: 1 addition & 0 deletions nifi-docs/src/main/asciidoc/user-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ While the options available from the context menu vary, the following options ar
NOTE: For Processors, Ports, Remote Process Groups, Connections and Labels, it is possible to open the configuration dialog by double-clicking on the desired component.

- *Start* or *Stop*: This option allows the user to start or stop a Processor; the option will be either Start or Stop, depending on the current state of the Processor.
- *Run Once*: This option allows the user to run a selected Processor exactly once. If the Processor is prevented from executing (e.g. there are no incoming flow files or the outgoing connection has backpressure applied) the Processor won't get triggered. *Execution* settings apply - i.e. *Primary Node* and *All Nodes* setting will result in running the Processor only once on the Primary Node or one time on each of the nodes, respectively. Works only with *Timer Driven* and *CRON driven* Scheduling Strategy.
- *Enable* or *Disable*: This option allows the user to enable or disable a Processor; the option will be either Enable or Disable, depending on the current state of the Processor.
- *View data provenance*: This option displays the NiFi Data Provenance table, with information about data provenance events for the FlowFiles routed through that Processor (see <<data_provenance>>).
- *View status history*: This option opens a graphical representation of the Processor's statistical information over time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
@XmlType(name = "processorRunStatus")
public class ProcessorRunStatusEntity extends ComponentRunStatusEntity {

private static String[] SUPPORTED_STATE = {"RUNNING", "STOPPED", "DISABLED"};
private static String[] SUPPORTED_STATE = {"RUNNING", "RUN_ONCE", "STOPPED", "DISABLED"};

@Override
protected String[] getSupportedState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,32 @@ public void disable() {
public void start(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis, final long timeoutMillis, final Supplier<ProcessContext> processContextFactory,
final SchedulingAgentCallback schedulingAgentCallback, final boolean failIfStopping) {

run(taskScheduler, administrativeYieldMillis, timeoutMillis, processContextFactory, schedulingAgentCallback, failIfStopping, ScheduledState.RUNNING, ScheduledState.STARTING);
}

/**
* Similar to {@link #start(ScheduledExecutorService, long, long, Supplier, SchedulingAgentCallback, boolean)}, except for the following:
* <ul>
* <li>
* Once the {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)} method has been invoked successfully, the processor is scehduled to be stopped immediately.
* All appropriate lifecycle methods will be executed as well.
* </li>
* <li>
* The processor's desired state is going to be set to STOPPED right away. This usually doesn't prevent the processor to run once, unless NiFi is restarted before it can finish.
* In that case the processor will stay STOPPED after the restart.
* </li>
* </ul>
*/
@Override
public void runOnce(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis, final long timeoutMillis, final Supplier<ProcessContext> processContextFactory,
final SchedulingAgentCallback schedulingAgentCallback) {

run(taskScheduler, administrativeYieldMillis, timeoutMillis, processContextFactory, schedulingAgentCallback, true, ScheduledState.RUN_ONCE, ScheduledState.RUN_ONCE);
}

private void run(ScheduledExecutorService taskScheduler, long administrativeYieldMillis, long timeoutMillis, Supplier<ProcessContext> processContextFactory,
SchedulingAgentCallback schedulingAgentCallback, boolean failIfStopping, ScheduledState desiredSate, ScheduledState scheduledState) {

final Processor processor = processorRef.get().getProcessor();
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
LOG.info("Starting {}", this);
Expand All @@ -1368,12 +1394,12 @@ public void start(final ScheduledExecutorService taskScheduler, final long admin
currentState = this.scheduledState.get();

if (currentState == ScheduledState.STOPPED) {
starting = this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.STARTING);
starting = this.scheduledState.compareAndSet(ScheduledState.STOPPED, scheduledState);
if (starting) {
desiredState = ScheduledState.RUNNING;
desiredState = desiredSate;
}
} else if (currentState == ScheduledState.STOPPING && !failIfStopping) {
desiredState = ScheduledState.RUNNING;
desiredState = desiredSate;
return;
} else {
starting = false;
Expand All @@ -1385,7 +1411,7 @@ public void start(final ScheduledExecutorService taskScheduler, final long admin
} else {
final String procName = processorRef.get().toString();
LOG.warn("Cannot start {} because it is not currently stopped. Current state is {}", procName, currentState);
procLog.warn("Cannot start {} because it is not currently stopped. Current state is {}", new Object[] {procName, currentState});
procLog.warn("Cannot start {} because it is not currently stopped. Current state is {}", new Object[]{procName, currentState});
}
}

Expand Down Expand Up @@ -1474,7 +1500,7 @@ public boolean isTerminated(final Thread thread) {

@Override
public void verifyCanTerminate() {
if (getScheduledState() != ScheduledState.STOPPED) {
if (getScheduledState() != ScheduledState.STOPPED && getScheduledState() != ScheduledState.RUN_ONCE) {
throw new IllegalStateException("Processor is not stopped");
}
}
Expand Down Expand Up @@ -1529,7 +1555,10 @@ private void initiateStart(final ScheduledExecutorService taskScheduler, final l
deactivateThread();
}

if (desiredState == ScheduledState.RUNNING && scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) {
if (
(desiredState == ScheduledState.RUNNING && scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING))
|| (desiredState == ScheduledState.RUN_ONCE && scheduledState.compareAndSet(ScheduledState.RUN_ONCE, ScheduledState.RUN_ONCE))
) {
LOG.debug("Successfully completed the @OnScheduled methods of {}; will now start triggering processor to run", processor);
schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle
} else {
Expand Down Expand Up @@ -1576,7 +1605,7 @@ private void initiateStart(final ScheduledExecutorService taskScheduler, final l
}

// make sure we only continue retry loop if STOP action wasn't initiated
if (scheduledState.get() != ScheduledState.STOPPING) {
if (scheduledState.get() != ScheduledState.STOPPING && scheduledState.get() != ScheduledState.RUN_ONCE) {
// re-initiate the entire process
final Runnable initiateStartTask = () -> initiateStart(taskScheduler, administrativeYieldMillis, timeoutMilis, processContextFactory, schedulingAgentCallback);
taskScheduler.schedule(initiateStartTask, administrativeYieldMillis, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -1605,7 +1634,7 @@ public void run() {
return;
}

monitorAsyncTask(taskFuture, monitoringFuture, completionTimestampRef.get());
monitorAsyncTask(taskFuture, monitoringFuture, completionTimestampRef.get());
}
};

Expand Down Expand Up @@ -1648,7 +1677,8 @@ public CompletableFuture<Void> stop(final ProcessScheduler processScheduler, fin
desiredState = ScheduledState.STOPPED;

final CompletableFuture<Void> future = new CompletableFuture<>();
if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, ScheduledState.STOPPING)) { // will ensure that the Processor represented by this node can only be stopped once
// will ensure that the Processor represented by this node can only be stopped once
if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, ScheduledState.STOPPING) || this.scheduledState.compareAndSet(ScheduledState.RUN_ONCE, ScheduledState.STOPPING)) {
scheduleState.incrementActiveThreadCount(null);

// will continue to monitor active threads, invoking OnStopped once there are no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -1469,6 +1470,31 @@ public Future<Void> startProcessor(final ProcessorNode processor, final boolean
}
}

@Override
public Future<Void> runProcessorOnce(ProcessorNode processor, Callable<Future<Void>> stopCallback) {
readLock.lock();
try {
if (getProcessor(processor.getIdentifier()) == null) {
throw new IllegalStateException("Processor is not a member of this Process Group");
}

final ScheduledState state = processor.getScheduledState();
if (state == ScheduledState.DISABLED) {
throw new IllegalStateException("Processor is disabled");
} else if (state == ScheduledState.RUNNING) {
throw new IllegalStateException("Processor is already running");
}
processor.reloadAdditionalResourcesIfNecessary();

return scheduler.runProcessorOnce(processor, stopCallback);
} catch (Exception e) {
processor.getLogger().error("Error while running processor {} once.", new Object[]{processor}, e);
return stopProcessor(processor);
} finally {
readLock.unlock();
}
}

@Override
public void startInputPort(final Port port) {
readLock.lock();
Expand Down Expand Up @@ -1557,7 +1583,7 @@ public void terminateProcessor(final ProcessorNode processor) {
}

final ScheduledState state = processor.getScheduledState();
if (state != ScheduledState.STOPPED) {
if (state != ScheduledState.STOPPED && state != ScheduledState.RUN_ONCE) {
throw new IllegalStateException("Cannot terminate processor with ID " + processor.getIdentifier() + " because it is not stopped");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@
*/
package org.apache.nifi.controller;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
Expand All @@ -27,12 +34,6 @@
import org.apache.nifi.processor.exception.TerminatedTaskException;
import org.apache.nifi.scheduling.SchedulingStrategy;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

public interface ProcessScheduler {

/**
Expand All @@ -59,6 +60,18 @@ public interface ProcessScheduler {
*/
Future<Void> startProcessor(ProcessorNode procNode, boolean failIfStopping);

/**
* Starts scheduling the given processor to run once, after invoking all methods
* on the underlying {@link org.apache.nifi.processor.Processor FlowFileProcessor} that
* are annotated with the {@link org.apache.nifi.annotation.lifecycle.OnScheduled} annotation. If the Processor
* is already scheduled to run, does nothing.
*
* @param procNode to start
* @param stopCallback The callback that is responsible to handle the stopping of the processor after it has run once.
* @throws IllegalStateException if the Processor is disabled
*/
Future<Void> runProcessorOnce(ProcessorNode procNode, Callable<Future<Void>> stopCallback);

/**
* Stops scheduling the given processor to run and invokes all methods on
* the underlying {@link org.apache.nifi.processor.Processor FlowFileProcessor} that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,30 @@ public ScheduledState getPhysicalScheduledState() {
public abstract void start(ScheduledExecutorService scheduler, long administrativeYieldMillis, long timeoutMillis, Supplier<ProcessContext> processContextFactory,
SchedulingAgentCallback schedulingAgentCallback, boolean failIfStopping);

/**
* Will run the {@link Processor} represented by this
* {@link ProcessorNode} once. This typically means invoking its
* operation that is annotated with @OnScheduled and then executing a
* callback provided by the {@link ProcessScheduler} to which typically
* initiates
* {@link Processor#onTrigger(ProcessContext, org.apache.nifi.processor.ProcessSessionFactory)}
* cycle and schedules the stopping of the processor right away.
* @param scheduler
* implementation of {@link ScheduledExecutorService} used to
* initiate processor <i>start</i> task
* @param administrativeYieldMillis
* the amount of milliseconds to wait for administrative yield
* @param timeoutMillis the number of milliseconds to wait after triggering the Processor's @OnScheduled methods before timing out and considering
* the startup a failure. This will result in the thread being interrupted and trying again.
* @param processContextFactory
* a factory for creating instances of {@link ProcessContext}
* @param schedulingAgentCallback
* the callback provided by the {@link ProcessScheduler} to
* execute upon successful start of the Processor
*/
public abstract void runOnce(ScheduledExecutorService scheduler, long administrativeYieldMillis, long timeoutMillis, Supplier<ProcessContext> processContextFactory,
SchedulingAgentCallback schedulingAgentCallback);

/**
* Will stop the {@link Processor} represented by this {@link ProcessorNode}.
* Stopping processor typically means invoking its operation that is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.nifi.controller.scheduling;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.nifi.connectable.Connectable;
Expand All @@ -25,6 +27,8 @@ public interface SchedulingAgent {

void schedule(Connectable connectable, LifecycleState scheduleState);

void scheduleOnce(Connectable connectable, LifecycleState scheduleState, Callable<Future<Void>> stopCallback);

void unschedule(Connectable connectable, LifecycleState scheduleState);

void onEvent(Connectable connectable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.function.Predicate;

Expand Down Expand Up @@ -205,6 +206,16 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
*/
Future<Void> startProcessor(ProcessorNode processor, boolean failIfStopping);

/**
* Runs the given Processor once and the stops it by calling the provided callback.
*
* @param processor the processor to start
* @param stopCallback the callback responsible for stopping the processor
* @throws IllegalStateException if the processor is not valid, or is
* already running
*/
Future<Void> runProcessorOnce(ProcessorNode processor, Callable<Future<Void>> stopCallback);

/**
* Starts the given Input Port
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -916,9 +916,10 @@ private ProcessGroup updateProcessGroup(final FlowController controller, final P
controller.startProcessor(procNode.getProcessGroupIdentifier(), procNode.getIdentifier(), false);
break;
case STOPPED:
case RUN_ONCE:
if (procState == ScheduledState.DISABLED) {
procNode.getProcessGroup().enableProcessor(procNode);
} else if (procState == ScheduledState.RUNNING) {
} else if (procState == ScheduledState.RUNNING || procState == ScheduledState.RUN_ONCE) {
controller.stopProcessor(procNode.getProcessGroupIdentifier(), procNode.getIdentifier());
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.engine.FlowEngine;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;

/**
* Base implementation of the {@link SchedulingAgent} which encapsulates the
* updates to the {@link LifecycleState} based on invoked operation and then
Expand All @@ -46,6 +49,12 @@ public void schedule(Connectable connectable, LifecycleState scheduleState) {
this.doSchedule(connectable, scheduleState);
}

@Override
public void scheduleOnce(Connectable connectable, LifecycleState scheduleState, Callable<Future<Void>> stopCallback) {
scheduleState.setScheduled(true);
this.doScheduleOnce(connectable, scheduleState, stopCallback);
}

@Override
public void unschedule(Connectable connectable, LifecycleState scheduleState) {
scheduleState.setScheduled(false);
Expand Down Expand Up @@ -75,6 +84,18 @@ public void unschedule(ReportingTaskNode taskNode, LifecycleState scheduleState)
*/
protected abstract void doSchedule(Connectable connectable, LifecycleState scheduleState);

/**
* Schedules the provided {@link Connectable} to run once and then calls the provided stopCallback to stop it.
* Its {@link LifecycleState} will be set to <i>true</i>
*
* @param connectable
* the instance of {@link Connectable}
* @param scheduleState
* the instance of {@link LifecycleState}
* @param stopCallback the callback responsible for stopping connectable after it ran once
*/
protected abstract void doScheduleOnce(Connectable connectable, LifecycleState scheduleState, Callable<Future<Void>> stopCallback);

/**
* Unschedules the provided {@link Connectable}. Its {@link LifecycleState}
* will be set to <i>false</i>
Expand Down
Loading

0 comments on commit aa72604

Please sign in to comment.