Skip to content

Commit

Permalink
Remove redundant windowing information from the BeamRecord itself
Browse files Browse the repository at this point in the history
element window information `BoundedWindow` is added in `BeamSqlExpression`.
  • Loading branch information
mingmxu committed Sep 11, 2017
1 parent 2fc6f24 commit 5b1e4a5
Show file tree
Hide file tree
Showing 73 changed files with 366 additions and 352 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
@Experimental
public class BeamRecordCoder extends CustomCoder<BeamRecord> {
private static final BitSetCoder nullListCoder = BitSetCoder.of();
private static final InstantCoder instantCoder = InstantCoder.of();

private BeamRecordType recordType;
private List<Coder> coderArray;
Expand Down Expand Up @@ -64,9 +63,6 @@ public void encode(BeamRecord value, OutputStream outStream)

coderArray.get(idx).encode(value.getFieldValue(idx), outStream);
}

instantCoder.encode(value.getWindowStart(), outStream);
instantCoder.encode(value.getWindowEnd(), outStream);
}

@Override
Expand All @@ -82,9 +78,6 @@ public BeamRecord decode(InputStream inStream) throws CoderException, IOExceptio
record.addField(idx, coderArray.get(idx).decode(inStream));
}

record.setWindowStart(instantCoder.decode(inStream));
record.setWindowEnd(instantCoder.decode(inStream));

return record;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.joda.time.Instant;

/**
* {@link org.apache.beam.sdk.values.BeamRecord}, self-described with
Expand All @@ -42,9 +38,6 @@ public class BeamRecord implements Serializable {
private BitSet nullFields;
private BeamRecordType dataType;

private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));

public BeamRecord(BeamRecordType dataType) {
this.dataType = dataType;
this.nullFields = new BitSet(dataType.size());
Expand All @@ -62,17 +55,6 @@ public BeamRecord(BeamRecordType dataType, List<Object> dataValues) {
}
}

public void updateWindowRange(BeamRecord upstreamRecord, BoundedWindow window){
windowStart = upstreamRecord.windowStart;
windowEnd = upstreamRecord.windowEnd;

if (window instanceof IntervalWindow) {
IntervalWindow iWindow = (IntervalWindow) window;
windowStart = iWindow.start();
windowEnd = iWindow.end();
}
}

public void addField(String fieldName, Object fieldValue) {
addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
}
Expand Down Expand Up @@ -211,26 +193,10 @@ public boolean isNull(int idx) {
return nullFields.get(idx);
}

public Instant getWindowStart() {
return windowStart;
}

public Instant getWindowEnd() {
return windowEnd;
}

public void setWindowStart(Instant windowStart) {
this.windowStart = windowStart;
}

public void setWindowEnd(Instant windowEnd) {
this.windowEnd = windowEnd;
}

@Override
public String toString() {
return "BeamSqlRow [nullFields=" + nullFields + ", dataValues=" + dataValues + ", dataType="
+ dataType + ", windowStart=" + windowStart + ", windowEnd=" + windowEnd + "]";
+ dataType + "]";
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.Serializable;
import java.util.List;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;

/**
Expand All @@ -34,10 +35,10 @@ public interface BeamSqlExpressionExecutor extends Serializable {
void prepare();

/**
* apply transformation to input record {@link BeamRecord}.
* apply transformation to input record {@link BeamRecord} with {@link BoundedWindow}.
*
*/
List<Object> execute(BeamRecord inputRow);
List<Object> execute(BeamRecord inputRow, BoundedWindow window);

void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
Expand Down Expand Up @@ -427,10 +428,10 @@ public void prepare() {
}

