diff --git a/src/main/java/com/facebook/presto/block/StatsCollectingTupleStreamSerde.java b/src/main/java/com/facebook/presto/block/StatsCollectingTupleStreamSerde.java index 8c5d2395529fc..2fd9ef05cd980 100644 --- a/src/main/java/com/facebook/presto/block/StatsCollectingTupleStreamSerde.java +++ b/src/main/java/com/facebook/presto/block/StatsCollectingTupleStreamSerde.java @@ -1,26 +1,18 @@ package com.facebook.presto.block; -import com.facebook.presto.Range; import com.facebook.presto.SizeOf; -import com.facebook.presto.Tuple; -import com.facebook.presto.TupleInfo; +import com.facebook.presto.operator.inlined.InlinedOperator; +import com.facebook.presto.operator.inlined.InlinedOperatorHook; +import com.facebook.presto.operator.inlined.StatsInlinedOperator; import com.facebook.presto.slice.Slice; import com.facebook.presto.slice.SliceOutput; -import com.google.common.base.Throwables; -import io.airlift.json.ObjectMapperProvider; -import org.codehaus.jackson.annotate.JsonCreator; -import org.codehaus.jackson.annotate.JsonProperty; -import org.codehaus.jackson.map.ObjectMapper; -import java.io.IOException; - -import static com.google.common.base.Preconditions.*; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; public class StatsCollectingTupleStreamSerde implements TupleStreamSerde { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapperProvider().get(); - private final TupleStreamSerde tupleStreamSerde; public StatsCollectingTupleStreamSerde(TupleStreamSerde tupleStreamSerde) @@ -58,66 +50,34 @@ public StatsAnnotatedTupleStreamDeserializer(TupleStreamDeserializer tupleStream } @Override - public StatsAnnotatedTupleStream deserialize(Slice slice) + public TupleStream deserialize(Slice slice) { checkNotNull(slice, "slice is null"); int footerLength = slice.getInt(slice.length() - SizeOf.SIZE_OF_INT); int footerOffset = slice.length() - footerLength - SizeOf.SIZE_OF_INT; - try { - Stats stats = OBJECT_MAPPER.readValue(slice.slice(footerOffset, footerLength).input(), Stats.class); - return new StatsAnnotatedTupleStream(tupleStreamDeserializer.deserialize(slice.slice(0, footerOffset)), stats); - } catch (IOException e) { - throw Throwables.propagate(e); - } + return tupleStreamDeserializer.deserialize(slice.slice(0, footerOffset)); } - } - public static class StatsAnnotatedTupleStream - implements TupleStream - { - private final TupleStream tupleStream; - private final Stats stats; - - private StatsAnnotatedTupleStream(TupleStream tupleStream, Stats stats) + // TODO: how do we expose the stats data to other components? + public StatsInlinedOperator.Stats deserializeStats(Slice slice) { - this.tupleStream = checkNotNull(tupleStream, "tupleStream is null"); - this.stats = checkNotNull(stats, "stats is null"); - } - - @Override - public TupleInfo getTupleInfo() - { - return tupleStream.getTupleInfo(); - } - - @Override - public Range getRange() - { - return stats.getPositionRange(); - } - - @Override - public Cursor cursor(QuerySession session) - { - return tupleStream.cursor(session); - } - - public Stats getStats() - { - return stats; - } - - // HACK: for testing purposes - public TupleStream getUnderlyingTupleStream() - { - return tupleStream; + checkNotNull(slice, "slice is null"); + int footerLength = slice.getInt(slice.length() - SizeOf.SIZE_OF_INT); + int footerOffset = slice.length() - footerLength - SizeOf.SIZE_OF_INT; + return StatsInlinedOperator.resultsAsStats(TupleStreamSerdes.deserialize(TupleStreamSerdes.Encoding.RAW.createSerde(), slice.slice(footerOffset, footerLength))); } } private static class StatsCollectingTupleStreamWriter implements TupleStreamWriter { - private final StatsMerger statsMerger = new StatsMerger(); + private final InlinedOperator statsInlinedOperator = new StatsInlinedOperator() { + @Override + public void finished() + { + // Do nothing. We don't want this operator to ever be marked as finished + } + }; private final SliceOutput sliceOutput; private final TupleStreamWriter delegate; @@ -131,9 +91,7 @@ private StatsCollectingTupleStreamWriter(SliceOutput sliceOutput, TupleStreamWri public StatsCollectingTupleStreamWriter append(TupleStream tupleStream) { checkNotNull(tupleStream, "tupleStream is null"); - StatsCollectingTupleStream statsCollectingTupleStream = new StatsCollectingTupleStream(tupleStream); - delegate.append(statsCollectingTupleStream); - statsMerger.merge(statsCollectingTupleStream.getStats()); + delegate.append(new InlinedOperatorHook(tupleStream, statsInlinedOperator)); return this; } @@ -141,250 +99,12 @@ public StatsCollectingTupleStreamWriter append(TupleStream tupleStream) public void finish() { delegate.finish(); - try { - int startingIndex = sliceOutput.size(); - OBJECT_MAPPER.writeValue(sliceOutput, statsMerger.build()); - int endingIndex = sliceOutput.size(); - checkState(endingIndex > startingIndex); - sliceOutput.writeInt(endingIndex - startingIndex); - } catch (IOException e) { - throw Throwables.propagate(e); - } - } - } - - /** - * Stats can ONLY be retrieved from this stream when one of its cursors has been run to completion - */ - private static class StatsCollectingTupleStream - implements TupleStream - { - private final TupleStream tupleStream; - private StatsBuilder statsBuilder; - - private StatsCollectingTupleStream(TupleStream tupleStream) - { - this.tupleStream = checkNotNull(tupleStream, "tupleStream is null"); - } - - @Override - public TupleInfo getTupleInfo() - { - return tupleStream.getTupleInfo(); - } - - @Override - public Range getRange() - { - return tupleStream.getRange(); - } - - @Override - public Cursor cursor(QuerySession querySession) - { - if (statsBuilder == null) { - statsBuilder = new StatsBuilder(); - return new StatsCollectingCursor(tupleStream.cursor(querySession), statsBuilder); - } - else { - // Stats should already have been collected - return tupleStream.cursor(querySession); - } - } - - public Stats getStats() - { - checkState(statsBuilder != null, "no cursor was ever used"); - checkState(statsBuilder.isFinished(), "did not collect all stats"); - return statsBuilder.build(); - } - } - - private static class StatsCollectingCursor - extends ForwardingCursor - { - private final StatsBuilder statsBuilder; - private long measuredPosition = -1; - - private StatsCollectingCursor(Cursor cursor, StatsBuilder statsBuilder) - { - super(checkNotNull(cursor, "cursor is null")); - this.statsBuilder = checkNotNull(statsBuilder, "statsBuilder is null"); - } - - @Override - public AdvanceResult advanceNextValue() - { - AdvanceResult result = getDelegate().advanceNextValue(); - processCurrentValueIfNecessary(result); - return result; - } - - @Override - public AdvanceResult advanceNextPosition() - { - AdvanceResult result = getDelegate().advanceNextPosition(); - processCurrentValueIfNecessary(result); - return result; - } - - @Override - public AdvanceResult advanceToPosition(long position) - { - // We should always have processed as much as the current value end position - while (position > getDelegate().getCurrentValueEndPosition()) { - AdvanceResult result = getDelegate().advanceNextValue(); - processCurrentValueIfNecessary(result); - if (result == AdvanceResult.MUST_YIELD || result == AdvanceResult.FINISHED) { - return result; - } - } - // All intermediate values and all positions of the current value should already be processed - return (position == getDelegate().getPosition()) ? AdvanceResult.SUCCESS : getDelegate().advanceToPosition(position); - } - - private void processCurrentValueIfNecessary(AdvanceResult advanceResult) - { - switch (advanceResult) { - case SUCCESS: - if (getDelegate().getPosition() > measuredPosition) { - statsBuilder.process(getDelegate().getTuple(), Range.create(getDelegate().getPosition(), getDelegate().getCurrentValueEndPosition())); - measuredPosition = getDelegate().getCurrentValueEndPosition(); - } - break; - case MUST_YIELD: - // No advancement - break; - case FINISHED: - statsBuilder.markFinished(); - break; - default: - throw new AssertionError("Missing advance state"); - } - } - } - - private static class StatsMerger - { - private long rowCount; - private long runsCount; - private Range range; - - public StatsMerger merge(Stats stats) - { - checkNotNull(stats, "stats is null"); - rowCount += stats.getRowCount(); - runsCount += stats.getRunsCount(); - range = (range == null) ? stats.getPositionRange() : range.outerBound(stats.getPositionRange()); - return this; - } - - public Stats build() - { - return new Stats(rowCount, runsCount, range.getStart(), range.getEnd()); - } - } - - private static class StatsBuilder - { - private long rowCount; - private long runsCount; - private Range range; - private Tuple lastTuple; - private boolean finished; - - public StatsBuilder process(Tuple tuple, Range tupleRange) - { - checkNotNull(tuple, "tuple is null"); - checkNotNull(tupleRange, "tupleRange is null"); - if (lastTuple == null) { - lastTuple = tuple; - } - else if (!lastTuple.equals(tuple)) { - runsCount++; - lastTuple = tuple; - } - range = (range == null) ? tupleRange : range.outerBound(tupleRange); - rowCount += tupleRange.length(); - return this; - } - - public StatsBuilder markFinished() - { - finished = true; - return this; - } - - public boolean isFinished() - { - return finished; - } - - public Stats build() - { - return new Stats(rowCount, runsCount + 1, range.getStart(), range.getEnd()); - } - } - - /** - * Serializable and deserializable with Jackson JSON Processor - * TODO: figure out how to manage introduction of new stats after already having old data (versioning?) - */ - public static class Stats - { - private final long rowCount; - private final long runsCount; - private final Range positionRange; - - @JsonCreator - private Stats( - @JsonProperty("rowCount") long rowCount, - @JsonProperty("runsCount") long runsCount, - @JsonProperty("startPosition") long startPosition, - @JsonProperty("endPosition") long endPosition - ) - { - checkArgument(rowCount >= 0, "row count cannot be negative"); - checkArgument(runsCount >= 0, "runs count cannot be negative"); - checkArgument(startPosition >= 0, "start position cannot be negative"); - checkArgument(endPosition >= 0, "end position cannot be negative"); - this.rowCount = rowCount; - this.runsCount = runsCount; - this.positionRange = Range.create(startPosition, endPosition); - } - - @JsonProperty - public long getRowCount() - { - return rowCount; - } - - @JsonProperty - public long getRunsCount() - { - return runsCount; - } - - @JsonProperty - public long getStartPosition() - { - return positionRange.getStart(); - } - - @JsonProperty - public long getEndPosition() - { - return positionRange.getEnd(); - } - - public long getAverageRunLength() - { - return rowCount / Math.max(runsCount, 1); - } - - public Range getPositionRange() - { - return positionRange; + int startingIndex = sliceOutput.size(); + // TODO: add a better way of serializing the stats that is less fragile + TupleStreamSerdes.serialize(TupleStreamSerdes.Encoding.RAW.createSerde(), statsInlinedOperator.getResult(), sliceOutput); + int endingIndex = sliceOutput.size(); + checkState(endingIndex > startingIndex); + sliceOutput.writeInt(endingIndex - startingIndex); } } } diff --git a/src/main/java/com/facebook/presto/block/TupleStreamSerdes.java b/src/main/java/com/facebook/presto/block/TupleStreamSerdes.java index b7d71b4c44509..70e15c5376f3a 100644 --- a/src/main/java/com/facebook/presto/block/TupleStreamSerdes.java +++ b/src/main/java/com/facebook/presto/block/TupleStreamSerdes.java @@ -9,7 +9,6 @@ import java.util.Map; -import static com.facebook.presto.block.StatsCollectingTupleStreamSerde.StatsAnnotatedTupleStream; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -29,54 +28,6 @@ public static TupleStream deserialize(TupleStreamSerde serde, Slice slice) return serde.createDeserializer().deserialize(slice); } - /** - * Should only be used when paired with createDefaultDeserializer(...) - */ - public static DefaultTupleStreamSerializer createDefaultSerializer(TupleStreamSerde coreSerde) - { - return new DefaultTupleStreamSerializer(coreSerde); - } - - /** - * Should only be used when paired with createDefaultSerializer(...) - */ - public static DefaultTupleStreamDeserializer createDefaultDeserializer() - { - return DefaultTupleStreamDeserializer.INSTANCE; - } - - public static class DefaultTupleStreamSerializer - implements TupleStreamSerializer - { - private final TupleStreamSerializer defaultSerializer; - - public DefaultTupleStreamSerializer(TupleStreamSerde coreSerde) - { - defaultSerializer = new StatsCollectingTupleStreamSerde(new SelfDescriptiveSerde(checkNotNull(coreSerde, "coreSerde is null"))).createSerializer(); - } - - @Override - public TupleStreamWriter createTupleStreamWriter(SliceOutput sliceOutput) - { - return defaultSerializer.createTupleStreamWriter(checkNotNull(sliceOutput, "sliceOutput is null")); - } - - } - - public static class DefaultTupleStreamDeserializer - implements TupleStreamDeserializer - { - private static final DefaultTupleStreamDeserializer INSTANCE = new DefaultTupleStreamDeserializer(); - - private static final StatsCollectingTupleStreamSerde.StatsAnnotatedTupleStreamDeserializer DESERIALIZER = new StatsCollectingTupleStreamSerde.StatsAnnotatedTupleStreamDeserializer(SelfDescriptiveSerde.DESERIALIZER); - - @Override - public StatsAnnotatedTupleStream deserialize(Slice slice) - { - return DESERIALIZER.deserialize(slice); - } - } - public static enum Encoding { RAW("raw") diff --git a/src/main/java/com/facebook/presto/operator/inlined/StatsInlinedOperator.java b/src/main/java/com/facebook/presto/operator/inlined/StatsInlinedOperator.java index f35316ffad457..2b7b596b229a9 100644 --- a/src/main/java/com/facebook/presto/operator/inlined/StatsInlinedOperator.java +++ b/src/main/java/com/facebook/presto/operator/inlined/StatsInlinedOperator.java @@ -20,19 +20,19 @@ public enum Fields MAX_POSITION(3), AVG_RUN_LENGTH(4),; - private final int fieldIndex; + private final int columnIndex; - private Fields(int fieldIndex) + private Fields(int columnIndex) { - this.fieldIndex = fieldIndex; + this.columnIndex = columnIndex; } public int getFieldIndex() { - return fieldIndex; + return columnIndex; } } - + private static final TupleInfo RESULT_TUPLE_INFO = new TupleInfo( TupleInfo.Type.FIXED_INT_64, // Row count TupleInfo.Type.FIXED_INT_64, // Runs count @@ -76,7 +76,7 @@ public boolean isFinished() { return finished; } - + @Override public TupleInfo getTupleInfo() { @@ -101,4 +101,60 @@ public TupleStream getResult() .append(rowCount / (runsCount + 1)) // Average run length .build(); } + + public static Stats resultsAsStats(TupleStream tupleStream) + { + Cursor cursor = tupleStream.cursor(new QuerySession()); + cursor.advanceNextPosition(); + return new Stats( + cursor.getLong(Fields.ROW_COUNT.getFieldIndex()), + cursor.getLong(Fields.RUNS_COUNT.getFieldIndex()), + cursor.getLong(Fields.MIN_POSITION.getFieldIndex()), + cursor.getLong(Fields.MAX_POSITION.getFieldIndex()), + cursor.getLong(Fields.AVG_RUN_LENGTH.getFieldIndex()) + ); + } + + public static class Stats + { + private final long rowCount; + private final long runsCount; + private final long minPosition; + private final long maxPosition; + private final long avgRunLength; + + public Stats(long rowCount, long runsCount, long minPosition, long maxPosition, long avgRunLength) + { + this.rowCount = rowCount; + this.runsCount = runsCount; + this.minPosition = minPosition; + this.maxPosition = maxPosition; + this.avgRunLength = avgRunLength; + } + + public long getRowCount() + { + return rowCount; + } + + public long getRunsCount() + { + return runsCount; + } + + public long getMinPosition() + { + return minPosition; + } + + public long getMaxPosition() + { + return maxPosition; + } + + public long getAvgRunLength() + { + return avgRunLength; + } + } } diff --git a/src/test/java/com/facebook/presto/benchmark/AbstractTupleStreamBenchmark.java b/src/test/java/com/facebook/presto/benchmark/AbstractTupleStreamBenchmark.java index 22d110898e22b..517ecede82c72 100644 --- a/src/test/java/com/facebook/presto/benchmark/AbstractTupleStreamBenchmark.java +++ b/src/test/java/com/facebook/presto/benchmark/AbstractTupleStreamBenchmark.java @@ -1,10 +1,10 @@ package com.facebook.presto.benchmark; import com.facebook.presto.block.*; +import com.facebook.presto.operator.inlined.StatsInlinedOperator; import com.facebook.presto.slice.Slice; import com.facebook.presto.slice.Slices; -import com.facebook.presto.tpch.TpchDataProvider; -import com.facebook.presto.tpch.TpchSchema; +import com.facebook.presto.tpch.*; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; @@ -12,13 +12,14 @@ import io.airlift.units.DataSize; import io.airlift.units.Duration; -import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + /** * Abstract template for benchmarks that want to test the performance of a single TupleStream. * @@ -30,10 +31,11 @@ * - The first column to be requested will be used to represent the count of the number of input rows */ public abstract class AbstractTupleStreamBenchmark - extends AbstractBenchmark + extends AbstractBenchmark { + private static final TpchDataProvider TPCH_DATA_PROVIDER = new CachingTpchDataProvider(new GeneratingTpchDataProvider()); + private final TpchDataProvider tpchDataProvider; - private List dataSources = new ArrayList<>(); protected AbstractTupleStreamBenchmark(String benchmarkName, int warmupIterations, int measuredIterations, TpchDataProvider tpchDataProvider) { @@ -43,7 +45,7 @@ protected AbstractTupleStreamBenchmark(String benchmarkName, int warmupIteration protected AbstractTupleStreamBenchmark(String benchmarkName, int warmupIterations, int measuredIterations) { - this(benchmarkName, warmupIterations, measuredIterations, BenchmarkSuite.TPCH_DATA_PROVIDER); + this(benchmarkName, warmupIterations, measuredIterations, TPCH_DATA_PROVIDER); } @Override @@ -52,48 +54,17 @@ protected String getDefaultResult() return "input_rows_per_second"; } - /** - * Loads a specific TPCH column to be used for benchmarking. Requested column order will match - * the arguments provided to createBenchmarkedTupleStream(...) - * - * @param column - TPCH column - * @param encoding - Encoding to use for this column - */ - protected void loadColumnFile(TpchSchema.Column column, TupleStreamSerdes.Encoding encoding) - { - Preconditions.checkNotNull(column, "column is null"); - Preconditions.checkNotNull(encoding, "encoding is null"); - try { - File columnFile = tpchDataProvider.getColumnFile(column, encoding); - dataSources.add(new TupleStreamDataSource(columnFile)); - } catch (IOException e) { - throw Throwables.propagate(e); - } - } - - /** - * Creates the TupleStream to be benchmarked given the list of previously requested columns - * as TupleStreams (in the order they were requested). - * - * @param inputTupleStreams - Column decoded TupleStreams - * @return TupleStream to be benchmarked - */ - protected abstract TupleStream createBenchmarkedTupleStream(List inputTupleStreams); + protected abstract TupleStream createBenchmarkedTupleStream(TpchTupleStreamProvider inputStreamProvider); @Override protected Map runOnce() { - Preconditions.checkState(!dataSources.isEmpty(), "No data sources requested!"); - long start = System.nanoTime(); - ImmutableList.Builder builder = ImmutableList.builder(); - for (TupleStreamDataSource dataSource : dataSources) { - builder.add(TupleStreamSerdes.createDefaultDeserializer().deserialize(dataSource.getSlice())); - } - ImmutableList inputTupleStreams = builder.build(); + MetricRecordingTpchDataProvider metricRecordingTpchDataProvider = new MetricRecordingTpchDataProvider(tpchDataProvider); + StatsTpchTupleStreamProvider statsTpchTupleStreamProvider = new StatsTpchTupleStreamProvider(metricRecordingTpchDataProvider); - TupleStream tupleStream = createBenchmarkedTupleStream(inputTupleStreams); + TupleStream tupleStream = createBenchmarkedTupleStream(statsTpchTupleStreamProvider); Cursor cursor = tupleStream.cursor(new QuerySession()); long outputRows = 0; @@ -101,51 +72,61 @@ protected Map runOnce() outputRows += cursor.getCurrentValueEndPosition() - cursor.getPosition() + 1; } - Duration duration = Duration.nanosSince(start); + Duration totalDuration = Duration.nanosSince(start); + Duration dataGenerationDuration = metricRecordingTpchDataProvider.getDataFetchElapsedTime(); + checkState(totalDuration.compareTo(dataGenerationDuration) >= 0, "total time should be at least as large as data generation time"); - long elapsedMillis = (long) duration.convertTo(TimeUnit.MILLISECONDS); - double elapsedSeconds = duration.convertTo(TimeUnit.SECONDS); + // Compute the benchmark execution time without factoring in the time to generate the data source + double executionMillis = totalDuration.convertTo(TimeUnit.MILLISECONDS) - dataGenerationDuration.toMillis(); + double executionSeconds = executionMillis / TimeUnit.SECONDS.toMillis(1); - double inputBytes = 0; - for (TupleStreamDataSource dataSource : dataSources) { - inputBytes += dataSource.getFile().length(); - } - DataSize totalDataSize = new DataSize(inputBytes, DataSize.Unit.BYTE); + DataSize totalDataSize = metricRecordingTpchDataProvider.getCumulativeDataSize(); + + checkState(!statsTpchTupleStreamProvider.getStats().isEmpty(), "no columns were fetched"); + // Use the first column fetched as the indicator of the number of rows + long inputRows = statsTpchTupleStreamProvider.getStats().get(0).getRowCount(); return ImmutableMap.builder() - .put("elapsed_millis", elapsedMillis) - .put("input_rows", inputTupleStreams.get(0).getStats().getRowCount()) - .put("input_rows_per_second", (long) (inputTupleStreams.get(0).getStats().getRowCount() / elapsedSeconds)) + .put("elapsed_millis", (long) executionMillis) + .put("input_rows", inputRows) + .put("input_rows_per_second", (long) (inputRows / executionSeconds)) .put("output_rows", outputRows) - .put("output_rows_per_second", (long) (outputRows / elapsedSeconds)) + .put("output_rows_per_second", (long) (outputRows / executionSeconds)) .put("input_megabytes", (long) totalDataSize.getValue(DataSize.Unit.MEGABYTE)) - .put("input_megabytes_per_second", (long) (totalDataSize.getValue(DataSize.Unit.MEGABYTE) / elapsedSeconds)) + .put("input_megabytes_per_second", (long) (totalDataSize.getValue(DataSize.Unit.MEGABYTE) / executionSeconds)) .build(); } - private static class TupleStreamDataSource + private static class StatsTpchTupleStreamProvider + implements TpchTupleStreamProvider { - private final File file; - private final Slice slice; + private final TpchDataProvider tpchDataProvider; + private final ImmutableList.Builder statsBuilder = ImmutableList.builder(); + + private StatsTpchTupleStreamProvider(TpchDataProvider tpchDataProvider) + { + this.tpchDataProvider = checkNotNull(tpchDataProvider, "tpchDataProvider is null"); + } - private TupleStreamDataSource(File file) + @Override + public TupleStream getTupleStream(TpchSchema.Column column, TupleStreamSerdes.Encoding encoding) { - this.file = file; + checkNotNull(column, "column is null"); + checkNotNull(encoding, "encoding is null"); + // Wrap the encoding with stats collection + StatsCollectingTupleStreamSerde serde = new StatsCollectingTupleStreamSerde(encoding.createSerde()); try { - this.slice = Slices.mapFileReadOnly(file); + Slice slice = Slices.mapFileReadOnly(tpchDataProvider.getColumnFile(column, serde.createSerializer(), encoding.getName())); + statsBuilder.add(serde.createDeserializer().deserializeStats(slice)); + return serde.createDeserializer().deserialize(slice); } catch (IOException e) { throw Throwables.propagate(e); } } - - public File getFile() - { - return file; - } - - public Slice getSlice() + + public List getStats() { - return slice; + return statsBuilder.build(); } } } diff --git a/src/test/java/com/facebook/presto/benchmark/BenchmarkSuite.java b/src/test/java/com/facebook/presto/benchmark/BenchmarkSuite.java index 9fdfd50f98b17..5cf87d33ad4a6 100644 --- a/src/test/java/com/facebook/presto/benchmark/BenchmarkSuite.java +++ b/src/test/java/com/facebook/presto/benchmark/BenchmarkSuite.java @@ -1,6 +1,5 @@ package com.facebook.presto.benchmark; -import com.facebook.presto.tpch.TpchDataProvider; import com.google.common.collect.ImmutableList; import com.google.common.io.Files; @@ -15,7 +14,6 @@ public class BenchmarkSuite { - public static final TpchDataProvider TPCH_DATA_PROVIDER = new TpchDataProvider(); public static final List BENCHMARKS = ImmutableList.of( new BinaryOperatorBenchmark(), new CountAggregationBenchmark(), diff --git a/src/test/java/com/facebook/presto/benchmark/BinaryOperatorBenchmark.java b/src/test/java/com/facebook/presto/benchmark/BinaryOperatorBenchmark.java index 33f2421685d77..f4da5aaed291c 100644 --- a/src/test/java/com/facebook/presto/benchmark/BinaryOperatorBenchmark.java +++ b/src/test/java/com/facebook/presto/benchmark/BinaryOperatorBenchmark.java @@ -8,6 +8,7 @@ import com.facebook.presto.operator.HashAggregationOperator; import com.facebook.presto.operator.UncompressedBinaryOperator; import com.facebook.presto.tpch.TpchSchema; +import com.facebook.presto.tpch.TpchTupleStreamProvider; import java.util.List; @@ -20,19 +21,15 @@ public BinaryOperatorBenchmark() } @Override - protected void setUp() - { - loadColumnFile(TpchSchema.LineItem.ORDERKEY, TupleStreamSerdes.Encoding.RAW); - loadColumnFile(TpchSchema.LineItem.PARTKEY, TupleStreamSerdes.Encoding.RAW); - loadColumnFile(TpchSchema.LineItem.LINESTATUS, TupleStreamSerdes.Encoding.RAW); - } - - @Override - protected TupleStream createBenchmarkedTupleStream(List inputTupleStreams) + protected TupleStream createBenchmarkedTupleStream(TpchTupleStreamProvider inputStreamProvider) { return new HashAggregationOperator( - new GroupByOperator(inputTupleStreams.get(2)), - new UncompressedBinaryOperator(inputTupleStreams.get(0), inputTupleStreams.get(1), new SubtractionOperation()), + new GroupByOperator(inputStreamProvider.getTupleStream(TpchSchema.LineItem.LINESTATUS, TupleStreamSerdes.Encoding.RAW)), + new UncompressedBinaryOperator( + inputStreamProvider.getTupleStream(TpchSchema.LineItem.ORDERKEY, TupleStreamSerdes.Encoding.RAW), + inputStreamProvider.getTupleStream(TpchSchema.LineItem.PARTKEY, TupleStreamSerdes.Encoding.RAW), + new SubtractionOperation() + ), SumAggregation.PROVIDER ); } diff --git a/src/test/java/com/facebook/presto/benchmark/CountAggregationBenchmark.java b/src/test/java/com/facebook/presto/benchmark/CountAggregationBenchmark.java index 855d29bfc7d70..b1940aca33a74 100644 --- a/src/test/java/com/facebook/presto/benchmark/CountAggregationBenchmark.java +++ b/src/test/java/com/facebook/presto/benchmark/CountAggregationBenchmark.java @@ -4,7 +4,9 @@ import com.facebook.presto.block.TupleStream; import com.facebook.presto.block.TupleStreamSerdes; import com.facebook.presto.operator.AggregationOperator; +import com.facebook.presto.operator.GroupByOperator; import com.facebook.presto.tpch.TpchSchema; +import com.facebook.presto.tpch.TpchTupleStreamProvider; import java.util.List; @@ -17,15 +19,12 @@ public CountAggregationBenchmark() } @Override - protected void setUp() + protected TupleStream createBenchmarkedTupleStream(TpchTupleStreamProvider inputStreamProvider) { - loadColumnFile(TpchSchema.Orders.ORDERKEY, TupleStreamSerdes.Encoding.RAW); - } - - @Override - protected TupleStream createBenchmarkedTupleStream(List inputTupleStreams) - { - return new AggregationOperator(inputTupleStreams.get(0), CountAggregation.PROVIDER); + return new AggregationOperator( + inputStreamProvider.getTupleStream(TpchSchema.Orders.ORDERKEY, TupleStreamSerdes.Encoding.RAW), + CountAggregation.PROVIDER + ); } public static void main(String[] args) diff --git a/src/test/java/com/facebook/presto/benchmark/DicRleGroupByBenchmark.java b/src/test/java/com/facebook/presto/benchmark/DicRleGroupByBenchmark.java index 57ede80dd3298..fd28e8badc8d4 100644 --- a/src/test/java/com/facebook/presto/benchmark/DicRleGroupByBenchmark.java +++ b/src/test/java/com/facebook/presto/benchmark/DicRleGroupByBenchmark.java @@ -2,8 +2,10 @@ import com.facebook.presto.block.TupleStream; import com.facebook.presto.block.TupleStreamSerdes; +import com.facebook.presto.operator.ApplyPredicateOperator; import com.facebook.presto.operator.GroupByOperator; import com.facebook.presto.tpch.TpchSchema; +import com.facebook.presto.tpch.TpchTupleStreamProvider; import java.util.List; @@ -15,17 +17,12 @@ public DicRleGroupByBenchmark() super("groupby_dic_rle", 10, 50); } - - @Override - protected void setUp() - { - loadColumnFile(TpchSchema.Orders.ORDERSTATUS, TupleStreamSerdes.Encoding.DICTIONARY_RLE); - } - @Override - protected TupleStream createBenchmarkedTupleStream(List inputTupleStreams) + protected TupleStream createBenchmarkedTupleStream(TpchTupleStreamProvider inputStreamProvider) { - return new GroupByOperator(inputTupleStreams.get(0)); + return new GroupByOperator( + inputStreamProvider.getTupleStream(TpchSchema.Orders.ORDERSTATUS, TupleStreamSerdes.Encoding.DICTIONARY_RLE) + ); } public static void main(String[] args) diff --git a/src/test/java/com/facebook/presto/benchmark/DictionaryAggregationBenchmark.java b/src/test/java/com/facebook/presto/benchmark/DictionaryAggregationBenchmark.java index d6cac625967e8..8236083742a84 100644 --- a/src/test/java/com/facebook/presto/benchmark/DictionaryAggregationBenchmark.java +++ b/src/test/java/com/facebook/presto/benchmark/DictionaryAggregationBenchmark.java @@ -1,35 +1,27 @@ package com.facebook.presto.benchmark; import com.facebook.presto.aggregation.DoubleSumAggregation; -import com.facebook.presto.block.*; +import com.facebook.presto.block.TupleStream; +import com.facebook.presto.block.TupleStreamSerdes; import com.facebook.presto.block.dictionary.DictionaryEncodedTupleStream; import com.facebook.presto.operator.DictionaryAggregationOperator; import com.facebook.presto.tpch.TpchSchema; - -import java.util.List; +import com.facebook.presto.tpch.TpchTupleStreamProvider; public class DictionaryAggregationBenchmark extends AbstractTupleStreamBenchmark { public DictionaryAggregationBenchmark() { - super("dictionary_agg", 10, 100); - } - - @Override - protected void setUp() - { - loadColumnFile(TpchSchema.Orders.ORDERSTATUS, TupleStreamSerdes.Encoding.DICTIONARY_RLE); - loadColumnFile(TpchSchema.Orders.TOTALPRICE, TupleStreamSerdes.Encoding.RAW); + super("dictionary_agg", 10, 50); } - + @Override - protected TupleStream createBenchmarkedTupleStream(List inputTupleStreams) + protected TupleStream createBenchmarkedTupleStream(TpchTupleStreamProvider inputStreamProvider) { return new DictionaryAggregationOperator( - // Terrible hack until we can figure out how to propagate the DictionaryEncodedTupleStream more cleanly! - (DictionaryEncodedTupleStream) ((StatsCollectingTupleStreamSerde.StatsAnnotatedTupleStream) inputTupleStreams.get(0)).getUnderlyingTupleStream(), - inputTupleStreams.get(1), + (DictionaryEncodedTupleStream) inputStreamProvider.getTupleStream(TpchSchema.Orders.ORDERSTATUS, TupleStreamSerdes.Encoding.DICTIONARY_RLE), + inputStreamProvider.getTupleStream(TpchSchema.Orders.TOTALPRICE, TupleStreamSerdes.Encoding.RAW), DoubleSumAggregation.PROVIDER ); } diff --git a/src/test/java/com/facebook/presto/benchmark/PredicateFilterBenchmark.java b/src/test/java/com/facebook/presto/benchmark/PredicateFilterBenchmark.java index 1a651bb5bd4a9..891202c26abd9 100644 --- a/src/test/java/com/facebook/presto/benchmark/PredicateFilterBenchmark.java +++ b/src/test/java/com/facebook/presto/benchmark/PredicateFilterBenchmark.java @@ -2,14 +2,12 @@ import com.facebook.presto.block.Cursor; import com.facebook.presto.block.TupleStream; +import com.facebook.presto.block.TupleStreamSerdes; import com.facebook.presto.operator.ApplyPredicateOperator; import com.facebook.presto.tpch.TpchSchema; +import com.facebook.presto.tpch.TpchTupleStreamProvider; import com.google.common.base.Predicate; -import java.util.List; - -import static com.facebook.presto.block.TupleStreamSerdes.Encoding; - public class PredicateFilterBenchmark extends AbstractTupleStreamBenchmark { @@ -19,15 +17,12 @@ public PredicateFilterBenchmark() } @Override - protected void setUp() + protected TupleStream createBenchmarkedTupleStream(TpchTupleStreamProvider inputStreamProvider) { - loadColumnFile(TpchSchema.Orders.TOTALPRICE, Encoding.RAW); - } - - @Override - protected TupleStream createBenchmarkedTupleStream(List inputTupleStreams) - { - return new ApplyPredicateOperator(inputTupleStreams.get(0), new DoubleFilter(50000.00)); + return new ApplyPredicateOperator( + inputStreamProvider.getTupleStream(TpchSchema.Orders.TOTALPRICE, TupleStreamSerdes.Encoding.RAW), + new DoubleFilter(50000.00) + ); } public static class DoubleFilter implements Predicate { diff --git a/src/test/java/com/facebook/presto/benchmark/RawStreamingBenchmark.java b/src/test/java/com/facebook/presto/benchmark/RawStreamingBenchmark.java index 0e9eb987fb806..1b76fae463b65 100644 --- a/src/test/java/com/facebook/presto/benchmark/RawStreamingBenchmark.java +++ b/src/test/java/com/facebook/presto/benchmark/RawStreamingBenchmark.java @@ -3,8 +3,7 @@ import com.facebook.presto.block.TupleStream; import com.facebook.presto.block.TupleStreamSerdes; import com.facebook.presto.tpch.TpchSchema; - -import java.util.List; +import com.facebook.presto.tpch.TpchTupleStreamProvider; public class RawStreamingBenchmark extends AbstractTupleStreamBenchmark @@ -15,15 +14,9 @@ public RawStreamingBenchmark() } @Override - protected void setUp() - { - loadColumnFile(TpchSchema.Orders.TOTALPRICE, TupleStreamSerdes.Encoding.RAW); - } - - @Override - protected TupleStream createBenchmarkedTupleStream(List inputTupleStreams) + protected TupleStream createBenchmarkedTupleStream(TpchTupleStreamProvider inputStreamProvider) { - return inputTupleStreams.get(0); + return inputStreamProvider.getTupleStream(TpchSchema.Orders.TOTALPRICE, TupleStreamSerdes.Encoding.RAW); } public static void main(String[] args) diff --git a/src/test/java/com/facebook/presto/benchmark/RleHashAggregationBenchmark.java b/src/test/java/com/facebook/presto/benchmark/RleHashAggregationBenchmark.java index 140b6a6416051..e5ab913c84af7 100644 --- a/src/test/java/com/facebook/presto/benchmark/RleHashAggregationBenchmark.java +++ b/src/test/java/com/facebook/presto/benchmark/RleHashAggregationBenchmark.java @@ -1,11 +1,15 @@ package com.facebook.presto.benchmark; import com.facebook.presto.aggregation.DoubleSumAggregation; +import com.facebook.presto.aggregation.SumAggregation; import com.facebook.presto.block.TupleStream; import com.facebook.presto.block.TupleStreamSerdes; +import com.facebook.presto.operation.SubtractionOperation; import com.facebook.presto.operator.GroupByOperator; import com.facebook.presto.operator.HashAggregationOperator; +import com.facebook.presto.operator.UncompressedBinaryOperator; import com.facebook.presto.tpch.TpchSchema; +import com.facebook.presto.tpch.TpchTupleStreamProvider; import java.util.List; @@ -18,22 +22,18 @@ public RleHashAggregationBenchmark() } @Override - protected void setUp() - { - loadColumnFile(TpchSchema.Orders.ORDERSTATUS, TupleStreamSerdes.Encoding.RLE); - loadColumnFile(TpchSchema.Orders.TOTALPRICE, TupleStreamSerdes.Encoding.RAW); - } - - @Override - protected TupleStream createBenchmarkedTupleStream(List inputTupleStreams) + protected TupleStream createBenchmarkedTupleStream(TpchTupleStreamProvider inputStreamProvider) { return new HashAggregationOperator( - new GroupByOperator(inputTupleStreams.get(0)), - inputTupleStreams.get(1), + new GroupByOperator( + inputStreamProvider.getTupleStream(TpchSchema.Orders.ORDERSTATUS, TupleStreamSerdes.Encoding.RLE) + ), + inputStreamProvider.getTupleStream(TpchSchema.Orders.TOTALPRICE, TupleStreamSerdes.Encoding.RAW), DoubleSumAggregation.PROVIDER ); } + public static void main(String[] args) { new RleHashAggregationBenchmark().runBenchmark( diff --git a/src/test/java/com/facebook/presto/benchmark/RlePipelinedAggregationBenchmark.java b/src/test/java/com/facebook/presto/benchmark/RlePipelinedAggregationBenchmark.java index 81aeb29895219..25b528a0e6f72 100644 --- a/src/test/java/com/facebook/presto/benchmark/RlePipelinedAggregationBenchmark.java +++ b/src/test/java/com/facebook/presto/benchmark/RlePipelinedAggregationBenchmark.java @@ -4,8 +4,10 @@ import com.facebook.presto.block.TupleStream; import com.facebook.presto.block.TupleStreamSerdes; import com.facebook.presto.operator.GroupByOperator; +import com.facebook.presto.operator.HashAggregationOperator; import com.facebook.presto.operator.PipelinedAggregationOperator; import com.facebook.presto.tpch.TpchSchema; +import com.facebook.presto.tpch.TpchTupleStreamProvider; import java.util.List; @@ -16,20 +18,15 @@ public RlePipelinedAggregationBenchmark() { super("pipelined_agg_rle", 5, 20); } - - @Override - protected void setUp() - { - loadColumnFile(TpchSchema.Orders.ORDERSTATUS, TupleStreamSerdes.Encoding.RLE); - loadColumnFile(TpchSchema.Orders.TOTALPRICE, TupleStreamSerdes.Encoding.RAW); - } - + @Override - protected TupleStream createBenchmarkedTupleStream(List inputTupleStreams) + protected TupleStream createBenchmarkedTupleStream(TpchTupleStreamProvider inputStreamProvider) { return new PipelinedAggregationOperator( - new GroupByOperator(inputTupleStreams.get(0)), - inputTupleStreams.get(1), + new GroupByOperator( + inputStreamProvider.getTupleStream(TpchSchema.Orders.ORDERSTATUS, TupleStreamSerdes.Encoding.RLE) + ), + inputStreamProvider.getTupleStream(TpchSchema.Orders.TOTALPRICE, TupleStreamSerdes.Encoding.RAW), DoubleSumAggregation.PROVIDER ); } diff --git a/src/test/java/com/facebook/presto/block/TestStatsCollectingTupleStreamSerde.java b/src/test/java/com/facebook/presto/block/TestStatsCollectingTupleStreamSerde.java index 7b0209653ed53..87d4125e3a48e 100644 --- a/src/test/java/com/facebook/presto/block/TestStatsCollectingTupleStreamSerde.java +++ b/src/test/java/com/facebook/presto/block/TestStatsCollectingTupleStreamSerde.java @@ -2,13 +2,12 @@ import com.facebook.presto.Range; import com.facebook.presto.block.uncompressed.UncompressedSerde; +import com.facebook.presto.operator.inlined.StatsInlinedOperator; import com.facebook.presto.slice.DynamicSliceOutput; import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import static com.facebook.presto.block.StatsCollectingTupleStreamSerde.*; - public class TestStatsCollectingTupleStreamSerde { private DynamicSliceOutput sliceOutput; @@ -29,11 +28,14 @@ public void testSanity() throws Exception .createTupleStreamWriter(sliceOutput) .append(tupleStream) .finish(); - StatsAnnotatedTupleStream statsAnnotatedTupleStream = serde.createDeserializer().deserialize(sliceOutput.slice()); - Blocks.assertTupleStreamEquals(statsAnnotatedTupleStream, tupleStream); - Assert.assertEquals(statsAnnotatedTupleStream.getStats().getPositionRange(), Range.create(0, 7)); - Assert.assertEquals(statsAnnotatedTupleStream.getStats().getRowCount(), 8); - Assert.assertEquals(statsAnnotatedTupleStream.getStats().getRunsCount(), 4); - Assert.assertEquals(statsAnnotatedTupleStream.getStats().getAverageRunLength(), 2); + TupleStream resultTupleStream = serde.createDeserializer().deserialize(sliceOutput.slice()); + StatsInlinedOperator.Stats stats = serde.createDeserializer().deserializeStats(sliceOutput.slice()); + Blocks.assertTupleStreamEquals(resultTupleStream, tupleStream); + + Assert.assertEquals(stats.getAvgRunLength() , 2); + Assert.assertEquals(stats.getMinPosition() , 0); + Assert.assertEquals(stats.getMaxPosition() , 7); + Assert.assertEquals(stats.getRowCount() , 8); + Assert.assertEquals(stats.getRunsCount() , 4); } } diff --git a/src/test/java/com/facebook/presto/operator/inlined/TestStatsInlinedOperator.java b/src/test/java/com/facebook/presto/operator/inlined/TestStatsInlinedOperator.java index f124c142ed362..443a3bbf258fc 100644 --- a/src/test/java/com/facebook/presto/operator/inlined/TestStatsInlinedOperator.java +++ b/src/test/java/com/facebook/presto/operator/inlined/TestStatsInlinedOperator.java @@ -31,6 +31,7 @@ public void testInitFail() throws Exception statsInlinedOperator.process(Cursors.asTupleStreamPosition(cursor)); } + @Test public void testPartial() throws Exception { cursor.advanceNextPosition(); @@ -45,6 +46,7 @@ public void testPartial() throws Exception Assert.assertEquals(statsInlinedOperator.getRange(), Range.create(0, 0)); } + @Test public void testFull() throws Exception { cursor.advanceNextValue(); diff --git a/src/test/java/com/facebook/presto/tpch/CachingTpchDataProvider.java b/src/test/java/com/facebook/presto/tpch/CachingTpchDataProvider.java new file mode 100644 index 0000000000000..1bc6bf554f525 --- /dev/null +++ b/src/test/java/com/facebook/presto/tpch/CachingTpchDataProvider.java @@ -0,0 +1,73 @@ +package com.facebook.presto.tpch; + +import com.facebook.presto.block.TupleStreamSerializer; +import com.google.common.base.Preconditions; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class CachingTpchDataProvider + implements TpchDataProvider +{ + private final Map localFileCache = new HashMap<>(); + private final TpchDataProvider delegate; + + public CachingTpchDataProvider(TpchDataProvider delegate) + { + this.delegate = checkNotNull(delegate, "delegate is null"); + } + + @Override + public File getColumnFile(TpchSchema.Column column, TupleStreamSerializer serializer, String serdeName) + { + Preconditions.checkNotNull(column, "column is null"); + Preconditions.checkNotNull(serializer, "serializer is null"); + Preconditions.checkNotNull(serdeName, "serdeName is null"); + + // Hack: Use the serdeName as the unique identifier of the serializer + TpchColumnRequest columnRequest = new TpchColumnRequest(column, serdeName); + File file = localFileCache.get(columnRequest); + if (file == null) { + file = delegate.getColumnFile(column, serializer, serdeName); + localFileCache.put(columnRequest, file); + } + return file; + } + + private static final class TpchColumnRequest + { + private final TpchSchema.Column column; + private final String serdeName; + + private TpchColumnRequest(TpchSchema.Column column, String serdeName) + { + this.column = checkNotNull(column, "column is null"); + this.serdeName = checkNotNull(serdeName, "serdeName is null"); + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof TpchColumnRequest)) return false; + + TpchColumnRequest that = (TpchColumnRequest) o; + + if (!column.equals(that.column)) return false; + if (!serdeName.equals(that.serdeName)) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = column.hashCode(); + result = 31 * result + serdeName.hashCode(); + return result; + } + } +} diff --git a/src/test/java/com/facebook/presto/tpch/GeneratingTpchDataProvider.java b/src/test/java/com/facebook/presto/tpch/GeneratingTpchDataProvider.java new file mode 100644 index 0000000000000..6019e826b574e --- /dev/null +++ b/src/test/java/com/facebook/presto/tpch/GeneratingTpchDataProvider.java @@ -0,0 +1,170 @@ +package com.facebook.presto.tpch; + +import com.facebook.presto.block.TupleStreamSerializer; +import com.facebook.presto.ingest.BlockDataImporter; +import com.facebook.presto.ingest.BlockExtractor; +import com.facebook.presto.ingest.DelimitedBlockExtractor; +import com.google.common.base.Charsets; +import com.google.common.base.Splitter; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.hash.Hashing; +import com.google.common.io.ByteStreams; +import com.google.common.io.Files; +import com.google.common.io.InputSupplier; +import com.google.common.io.Resources; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.jar.JarFile; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Extracts TPCH data into serialized column file formats. + * It will also cache the extracted columns in the local file system to help mitigate the cost of the operation. + */ +public class GeneratingTpchDataProvider + implements TpchDataProvider +{ + private final TableInputSupplierFactory tableInputSupplierFactory; + private final File cacheDirectory; + + public GeneratingTpchDataProvider(TableInputSupplierFactory tableInputSupplierFactory, File cacheDirectory) + { + checkNotNull(tableInputSupplierFactory, "tableInputStreamProvider is null"); + checkNotNull(cacheDirectory, "cacheDirectory is null"); + checkArgument(!cacheDirectory.exists() || cacheDirectory.isDirectory(), "cacheDirectory must be a directory"); + this.tableInputSupplierFactory = tableInputSupplierFactory; + this.cacheDirectory = cacheDirectory; + } + + public GeneratingTpchDataProvider(TableInputSupplierFactory tableInputSupplierFactory, String cacheDirectoryName) + { + this(tableInputSupplierFactory, new File(checkNotNull(cacheDirectoryName, "cacheDirectoryName is null"))); + } + + public GeneratingTpchDataProvider(String cacheDirectoryName) + { + this(autoSelectTableInputStreamProvider(), cacheDirectoryName); + } + + public GeneratingTpchDataProvider() + { + this(System.getProperty("tpchCacheDir", "/tmp/tpchdatacache")); + } + + private interface TableInputSupplierFactory + { + InputSupplier getInputSupplier(String tableName); + } + + private static class JarTableInputSupplierFactory + implements TableInputSupplierFactory + { + private final String jarFileName; + + private JarTableInputSupplierFactory(String jarFileName) + { + this.jarFileName = checkNotNull(jarFileName, "jarFileName is null"); + } + + @Override + public InputSupplier getInputSupplier(final String tableName) + { + checkNotNull(tableName, "tableFileName is null"); + return new InputSupplier() { + @Override + public InputStream getInput() throws IOException + { + try { + JarFile jarFile = new JarFile(jarFileName); + return jarFile.getInputStream(jarFile.getJarEntry(createTableFileName(tableName))); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + }; + } + } + + private static class ResourcesTableInputSupplierFactory + implements TableInputSupplierFactory + { + @Override + public InputSupplier getInputSupplier(String tableName) + { + checkNotNull(tableName, "tableFileName is null"); + return Resources.newInputStreamSupplier(Resources.getResource(createTableFileName(tableName))); + } + } + + private static TableInputSupplierFactory autoSelectTableInputStreamProvider() { + // First check if a data jar file has been manually specified + final String tpchDataJarFileOverride = System.getProperty("tpchDataJar"); + if (tpchDataJarFileOverride != null) { + return new JarTableInputSupplierFactory(tpchDataJarFileOverride); + } + // Otherwise fall back to the default in resources if one is available + else { + return new ResourcesTableInputSupplierFactory(); + } + } + + // TODO: make this work for columns with more than one file + @Override + public File getColumnFile(final TpchSchema.Column column, TupleStreamSerializer serializer, String serdeName) + { + checkNotNull(column, "column is null"); + checkNotNull(serializer, "serializer is null"); + checkNotNull(serdeName, "serdeName is null"); + + try { + String hash = ByteStreams.hash(tableInputSupplierFactory.getInputSupplier(column.getTableName()), Hashing.md5()).toString(); + + File cachedFile = new File(new File(cacheDirectory, column.getTableName() + "-" + hash), createFileName(column, serdeName)); + if (cachedFile.exists()) { + return cachedFile; + } + + Files.createParentDirs(cachedFile); + + BlockExtractor blockExtractor = new DelimitedBlockExtractor( + Splitter.on('|'), + ImmutableList.of(new DelimitedBlockExtractor.ColumnDefinition(column.getIndex(), column.getType())) + ); + BlockDataImporter importer = new BlockDataImporter( + blockExtractor, + ImmutableList.of( + new BlockDataImporter.ColumnImportSpec( + serializer, + Files.newOutputStreamSupplier(cachedFile))) + ); + importer.importFrom( + new InputSupplier() { + @Override + public InputStreamReader getInput() throws IOException + { + return new InputStreamReader(tableInputSupplierFactory.getInputSupplier(column.getTableName()).getInput(), Charsets.UTF_8); + } + } + ); + return cachedFile; + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + + private static String createTableFileName(String tableName) + { + return tableName + ".tbl"; + } + + private static String createFileName(TpchSchema.Column column, String serdeName) + { + return String.format("column%d.%s_%s.data", column.getIndex(), column.getType().getName(), serdeName); + } +} diff --git a/src/test/java/com/facebook/presto/tpch/MetricRecordingTpchDataProvider.java b/src/test/java/com/facebook/presto/tpch/MetricRecordingTpchDataProvider.java new file mode 100644 index 0000000000000..e3ced45898441 --- /dev/null +++ b/src/test/java/com/facebook/presto/tpch/MetricRecordingTpchDataProvider.java @@ -0,0 +1,48 @@ +package com.facebook.presto.tpch; + +import com.facebook.presto.block.TupleStreamSerializer; +import com.google.common.base.Preconditions; +import io.airlift.units.DataSize; +import io.airlift.units.Duration; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +public class MetricRecordingTpchDataProvider + implements TpchDataProvider +{ + private final TpchDataProvider tpchDataProvider; + private long dataFetchElapsedMillis; + private long cumulativeDataByteSize; + + public MetricRecordingTpchDataProvider(TpchDataProvider tpchDataProvider) + { + this.tpchDataProvider = Preconditions.checkNotNull(tpchDataProvider, "tpchDataProvider is null"); + } + + @Override + public File getColumnFile(TpchSchema.Column column, TupleStreamSerializer serializer, String serdeName) + { + Preconditions.checkNotNull(column, "column is null"); + Preconditions.checkNotNull(serializer, "serializer is null"); + Preconditions.checkNotNull(serdeName, "serdeName is null"); + long start = System.nanoTime(); + try { + File file = tpchDataProvider.getColumnFile(column, serializer, serdeName); + cumulativeDataByteSize += file.length(); + return file; + } finally { + dataFetchElapsedMillis += Duration.nanosSince(start).toMillis(); + } + } + + public Duration getDataFetchElapsedTime() + { + return new Duration(dataFetchElapsedMillis, TimeUnit.MILLISECONDS); + } + + public DataSize getCumulativeDataSize() + { + return new DataSize(cumulativeDataByteSize, DataSize.Unit.BYTE); + } +} diff --git a/src/test/java/com/facebook/presto/tpch/TpchDataProvider.java b/src/test/java/com/facebook/presto/tpch/TpchDataProvider.java index f39d72cddbf09..54908b8540de7 100644 --- a/src/test/java/com/facebook/presto/tpch/TpchDataProvider.java +++ b/src/test/java/com/facebook/presto/tpch/TpchDataProvider.java @@ -1,160 +1,10 @@ package com.facebook.presto.tpch; -import com.facebook.presto.block.TupleStreamSerdes; -import com.facebook.presto.ingest.BlockDataImporter; -import com.facebook.presto.ingest.BlockExtractor; -import com.facebook.presto.ingest.DelimitedBlockExtractor; -import com.google.common.base.Charsets; -import com.google.common.base.Splitter; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.hash.Hashing; -import com.google.common.io.ByteStreams; -import com.google.common.io.Files; -import com.google.common.io.InputSupplier; -import com.google.common.io.Resources; +import com.facebook.presto.block.TupleStreamSerializer; import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.jar.JarFile; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -public class TpchDataProvider +public interface TpchDataProvider { - private final TableInputSupplierFactory tableInputSupplierFactory; - private final File cacheDirectory; - - public TpchDataProvider(TableInputSupplierFactory tableInputSupplierFactory, File cacheDirectory) - { - checkNotNull(tableInputSupplierFactory, "tableInputStreamProvider is null"); - checkNotNull(cacheDirectory, "cacheDirectory is null"); - checkArgument(!cacheDirectory.exists() || cacheDirectory.isDirectory(), "cacheDirectory must be a directory"); - this.tableInputSupplierFactory = tableInputSupplierFactory; - this.cacheDirectory = cacheDirectory; - } - - public TpchDataProvider(TableInputSupplierFactory tableInputSupplierFactory, String cacheDirectoryName) - { - this(tableInputSupplierFactory, new File(checkNotNull(cacheDirectoryName, "cacheDirectoryName is null"))); - } - - public TpchDataProvider(String cacheDirectoryName) - { - this(autoSelectTableInputStreamProvider(), cacheDirectoryName); - } - - public TpchDataProvider() - { - this(System.getProperty("tpchCacheDir", "/tmp/tpchdatacache")); - } - - private interface TableInputSupplierFactory - { - InputSupplier getInputSupplier(String tableName); - } - - private static class JarTableInputSupplierFactory - implements TableInputSupplierFactory - { - private final String jarFileName; - - private JarTableInputSupplierFactory(String jarFileName) - { - this.jarFileName = checkNotNull(jarFileName, "jarFileName is null"); - } - - @Override - public InputSupplier getInputSupplier(final String tableName) - { - checkNotNull(tableName, "tableFileName is null"); - return new InputSupplier() { - @Override - public InputStream getInput() throws IOException - { - try { - JarFile jarFile = new JarFile(jarFileName); - return jarFile.getInputStream(jarFile.getJarEntry(createTableFileName(tableName))); - } catch (IOException e) { - throw Throwables.propagate(e); - } - } - }; - } - } - - private static class ResourcesTableInputSupplierFactory - implements TableInputSupplierFactory - { - @Override - public InputSupplier getInputSupplier(String tableName) - { - checkNotNull(tableName, "tableFileName is null"); - return Resources.newInputStreamSupplier(Resources.getResource(createTableFileName(tableName))); - } - } - - private static TableInputSupplierFactory autoSelectTableInputStreamProvider() { - // First check if a data jar file has been manually specified - final String tpchDataJarFileOverride = System.getProperty("tpchDataJar"); - if (tpchDataJarFileOverride != null) { - return new JarTableInputSupplierFactory(tpchDataJarFileOverride); - } - // Otherwise fall back to the default in resources if one is available - else { - return new ResourcesTableInputSupplierFactory(); - } - } - - // TODO: make this work for columns with more than one file - public File getColumnFile(final TpchSchema.Column column, TupleStreamSerdes.Encoding encoding) throws IOException - { - checkNotNull(column, "column is null"); - checkNotNull(encoding, "encoding is null"); - - String hash = ByteStreams.hash(tableInputSupplierFactory.getInputSupplier(column.getTableName()), Hashing.md5()).toString(); - - File cachedFile = new File(new File(cacheDirectory, column.getTableName() + "-" + hash), createFileName(column, encoding)); - if (cachedFile.exists()) { - return cachedFile; - } - - Files.createParentDirs(cachedFile); - - BlockExtractor blockExtractor = new DelimitedBlockExtractor( - Splitter.on('|'), - ImmutableList.of(new DelimitedBlockExtractor.ColumnDefinition(column.getIndex(), column.getType())) - ); - BlockDataImporter importer = new BlockDataImporter( - blockExtractor, - ImmutableList.of( - new BlockDataImporter.ColumnImportSpec( - // The TPCH data will use default stats annotated and self ID'ed serde - TupleStreamSerdes.createDefaultSerializer(encoding.createSerde()), - Files.newOutputStreamSupplier(cachedFile))) - ); - importer.importFrom( - new InputSupplier() { - @Override - public InputStreamReader getInput() throws IOException - { - return new InputStreamReader(tableInputSupplierFactory.getInputSupplier(column.getTableName()).getInput(), Charsets.UTF_8); - } - } - ); - return cachedFile; - } - - private static String createTableFileName(String tableName) - { - return tableName + ".tbl"; - } - - private static String createFileName(TpchSchema.Column column, TupleStreamSerdes.Encoding encoding) - { - return String.format("column%d.%s_%s.data", column.getIndex(), column.getType().getName(), encoding.getName()); - } + File getColumnFile(TpchSchema.Column column, TupleStreamSerializer serializer, String serdeName); } diff --git a/src/test/java/com/facebook/presto/tpch/TpchTupleStreamProvider.java b/src/test/java/com/facebook/presto/tpch/TpchTupleStreamProvider.java new file mode 100644 index 0000000000000..cc0080b621f9c --- /dev/null +++ b/src/test/java/com/facebook/presto/tpch/TpchTupleStreamProvider.java @@ -0,0 +1,9 @@ +package com.facebook.presto.tpch; + +import com.facebook.presto.block.TupleStream; +import com.facebook.presto.block.TupleStreamSerdes; + +public interface TpchTupleStreamProvider +{ + TupleStream getTupleStream(TpchSchema.Column column, TupleStreamSerdes.Encoding encoding); +}