Skip to content

Commit

Permalink
NIFI-8458 TailFile - Fix some bugs. Add more tests.
Browse files Browse the repository at this point in the history
- During cleanup keep "tailingPostRollover" in the updated state.
- Skipping tests that can't run on Windows.

Signed-off-by: Mark Payne <[email protected]>
  • Loading branch information
tpalfy authored and markap14 committed Apr 27, 2021
1 parent 90c7d03 commit 16dc61e
Show file tree
Hide file tree
Showing 4 changed files with 556 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,8 @@ public void cleanup() {
for (TailFileObject tfo : states.values()) {
cleanReader(tfo);
final TailFileState state = tfo.getState();
tfo.setState(new TailFileState(state.getFilename(), state.getFile(), null, state.getPosition(), state.getTimestamp(), state.getLength(), state.getChecksum(), state.getBuffer()));
tfo.setState(new TailFileState(state.getFilename(), state.getFile(), null, state.getPosition(),
state.getTimestamp(), state.getLength(), state.getChecksum(), state.getBuffer(), state.isTailingPostRollover()));
}
}

Expand Down Expand Up @@ -824,24 +825,6 @@ public void process(final OutputStream rawOut) throws IOException {
}
});

if (abort.get() != null) {
session.remove(flowFile);
final long newPosition = positionHolder.get();
try {
reader.position(newPosition);
} catch (IOException ex) {
getLogger().warn("Couldn't reposition the reader for {} due to {}", new Object[]{ file, ex }, ex);
try {
reader.close();
} catch (IOException ex2) {
getLogger().warn("Failed to close reader for {} due to {}", new Object[]{ file, ex2 }, ex2);
}
reader = null;
}
tfo.setState(new TailFileState(tailFile, file, reader, newPosition, timestamp, length, checksum, state.getBuffer()));
throw abort.get();
}

