Skip to content

Commit

Permalink
Refactor AbstractTupleStreamBenchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
erichwang committed Oct 11, 2012
1 parent b383815 commit 55e0865
Show file tree
Hide file tree
Showing 20 changed files with 511 additions and 681 deletions.
Original file line number Diff line number Diff line change
@@ -1,26 +1,18 @@
package com.facebook.presto.block;

import com.facebook.presto.Range;
import com.facebook.presto.SizeOf;
import com.facebook.presto.Tuple;
import com.facebook.presto.TupleInfo;
import com.facebook.presto.operator.inlined.InlinedOperator;
import com.facebook.presto.operator.inlined.InlinedOperatorHook;
import com.facebook.presto.operator.inlined.StatsInlinedOperator;
import com.facebook.presto.slice.Slice;
import com.facebook.presto.slice.SliceOutput;
import com.google.common.base.Throwables;
import io.airlift.json.ObjectMapperProvider;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;

import static com.google.common.base.Preconditions.*;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

public class StatsCollectingTupleStreamSerde
implements TupleStreamSerde
{
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapperProvider().get();

private final TupleStreamSerde tupleStreamSerde;

public StatsCollectingTupleStreamSerde(TupleStreamSerde tupleStreamSerde)
Expand Down Expand Up @@ -58,66 +50,34 @@ public StatsAnnotatedTupleStreamDeserializer(TupleStreamDeserializer tupleStream
}

@Override
public StatsAnnotatedTupleStream deserialize(Slice slice)
public TupleStream deserialize(Slice slice)
{
checkNotNull(slice, "slice is null");
int footerLength = slice.getInt(slice.length() - SizeOf.SIZE_OF_INT);
int footerOffset = slice.length() - footerLength - SizeOf.SIZE_OF_INT;
try {
Stats stats = OBJECT_MAPPER.readValue(slice.slice(footerOffset, footerLength).input(), Stats.class);
return new StatsAnnotatedTupleStream(tupleStreamDeserializer.deserialize(slice.slice(0, footerOffset)), stats);
} catch (IOException e) {
throw Throwables.propagate(e);
}
return tupleStreamDeserializer.deserialize(slice.slice(0, footerOffset));
}
}

public static class StatsAnnotatedTupleStream
implements TupleStream
{
private final TupleStream tupleStream;
private final Stats stats;

private StatsAnnotatedTupleStream(TupleStream tupleStream, Stats stats)
// TODO: how do we expose the stats data to other components?
public StatsInlinedOperator.Stats deserializeStats(Slice slice)
{
this.tupleStream = checkNotNull(tupleStream, "tupleStream is null");
this.stats = checkNotNull(stats, "stats is null");
}

@Override
public TupleInfo getTupleInfo()
{
return tupleStream.getTupleInfo();
}

@Override
public Range getRange()
{
return stats.getPositionRange();
}

@Override
public Cursor cursor(QuerySession session)
{
return tupleStream.cursor(session);
}

public Stats getStats()
{
return stats;
}

// HACK: for testing purposes
public TupleStream getUnderlyingTupleStream()
{
return tupleStream;
checkNotNull(slice, "slice is null");
int footerLength = slice.getInt(slice.length() - SizeOf.SIZE_OF_INT);
int footerOffset = slice.length() - footerLength - SizeOf.SIZE_OF_INT;
return StatsInlinedOperator.resultsAsStats(TupleStreamSerdes.deserialize(TupleStreamSerdes.Encoding.RAW.createSerde(), slice.slice(footerOffset, footerLength)));
}
}

