Skip to content

Commit

Permalink
[FLINK-3702] FieldAccessor refactor to static factory
Browse files Browse the repository at this point in the history
  • Loading branch information
mbalassi committed Nov 24, 2016
1 parent 1f04542 commit 870e219
Show file tree
Hide file tree
Showing 26 changed files with 650 additions and 596 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
import org.apache.flink.api.java.typeutils.FieldAccessor;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -121,23 +120,6 @@ public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
}
}

@Override
@PublicEvolving
public <F> FieldAccessor<T, F> getFieldAccessor(int pos, ExecutionConfig config) {
return new FieldAccessor.ArrayFieldAccessor<>(pos, this);
}

@Override
@PublicEvolving
public <F> FieldAccessor<T, F> getFieldAccessor(String pos, ExecutionConfig config) {
try {
return new FieldAccessor.ArrayFieldAccessor<>(Integer.parseInt(pos), this);
} catch (NumberFormatException ex) {
throw new InvalidFieldReferenceException
("A field expression on an array must be an integer index (that might be given as a string).");
}
}

@Override
public boolean equals(Object obj) {
if (obj instanceof BasicArrayTypeInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.apache.flink.api.common.typeutils.base.StringComparator;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.typeutils.FieldAccessor;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -172,31 +171,6 @@ public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionC
}
}

@Override
@PublicEvolving
@SuppressWarnings("unchecked")
public <F> FieldAccessor<T, F> getFieldAccessor(int pos, ExecutionConfig config) {
if(pos != 0) {
throw new InvalidFieldReferenceException("The " + ((Integer) pos).toString() + ". field selected on a " +
"basic type (" + this.toString() + "). A field expression on a basic type can only select " +
"the 0th field (which means selecting the entire basic type).");
}
return (FieldAccessor<T, F>) new FieldAccessor.SimpleFieldAccessor<T>(this);
}

@Override
@PublicEvolving
public <F> FieldAccessor<T, F> getFieldAccessor(String field, ExecutionConfig config) {
try {
int pos = field.equals("*") ? 0 : Integer.parseInt(field);
return getFieldAccessor(pos, config);
} catch (NumberFormatException ex) {
throw new InvalidFieldReferenceException("You tried to select the field \"" + field +
"\" on a " + this.toString() + ". A field expression on a basic type can only be \"*\" or \"0\"" +
" (both of which mean selecting the entire basic type).");
}
}

// --------------------------------------------------------------------------------------------

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.flink.api.common.typeutils.base.array.PrimitiveArrayComparator;
import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArrayComparator;
import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySerializer;
import org.apache.flink.api.java.typeutils.FieldAccessor;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -139,23 +138,6 @@ public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
return this.serializer;
}

@Override
@PublicEvolving
public <F> FieldAccessor<T, F> getFieldAccessor(int pos, ExecutionConfig config) {
return new FieldAccessor.ArrayFieldAccessor<>(pos, this);
}

@Override
@PublicEvolving
public <F> FieldAccessor<T, F> getFieldAccessor(String pos, ExecutionConfig config) {
try {
return new FieldAccessor.ArrayFieldAccessor<>(Integer.parseInt(pos), this);
} catch (NumberFormatException ex) {
throw new InvalidFieldReferenceException
("A field expression on an array must be an integer index (that might be given as a string).");
}
}

/**
* Gets the class that represents the component type.
* @return The class of the component type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.FieldAccessor;
import org.apache.flink.api.java.typeutils.TypeExtractor;

import java.io.Serializable;
Expand Down Expand Up @@ -173,39 +172,6 @@ public boolean isSortKeyType() {
@PublicEvolving
public abstract TypeSerializer<T> createSerializer(ExecutionConfig config);


/**
* Creates a {@link FieldAccessor} for the given field position, which can be used to get and set
* the specified field on instances of this type.
*
* @param pos The field position (zero-based)
* @param config Configuration object
* @param <F> The type of the field to access
* @return The created FieldAccessor
*/
@PublicEvolving
public <F> FieldAccessor<T, F> getFieldAccessor(int pos, ExecutionConfig config){
throw new InvalidFieldReferenceException("Cannot reference field by position on " + this.toString()
+ "Referencing a field by position is supported on tuples, case classes, and arrays. "
+ "Additionally, you can select the 0th field of a primitive/basic type (e.g. int).");
}

/**
* Creates a {@link FieldAccessor} for the field that is given by a field expression,
* which can be used to get and set the specified field on instances of this type.
*
* @param field The field expression
* @param config Configuration object
* @param <F> The type of the field to access
* @return The created FieldAccessor
*/
@PublicEvolving
public <F> FieldAccessor<T, F> getFieldAccessor(String field, ExecutionConfig config) {
throw new InvalidFieldReferenceException("Cannot reference field by field expression on " + this.toString()
+ "Field expressions are only supported on POJO types, tuples, and case classes. "
+ "(See the Flink documentation on what is considered a POJO.)");
}

