Skip to content

Commit

Permalink
NIFI-8081 Added new Listing Strategy to ListFTP and ListSFTP: Time Wi…
Browse files Browse the repository at this point in the history
…ndow

NIFI-8081 Added new Listing Strategy to ListFTP and ListSFTP: Adjusted Time Window. User can specify the time zone or time difference (compared to where NiFi runs) of the system hosting the files and based on the calculates the current time there. Lists files modified before this adjusted current time (and after the last listing).
NIFI-8081 'Time Adjustment' validated not to be set if listing strategy is not 'Adjusted Time Window'. Extracted validator to a separate class. Added more tests. Minor refactor. Typo fix.
NIFI-8081 Improved validation.
NIFI-8081 'Time Adjustment' is not necessary - in fact it can cause problems. SFTP (and usually FTP - which has a more general bug at the moment) returns a timestamp that doesn't really need adjustment. (SFTP in particular returns the an 'epoch' time.) Everything remains the same - the new listing strategy relies on a sliding time window, but without the unnecessary option to adjust for the modification time.
NIFI-8081 Resolved conflicts after rebasing to main.
NIFI-8081 Renamed 'AbstractListProcessor.listByAdjustedSlidingTimeWindow' to 'listByTimeWindow'. Post main rebase correction.
NIFI-8081 Updated user doc for the BY_TIME_WINDOW strategy to warn user on it's reliance of accurate time.

This closes apache#4721.

Signed-off-by: Peter Turcsanyi <[email protected]>
  • Loading branch information
tpalfy authored and turcsanyip committed Feb 4, 2021
1 parent c1f88ec commit b55998a
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
Expand Down Expand Up @@ -190,11 +191,13 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
" Since it only tracks few timestamps, it can manage listing state efficiently." +
" However, any newly added, or updated entity having timestamp older than the tracked latest timestamp can not be picked by this strategy." +
" For example, such situation can happen in a file system if a file with old timestamp" +
" is copied or moved into the target directory without its last modified timestamp being updated.");
" is copied or moved into the target directory without its last modified timestamp being updated." +
" Also may miss files when multiple subdirectories are being written at the same time while listing is running.");

public static final AllowableValue BY_ENTITIES = new AllowableValue("entities", "Tracking Entities",
"This strategy tracks information of all the listed entities within the latest 'Entity Tracking Time Window' to determine new/updated entities." +
" This strategy can pick entities having old timestamp that can be missed with 'Tracing Timestamps'." +
" Works even when multiple subdirectories are being written at the same time while listing is running." +
" However additional DistributedMapCache controller service is required and more JVM heap memory is used." +
" See the description of 'Entity Tracking Time Window' property for further details on how it works.");

Expand All @@ -203,7 +206,14 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
" on executing this processor. It is recommended to change the default run schedule value." +
" Any property that related to the persisting state will be disregarded.");

public static final PropertyDescriptor LISTING_STRATEGY = new PropertyDescriptor.Builder()
public static final AllowableValue BY_TIME_WINDOW = new AllowableValue("time-window", "Time Window",
"This strategy uses a sliding time window. The window starts where the previous window ended and ends with the 'current time'." +
" One cycle will list files with modification time falling within the time window." +
" Works even when multiple subdirectories are being written at the same time while listing is running." +
" IMPORTANT: This strategy works properly only if the time on both the system hosting NiFi and the one hosting the files" +
" are accurate.");

public static final PropertyDescriptor LISTING_STRATEGY = new Builder()
.name("listing-strategy")
.displayName("Listing Strategy")
.description("Specify how to determine new/updated entities. See each strategy descriptions for detail.")
Expand Down Expand Up @@ -449,6 +459,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
} else if (NO_TRACKING.equals(listingStrategy)) {
listByNoTracking(context, session);

} else if (BY_TIME_WINDOW.equals(listingStrategy)) {
listByTimeWindow(context, session);

} else {
throw new ProcessException("Unknown listing strategy: " + listingStrategy);
}
Expand Down Expand Up @@ -502,6 +515,119 @@ public void listByNoTracking(final ProcessContext context, final ProcessSession
}
}

public void listByTimeWindow(final ProcessContext context, final ProcessSession session) throws ProcessException {
if (this.lastListedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
try {
final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
Optional.ofNullable(stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY))
.map(Long::parseLong)
.ifPresent(lastTimestamp -> this.lastListedLatestEntryTimestampMillis = lastTimestamp);

justElectedPrimaryNode = false;
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
context.yield();
return;
}
}

