Skip to content

Commit

Permalink
NIFI-10700:
Browse files Browse the repository at this point in the history
- Only restart ports if their scheduledState is Running.
- Adding unit tests.
- Fixing unit test to properly verify Port isn't restarted.

Signed-off-by: Nathan Gough <[email protected]>

This closes apache#6582.
  • Loading branch information
mcgilman authored and thenatog committed Oct 26, 2022
1 parent d390a0b commit b05bd98
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2252,7 +2252,7 @@ public void synchronize(final Port port, final VersionedPort proposed, final Pro
final Set<Connectable> toRestart = new HashSet<>();
if (port != null) {
final boolean stopped = stopOrTerminate(port, timeout, synchronizationOptions);
if (stopped && proposed != null) {
if (stopped && proposed != null && proposed.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) {
toRestart.add(port);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -345,6 +346,22 @@ public void testRunningProcessorRestarted() throws FlowSynchronizationException,
verify(componentScheduler, atLeast(1)).startComponent(any(Connectable.class));
}

@Test
public void testStartingProcessorRestarted() throws FlowSynchronizationException, TimeoutException, InterruptedException {
final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor();
versionedProcessor.setScheduledState(ScheduledState.RUNNING);

when(processorA.isRunning()).thenReturn(false);
when(processorA.getPhysicalScheduledState()).thenReturn(org.apache.nifi.controller.ScheduledState.STARTING);
when(group.stopProcessor(processorA)).thenReturn(CompletableFuture.completedFuture(null));

synchronizer.synchronize(processorA, versionedProcessor, group, synchronizationOptions);

verify(group, times(1)).stopProcessor(processorA);
verify(processorA).setProperties(versionedProcessor.getProperties(), true, Collections.emptySet());
verify(componentScheduler, atLeast(1)).startComponent(any(Connectable.class));
}

@Test
public void testTimeoutWhenProcessorDoesNotStop() {
final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor();
Expand Down Expand Up @@ -586,9 +603,27 @@ public void testPortStarted() throws FlowSynchronizationException, InterruptedEx
public void testPortRestarted() throws FlowSynchronizationException, InterruptedException, TimeoutException {
final VersionedPort versionedInputPort = createMinimalVersionedPort(ComponentType.INPUT_PORT);
versionedInputPort.setScheduledState(ScheduledState.RUNNING);

when(inputPort.isRunning()).thenReturn(true);

synchronizer.synchronize(inputPort, versionedInputPort, group, synchronizationOptions);

verify(componentScheduler, atLeast(1)).transitionComponentState(inputPort, ScheduledState.RUNNING);
verify(componentScheduler, times(1)).startComponent(inputPort);
verify(inputPort).setName("Input");
}

@Test
public void testStoppedPortNotRestarted() throws FlowSynchronizationException, InterruptedException, TimeoutException {
final VersionedPort versionedInputPort = createMinimalVersionedPort(ComponentType.INPUT_PORT);
versionedInputPort.setScheduledState(ScheduledState.ENABLED);

when(inputPort.isRunning()).thenReturn(true);

synchronizer.synchronize(inputPort, versionedInputPort, group, synchronizationOptions);

verify(componentScheduler, times(1)).transitionComponentState(inputPort, ScheduledState.ENABLED);
verify(componentScheduler, never()).startComponent(inputPort);
verify(inputPort).setName("Input");
}

Expand Down

0 comments on commit b05bd98

Please sign in to comment.