Skip to content

Commit

Permalink
[FLINK-5332] [core] Synchronize FileSystem::initOutPathLocalFS() to p…
Browse files Browse the repository at this point in the history
…revent lost files when called concurrently.
  • Loading branch information
StephanEwen committed Dec 14, 2016
1 parent 790153c commit 2f3ad58
Show file tree
Hide file tree
Showing 5 changed files with 436 additions and 176 deletions.
268 changes: 148 additions & 120 deletions flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -86,6 +87,12 @@ public enum WriteMode {

private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);

/** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and
* {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races */
private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true);

// ------------------------------------------------------------------------

/**
* Create a SafetyNetCloseableRegistry for a Task. This method should be called at the beginning of the task's
* main thread.
Expand Down Expand Up @@ -537,72 +544,96 @@ public abstract FSDataOutputStream create(Path f, boolean overwrite, int bufferS
/**
* Initializes output directories on local file systems according to the given write mode.
*
* WriteMode.NO_OVERWRITE & parallel output:
* - A directory is created if the output path does not exist.
* - An existing directory is reused, files contained in the directory are NOT deleted.
* - An existing file raises an exception.
* <ul>
* <li>WriteMode.NO_OVERWRITE &amp; parallel output:
* <ul>
* <li>A directory is created if the output path does not exist.</li>
* <li>An existing directory is reused, files contained in the directory are NOT deleted.</li>
* <li>An existing file raises an exception.</li>
* </ul>
* </li>
*
* WriteMode.NO_OVERWRITE &amp; NONE parallel output:
* - An existing file or directory raises an exception.
* <li>WriteMode.NO_OVERWRITE &amp; NONE parallel output:
* <ul>
* <li>An existing file or directory raises an exception.</li>
* </ul>
* </li>
*
* WriteMode.OVERWRITE &amp; parallel output:
* - A directory is created if the output path does not exist.
* - An existing directory is reused, files contained in the directory are NOT deleted.
* - An existing file is deleted and replaced by a new directory.
*
* WriteMode.OVERWRITE &amp; NONE parallel output:
* - An existing file or directory (and all its content) is deleted
* <li>WriteMode.OVERWRITE &amp; parallel output:
* <ul>
* <li>A directory is created if the output path does not exist.</li>
* <li>An existing directory is reused, files contained in the directory are NOT deleted.</li>
* <li>An existing file is deleted and replaced by a new directory.</li>
* </ul>
* </li>
*
* Files contained in an existing directory are not deleted, because multiple instances of a
* <li>WriteMode.OVERWRITE &amp; NONE parallel output:
* <ul>
* <li>An existing file or directory (and all its content) is deleted</li>
* </ul>
* </li>
* </ul>
*
* <p>Files contained in an existing directory are not deleted, because multiple instances of a
* DataSinkTask might call this function at the same time and hence might perform concurrent
* delete operations on the file system (possibly deleting output files of concurrently running tasks).
* Since concurrent DataSinkTasks are not aware of each other, coordination of delete and create
* operations would be difficult.
*
* @param outPath Output path that should be prepared.
* @param writeMode Write mode to consider.
* @param createDirectory True, to initialize a directory at the given path, false otherwise.
* @param createDirectory True, to initialize a directory at the given path, false to prepare space for a file.
*
* @return True, if the path was successfully prepared, false otherwise.
* @throws IOException
* @throws IOException Thrown, if any of the file system access operations failed.
*/
public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
if (this.isDistributedFS()) {
if (isDistributedFS()) {
return false;
}

// NOTE: we sometimes see this code block fail due to a races when changes to the file system take small time fractions before being
// visible to other threads. for example:
// - the check whether the directory exists returns false
// - the call to create the directory fails (some concurrent thread is creating the directory, locked)
// - the call to check whether the directory exists does not yet see the new directory (change is not committed)
// NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that
// concurrently work in this method (multiple output formats writing locally) might end
// up deleting each other's directories and leave non-retrievable files, without necessarily
// causing an exception. That results in very subtle issues, like output files looking as if
// they are not getting created.

// try for 30 seconds
final long now = System.currentTimeMillis();
final long deadline = now + 30000;
// we acquire the lock interruptibly here, to make sure that concurrent threads waiting
// here can cancel faster
try {
OUTPUT_DIRECTORY_INIT_LOCK.lockInterruptibly();
}
catch (InterruptedException e) {
// restore the interruption state
Thread.currentThread().interrupt();

Exception lastError = null;
// leave the method - we don't have the lock anyways
throw new IOException("The thread was interrupted while trying to initialize the output directory");
}

do {
FileStatus status = null;
try {
FileStatus status;
try {
status = getFileStatus(outPath);
}
catch (FileNotFoundException e) {
// okay, the file is not there
status = null;
}

// check if path exists
if (status != null) {
// path exists, check write mode
switch (writeMode) {

case NO_OVERWRITE:
if (status.isDir() && createDirectory) {
return true;
} else {
// file may not be overwritten
throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " +
WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() +
" mode to overwrite existing files and directories.");
throw new IOException("File or directory already exists. Existing files and directories " +
"are not overwritten in " + WriteMode.NO_OVERWRITE.name() + " mode. Use " +
WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories.");
}

case OVERWRITE:
Expand All @@ -612,86 +643,54 @@ public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean cre
return true;
} else {
// we will write in a single file, delete directory
// (there is also no other thread trying to delete the directory, since there is only one writer).
try {
this.delete(outPath, true);
delete(outPath, true);
}
catch (IOException e) {
// due to races in some file systems, it may spuriously occur that a deleted the file looks
// as if it still exists and is gone a millisecond later, once the change is committed
// we ignore the exception, possibly fall through the loop later
lastError = e;
throw new IOException("Could not remove existing directory '" + outPath +
"' to allow overwrite by result file", e);
}
}
}
else {
// delete file
try {
this.delete(outPath, false);
delete(outPath, false);
}
catch (IOException e) {
// Some other thread might already have deleted the file.
// If - for some other reason - the file could not be deleted,
// the error will be handled later.
lastError = e;
throw new IOException("Could not remove existing file '" + outPath +
"' to allow overwrite by result file/directory", e);
}
}
break;

default:
throw new IllegalArgumentException("Invalid write mode: " + writeMode);
}
}

if (createDirectory) {
// Output directory needs to be created

try {
if (!this.exists(outPath)) {
this.mkdirs(outPath);
}
}
catch (IOException e) {
// Some other thread might already have created the directory concurrently.
lastError = e;
if (!exists(outPath)) {
mkdirs(outPath);
}

// double check that the output directory exists
try {
FileStatus check = getFileStatus(outPath);
if (check != null) {
if (check.isDir()) {
return true;
}
else {
lastError = new IOException("FileSystem should create an output directory, but the path points to a file instead.");
}
}
// fall through the loop
return getFileStatus(outPath).isDir();
}
catch (FileNotFoundException e) {
// fall though the loop
return false;
}

}
else {
// check that the output path does not exist and an output file can be created by the output format.
return !this.exists(outPath);
}

// small delay to allow changes to make progress
try {
Thread.sleep(10);
}
catch (InterruptedException e) {
throw new IOException("Thread was interrupted");
// check that the output path does not exist and an output file
// can be created by the output format.
return !exists(outPath);
}
}
while (System.currentTimeMillis() < deadline);

if (lastError != null) {
throw new IOException("File system failed to prepare output path " + outPath + " with write mode " + writeMode.name(), lastError);
} else {
return false;
finally {
OUTPUT_DIRECTORY_INIT_LOCK.unlock();
}
}

Expand All @@ -716,58 +715,87 @@ public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean cre
* @param outPath Output path that should be prepared.
* @param writeMode Write mode to consider.
* @param createDirectory True, to initialize a directory at the given path, false otherwise.
*
* @return True, if the path was successfully prepared, false otherwise.
* @throws IOException
*
* @throws IOException Thrown, if any of the file system access operations failed.
*/
public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException {
if (!this.isDistributedFS()) {
if (!isDistributedFS()) {
return false;
}

// check if path exists
if (this.exists(outPath)) {
// path exists, check write mode
switch(writeMode) {
case NO_OVERWRITE:
// file or directory may not be overwritten
throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " +
WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() +
" mode to overwrite existing files and directories.");
case OVERWRITE:
// output path exists. We delete it and all contained files in case of a directory.
try {
this.delete(outPath, true);
} catch(IOException ioe) {
// Some other thread might already have deleted the path.
// If - for some other reason - the path could not be deleted,
// this will be handled later.
}
break;
default:
throw new IllegalArgumentException("Invalid write mode: "+writeMode);
}
// NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that
// concurrently work in this method (multiple output formats writing locally) might end
// up deleting each other's directories and leave non-retrievable files, without necessarily
// causing an exception. That results in very subtle issues, like output files looking as if
// they are not getting created.

// we acquire the lock interruptibly here, to make sure that concurrent threads waiting
// here can cancel faster
try {
OUTPUT_DIRECTORY_INIT_LOCK.lockInterruptibly();
}
catch (InterruptedException e) {
// restore the interruption state
Thread.currentThread().interrupt();

if (createDirectory) {
// Output directory needs to be created
try {
if (!this.exists(outPath)) {
this.mkdirs(outPath);
// leave the method - we don't have the lock anyways
throw new IOException("The thread was interrupted while trying to initialize the output directory");
}

try {
// check if path exists
if (exists(outPath)) {
// path exists, check write mode
switch(writeMode) {

case NO_OVERWRITE:
// file or directory may not be overwritten
throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " +
WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() +
" mode to overwrite existing files and directories.");

case OVERWRITE:
// output path exists. We delete it and all contained files in case of a directory.
try {
delete(outPath, true);
} catch (IOException e) {
// Some other thread might already have deleted the path.
// If - for some other reason - the path could not be deleted,
// this will be handled later.
}
break;

default:
throw new IllegalArgumentException("Invalid write mode: "+writeMode);
}
} catch(IOException ioe) {
// Some other thread might already have created the directory.
// If - for some other reason - the directory could not be created
// and the path does not exist, this will be handled later.
}

// double check that the output directory exists
return this.exists(outPath) && this.getFileStatus(outPath).isDir();
} else {

// check that the output path does not exist and an output file can be created by the output format.
return !this.exists(outPath);

if (createDirectory) {
// Output directory needs to be created
try {
if (!exists(outPath)) {
mkdirs(outPath);
}
} catch (IOException ioe) {
// Some other thread might already have created the directory.
// If - for some other reason - the directory could not be created
// and the path does not exist, this will be handled later.
}

// double check that the output directory exists
return exists(outPath) && getFileStatus(outPath).isDir();
}
else {
// single file case: check that the output path does not exist and
// an output file can be created by the output format.
return !exists(outPath);
}
}
finally {
OUTPUT_DIRECTORY_INIT_LOCK.unlock();
}

}

// ------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 2f3ad58

Please sign in to comment.