long lowerBoundInclusiveTimestamp = Optional.ofNullable(this.lastListedLatestEntryTimestampMillis).orElse(0L);
long upperBoundExclusiveTimestamp;

long currentTime = getCurrentTime();

final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();
try {
List<T> entityList = performListing(context, lowerBoundInclusiveTimestamp);

boolean targetSystemHasMilliseconds = false;
boolean targetSystemHasSeconds = false;
for (final T entity : entityList) {
final long entityTimestampMillis = entity.getTimestamp();
if (!targetSystemHasMilliseconds) {
targetSystemHasMilliseconds = entityTimestampMillis % 1000 > 0;
}
if (!targetSystemHasSeconds) {
targetSystemHasSeconds = entityTimestampMillis % 60_000 > 0;
}
}

// Determine target system time precision.
String specifiedPrecision = context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue();
if (StringUtils.isBlank(specifiedPrecision)) {
// If TARGET_SYSTEM_TIMESTAMP_PRECISION is not supported by the Processor, then specifiedPrecision can be null, instead of its default value.
specifiedPrecision = getDefaultTimePrecision();
}
final TimeUnit targetSystemTimePrecision
= PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision)
? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
: PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? TimeUnit.MILLISECONDS
: PRECISION_SECONDS.getValue().equals(specifiedPrecision) ? TimeUnit.SECONDS : TimeUnit.MINUTES;
final Long listingLagMillis = LISTING_LAG_MILLIS.get(targetSystemTimePrecision);

upperBoundExclusiveTimestamp = currentTime - listingLagMillis;

if (getLogger().isTraceEnabled()) {
getLogger().trace("interval: " + lowerBoundInclusiveTimestamp + " - " + upperBoundExclusiveTimestamp);
getLogger().trace("entityList: " + entityList.stream().map(entity -> entity.getName() + "_" + entity.getTimestamp()).collect(Collectors.joining(", ")));
}
entityList
.stream()
.filter(entity -> entity.getTimestamp() >= lowerBoundInclusiveTimestamp)
.filter(entity -> entity.getTimestamp() < upperBoundExclusiveTimestamp)
.forEach(entity -> orderedEntries
.computeIfAbsent(entity.getTimestamp(), __ -> new ArrayList<>())
.add(entity)
);
if (getLogger().isTraceEnabled()) {
getLogger().trace("orderedEntries: " +
orderedEntries.values().stream()
.flatMap(List::stream)
.map(entity -> entity.getName() + "_" + entity.getTimestamp())
.collect(Collectors.joining(", "))
);
}
} catch (final IOException e) {
getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e);
context.yield();
return;
}

if (orderedEntries.isEmpty()) {
getLogger().debug("There is no data to list. Yielding.");
context.yield();
return;
}

final boolean writerSet = context.getProperty(RECORD_WRITER).isSet();
if (writerSet) {
try {
createRecordsForEntities(context, session, orderedEntries);
} catch (final IOException | SchemaNotFoundException e) {
getLogger().error("Failed to write listing to FlowFile", e);
context.yield();
return;
}
} else {
createFlowFilesForEntities(context, session, orderedEntries);
}

try {
if (getLogger().isTraceEnabled()) {
getLogger().info("this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp: " + this.lastListedLatestEntryTimestampMillis + " = " + upperBoundExclusiveTimestamp);
}
this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp;
persist(upperBoundExclusiveTimestamp, upperBoundExclusiveTimestamp, latestIdentifiersProcessed, session, getStateScope(context));
} catch (final IOException ioe) {
getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
+ "if another node begins executing this Processor, data duplication may occur.", ioe);
}
}

protected long getCurrentTime() {
return System.currentTimeMillis();
}