// If there ended up being no data, just remove the FlowFile
if (flowFile.getSize() == 0) {
session.remove(flowFile);
Expand Down Expand Up @@ -882,6 +865,22 @@ public void process(final OutputStream rawOut) throws IOException {
tfo.setState(new TailFileState(tailFile, file, reader, position, timestamp, length, checksum, state.getBuffer()));

persistState(tfo, session, context);

if (abort.get() != null) {
final long newPosition = positionHolder.get();
try {
reader.position(newPosition);
} catch (IOException ex) {
getLogger().warn("Couldn't reposition the reader for {} due to {}", new Object[]{ file, ex }, ex);
try {
reader.close();
} catch (IOException ex2) {
getLogger().warn("Failed to close reader for {} due to {}", new Object[]{ file, ex2 }, ex2);
}
}

throw abort.get();
}
}

private long readLines(final FileChannel reader, final ByteBuffer buffer, final OutputStream out, final Checksum checksum, Boolean reReadOnNul) throws IOException {
Expand Down Expand Up @@ -1208,8 +1207,8 @@ private boolean recoverRolledFiles(final ProcessContext context, final ProcessSe
final boolean tailFirstFile;
if (rolloverOccurred) {
final File firstFile = rolledOffFiles.get(0);
final long millisSinceModified = System.currentTimeMillis() - firstFile.lastModified();
final boolean fileGrew = firstFile.length() >= position && position > 0;
final long millisSinceModified = getCurrentTimeMs() - firstFile.lastModified();
final boolean fileGrew = firstFile.length() >= position;
final boolean tailRolledFile = postRolloverTailMillis == 0 || millisSinceModified < postRolloverTailMillis;
tailFirstFile = fileGrew && tailRolledFile && expectedChecksum != null;
} else {
Expand Down Expand Up @@ -1244,7 +1243,7 @@ private boolean recoverRolledFiles(final ProcessContext context, final ProcessSe
// If we don't notice that the file has been modified, per the checks above, then we want to keep checking until the last modified
// date has eclipsed the configured value for the Post-Rollover Tail Period. Until then, return false. Once that occurs, we will
// consume the rest of the data, including the last line, even if it doesn't have a line ending.
final long millisSinceModified = System.currentTimeMillis() - newestFile.lastModified();
final long millisSinceModified = getCurrentTimeMs() - newestFile.lastModified();
if (millisSinceModified < postRolloverTailMillis) {
getLogger().debug("Rolled over file {} (size={}, lastModified={}) was modified {} millis ago, which isn't long enough to consume file fully without taking line endings into " +
"account. Will do nothing will file for now.", newestFile, newestFile.length(), newestFile.lastModified(), millisSinceModified);
Expand Down Expand Up @@ -1343,7 +1342,7 @@ private boolean tailRolledFile(final ProcessContext context, final ProcessSessio
// updated values.
// But if we are not going to tail the rolled over file for any period of time, we can essentially reset the state.
final long postRolloverTailMillis = context.getProperty(POST_ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
final long millisSinceUpdate = System.currentTimeMillis() - timestamp;
final long millisSinceUpdate = getCurrentTimeMs() - timestamp;
if (tailingPostRollover && postRolloverTailMillis > 0) {
getLogger().debug("File {} has been rolled over, but it was updated {} millis ago, which is less than the configured {} ({} ms), so will continue tailing",
fileToTail, millisSinceUpdate, POST_ROLLOVER_TAIL_PERIOD.getDisplayName(), postRolloverTailMillis);
Expand Down Expand Up @@ -1410,6 +1409,10 @@ private TailFileState consumeFileFully(final File file, final ProcessContext con
return tfo.getState();
}

public long getCurrentTimeMs() {
return System.currentTimeMillis();
}

static class TailFileObject {

private TailFileState state;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;

import org.apache.commons.lang3.SystemUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class AbstractTestTailFileScenario {
public static final String TEST_DIRECTORY = "testTailFileScenario";
public static final String TARGET_FILE_PATH = "target/" + TEST_DIRECTORY + "/in.txt";
public static final String NUL_SUBSTITUTE = "X";
public static final Long POST_ROLLOVER_WAIT_PERSIOD_SECONDS = 100L;

protected File file;
protected RandomAccessFile randomAccessFile;

private TailFile processor;
protected TestRunner runner;

private AtomicBoolean stopAfterEachTrigger;

protected AtomicLong wordIndex;
protected AtomicLong rolloverIndex;
protected AtomicLong timeAdjustment;
protected AtomicBoolean rolloverSwitchPending;

protected LinkedList<Long> nulPositions;

protected List<String> expected;
private Random random;

@Before
public void setUp() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard", "TRACE");

clean();

File directory = new File("target/" + TEST_DIRECTORY);
if (!directory.exists()) {
assertTrue(directory.mkdirs());
}

createTargetFile();
randomAccessFile = new RandomAccessFile(file, "rw");

processor = new TailFile() {
@Override
public long getCurrentTimeMs() {
return super.getCurrentTimeMs() + timeAdjustment.get();
}
};
runner = TestRunners.newTestRunner(processor);
runner.setProperty(TailFile.FILENAME, TARGET_FILE_PATH);
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "in.txt*");
runner.setProperty(TailFile.REREAD_ON_NUL, "true");
runner.setProperty(TailFile.POST_ROLLOVER_TAIL_PERIOD, POST_ROLLOVER_WAIT_PERSIOD_SECONDS + " sec");
runner.assertValid();

runner.run(1, false, true);

stopAfterEachTrigger = new AtomicBoolean(false);

nulPositions = new LinkedList<>();
wordIndex = new AtomicLong(1);
rolloverIndex = new AtomicLong(1);
timeAdjustment = new AtomicLong(0);
rolloverSwitchPending = new AtomicBoolean(false);

expected = new ArrayList<>();

random = new Random();
}

@After
public void tearDown() throws IOException {
if (randomAccessFile != null) {
randomAccessFile.close();
}

processor.cleanup();
}

public void testScenario(List<Action> actions) throws Exception {
testScenario(actions, false);

tearDown();
setUp();

testScenario(actions, true);
}

public void testScenario(List<Action> actions, boolean stopAfterEachTrigger) throws Exception {
if (actions.contains(Action.ROLLOVER)) {
Assume.assumeTrue("Test wants to rename an open file which is not allowed on Windows", !SystemUtils.IS_OS_WINDOWS);
}

// GIVEN
this.stopAfterEachTrigger.set(stopAfterEachTrigger);

// WHEN
for (Action action : actions) {
action.run(this);
}
overwriteRemainingNuls();
Action.WRITE_NEW_LINE.run(this);
Action.TRIGGER.run(this);
Action.EXPIRE_ROLLOVER_WAIT_PERIOD.run(this);
Action.TRIGGER.run(this);
Action.TRIGGER.run(this);

// THEN
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS);
List<String> actual = flowFiles.stream()
.map(MockFlowFile::toByteArray)
.map(String::new)
.collect(Collectors.toList());

assertEquals(
stopAfterEachTrigger + " " + actions.toString(),
expected.stream().collect(Collectors.joining()),
actual.stream().collect(Collectors.joining())
);
}

private void clean() {
cleanFiles("target/" + TEST_DIRECTORY);
}

private void cleanFiles(String directory) {
final File targetDir = new File(directory);

if (targetDir.exists()) {
for (final File file : targetDir.listFiles()) {
file.delete();
}
}
}

private void createTargetFile() throws IOException {
file = new File(TARGET_FILE_PATH);
file.delete();
assertTrue(file.createNewFile());
}

private void overwriteRemainingNuls() throws Exception {
while (!nulPositions.isEmpty()) {
Action.OVERWRITE_NUL.run(this);
}
}

private void writeWord() throws IOException {
String word = "-word_" + wordIndex.getAndIncrement() + "-";

randomAccessFile.write(word.getBytes());

expected.add(word);
}

private void writeNewLine() throws IOException {
randomAccessFile.write("\n".getBytes());

expected.add("\n");
}

private void writeNul() throws IOException {
nulPositions.add(randomAccessFile.getFilePointer());

randomAccessFile.write("\0".getBytes());

expected.add(NUL_SUBSTITUTE);
}

private void overwriteNul() throws IOException {
if (!nulPositions.isEmpty()) {
Long nulPosition = nulPositions.remove(random.nextInt(nulPositions.size()));

long currentPosition = randomAccessFile.getFilePointer();
randomAccessFile.seek(nulPosition);
randomAccessFile.write(NUL_SUBSTITUTE.getBytes());
randomAccessFile.seek(currentPosition);
}
}

private void trigger() {
runner.run(1, stopAfterEachTrigger.get(), false);
}

private void rollover() throws IOException {
File rolledOverFile = new File(file.getParentFile(), file.getName() + "." + rolloverIndex.getAndIncrement());
file.renameTo(rolledOverFile);

createTargetFile();

rolloverSwitchPending.set(true);
}

private void switchFile() throws Exception {
if (rolloverSwitchPending.get()) {
overwriteRemainingNuls();

randomAccessFile.close();
randomAccessFile = new RandomAccessFile(file, "rw");

rolloverSwitchPending.set(false);
}
}

private void expireRolloverWaitPeriod() throws Exception {
long waitPeriod = POST_ROLLOVER_WAIT_PERSIOD_SECONDS * 1000 + 100;
timeAdjustment.set(timeAdjustment.get() + waitPeriod);
}

protected enum Action {
WRITE_WORD(AbstractTestTailFileScenario::writeWord),
WRITE_NEW_LINE(AbstractTestTailFileScenario::writeNewLine),
WRITE_NUL(AbstractTestTailFileScenario::writeNul),
OVERWRITE_NUL(AbstractTestTailFileScenario::overwriteNul),
TRIGGER(AbstractTestTailFileScenario::trigger),
ROLLOVER(AbstractTestTailFileScenario::rollover),
SWITCH_FILE(AbstractTestTailFileScenario::switchFile),
EXPIRE_ROLLOVER_WAIT_PERIOD(AbstractTestTailFileScenario::expireRolloverWaitPeriod);

private final ActionRunner actionRunner;

Action(ActionRunner actionRunner) {
this.actionRunner = actionRunner;
}

void run(AbstractTestTailFileScenario currentTest) throws Exception {
actionRunner.runAction(currentTest);
}
}

private interface ActionRunner {
void runAction(AbstractTestTailFileScenario currentTest) throws Exception;
}
}
Loading

0 comments on commit 16dc61e

Please sign in to comment.