@Override
public List<Object> execute(BeamRecord inputRow) {
public List<Object> execute(BeamRecord inputRow, BoundedWindow window) {
List<Object> results = new ArrayList<>();
for (BeamSqlExpression exp : exps) {
results.add(exp.evaluate(inputRow).getValue());
results.add(exp.evaluate(inputRow, window).getValue());
}
return results;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;

import java.util.List;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;

Expand Down Expand Up @@ -48,16 +49,16 @@ public BeamSqlCaseExpression(List<BeamSqlExpression> operands) {
return true;
}

@Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
@Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
for (int i = 0; i < operands.size() - 1; i += 2) {
if (opValueEvaluated(i, inputRow)) {
if (opValueEvaluated(i, inputRow, window)) {
return BeamSqlPrimitive.of(
outputType,
opValueEvaluated(i + 1, inputRow)
opValueEvaluated(i + 1, inputRow, window)
);
}
}
return BeamSqlPrimitive.of(outputType,
opValueEvaluated(operands.size() - 1, inputRow));
opValueEvaluated(operands.size() - 1, inputRow, window));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.sql.Date;
import java.sql.Timestamp;
import java.util.List;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.runtime.SqlFunctions;
import org.apache.calcite.sql.type.SqlTypeName;
Expand Down Expand Up @@ -71,40 +72,40 @@ public boolean accept() {
}

@Override
public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
SqlTypeName castOutputType = getOutputType();
switch (castOutputType) {
case INTEGER:
return BeamSqlPrimitive
.of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow)));
.of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow, window)));
case DOUBLE:
return BeamSqlPrimitive
.of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRow)));
return BeamSqlPrimitive.of(SqlTypeName.DOUBLE,
SqlFunctions.toDouble(opValueEvaluated(index, inputRow, window)));
case SMALLINT:
return BeamSqlPrimitive
.of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRow)));
return BeamSqlPrimitive.of(SqlTypeName.SMALLINT,
SqlFunctions.toShort(opValueEvaluated(index, inputRow, window)));
case TINYINT:
return BeamSqlPrimitive
.of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRow)));
return BeamSqlPrimitive.of(SqlTypeName.TINYINT,
SqlFunctions.toByte(opValueEvaluated(index, inputRow, window)));
case BIGINT:
return BeamSqlPrimitive
.of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow)));
.of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow, window)));
case DECIMAL:
return BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow)));
SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow, window)));
case FLOAT:
return BeamSqlPrimitive
.of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow)));
.of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow, window)));
case CHAR:
case VARCHAR:
return BeamSqlPrimitive
.of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow).toString());
.of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow, window).toString());
case DATE:
return BeamSqlPrimitive
.of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRow), outputDateFormat));
return BeamSqlPrimitive.of(SqlTypeName.DATE,
toDate(opValueEvaluated(index, inputRow, window), outputDateFormat));
case TIMESTAMP:
return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
toTimeStamp(opValueEvaluated(index, inputRow), outputTimestampFormat));
toTimeStamp(opValueEvaluated(index, inputRow, window), outputTimestampFormat));
}
throw new UnsupportedOperationException(
String.format("Cast to type %s not supported", castOutputType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.Serializable;
import java.util.List;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.type.SqlTypeName;
Expand Down Expand Up @@ -49,8 +50,8 @@ public SqlTypeName opType(int idx) {
return op(idx).getOutputType();
}

public <T> T opValueEvaluated(int idx, BeamRecord row) {
return (T) op(idx).evaluate(row).getValue();
public <T> T opValueEvaluated(int idx, BeamRecord row, BoundedWindow window) {
return (T) op(idx).evaluate(row, window).getValue();
}

/**
Expand All @@ -59,10 +60,10 @@ public <T> T opValueEvaluated(int idx, BeamRecord row) {
public abstract boolean accept();

/**
* Apply input record {@link BeamRecord} to this expression,
* Apply input record {@link BeamRecord} with {@link BoundedWindow} to this expression,
* the output value is wrapped with {@link BeamSqlPrimitive}.
*/
public abstract BeamSqlPrimitive evaluate(BeamRecord inputRow);
public abstract BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window);

public List<BeamSqlExpression> getOperands() {
return operands;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;

import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;

Expand All @@ -37,7 +38,7 @@ public boolean accept() {
}

@Override
public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.NlsString;

/**
* {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}.
* It holds the value, and return it directly during {@link #evaluate(BeamRecord)}.
* It holds the value, and return it directly during {@link #evaluate(BeamRecord, BoundedWindow)}.
*
*/
public class BeamSqlPrimitive<T> extends BeamSqlExpression {
Expand Down Expand Up @@ -145,7 +146,7 @@ public boolean accept() {
}

@Override
public BeamSqlPrimitive<T> evaluate(BeamRecord inputRow) {
public BeamSqlPrimitive<T> evaluate(BeamRecord inputRow, BoundedWindow window) {
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;

Expand All @@ -41,13 +42,13 @@ public BeamSqlReinterpretExpression(List<BeamSqlExpression> operands, SqlTypeNam
&& SqlTypeName.DATETIME_TYPES.contains(opType(0));
}

@Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
@Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
if (opType(0) == SqlTypeName.TIME) {
GregorianCalendar date = opValueEvaluated(0, inputRow);
GregorianCalendar date = opValueEvaluated(0, inputRow, window);
return BeamSqlPrimitive.of(outputType, date.getTimeInMillis());

} else {
Date date = opValueEvaluated(0, inputRow);
Date date = opValueEvaluated(0, inputRow, window);
return BeamSqlPrimitive.of(outputType, date.getTime());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;

Expand Down Expand Up @@ -51,14 +52,14 @@ public boolean accept() {
}

@Override
public BeamSqlPrimitive evaluate(BeamRecord inputRow) {
public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) {
if (method == null) {
reConstructMethod();
}
try {
List<Object> paras = new ArrayList<>();
for (BeamSqlExpression e : getOperands()) {
paras.add(e.evaluate(inputRow).getValue());
paras.add(e.evaluate(inputRow, window).getValue());
}

return BeamSqlPrimitive.of(getOutputType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;

import java.util.Date;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;

Expand All @@ -34,9 +36,13 @@ public boolean accept() {
}

@Override
public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow) {
return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
new Date(inputRow.getWindowEnd().getMillis()));
public BeamSqlPrimitive<Date> evaluate(BeamRecord inputRow, BoundedWindow window) {
if (window instanceof IntervalWindow) {
return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, ((IntervalWindow) window).end().toDate());
} else {
throw new UnsupportedOperationException(
"Cannot run HOP_END|TUMBLE_END|SESSION_END on GlobalWindow.");
}
}

}
Loading

0 comments on commit 5b1e4a5

Please sign in to comment.