diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java
index fa3de665f0402..9825781dfdfb3 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.core.fs.local;
import java.io.IOException;
@@ -25,9 +24,7 @@
import org.apache.flink.core.fs.BlockLocation;
/**
- * Implementation of the {@link BlockLocation} interface for a
- * local file system.
- *
+ * Implementation of the {@link BlockLocation} interface for a local file system.
*/
@Internal
public class LocalBlockLocation implements BlockLocation {
@@ -41,30 +38,23 @@ public LocalBlockLocation(final String host, final long length) {
this.length = length;
}
-
@Override
public String[] getHosts() throws IOException {
-
return this.hosts;
}
-
@Override
public long getLength() {
-
return this.length;
}
-
@Override
public long getOffset() {
return 0;
}
-
@Override
public int compareTo(final BlockLocation o) {
return 0;
}
-
}
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
index 3e127ff16ee58..63e999da3cbef 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
@@ -29,7 +29,6 @@
/**
* The class LocalFileStatus
provides an implementation of the {@link FileStatus} interface
* for the local file system.
- *
*/
@Internal
public class LocalFileStatus implements FileStatus {
@@ -57,48 +56,41 @@ public LocalFileStatus(final File f, final FileSystem fs) {
this.path = new Path(fs.getUri().getScheme() + ":" + f.toURI().getPath());
}
-
@Override
public long getAccessTime() {
return 0; // We don't have access files for local files
}
-
@Override
public long getBlockSize() {
return this.file.length();
}
-
@Override
public long getLen() {
return this.file.length();
}
-
@Override
public long getModificationTime() {
return this.file.lastModified();
}
-
@Override
public short getReplication() {
return 1; // For local files replication is always 1
}
-
@Override
public boolean isDir() {
return this.file.isDirectory();
}
-
@Override
public Path getPath() {
return this.path;
}
-
+
public File getFile() {
return this.file;
}
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index 7ad68b35d7a5d..acbf814da5611 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -25,16 +25,7 @@
package org.apache.flink.core.fs.local;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.URI;
-import java.net.UnknownHostException;
-
import org.apache.flink.annotation.Internal;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
@@ -43,63 +34,66 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.OperatingSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+
/**
- * The class LocalFile
provides an implementation of the {@link FileSystem} interface for the local file
- * system.
- *
+ * The class LocalFile
provides an implementation of the {@link FileSystem} interface
+ * for the local file system of the machine where the JVM runs.
*/
@Internal
public class LocalFileSystem extends FileSystem {
- /**
- * Path pointing to the current working directory.
- */
- private Path workingDir = null;
+ private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystem.class);
- /**
- * The URI representing the local file system.
- */
- private final URI name = OperatingSystem.isWindows() ? URI.create("file:/") : URI.create("file:///");
+ /** The URI representing the local file system. */
+ private static final URI uri = OperatingSystem.isWindows() ? URI.create("file:/") : URI.create("file:///");
- /**
- * The host name of this machine;
- */
- private final String hostName;
+ /** Path pointing to the current working directory.
+ * Because Paths are not immutable, we cannot cache the proper path here */
+ private final String workingDir;
- private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystem.class);
+ /** Path pointing to the current working directory.
+ * Because Paths are not immutable, we cannot cache the proper path here */
+ private final String homeDir;
+
+ /** The host name of this machine */
+ private final String hostName;
/**
* Constructs a new LocalFileSystem
object.
*/
public LocalFileSystem() {
- this.workingDir = new Path(System.getProperty("user.dir")).makeQualified(this);
+ this.workingDir = new Path(System.getProperty("user.dir")).makeQualified(this).toString();
+ this.homeDir = new Path(System.getProperty("user.home")).toString();
String tmp = "unknownHost";
-
try {
tmp = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
LOG.error("Could not resolve local host", e);
}
-
this.hostName = tmp;
}
+ // ------------------------------------------------------------------------
@Override
- public BlockLocation[] getFileBlockLocations(final FileStatus file, final long start, final long len)
- throws IOException {
-
- final BlockLocation[] blockLocations = new BlockLocation[1];
- blockLocations[0] = new LocalBlockLocation(this.hostName, file.getLen());
-
- return blockLocations;
+ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
+ return new BlockLocation[] {
+ new LocalBlockLocation(hostName, file.getLen())
+ };
}
-
@Override
public FileStatus getFileStatus(Path f) throws IOException {
-
final File path = pathToFile(f);
if (path.exists()) {
return new LocalFileStatus(pathToFile(f), this);
@@ -110,40 +104,35 @@ public FileStatus getFileStatus(Path f) throws IOException {
}
}
-
@Override
public URI getUri() {
- return name;
+ return uri;
}
-
@Override
public Path getWorkingDirectory() {
- return workingDir;
+ return new Path(workingDir);
}
@Override
public Path getHomeDirectory() {
- return new Path(System.getProperty("user.home"));
+ return new Path(homeDir);
}
@Override
- public void initialize(final URI name) throws IOException { }
-
+ public void initialize(final URI name) throws IOException {}
@Override
public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
return open(f);
}
-
@Override
public FSDataInputStream open(final Path f) throws IOException {
final File file = pathToFile(f);
return new LocalDataInputStream(file);
}
-
private File pathToFile(Path path) {
if (!path.isAbsolute()) {
path = new Path(getWorkingDirectory(), path);
@@ -208,7 +197,6 @@ public boolean delete(final Path f, final boolean recursive) throws IOException
private boolean delete(final File f) throws IOException {
if (f.isDirectory()) {
-
final File[] files = f.listFiles();
for (File file : files) {
final boolean del = delete(file);
@@ -216,7 +204,6 @@ private boolean delete(final File f) throws IOException {
return false;
}
}
-
} else {
return f.delete();
}
@@ -234,7 +221,6 @@ private boolean delete(final File f) throws IOException {
* thrown if an error occurred while creating the directory/directories
*/
public boolean mkdirs(final Path f) throws IOException {
-
final File p2f = pathToFile(f);
if(p2f.isDirectory()) {
@@ -266,14 +252,12 @@ public FSDataOutputStream create(final Path f, final boolean overwrite, final in
@Override
public FSDataOutputStream create(final Path f, final boolean overwrite) throws IOException {
-
return create(f, overwrite, 0, (short) 0, 0);
}
@Override
public boolean rename(final Path src, final Path dst) throws IOException {
-
final File srcFile = pathToFile(src);
final File dstFile = pathToFile(dst);