@Override
public abstract String toString();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,16 @@ public boolean hasDeterministicFieldOrder() {
@PublicEvolving
public abstract int getFieldIndex(String fieldName);

@PublicEvolving
public static class InvalidFieldReferenceException extends IllegalArgumentException {

private static final long serialVersionUID = 1L;

public InvalidFieldReferenceException(String s) {
super(s);
}
}

@Override
public boolean equals(Object obj) {
if (obj instanceof CompositeType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
Expand Down Expand Up @@ -133,7 +132,7 @@ public boolean isSortKeyType() {
// gives only some undefined order.
return false;
}


@Override
@PublicEvolving
Expand Down Expand Up @@ -319,39 +318,7 @@ public TypeSerializer<T> createSerializer(ExecutionConfig config) {

return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config);
}

@Override
@PublicEvolving
public <F> FieldAccessor<T, F> getFieldAccessor(String fieldExpression, ExecutionConfig config) {

FieldAccessor.FieldExpression decomp = FieldAccessor.decomposeFieldExpression(fieldExpression);

// get field
PojoField field = null;
TypeInformation<?> fieldType = null;
for (int i = 0; i < fields.length; i++) {
if (fields[i].getField().getName().equals(decomp.head)) {
field = fields[i];
fieldType = fields[i].getTypeInformation();
break;
}
}
if (field == null) {
throw new InvalidFieldReferenceException("Unable to find field \""+decomp.head+"\" in type "+this+".");
}

if(decomp.tail == null) {
@SuppressWarnings("unchecked")
FieldAccessor<F,F> innerAccessor = new FieldAccessor.SimpleFieldAccessor<F>((TypeInformation<F>) fieldType);
return new FieldAccessor.PojoFieldAccessor<T, F, F>(field.getField(), innerAccessor);
} else {
@SuppressWarnings("unchecked")
FieldAccessor<Object,F> innerAccessor =
(FieldAccessor<Object,F>)fieldType.<F>getFieldAccessor(decomp.tail, config);
return new FieldAccessor.PojoFieldAccessor<T, Object, F>(field.getField(), innerAccessor);
}
}


@Override
public boolean equals(Object obj) {
if (obj instanceof PojoTypeInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;

Expand Down Expand Up @@ -206,34 +203,7 @@ public <X> TypeInformation<X> getTypeAt(int pos) {
TypeInformation<X> typed = (TypeInformation<X>) this.types[pos];
return typed;
}

@Override
@PublicEvolving
public <F> FieldAccessor<T, F> getFieldAccessor(int pos, ExecutionConfig config) {
return new FieldAccessor.SimpleTupleFieldAccessor<T, F>(pos, this);
}

@Override
@PublicEvolving
public <F> FieldAccessor<T, F> getFieldAccessor(String fieldExpression, ExecutionConfig config) {
FieldAccessor.FieldExpression decomp = FieldAccessor.decomposeFieldExpression(fieldExpression);
int fieldPos = this.getFieldIndex(decomp.head);
if (fieldPos == -1) {
try {
fieldPos = Integer.parseInt(decomp.head);
} catch (NumberFormatException ex) {
throw new InvalidFieldReferenceException("Tried to select field \"" + decomp.head
+ "\" on " + this.toString());
}
}
if (decomp.tail == null) {
return new FieldAccessor.SimpleTupleFieldAccessor<T, F>(fieldPos, this);
} else {
FieldAccessor<?, F> innerAccessor = getTypeAt(fieldPos).getFieldAccessor(decomp.tail, config);
return new FieldAccessor.RecursiveTupleFieldAccessor<>(fieldPos, innerAccessor);
}
}


@Override
public boolean equals(Object obj) {
if (obj instanceof TupleTypeInfoBase) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException;
import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
Expand Down Expand Up @@ -167,7 +166,7 @@ public void testFailTupleIndexOutOfBounds() {
.sortLocalOutput(5, Order.DESCENDING);
}

@Test(expected = InvalidFieldReferenceException.class)
@Test(expected = CompositeType.InvalidFieldReferenceException.class)
public void testFailTupleInv() {

final ExecutionEnvironment env = ExecutionEnvironment
Expand Down Expand Up @@ -285,7 +284,7 @@ public void testFailPojoIdx() {
.sortLocalOutput(1, Order.DESCENDING);
}

@Test(expected = InvalidFieldReferenceException.class)
@Test(expected = CompositeType.InvalidFieldReferenceException.class)
public void testFailPojoInvalidField() {

final ExecutionEnvironment env = ExecutionEnvironment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
Expand Down Expand Up @@ -136,7 +135,7 @@ public void testFullOuter7() {
.with(new DummyJoin());
}

@Test(expected = InvalidFieldReferenceException.class)
@Test(expected = CompositeType.InvalidFieldReferenceException.class)
public void testFullOuter8() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
Expand Down Expand Up @@ -137,7 +136,7 @@ public void testLeftOuter7() {
.with(new DummyJoin());
}

@Test(expected = InvalidFieldReferenceException.class)
@Test(expected = CompositeType.InvalidFieldReferenceException.class)
public void testLeftOuter8() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
Expand Down Expand Up @@ -136,7 +135,7 @@ public void testRightOuter7() {
.with(new DummyJoin());
}

@Test(expected = InvalidFieldReferenceException.class)
@Test(expected = CompositeType.InvalidFieldReferenceException.class)
public void testRightOuter8() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
Expand Down
Loading

0 comments on commit 870e219

Please sign in to comment.