public void listByTrackingTimestamps(final ProcessContext context, final ProcessSession session) throws ProcessException {
Long minTimestampToListMillis = lastListedLatestEntryTimestampMillis;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("21").build();

final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(LISTING_STRATEGY);
properties.add(FILE_TRANSFER_LISTING_STRATEGY);
properties.add(HOSTNAME);
properties.add(port);
properties.add(USERNAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue(".")
.build();

public static final PropertyDescriptor FILE_TRANSFER_LISTING_STRATEGY = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(LISTING_STRATEGY)
.allowableValues(BY_TIMESTAMPS, BY_ENTITIES, NO_TRACKING, BY_TIME_WINDOW)
.build();

@Override
protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build();

final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(LISTING_STRATEGY);
properties.add(FILE_TRANSFER_LISTING_STRATEGY);
properties.add(HOSTNAME);
properties.add(port);
properties.add(USERNAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public List<FileInfo> getListing() throws IOException {
return listing;
}

private void getListing(final String path, final int depth, final int maxResults, final List<FileInfo> listing) throws IOException {
protected void getListing(final String path, final int depth, final int maxResults, final List<FileInfo> listing) throws IOException {
if (maxResults < 1 || listing.size() >= maxResults) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@

import com.github.stefanbirkner.fakesftpserver.rule.FakeSftpServerRule;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
Expand All @@ -33,6 +43,10 @@
import org.junit.Test;
import org.junit.Rule;
import java.security.SecureRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.junit.Assert.assertEquals;

public class TestListSFTP {
@Rule
Expand Down Expand Up @@ -63,6 +77,106 @@ public void tearDown() throws Exception {
sftpServer.deleteAllFilesAndDirectories();
}

@Test
public void testListingWhileConcurrentlyWritingIntoMultipleDirectories() throws Exception {
AtomicInteger fileCounter = new AtomicInteger(1);

List<String> createdFileNames = new ArrayList<>();

CountDownLatch finishScheduledRun = new CountDownLatch(1);
CountDownLatch reachScanningSubDir = new CountDownLatch(1);
CountDownLatch writeMoreFiles = new CountDownLatch(1);

String baseDir = "/base/";
String subDir = "/base/subdir/";

TestRunner runner = TestRunners.newTestRunner(new ListSFTP() {
@Override
protected FileTransfer getFileTransfer(ProcessContext context) {
return new SFTPTransfer(context, getLogger()){
@Override
protected void getListing(String path, int depth, int maxResults, List<FileInfo> listing) throws IOException {
if (path.contains("subdir")) {
reachScanningSubDir.countDown();
try {
writeMoreFiles.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

super.getListing(path, depth, maxResults, listing);
}
};
}
});

// This test fails with BY_TIMESTAMPS
// runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIMESTAMPS.getValue());
runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIME_WINDOW.getValue());
runner.setProperty(ListSFTP.HOSTNAME, "localhost");
runner.setProperty(ListSFTP.USERNAME, username);
runner.setProperty(SFTPTransfer.PASSWORD, password);
runner.setProperty(FTPTransfer.PORT, Integer.toString(port));
runner.setProperty(ListSFTP.REMOTE_PATH, baseDir);
runner.setProperty(FileTransfer.RECURSIVE_SEARCH, "true");

runner.assertValid();

ExecutorService executorService = null;
try {
executorService = Executors.newFixedThreadPool(1);
sftpServer.createDirectory("/base");

uploadFile(baseDir, fileCounter.getAndIncrement(), createdFileNames);
uploadFile(subDir, "sub." + fileCounter.getAndIncrement(), createdFileNames);

executorService.submit(() -> {
try {
runner.run(1, false);
} finally {
finishScheduledRun.countDown();
}
});

reachScanningSubDir.await();

uploadFile(baseDir, fileCounter.getAndIncrement(), createdFileNames);
Thread.sleep(1100); // Make sure the next file has greater timestamp
uploadFile(subDir, "sub." + fileCounter.getAndIncrement(), createdFileNames);

writeMoreFiles.countDown();

Thread.sleep(1100); // Need to wait for 1+ sec if the file timestamps have only sec precision.
finishScheduledRun.await();
runner.run();

List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);

List<String> successFileNames = successFiles.stream()
.map(MockFlowFile::getAttributes)
.map(attributes -> attributes.get("filename"))
.sorted()
.collect(Collectors.toList());

Collections.sort(createdFileNames);

assertEquals(createdFileNames, successFileNames);
} finally {
if (executorService != null) {
executorService.shutdown();
}
}
}

private void uploadFile(String baseDir, Object fileSuffix, List<String> createdFileNames) throws Exception {
String fileName = "file." + fileSuffix;

sftpServer.putFile(baseDir + fileName, "unimportant", StandardCharsets.UTF_8);

createdFileNames.add(fileName);
}

@Test
public void basicFileList() throws InterruptedException {
TestRunner runner = TestRunners.newTestRunner(ListSFTP.class);
Expand Down

0 comments on commit b55998a

Please sign in to comment.