Skip to content

Commit

Permalink
[misc] Code cleanups and fixes for various compiler warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Feb 3, 2015
1 parent fb7ce0e commit 215cf0c
Show file tree
Hide file tree
Showing 17 changed files with 18 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ public class CliFrontend {
public static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString";
// this has to be a regex for String.split()
public static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@";
private static final String DEFAULT_LOG4J_PATTERN_LAYOUT = "%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n";



private CommandLineParser parser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,10 @@ private void printUsage() {
public static AbstractFlinkYarnClient getFlinkYarnClient() {
AbstractFlinkYarnClient yarnClient = null;
try {
Class<AbstractFlinkYarnClient> yarnClientClass = (Class<AbstractFlinkYarnClient>) Class.forName("org.apache.flink.yarn.FlinkYarnClient");
Class<? extends AbstractFlinkYarnClient> yarnClientClass = Class.forName("org.apache.flink.yarn.FlinkYarnClient").asSubclass(AbstractFlinkYarnClient.class);
yarnClient = InstantiationUtil.instantiate(yarnClientClass, AbstractFlinkYarnClient.class);
} catch (ClassNotFoundException e) {
}
catch (ClassNotFoundException e) {
System.err.println("Unable to locate the Flink YARN Client. Please ensure that you are using a Flink build with Hadoop2/YARN support. Message: "+e.getMessage());
e.printStackTrace(System.err);
return null; // make it obvious
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ public void computeOutputEstimates(DataStatistics statistics) {

public static class UnionSemanticProperties implements SemanticProperties {

private static final long serialVersionUID = 1L;

@Override
public FieldSet getForwardingTargetFields(int input, int sourceField) {
if (input != 0 && input != 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,6 @@ public void reset() {
* @return The filtered RequestedGlobalProperties
*/
public RequestedGlobalProperties filterBySemanticProperties(SemanticProperties props, int input) {
RequestedGlobalProperties returnProps = new RequestedGlobalProperties();

// no semantic properties available. All global properties are filtered.
if (props == null) {
throw new NullPointerException("SemanticProperties may not be null.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ private static <T> TypeSerializerFactory<?> createSerializer(TypeInformation<T>
}
}

@SuppressWarnings("unchecked")
private static <T> TypeComparatorFactory<?> createComparator(TypeInformation<T> typeInfo, FieldList keys, boolean[] sortOrder) {

TypeComparator<T> comparator;
Expand All @@ -294,7 +295,7 @@ private static <T> TypeComparatorFactory<?> createComparator(TypeInformation<T>
}
else if (typeInfo instanceof AtomicType) {
// handle grouping of atomic types
comparator = ((AtomicType) typeInfo).createComparator(sortOrder[0]);
comparator = ((AtomicType<T>) typeInfo).createComparator(sortOrder[0]);
}
else {
throw new RuntimeException("Unrecognized type: " + typeInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,10 @@
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple8;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.compiler.dataproperties.GlobalProperties;
import org.apache.flink.compiler.dataproperties.LocalProperties;
import org.apache.flink.compiler.dataproperties.PartitioningProperty;
Expand All @@ -46,14 +42,9 @@
import org.junit.Assert;
import org.junit.Test;

@SuppressWarnings("serial")
public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase {

private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo =
new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO
);

@Test
public void forwardFieldsTestMapReduce() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.compiler.CompilerTestBase;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plan.SingleInputPlanNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.compiler.dataproperties;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.Order;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.compiler.dataproperties;

import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;

import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,8 @@
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.StringValue;
import org.junit.Assert;
import org.junit.Test;

import java.util.Set;

public class LocalPropertiesFilteringTest {

private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.IOException;

@SuppressWarnings("serial")
public class MockDistribution implements DataDistribution {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@
import org.apache.flink.types.ByteValue;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;

public class RequestedGlobalPropertiesFilteringTest {

Expand All @@ -53,7 +51,7 @@ public void testNullProps() {
RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
rgProps.setAnyPartitioning(new FieldSet(0,1,2));

RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(null, 0);
rgProps.filterBySemanticProperties(null, 0);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.SemanticPropUtil;
Expand All @@ -33,7 +32,6 @@
import org.apache.flink.types.ByteValue;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.junit.Assert;
import org.junit.Test;

public class RequestedLocalPropertiesFilteringTest {
Expand All @@ -50,7 +48,7 @@ public void testNullProps() {
RequestedLocalProperties rlProp = new RequestedLocalProperties();
rlProp.setGroupedFields(new FieldSet(0, 2, 3));

RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(null, 0);
rlProp.filterBySemanticProperties(null, 0);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,6 @@ private boolean isTargetFieldPresent(int targetField, Map<Integer, FieldSet> fie
*/
public void addReadFields(int input, FieldSet readFields) {

FieldSet curReadFields;

if (input != 0 && input != 1) {
throw new IndexOutOfBoundsException();
} else if (input == 0) {
Expand All @@ -175,5 +173,4 @@ public void addReadFields(int input, FieldSet readFields) {
public String toString() {
return "DISP(" + this.fieldMapping1 + "; " + this.fieldMapping2 + ")";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public interface SemanticProperties extends Serializable {

public static class InvalidSemanticAnnotationException extends InvalidProgramException {

private static final long serialVersionUID = 1L;

public InvalidSemanticAnnotationException(String s) {
super(s);
}
Expand All @@ -74,6 +76,8 @@ public InvalidSemanticAnnotationException(String s, Throwable e) {

public static class EmptySemanticProperties implements SemanticProperties {

private static final long serialVersionUID = 1L;

@Override
public FieldSet getForwardingTargetFields(int input, int sourceField) {
return FieldSet.EMPTY_SET;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,13 @@
import org.apache.flink.util.ClassUtils;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An abstract base class for a fairly generic file system. It
* may be implemented as a distributed file system, or as a local
* one that reflects the locally-connected disk.
*/
public abstract class FileSystem {
private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);

private static final String LOCAL_FILESYSTEM_CLASS = "org.apache.flink.core.fs.local.LocalFileSystem";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ public interface HadoopFileSystemWrapper {

/**
* Test whether the HadoopWrapper can wrap the given file system scheme.
* @param scheme
* @return
*
* @param scheme The scheme of the file system.
* @return The class implementing the file system.
*/
public Class<?> getHadoopWrapperClassNameForFileSystem(String scheme);
}

0 comments on commit 215cf0c

Please sign in to comment.