Skip to content

Commit

Permalink
[hotfix] [core] Minor code cleanups in 'LocalFileSystem'.
Browse files Browse the repository at this point in the history
This makes members final where possible and avoids repeated access to the system properties.
This commit also brings the formatting style closer to the style of the other Flink classes.
  • Loading branch information
StephanEwen committed Dec 14, 2016
1 parent 7ecf6c8 commit 3feea13
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* limitations under the License.
*/


package org.apache.flink.core.fs.local;

import java.io.IOException;
Expand All @@ -25,9 +24,7 @@
import org.apache.flink.core.fs.BlockLocation;

/**
* Implementation of the {@link BlockLocation} interface for a
* local file system.
*
* Implementation of the {@link BlockLocation} interface for a local file system.
*/
@Internal
public class LocalBlockLocation implements BlockLocation {
Expand All @@ -41,30 +38,23 @@ public LocalBlockLocation(final String host, final long length) {
this.length = length;
}


@Override
public String[] getHosts() throws IOException {

return this.hosts;
}


@Override
public long getLength() {

return this.length;
}


@Override
public long getOffset() {
return 0;
}


@Override
public int compareTo(final BlockLocation o) {
return 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
/**
* The class <code>LocalFileStatus</code> provides an implementation of the {@link FileStatus} interface
* for the local file system.
*
*/
@Internal
public class LocalFileStatus implements FileStatus {
Expand Down Expand Up @@ -57,48 +56,41 @@ public LocalFileStatus(final File f, final FileSystem fs) {
this.path = new Path(fs.getUri().getScheme() + ":" + f.toURI().getPath());
}


@Override
public long getAccessTime() {
return 0; // We don't have access files for local files
}


@Override
public long getBlockSize() {
return this.file.length();
}


@Override
public long getLen() {
return this.file.length();
}


@Override
public long getModificationTime() {
return this.file.lastModified();
}


@Override
public short getReplication() {
return 1; // For local files replication is always 1
}


@Override
public boolean isDir() {
return this.file.isDirectory();
}


@Override
public Path getPath() {
return this.path;
}

public File getFile() {
return this.file;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,7 @@

package org.apache.flink.core.fs.local;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;

import org.apache.flink.annotation.Internal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
Expand All @@ -43,63 +34,66 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.OperatingSystem;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;

/**
* The class <code>LocalFile</code> provides an implementation of the {@link FileSystem} interface for the local file
* system.
*
* The class <code>LocalFile</code> provides an implementation of the {@link FileSystem} interface
* for the local file system of the machine where the JVM runs.
*/
@Internal
public class LocalFileSystem extends FileSystem {

/**
* Path pointing to the current working directory.
*/
private Path workingDir = null;
private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystem.class);

/**
* The URI representing the local file system.
*/
private final URI name = OperatingSystem.isWindows() ? URI.create("file:/") : URI.create("file:///");
/** The URI representing the local file system. */
private static final URI uri = OperatingSystem.isWindows() ? URI.create("file:/") : URI.create("file:///");

/**
* The host name of this machine;
*/
private final String hostName;
/** Path pointing to the current working directory.
* Because Paths are not immutable, we cannot cache the proper path here */
private final String workingDir;

private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystem.class);
/** Path pointing to the current working directory.
* Because Paths are not immutable, we cannot cache the proper path here */
private final String homeDir;

/** The host name of this machine */
private final String hostName;

/**
* Constructs a new <code>LocalFileSystem</code> object.
*/
public LocalFileSystem() {
this.workingDir = new Path(System.getProperty("user.dir")).makeQualified(this);
this.workingDir = new Path(System.getProperty("user.dir")).makeQualified(this).toString();
this.homeDir = new Path(System.getProperty("user.home")).toString();

String tmp = "unknownHost";

try {
tmp = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
LOG.error("Could not resolve local host", e);
}

this.hostName = tmp;
}

// ------------------------------------------------------------------------

@Override
public BlockLocation[] getFileBlockLocations(final FileStatus file, final long start, final long len)
throws IOException {

final BlockLocation[] blockLocations = new BlockLocation[1];
blockLocations[0] = new LocalBlockLocation(this.hostName, file.getLen());

return blockLocations;
public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
return new BlockLocation[] {
new LocalBlockLocation(hostName, file.getLen())
};
}


@Override
public FileStatus getFileStatus(Path f) throws IOException {

final File path = pathToFile(f);
if (path.exists()) {
return new LocalFileStatus(pathToFile(f), this);
Expand All @@ -110,40 +104,35 @@ public FileStatus getFileStatus(Path f) throws IOException {
}
}


@Override
public URI getUri() {
return name;
return uri;
}


@Override
public Path getWorkingDirectory() {
return workingDir;
return new Path(workingDir);
}

@Override
public Path getHomeDirectory() {
return new Path(System.getProperty("user.home"));
return new Path(homeDir);
}

@Override
public void initialize(final URI name) throws IOException { }

public void initialize(final URI name) throws IOException {}

@Override
public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
return open(f);
}


@Override
public FSDataInputStream open(final Path f) throws IOException {
final File file = pathToFile(f);
return new LocalDataInputStream(file);
}


private File pathToFile(Path path) {
if (!path.isAbsolute()) {
path = new Path(getWorkingDirectory(), path);
Expand Down Expand Up @@ -208,15 +197,13 @@ public boolean delete(final Path f, final boolean recursive) throws IOException
private boolean delete(final File f) throws IOException {

if (f.isDirectory()) {

final File[] files = f.listFiles();
for (File file : files) {
final boolean del = delete(file);
if (!del) {
return false;
}
}

} else {
return f.delete();
}
Expand All @@ -234,7 +221,6 @@ private boolean delete(final File f) throws IOException {
* thrown if an error occurred while creating the directory/directories
*/
public boolean mkdirs(final Path f) throws IOException {

final File p2f = pathToFile(f);

if(p2f.isDirectory()) {
Expand Down Expand Up @@ -266,14 +252,12 @@ public FSDataOutputStream create(final Path f, final boolean overwrite, final in

@Override
public FSDataOutputStream create(final Path f, final boolean overwrite) throws IOException {

return create(f, overwrite, 0, (short) 0, 0);
}


@Override
public boolean rename(final Path src, final Path dst) throws IOException {

final File srcFile = pathToFile(src);
final File dstFile = pathToFile(dst);

Expand Down

0 comments on commit 3feea13

Please sign in to comment.