private static class StatsCollectingTupleStreamWriter
implements TupleStreamWriter
{
private final StatsMerger statsMerger = new StatsMerger();
private final InlinedOperator statsInlinedOperator = new StatsInlinedOperator() {
@Override
public void finished()
{
// Do nothing. We don't want this operator to ever be marked as finished
}
};
private final SliceOutput sliceOutput;
private final TupleStreamWriter delegate;

Expand All @@ -131,260 +91,20 @@ private StatsCollectingTupleStreamWriter(SliceOutput sliceOutput, TupleStreamWri
public StatsCollectingTupleStreamWriter append(TupleStream tupleStream)
{
checkNotNull(tupleStream, "tupleStream is null");
StatsCollectingTupleStream statsCollectingTupleStream = new StatsCollectingTupleStream(tupleStream);
delegate.append(statsCollectingTupleStream);
statsMerger.merge(statsCollectingTupleStream.getStats());
delegate.append(new InlinedOperatorHook(tupleStream, statsInlinedOperator));
return this;
}

@Override
public void finish()
{
delegate.finish();
try {
int startingIndex = sliceOutput.size();
OBJECT_MAPPER.writeValue(sliceOutput, statsMerger.build());
int endingIndex = sliceOutput.size();
checkState(endingIndex > startingIndex);
sliceOutput.writeInt(endingIndex - startingIndex);
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
}

/**
* Stats can ONLY be retrieved from this stream when one of its cursors has been run to completion
*/
private static class StatsCollectingTupleStream
implements TupleStream
{
private final TupleStream tupleStream;
private StatsBuilder statsBuilder;

private StatsCollectingTupleStream(TupleStream tupleStream)
{
this.tupleStream = checkNotNull(tupleStream, "tupleStream is null");
}

@Override
public TupleInfo getTupleInfo()
{
return tupleStream.getTupleInfo();
}

@Override
public Range getRange()
{
return tupleStream.getRange();
}

@Override
public Cursor cursor(QuerySession querySession)
{
if (statsBuilder == null) {
statsBuilder = new StatsBuilder();
return new StatsCollectingCursor(tupleStream.cursor(querySession), statsBuilder);
}
else {
// Stats should already have been collected
return tupleStream.cursor(querySession);
}
}

public Stats getStats()
{
checkState(statsBuilder != null, "no cursor was ever used");
checkState(statsBuilder.isFinished(), "did not collect all stats");
return statsBuilder.build();
}
}

private static class StatsCollectingCursor
extends ForwardingCursor
{
private final StatsBuilder statsBuilder;
private long measuredPosition = -1;

private StatsCollectingCursor(Cursor cursor, StatsBuilder statsBuilder)
{
super(checkNotNull(cursor, "cursor is null"));
this.statsBuilder = checkNotNull(statsBuilder, "statsBuilder is null");
}

@Override
public AdvanceResult advanceNextValue()
{
AdvanceResult result = getDelegate().advanceNextValue();
processCurrentValueIfNecessary(result);
return result;
}

@Override
public AdvanceResult advanceNextPosition()
{
AdvanceResult result = getDelegate().advanceNextPosition();
processCurrentValueIfNecessary(result);
return result;
}

@Override
public AdvanceResult advanceToPosition(long position)
{
// We should always have processed as much as the current value end position
while (position > getDelegate().getCurrentValueEndPosition()) {
AdvanceResult result = getDelegate().advanceNextValue();
processCurrentValueIfNecessary(result);
if (result == AdvanceResult.MUST_YIELD || result == AdvanceResult.FINISHED) {
return result;
}
}
// All intermediate values and all positions of the current value should already be processed
return (position == getDelegate().getPosition()) ? AdvanceResult.SUCCESS : getDelegate().advanceToPosition(position);
}

private void processCurrentValueIfNecessary(AdvanceResult advanceResult)
{
switch (advanceResult) {
case SUCCESS:
if (getDelegate().getPosition() > measuredPosition) {
statsBuilder.process(getDelegate().getTuple(), Range.create(getDelegate().getPosition(), getDelegate().getCurrentValueEndPosition()));
measuredPosition = getDelegate().getCurrentValueEndPosition();
}
break;
case MUST_YIELD:
// No advancement
break;
case FINISHED:
statsBuilder.markFinished();
break;
default:
throw new AssertionError("Missing advance state");
}
}
}

private static class StatsMerger
{
private long rowCount;
private long runsCount;
private Range range;

public StatsMerger merge(Stats stats)
{
checkNotNull(stats, "stats is null");
rowCount += stats.getRowCount();
runsCount += stats.getRunsCount();
range = (range == null) ? stats.getPositionRange() : range.outerBound(stats.getPositionRange());
return this;
}

public Stats build()
{
return new Stats(rowCount, runsCount, range.getStart(), range.getEnd());
}
}

private static class StatsBuilder
{
private long rowCount;
private long runsCount;
private Range range;
private Tuple lastTuple;
private boolean finished;

public StatsBuilder process(Tuple tuple, Range tupleRange)
{
checkNotNull(tuple, "tuple is null");
checkNotNull(tupleRange, "tupleRange is null");
if (lastTuple == null) {
lastTuple = tuple;
}
else if (!lastTuple.equals(tuple)) {
runsCount++;
lastTuple = tuple;
}
range = (range == null) ? tupleRange : range.outerBound(tupleRange);
rowCount += tupleRange.length();
return this;
}

public StatsBuilder markFinished()
{
finished = true;
return this;
}

public boolean isFinished()
{
return finished;
}

public Stats build()
{
return new Stats(rowCount, runsCount + 1, range.getStart(), range.getEnd());
}
}

/**
* Serializable and deserializable with Jackson JSON Processor
* TODO: figure out how to manage introduction of new stats after already having old data (versioning?)
*/
public static class Stats
{
private final long rowCount;
private final long runsCount;
private final Range positionRange;

@JsonCreator
private Stats(
@JsonProperty("rowCount") long rowCount,
@JsonProperty("runsCount") long runsCount,
@JsonProperty("startPosition") long startPosition,
@JsonProperty("endPosition") long endPosition
)
{
checkArgument(rowCount >= 0, "row count cannot be negative");
checkArgument(runsCount >= 0, "runs count cannot be negative");
checkArgument(startPosition >= 0, "start position cannot be negative");
checkArgument(endPosition >= 0, "end position cannot be negative");
this.rowCount = rowCount;
this.runsCount = runsCount;
this.positionRange = Range.create(startPosition, endPosition);
}

@JsonProperty
public long getRowCount()
{
return rowCount;
}

@JsonProperty
public long getRunsCount()
{
return runsCount;
}

@JsonProperty
public long getStartPosition()
{
return positionRange.getStart();
}

@JsonProperty
public long getEndPosition()
{
return positionRange.getEnd();
}

public long getAverageRunLength()
{
return rowCount / Math.max(runsCount, 1);
}

public Range getPositionRange()
{
return positionRange;
int startingIndex = sliceOutput.size();
// TODO: add a better way of serializing the stats that is less fragile
TupleStreamSerdes.serialize(TupleStreamSerdes.Encoding.RAW.createSerde(), statsInlinedOperator.getResult(), sliceOutput);
int endingIndex = sliceOutput.size();
checkState(endingIndex > startingIndex);
sliceOutput.writeInt(endingIndex - startingIndex);
}
}
}
Loading

0 comments on commit 55e0865

Please sign in to comment.