diff --git a/benchmarks/src/main/java/io/druid/benchmark/CompressedIndexedIntsBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/CompressedIndexedIntsBenchmark.java index 8d97705ca11b..9a5bf336e65e 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/CompressedIndexedIntsBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/CompressedIndexedIntsBenchmark.java @@ -76,7 +76,9 @@ public void setup() throws IOException ) ); this.compressed = CompressedVSizeIntsIndexedSupplier.fromByteBuffer( - bufferCompressed, ByteOrder.nativeOrder() + bufferCompressed, + ByteOrder.nativeOrder(), + null ).get(); final ByteBuffer bufferUncompressed = serialize( diff --git a/benchmarks/src/main/java/io/druid/benchmark/CompressedVSizeIndexedBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/CompressedVSizeIndexedBenchmark.java index 431eb2c2abe5..106854cd0386 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/CompressedVSizeIndexedBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/CompressedVSizeIndexedBenchmark.java @@ -99,7 +99,9 @@ public IndexedInts apply(int[] input) ) ); this.compressed = CompressedVSizeIndexedSupplier.fromByteBuffer( - bufferCompressed, ByteOrder.nativeOrder() + bufferCompressed, + ByteOrder.nativeOrder(), + null ).get(); final ByteBuffer bufferUncompressed = serialize( diff --git a/benchmarks/src/main/java/io/druid/benchmark/FloatCompressionBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/FloatCompressionBenchmark.java index ee46f8384f79..b06ecb0657a2 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/FloatCompressionBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/FloatCompressionBenchmark.java @@ -72,7 +72,7 @@ public void setup() throws Exception File compFile = new File(dir, file + "-" + strategy); rand = new Random(); ByteBuffer buffer = Files.map(compFile); - supplier = CompressedFloatsIndexedSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder()); + supplier = CompressedFloatsIndexedSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder(), null); } @Benchmark diff --git a/benchmarks/src/main/java/io/druid/benchmark/LongCompressionBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/LongCompressionBenchmark.java index cfb40ceabd5f..ac41c6874571 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/LongCompressionBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/LongCompressionBenchmark.java @@ -75,7 +75,7 @@ public void setup() throws Exception File compFile = new File(dir, file + "-" + strategy + "-" + format); rand = new Random(); ByteBuffer buffer = Files.map(compFile); - supplier = CompressedLongsIndexedSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder()); + supplier = CompressedLongsIndexedSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder(), null); } @Benchmark diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIOPeon.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIOPeon.java index e74d03d96eab..7bb19e6c3644 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIOPeon.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIOPeon.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.JobContext; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -59,4 +60,10 @@ public void close() throws IOException { throw new UnsupportedOperationException(); } + + @Override + public File getFile(String filename) + { + throw new UnsupportedOperationException(); + } } diff --git a/processing/src/main/java/io/druid/segment/CompressedVSizeIndexedSupplier.java b/processing/src/main/java/io/druid/segment/CompressedVSizeIndexedSupplier.java index 6738480809be..e2a47e36eb5c 100644 --- a/processing/src/main/java/io/druid/segment/CompressedVSizeIndexedSupplier.java +++ b/processing/src/main/java/io/druid/segment/CompressedVSizeIndexedSupplier.java @@ -20,6 +20,7 @@ package io.druid.segment; import io.druid.java.util.common.IAE; +import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; import io.druid.segment.data.CompressedObjectStrategy; import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier; import io.druid.segment.data.IndexedInts; @@ -76,18 +77,20 @@ public void writeToChannel(WritableByteChannel channel) throws IOException valueSupplier.writeToChannel(channel); } - public static CompressedVSizeIndexedSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order) + public static CompressedVSizeIndexedSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order, SmooshedFileMapper fileMapper) { byte versionFromBuffer = buffer.get(); if (versionFromBuffer == version) { CompressedVSizeIntsIndexedSupplier offsetSupplier = CompressedVSizeIntsIndexedSupplier.fromByteBuffer( buffer, - order + order, + fileMapper ); CompressedVSizeIntsIndexedSupplier valueSupplier = CompressedVSizeIntsIndexedSupplier.fromByteBuffer( buffer, - order + order, + fileMapper ); return new CompressedVSizeIndexedSupplier(offsetSupplier, valueSupplier); } diff --git a/processing/src/main/java/io/druid/segment/CompressedVSizeIndexedV3Supplier.java b/processing/src/main/java/io/druid/segment/CompressedVSizeIndexedV3Supplier.java index 893d35a61589..26e10afa5543 100644 --- a/processing/src/main/java/io/druid/segment/CompressedVSizeIndexedV3Supplier.java +++ b/processing/src/main/java/io/druid/segment/CompressedVSizeIndexedV3Supplier.java @@ -20,6 +20,7 @@ package io.druid.segment; import io.druid.java.util.common.IAE; +import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; import io.druid.segment.data.CompressedIntsIndexedSupplier; import io.druid.segment.data.CompressedObjectStrategy; import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier; @@ -58,18 +59,24 @@ public class CompressedVSizeIndexedV3Supplier implements WritableSupplier availableDimensions = GenericIndexed.read( - indexBuffer, GenericIndexed.STRING_STRATEGY + indexBuffer, + GenericIndexed.STRING_STRATEGY, + smooshedFiles ); final GenericIndexed availableMetrics = GenericIndexed.read( - indexBuffer, GenericIndexed.STRING_STRATEGY + indexBuffer, + GenericIndexed.STRING_STRATEGY, + smooshedFiles ); final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer)); final BitmapSerdeFactory bitmapSerdeFactory = new BitmapSerde.LegacyBitmapSerdeFactory(); CompressedLongsIndexedSupplier timestamps = CompressedLongsIndexedSupplier.fromByteBuffer( - smooshedFiles.mapFile(makeTimeFile(inDir, BYTE_ORDER).getName()), BYTE_ORDER + smooshedFiles.mapFile(makeTimeFile(inDir, BYTE_ORDER).getName()), + BYTE_ORDER, + smooshedFiles ); Map metrics = Maps.newLinkedHashMap(); for (String metric : availableMetrics) { final String metricFilename = makeMetricFile(inDir, metric, BYTE_ORDER).getName(); - final MetricHolder holder = MetricHolder.fromByteBuffer(smooshedFiles.mapFile(metricFilename)); + final MetricHolder holder = MetricHolder.fromByteBuffer(smooshedFiles.mapFile(metricFilename), smooshedFiles); if (!metric.equals(holder.getName())) { throw new ISE("Metric[%s] loaded up metric[%s] from disk. File names do matter.", metric, holder.getName()); @@ -509,7 +515,7 @@ public void convertV8toV9(File v8Dir, File v9Dir, IndexSpec indexSpec) final String dimName = serializerUtils.readString(invertedBuffer); bitmapIndexes.put( dimName, - GenericIndexed.read(invertedBuffer, bitmapSerdeFactory.getObjectStrategy()) + GenericIndexed.read(invertedBuffer, bitmapSerdeFactory.getObjectStrategy(), v8SmooshedFiles) ); } @@ -691,7 +697,7 @@ public int size() dimension, serdeficator.numBytes() + specBytes.length ); channel.write(ByteBuffer.wrap(specBytes)); - serdeficator.write(channel); + serdeficator.write(channel, v9Smoosher); channel.close(); } else if (filename.startsWith("met_") || filename.startsWith("numeric_dim_")) { // NOTE: identifying numeric dimensions by using a different filename pattern is meant to allow the @@ -702,7 +708,7 @@ public int size() continue; } - MetricHolder holder = MetricHolder.fromByteBuffer(v8SmooshedFiles.mapFile(filename)); + MetricHolder holder = MetricHolder.fromByteBuffer(v8SmooshedFiles.mapFile(filename), v8SmooshedFiles); final String metric = holder.getName(); final ColumnDescriptor.Builder builder = ColumnDescriptor.builder(); @@ -753,11 +759,13 @@ public int size() metric, serdeficator.numBytes() + specBytes.length ); channel.write(ByteBuffer.wrap(specBytes)); - serdeficator.write(channel); + serdeficator.write(channel, v9Smoosher); channel.close(); } else if (String.format("time_%s.drd", BYTE_ORDER).equals(filename)) { CompressedLongsIndexedSupplier timestamps = CompressedLongsIndexedSupplier.fromByteBuffer( - v8SmooshedFiles.mapFile(filename), BYTE_ORDER + v8SmooshedFiles.mapFile(filename), + BYTE_ORDER, + v8SmooshedFiles ); final ColumnDescriptor.Builder builder = ColumnDescriptor.builder(); @@ -778,7 +786,7 @@ public int size() "__time", serdeficator.numBytes() + specBytes.length ); channel.write(ByteBuffer.wrap(specBytes)); - serdeficator.write(channel); + serdeficator.write(channel, v9Smoosher); channel.close(); } else { skippedFiles.add(filename); @@ -983,8 +991,16 @@ public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException * Index.drd should consist of the segment version, the columns and dimensions of the segment as generic * indexes, the interval start and end millis as longs (in 16 bytes), and a bitmap index type. */ - final GenericIndexed cols = GenericIndexed.read(indexBuffer, GenericIndexed.STRING_STRATEGY); - final GenericIndexed dims = GenericIndexed.read(indexBuffer, GenericIndexed.STRING_STRATEGY); + final GenericIndexed cols = GenericIndexed.read( + indexBuffer, + GenericIndexed.STRING_STRATEGY, + smooshedFiles + ); + final GenericIndexed dims = GenericIndexed.read( + indexBuffer, + GenericIndexed.STRING_STRATEGY, + smooshedFiles + ); final Interval dataInterval = new Interval(indexBuffer.getLong(), indexBuffer.getLong()); final BitmapSerdeFactory segmentBitmapSerdeFactory; @@ -1021,10 +1037,10 @@ public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException Map columns = Maps.newHashMap(); for (String columnName : cols) { - columns.put(columnName, deserializeColumn(mapper, smooshedFiles.mapFile(columnName))); + columns.put(columnName, deserializeColumn(mapper, smooshedFiles.mapFile(columnName), smooshedFiles)); } - columns.put(Column.TIME_COLUMN_NAME, deserializeColumn(mapper, smooshedFiles.mapFile("__time"))); + columns.put(Column.TIME_COLUMN_NAME, deserializeColumn(mapper, smooshedFiles.mapFile("__time"), smooshedFiles)); final QueryableIndex index = new SimpleQueryableIndex( dataInterval, cols, dims, segmentBitmapSerdeFactory.getBitmapFactory(), columns, smooshedFiles, metadata @@ -1035,12 +1051,13 @@ public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException return index; } - private Column deserializeColumn(ObjectMapper mapper, ByteBuffer byteBuffer) throws IOException + private Column deserializeColumn(ObjectMapper mapper, ByteBuffer byteBuffer, SmooshedFileMapper smooshedFiles) + throws IOException { ColumnDescriptor serde = mapper.readValue( serializerUtils.readString(byteBuffer), ColumnDescriptor.class ); - return serde.read(byteBuffer, columnConfig); + return serde.read(byteBuffer, columnConfig, smooshedFiles); } } diff --git a/processing/src/main/java/io/druid/segment/IndexMergerV9.java b/processing/src/main/java/io/druid/segment/IndexMergerV9.java index dd6db74a02e8..e41ceb690d6b 100644 --- a/processing/src/main/java/io/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/io/druid/segment/IndexMergerV9.java @@ -423,7 +423,7 @@ private void makeColumn( ); try { channel.write(ByteBuffer.wrap(specBytes)); - serdeficator.write(channel); + serdeficator.write(channel, v9Smoosher); } finally { channel.close(); @@ -530,7 +530,7 @@ private ArrayList setupMetricsWriters( if (serde == null) { throw new ISE("Unknown type[%s]", typeName); } - writer = ComplexColumnSerializer.create(ioPeon, metric, serde); + writer = serde.getSerializer(ioPeon, metric); break; default: throw new ISE("Unknown type[%s]", type); diff --git a/processing/src/main/java/io/druid/segment/IntIteratorUtils.java b/processing/src/main/java/io/druid/segment/IntIteratorUtils.java index 75189b7511d3..98a56d02da1a 100644 --- a/processing/src/main/java/io/druid/segment/IntIteratorUtils.java +++ b/processing/src/main/java/io/druid/segment/IntIteratorUtils.java @@ -19,8 +19,7 @@ package io.druid.segment; -import com.metamx.common.IAE; -import com.metamx.common.guava.MergeIterator; +import io.druid.java.util.common.IAE; import it.unimi.dsi.fastutil.ints.AbstractIntIterator; import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntIterator; @@ -55,7 +54,8 @@ public static int skip(IntIterator it, int n) * It isn't checked if the given source iterators are actually ascending, if they are not, the order of values in the * returned iterator is undefined. *

- * This is similar to what {@link MergeIterator} does with simple {@link java.util.Iterator}s. + * This is similar to what {@link io.druid.java.util.common.guava.MergeIterator} does with simple + * {@link java.util.Iterator}s. * * @param iterators iterators to merge, must return ascending values */ @@ -71,7 +71,8 @@ public static IntIterator mergeAscending(List iterators) } /** - * This class is designed mostly after {@link MergeIterator}. {@code MergeIterator} uses a priority queue of wrapper + * This class is designed mostly after {@link io.druid.java.util.common.guava.MergeIterator}. + * {@code MergeIterator} uses a priority queue of wrapper * "peeking" iterators. Peeking wrappers are not available in fastutil for specialized iterators like IntIterator, so * they should be implemented manually in the druid codebase. Instead, another approach is taken: a priority queue * of primitive long values is used, in long values the high 32-bits is the last value from some iterator, and the low diff --git a/processing/src/main/java/io/druid/segment/LongColumnSerializer.java b/processing/src/main/java/io/druid/segment/LongColumnSerializer.java index 1f0a81f7dac8..2c164f59d938 100644 --- a/processing/src/main/java/io/druid/segment/LongColumnSerializer.java +++ b/processing/src/main/java/io/druid/segment/LongColumnSerializer.java @@ -19,6 +19,7 @@ package io.druid.segment; +import io.druid.java.util.common.io.smoosh.FileSmoosher; import io.druid.segment.data.CompressedObjectStrategy; import io.druid.segment.data.CompressionFactory; import io.druid.segment.data.IOPeon; @@ -95,9 +96,9 @@ public long getSerializedSize() } @Override - public void writeToChannel(WritableByteChannel channel) throws IOException + public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { - writer.writeToChannel(channel); + writer.writeToChannel(channel, smoosher); } } diff --git a/processing/src/main/java/io/druid/segment/MetricHolder.java b/processing/src/main/java/io/druid/segment/MetricHolder.java index 48803d9d3ca5..1782d8277361 100644 --- a/processing/src/main/java/io/druid/segment/MetricHolder.java +++ b/processing/src/main/java/io/druid/segment/MetricHolder.java @@ -26,6 +26,7 @@ import io.druid.common.utils.SerializerUtils; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; import io.druid.segment.data.CompressedFloatsIndexedSupplier; import io.druid.segment.data.CompressedLongsIndexedSupplier; import io.druid.segment.data.FloatSupplierSerializer; @@ -75,7 +76,6 @@ public static void writeComplexMetric( out.write(version); serializerUtils.writeString(out, name); serializerUtils.writeString(out, typeName); - final InputSupplier supplier = column.combineStreams(); try (InputStream in = supplier.getInput()) { ByteStreams.copy(in, out); @@ -123,12 +123,13 @@ public static void writeToChannel(MetricHolder holder, WritableByteChannel out) } } - public static MetricHolder fromByteBuffer(ByteBuffer buf) throws IOException + public static MetricHolder fromByteBuffer(ByteBuffer buf, SmooshedFileMapper mapper) throws IOException { - return fromByteBuffer(buf, null); + return fromByteBuffer(buf, null, mapper); } - public static MetricHolder fromByteBuffer(ByteBuffer buf, ObjectStrategy strategy) throws IOException + public static MetricHolder fromByteBuffer(ByteBuffer buf, ObjectStrategy strategy, SmooshedFileMapper mapper) + throws IOException { final byte ver = buf.get(); if (version[0] != ver) { @@ -141,14 +142,14 @@ public static MetricHolder fromByteBuffer(ByteBuffer buf, ObjectStrategy strateg switch (holder.type) { case LONG: - holder.longType = CompressedLongsIndexedSupplier.fromByteBuffer(buf, ByteOrder.nativeOrder()); + holder.longType = CompressedLongsIndexedSupplier.fromByteBuffer(buf, ByteOrder.nativeOrder(), mapper); break; case FLOAT: - holder.floatType = CompressedFloatsIndexedSupplier.fromByteBuffer(buf, ByteOrder.nativeOrder()); + holder.floatType = CompressedFloatsIndexedSupplier.fromByteBuffer(buf, ByteOrder.nativeOrder(), mapper); break; case COMPLEX: if (strategy != null) { - holder.complexType = GenericIndexed.read(buf, strategy); + holder.complexType = GenericIndexed.read(buf, strategy, mapper); } else { final ComplexMetricSerde serdeForType = ComplexMetrics.getSerdeForType(holder.getTypeName()); @@ -165,7 +166,8 @@ public static MetricHolder fromByteBuffer(ByteBuffer buf, ObjectStrategy strateg } // This is only for guava14 compat. Eventually it should be able to be removed. - private static OutputSupplier toOutputSupplier(final ByteSink sink) { + private static OutputSupplier toOutputSupplier(final ByteSink sink) + { return new OutputSupplier() { @Override diff --git a/processing/src/main/java/io/druid/segment/column/ColumnBuilder.java b/processing/src/main/java/io/druid/segment/column/ColumnBuilder.java index ebbd8efbf09f..c5e52ae84e97 100644 --- a/processing/src/main/java/io/druid/segment/column/ColumnBuilder.java +++ b/processing/src/main/java/io/druid/segment/column/ColumnBuilder.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; +import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; /** */ @@ -35,6 +36,18 @@ public class ColumnBuilder private Supplier complexColumn = null; private Supplier bitmapIndex = null; private Supplier spatialIndex = null; + private SmooshedFileMapper fileMapper = null; + + public ColumnBuilder setFileMapper(SmooshedFileMapper fileMapper) + { + this.fileMapper = fileMapper; + return this; + } + + public SmooshedFileMapper getFileMapper() + { + return this.fileMapper; + } public ColumnBuilder setType(ValueType type) { diff --git a/processing/src/main/java/io/druid/segment/column/ColumnDescriptor.java b/processing/src/main/java/io/druid/segment/column/ColumnDescriptor.java index 92b235bd16dc..7119012abb83 100644 --- a/processing/src/main/java/io/druid/segment/column/ColumnDescriptor.java +++ b/processing/src/main/java/io/druid/segment/column/ColumnDescriptor.java @@ -24,6 +24,8 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import io.druid.java.util.common.IAE; +import io.druid.java.util.common.io.smoosh.FileSmoosher; +import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; import io.druid.segment.serde.ColumnPartSerde; import java.io.IOException; @@ -85,18 +87,19 @@ public long numBytes() return retVal; } - public void write(WritableByteChannel channel) throws IOException + public void write(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { for (ColumnPartSerde part : parts) { - part.getSerializer().write(channel); + part.getSerializer().write(channel, smoosher); } } - public Column read(ByteBuffer buffer, ColumnConfig columnConfig) + public Column read(ByteBuffer buffer, ColumnConfig columnConfig, SmooshedFileMapper smooshedFiles) { final ColumnBuilder builder = new ColumnBuilder() .setType(valueType) - .setHasMultipleValues(hasMultipleValues); + .setHasMultipleValues(hasMultipleValues) + .setFileMapper(smooshedFiles); for (ColumnPartSerde part : parts) { part.getDeserializer().read(buffer, builder, columnConfig); diff --git a/processing/src/main/java/io/druid/segment/data/BlockLayoutFloatSupplierSerializer.java b/processing/src/main/java/io/druid/segment/data/BlockLayoutFloatSupplierSerializer.java index 9ccbe00606e4..ddb112b5063e 100644 --- a/processing/src/main/java/io/druid/segment/data/BlockLayoutFloatSupplierSerializer.java +++ b/processing/src/main/java/io/druid/segment/data/BlockLayoutFloatSupplierSerializer.java @@ -26,6 +26,7 @@ import com.google.common.primitives.Ints; import io.druid.collections.ResourceHolder; import io.druid.collections.StupidResourceHolder; +import io.druid.java.util.common.io.smoosh.FileSmoosher; import io.druid.segment.CompressedPools; import java.io.IOException; @@ -34,7 +35,6 @@ import java.nio.ByteOrder; import java.nio.FloatBuffer; import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; public class BlockLayoutFloatSupplierSerializer implements FloatSupplierSerializer @@ -131,13 +131,11 @@ public long getSerializedSize() } @Override - public void writeToChannel(WritableByteChannel channel) throws IOException + public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { - try (InputStream meta = ioPeon.makeInputStream(metaFile); - InputStream input = flattener.combineStreams().getInput()) { + try (InputStream meta = ioPeon.makeInputStream(metaFile)) { ByteStreams.copy(Channels.newChannel(meta), channel); - final ReadableByteChannel from = Channels.newChannel(input); - ByteStreams.copy(from, channel); + flattener.writeToChannel(channel, smoosher); } } } diff --git a/processing/src/main/java/io/druid/segment/data/BlockLayoutIndexedFloatSupplier.java b/processing/src/main/java/io/druid/segment/data/BlockLayoutIndexedFloatSupplier.java index 95b885f84137..08a0415a6423 100644 --- a/processing/src/main/java/io/druid/segment/data/BlockLayoutIndexedFloatSupplier.java +++ b/processing/src/main/java/io/druid/segment/data/BlockLayoutIndexedFloatSupplier.java @@ -23,6 +23,7 @@ import com.google.common.primitives.Floats; import io.druid.collections.ResourceHolder; import io.druid.java.util.common.guava.CloseQuietly; +import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -35,13 +36,23 @@ public class BlockLayoutIndexedFloatSupplier implements Supplier private final int sizePer; public BlockLayoutIndexedFloatSupplier( - int totalSize, int sizePer, ByteBuffer fromBuffer, ByteOrder order, - CompressedObjectStrategy.CompressionStrategy strategy + int totalSize, + int sizePer, + ByteBuffer fromBuffer, + ByteOrder order, + CompressedObjectStrategy.CompressionStrategy strategy, + SmooshedFileMapper mapper ) { - baseFloatBuffers = GenericIndexed.read(fromBuffer, VSizeCompressedObjectStrategy.getBufferForOrder( - order, strategy, sizePer * Floats.BYTES - )); + baseFloatBuffers = GenericIndexed.read( + fromBuffer, + VSizeCompressedObjectStrategy.getBufferForOrder( + order, + strategy, + sizePer * Floats.BYTES + ), + mapper + ); this.totalSize = totalSize; this.sizePer = sizePer; } diff --git a/processing/src/main/java/io/druid/segment/data/BlockLayoutIndexedLongSupplier.java b/processing/src/main/java/io/druid/segment/data/BlockLayoutIndexedLongSupplier.java index 19d42927c0ca..729d75cc9a26 100644 --- a/processing/src/main/java/io/druid/segment/data/BlockLayoutIndexedLongSupplier.java +++ b/processing/src/main/java/io/druid/segment/data/BlockLayoutIndexedLongSupplier.java @@ -22,6 +22,7 @@ import com.google.common.base.Supplier; import io.druid.collections.ResourceHolder; import io.druid.java.util.common.guava.CloseQuietly; +import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -36,14 +37,24 @@ public class BlockLayoutIndexedLongSupplier implements Supplier private final CompressionFactory.LongEncodingReader baseReader; public BlockLayoutIndexedLongSupplier( - int totalSize, int sizePer, ByteBuffer fromBuffer, ByteOrder order, + int totalSize, + int sizePer, + ByteBuffer fromBuffer, + ByteOrder order, CompressionFactory.LongEncodingReader reader, - CompressedObjectStrategy.CompressionStrategy strategy + CompressedObjectStrategy.CompressionStrategy strategy, + SmooshedFileMapper fileMapper ) { - baseLongBuffers = GenericIndexed.read(fromBuffer, VSizeCompressedObjectStrategy.getBufferForOrder( - order, strategy, reader.getNumBytes(sizePer) - )); + baseLongBuffers = GenericIndexed.read( + fromBuffer, + VSizeCompressedObjectStrategy.getBufferForOrder( + order, + strategy, + reader.getNumBytes(sizePer) + ), + fileMapper + ); this.totalSize = totalSize; this.sizePer = sizePer; this.baseReader = reader; diff --git a/processing/src/main/java/io/druid/segment/data/BlockLayoutLongSupplierSerializer.java b/processing/src/main/java/io/druid/segment/data/BlockLayoutLongSupplierSerializer.java index ae682fd738de..49da3f312a42 100644 --- a/processing/src/main/java/io/druid/segment/data/BlockLayoutLongSupplierSerializer.java +++ b/processing/src/main/java/io/druid/segment/data/BlockLayoutLongSupplierSerializer.java @@ -23,8 +23,10 @@ import com.google.common.io.ByteStreams; import com.google.common.io.CountingOutputStream; import com.google.common.primitives.Ints; + import io.druid.collections.ResourceHolder; import io.druid.collections.StupidResourceHolder; +import io.druid.java.util.common.io.smoosh.FileSmoosher; import io.druid.segment.CompressedPools; import java.io.IOException; @@ -33,7 +35,6 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; public class BlockLayoutLongSupplierSerializer implements LongSupplierSerializer @@ -145,13 +146,11 @@ public long getSerializedSize() } @Override - public void writeToChannel(WritableByteChannel channel) throws IOException + public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { - try (InputStream meta = ioPeon.makeInputStream(metaFile); - InputStream input = flattener.combineStreams().getInput()) { + try (InputStream meta = ioPeon.makeInputStream(metaFile)) { ByteStreams.copy(Channels.newChannel(meta), channel); - final ReadableByteChannel from = Channels.newChannel(input); - ByteStreams.copy(from, channel); + flattener.writeToChannel(channel, smoosher); } } } diff --git a/processing/src/main/java/io/druid/segment/data/ByteBufferWriter.java b/processing/src/main/java/io/druid/segment/data/ByteBufferWriter.java index 2942a4d7ae54..afe13d0a49c6 100644 --- a/processing/src/main/java/io/druid/segment/data/ByteBufferWriter.java +++ b/processing/src/main/java/io/druid/segment/data/ByteBufferWriter.java @@ -26,6 +26,7 @@ import com.google.common.io.CountingOutputStream; import com.google.common.io.InputSupplier; import com.google.common.primitives.Ints; +import io.druid.java.util.common.io.smoosh.FileSmoosher; import java.io.Closeable; import java.io.IOException; @@ -117,7 +118,7 @@ public InputStream getInput() throws IOException ); } - public void writeToChannel(WritableByteChannel channel) throws IOException + public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { try (final ReadableByteChannel from = Channels.newChannel(combineStreams().getInput())) { ByteStreams.copy(from, channel); diff --git a/processing/src/main/java/io/druid/segment/data/CompressedFloatsIndexedSupplier.java b/processing/src/main/java/io/druid/segment/data/CompressedFloatsIndexedSupplier.java index 5b3c4a76deb7..839149f5423b 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedFloatsIndexedSupplier.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedFloatsIndexedSupplier.java @@ -22,6 +22,7 @@ import com.google.common.base.Supplier; import com.google.common.primitives.Ints; import io.druid.java.util.common.IAE; +import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; import java.io.IOException; import java.nio.ByteBuffer; @@ -79,7 +80,11 @@ public void writeToChannel(WritableByteChannel channel) throws IOException channel.write(buffer.asReadOnlyBuffer()); } - public static CompressedFloatsIndexedSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order) + public static CompressedFloatsIndexedSupplier fromByteBuffer( + ByteBuffer buffer, + ByteOrder order, + SmooshedFileMapper mapper + ) { byte versionFromBuffer = buffer.get(); @@ -96,7 +101,8 @@ public static CompressedFloatsIndexedSupplier fromByteBuffer(ByteBuffer buffer, sizePer, buffer.asReadOnlyBuffer(), order, - compression + compression, + mapper ); return new CompressedFloatsIndexedSupplier( totalSize, diff --git a/processing/src/main/java/io/druid/segment/data/CompressedIntsIndexedSupplier.java b/processing/src/main/java/io/druid/segment/data/CompressedIntsIndexedSupplier.java index b13e835bd181..4e56126354b0 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedIntsIndexedSupplier.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedIntsIndexedSupplier.java @@ -26,6 +26,7 @@ import io.druid.collections.StupidResourceHolder; import io.druid.java.util.common.IAE; import io.druid.java.util.common.guava.CloseQuietly; +import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; import io.druid.segment.CompressedPools; import it.unimi.dsi.fastutil.ints.IntIterator; @@ -129,18 +130,28 @@ GenericIndexed> getBaseIntBuffers() return baseIntBuffers; } - public static CompressedIntsIndexedSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order) + public static CompressedIntsIndexedSupplier fromByteBuffer( + ByteBuffer buffer, + ByteOrder order, + SmooshedFileMapper fileMapper + ) { byte versionFromBuffer = buffer.get(); if (versionFromBuffer == VERSION) { final int totalSize = buffer.getInt(); final int sizePer = buffer.getInt(); - final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.forId(buffer.get()); + final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.forId( + buffer.get() + ); return new CompressedIntsIndexedSupplier( totalSize, sizePer, - GenericIndexed.read(buffer, CompressedIntBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)), + GenericIndexed.read( + buffer, + CompressedIntBufferObjectStrategy.getBufferForOrder(order, compression, sizePer), + fileMapper + ), compression ); } diff --git a/processing/src/main/java/io/druid/segment/data/CompressedIntsIndexedWriter.java b/processing/src/main/java/io/druid/segment/data/CompressedIntsIndexedWriter.java index 592f249e8ec9..4ea7e3de8ccd 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedIntsIndexedWriter.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedIntsIndexedWriter.java @@ -19,18 +19,16 @@ package io.druid.segment.data; -import com.google.common.io.ByteStreams; import com.google.common.primitives.Ints; import io.druid.collections.ResourceHolder; import io.druid.collections.StupidResourceHolder; +import io.druid.java.util.common.io.smoosh.FileSmoosher; import io.druid.segment.IndexIO; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.IntBuffer; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; /** @@ -68,14 +66,23 @@ public CompressedIntsIndexedWriter( final ByteOrder byteOrder, final CompressedObjectStrategy.CompressionStrategy compression ) + { + this(chunkFactor, compression, new GenericIndexedWriter<>( + ioPeon, filenameBase, CompressedIntBufferObjectStrategy.getBufferForOrder(byteOrder, compression, chunkFactor) + )); + } + + public CompressedIntsIndexedWriter( + final int chunkFactor, + final CompressedObjectStrategy.CompressionStrategy compression, + GenericIndexedWriter> flattner + ) { this.chunkFactor = chunkFactor; this.compression = compression; - this.flattener = new GenericIndexedWriter<>( - ioPeon, filenameBase, CompressedIntBufferObjectStrategy.getBufferForOrder(byteOrder, compression, chunkFactor) - ); this.endBuffer = IntBuffer.allocate(chunkFactor); this.numInserted = 0; + this.flattener = flattner; } @Override @@ -123,14 +130,12 @@ public long getSerializedSize() } @Override - public void writeToChannel(WritableByteChannel channel) throws IOException + public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { channel.write(ByteBuffer.wrap(new byte[]{VERSION})); channel.write(ByteBuffer.wrap(Ints.toByteArray(numInserted))); channel.write(ByteBuffer.wrap(Ints.toByteArray(chunkFactor))); channel.write(ByteBuffer.wrap(new byte[]{compression.getId()})); - try (final ReadableByteChannel from = Channels.newChannel(flattener.combineStreams().getInput())) { - ByteStreams.copy(from, channel); - } + flattener.writeToChannel(channel, smoosher); } } diff --git a/processing/src/main/java/io/druid/segment/data/CompressedLongsIndexedSupplier.java b/processing/src/main/java/io/druid/segment/data/CompressedLongsIndexedSupplier.java index c7938135b643..1d701922fcbc 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedLongsIndexedSupplier.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedLongsIndexedSupplier.java @@ -22,6 +22,7 @@ import com.google.common.base.Supplier; import com.google.common.primitives.Ints; import io.druid.java.util.common.IAE; +import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; import java.io.IOException; import java.nio.ByteBuffer; @@ -90,7 +91,11 @@ public void writeToChannel(WritableByteChannel channel) throws IOException channel.write(buffer.asReadOnlyBuffer()); } - public static CompressedLongsIndexedSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order) + public static CompressedLongsIndexedSupplier fromByteBuffer( + ByteBuffer buffer, + ByteOrder order, + SmooshedFileMapper fileMapper + ) { byte versionFromBuffer = buffer.get(); @@ -113,7 +118,8 @@ public static CompressedLongsIndexedSupplier fromByteBuffer(ByteBuffer buffer, B buffer.asReadOnlyBuffer(), order, encoding, - compression + compression, + fileMapper ); return new CompressedLongsIndexedSupplier( totalSize, diff --git a/processing/src/main/java/io/druid/segment/data/CompressedVSizeIndexedV3Writer.java b/processing/src/main/java/io/druid/segment/data/CompressedVSizeIndexedV3Writer.java index efcc1cbe064c..6972735aa8f2 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedVSizeIndexedV3Writer.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedVSizeIndexedV3Writer.java @@ -22,6 +22,7 @@ */ package io.druid.segment.data; +import io.druid.java.util.common.io.smoosh.FileSmoosher; import io.druid.segment.CompressedVSizeIndexedV3Supplier; import io.druid.segment.IndexIO; @@ -118,10 +119,10 @@ public long getSerializedSize() } @Override - public void writeToChannel(WritableByteChannel channel) throws IOException + public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { channel.write(ByteBuffer.wrap(new byte[]{VERSION})); - offsetWriter.writeToChannel(channel); - valueWriter.writeToChannel(channel); + offsetWriter.writeToChannel(channel, smoosher); + valueWriter.writeToChannel(channel, smoosher); } } diff --git a/processing/src/main/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplier.java b/processing/src/main/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplier.java index e3f92e4da77c..dd678c44c581 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplier.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplier.java @@ -27,6 +27,7 @@ import io.druid.collections.StupidResourceHolder; import io.druid.java.util.common.IAE; import io.druid.java.util.common.guava.CloseQuietly; +import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; import io.druid.segment.CompressedPools; import it.unimi.dsi.fastutil.ints.IntIterator; @@ -148,7 +149,11 @@ GenericIndexed> getBaseBuffers() return baseBuffers; } - public static CompressedVSizeIntsIndexedSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order) + public static CompressedVSizeIntsIndexedSupplier fromByteBuffer( + ByteBuffer buffer, + ByteOrder order, + SmooshedFileMapper fileMapper + ) { byte versionFromBuffer = buffer.get(); @@ -168,7 +173,8 @@ public static CompressedVSizeIntsIndexedSupplier fromByteBuffer(ByteBuffer buffe numBytes, GenericIndexed.read( buffer, - CompressedByteBufferObjectStrategy.getBufferForOrder(order, compression, chunkBytes) + CompressedByteBufferObjectStrategy.getBufferForOrder(order, compression, chunkBytes), + fileMapper ), compression ); diff --git a/processing/src/main/java/io/druid/segment/data/CompressedVSizeIntsIndexedWriter.java b/processing/src/main/java/io/druid/segment/data/CompressedVSizeIntsIndexedWriter.java index 5a3324ba3a5f..0df6fc3e7bdc 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedVSizeIntsIndexedWriter.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedVSizeIntsIndexedWriter.java @@ -19,17 +19,15 @@ package io.druid.segment.data; -import com.google.common.io.ByteStreams; import com.google.common.primitives.Ints; import io.druid.collections.ResourceHolder; import io.druid.collections.StupidResourceHolder; +import io.druid.java.util.common.io.smoosh.FileSmoosher; import io.druid.segment.IndexIO; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; /** @@ -39,20 +37,6 @@ public class CompressedVSizeIntsIndexedWriter extends SingleValueIndexedIntsWrit { private static final byte VERSION = CompressedVSizeIntsIndexedSupplier.VERSION; - public static CompressedVSizeIntsIndexedWriter create( - final IOPeon ioPeon, - final String filenameBase, - final int maxValue, - final CompressedObjectStrategy.CompressionStrategy compression - ) - { - return new CompressedVSizeIntsIndexedWriter( - ioPeon, filenameBase, maxValue, - CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue), - IndexIO.BYTE_ORDER, compression - ); - } - private final int numBytes; private final int chunkFactor; private final int chunkBytes; @@ -60,6 +44,7 @@ public static CompressedVSizeIntsIndexedWriter create( private final CompressedObjectStrategy.CompressionStrategy compression; private final GenericIndexedWriter> flattener; private final ByteBuffer intBuffer; + private ByteBuffer endBuffer; private int numInserted; @@ -71,21 +56,70 @@ public CompressedVSizeIntsIndexedWriter( final ByteOrder byteOrder, final CompressedObjectStrategy.CompressionStrategy compression ) + { + this( + ioPeon, + filenameBase, + maxValue, + chunkFactor, + byteOrder, + compression, + new GenericIndexedWriter<>( + ioPeon, + filenameBase, + CompressedByteBufferObjectStrategy.getBufferForOrder( + byteOrder, + compression, + sizePer(maxValue, chunkFactor) + ) + ) + ); + } + + public CompressedVSizeIntsIndexedWriter( + final IOPeon ioPeon, + final String filenameBase, + final int maxValue, + final int chunkFactor, + final ByteOrder byteOrder, + final CompressedObjectStrategy.CompressionStrategy compression, + final GenericIndexedWriter writer + ) { this.numBytes = VSizeIndexedInts.getNumBytesForMax(maxValue); this.chunkFactor = chunkFactor; this.chunkBytes = chunkFactor * numBytes + CompressedVSizeIntsIndexedSupplier.bufferPadding(numBytes); this.byteOrder = byteOrder; this.compression = compression; - this.flattener = new GenericIndexedWriter<>( - ioPeon, filenameBase, CompressedByteBufferObjectStrategy.getBufferForOrder(byteOrder, compression, chunkBytes) - ); + this.flattener = writer; this.intBuffer = ByteBuffer.allocate(Ints.BYTES).order(byteOrder); this.endBuffer = ByteBuffer.allocate(chunkBytes).order(byteOrder); this.endBuffer.limit(numBytes * chunkFactor); this.numInserted = 0; } + public static CompressedVSizeIntsIndexedWriter create( + final IOPeon ioPeon, + final String filenameBase, + final int maxValue, + final CompressedObjectStrategy.CompressionStrategy compression + ) + { + return new CompressedVSizeIntsIndexedWriter( + ioPeon, + filenameBase, + maxValue, + CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue), + IndexIO.BYTE_ORDER, compression + ); + } + + private static int sizePer(int maxValue, int chunkFactor) + { + return chunkFactor * VSizeIndexedInts.getNumBytesForMax(maxValue) + + CompressedVSizeIntsIndexedSupplier.bufferPadding(VSizeIndexedInts.getNumBytesForMax(maxValue)); + } + @Override public void open() throws IOException { @@ -138,14 +172,12 @@ public long getSerializedSize() } @Override - public void writeToChannel(WritableByteChannel channel) throws IOException + public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { channel.write(ByteBuffer.wrap(new byte[]{VERSION, (byte) numBytes})); channel.write(ByteBuffer.wrap(Ints.toByteArray(numInserted))); channel.write(ByteBuffer.wrap(Ints.toByteArray(chunkFactor))); channel.write(ByteBuffer.wrap(new byte[]{compression.getId()})); - try (final ReadableByteChannel from = Channels.newChannel(flattener.combineStreams().getInput())) { - ByteStreams.copy(from, channel); - } + flattener.writeToChannel(channel, smoosher); } } diff --git a/processing/src/main/java/io/druid/segment/data/CompressionFactory.java b/processing/src/main/java/io/druid/segment/data/CompressionFactory.java index 6f603d253193..09c4da117e22 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressionFactory.java +++ b/processing/src/main/java/io/druid/segment/data/CompressionFactory.java @@ -24,6 +24,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.Maps; import io.druid.java.util.common.IAE; +import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; import java.io.IOException; import java.io.OutputStream; @@ -263,16 +264,26 @@ public interface LongEncodingReader } public static Supplier getLongSupplier( - int totalSize, int sizePer, ByteBuffer fromBuffer, ByteOrder order, + int totalSize, + int sizePer, + ByteBuffer fromBuffer, + ByteOrder order, LongEncodingFormat encodingFormat, - CompressedObjectStrategy.CompressionStrategy strategy + CompressedObjectStrategy.CompressionStrategy strategy, + SmooshedFileMapper fileMapper ) { if (strategy == CompressedObjectStrategy.CompressionStrategy.NONE) { return new EntireLayoutIndexedLongSupplier(totalSize, encodingFormat.getReader(fromBuffer, order)); } else { - return new BlockLayoutIndexedLongSupplier(totalSize, sizePer, fromBuffer, order, - encodingFormat.getReader(fromBuffer, order), strategy + return new BlockLayoutIndexedLongSupplier( + totalSize, + sizePer, + fromBuffer, + order, + encodingFormat.getReader(fromBuffer, order), + strategy, + fileMapper ); } } @@ -303,14 +314,18 @@ ioPeon, filenameBase, order, new LongsLongEncodingWriter(order), compressionStra // Float currently does not support any encoding types, and stores values as 4 byte float public static Supplier getFloatSupplier( - int totalSize, int sizePer, ByteBuffer fromBuffer, ByteOrder order, - CompressedObjectStrategy.CompressionStrategy strategy + int totalSize, + int sizePer, + ByteBuffer fromBuffer, + ByteOrder order, + CompressedObjectStrategy.CompressionStrategy strategy, + SmooshedFileMapper fileMapper ) { if (strategy == CompressedObjectStrategy.CompressionStrategy.NONE) { return new EntireLayoutIndexedFloatSupplier(totalSize, fromBuffer, order); } else { - return new BlockLayoutIndexedFloatSupplier(totalSize, sizePer, fromBuffer, order, strategy); + return new BlockLayoutIndexedFloatSupplier(totalSize, sizePer, fromBuffer, order, strategy, fileMapper); } } diff --git a/processing/src/main/java/io/druid/segment/data/EntireLayoutFloatSupplierSerializer.java b/processing/src/main/java/io/druid/segment/data/EntireLayoutFloatSupplierSerializer.java index 8e0606c72257..e49c32f9f691 100644 --- a/processing/src/main/java/io/druid/segment/data/EntireLayoutFloatSupplierSerializer.java +++ b/processing/src/main/java/io/druid/segment/data/EntireLayoutFloatSupplierSerializer.java @@ -24,6 +24,7 @@ import com.google.common.io.CountingOutputStream; import com.google.common.primitives.Floats; import com.google.common.primitives.Ints; +import io.druid.java.util.common.io.smoosh.FileSmoosher; import java.io.IOException; import java.io.InputStream; @@ -111,7 +112,7 @@ public long getSerializedSize() } @Override - public void writeToChannel(WritableByteChannel channel) throws IOException + public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { try (InputStream meta = ioPeon.makeInputStream(metaFile); InputStream value = ioPeon.makeInputStream(valueFile)) { diff --git a/processing/src/main/java/io/druid/segment/data/EntireLayoutLongSupplierSerializer.java b/processing/src/main/java/io/druid/segment/data/EntireLayoutLongSupplierSerializer.java index cb6b590ee73c..303c27df2c12 100644 --- a/processing/src/main/java/io/druid/segment/data/EntireLayoutLongSupplierSerializer.java +++ b/processing/src/main/java/io/druid/segment/data/EntireLayoutLongSupplierSerializer.java @@ -23,6 +23,7 @@ import com.google.common.io.ByteStreams; import com.google.common.io.CountingOutputStream; import com.google.common.primitives.Ints; +import io.druid.java.util.common.io.smoosh.FileSmoosher; import java.io.IOException; import java.io.InputStream; @@ -108,7 +109,7 @@ public long getSerializedSize() } @Override - public void writeToChannel(WritableByteChannel channel) throws IOException + public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { try (InputStream meta = ioPeon.makeInputStream(metaFile); InputStream value = ioPeon.makeInputStream(valueFile)) { diff --git a/processing/src/main/java/io/druid/segment/data/FloatSupplierSerializer.java b/processing/src/main/java/io/druid/segment/data/FloatSupplierSerializer.java index 7a6620b923a2..d6cf71157b62 100644 --- a/processing/src/main/java/io/druid/segment/data/FloatSupplierSerializer.java +++ b/processing/src/main/java/io/druid/segment/data/FloatSupplierSerializer.java @@ -20,6 +20,7 @@ package io.druid.segment.data; import com.google.common.io.ByteSink; +import io.druid.java.util.common.io.smoosh.FileSmoosher; import java.io.Closeable; import java.io.IOException; @@ -32,5 +33,5 @@ public interface FloatSupplierSerializer extends Closeable void add(float value) throws IOException; void closeAndConsolidate(ByteSink consolidatedOut) throws IOException; long getSerializedSize(); - void writeToChannel(WritableByteChannel channel) throws IOException; + void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException; } diff --git a/processing/src/main/java/io/druid/segment/data/GenericIndexed.java b/processing/src/main/java/io/druid/segment/data/GenericIndexed.java index bfc9cf5703a5..6e278ea79b1c 100644 --- a/processing/src/main/java/io/druid/segment/data/GenericIndexed.java +++ b/processing/src/main/java/io/druid/segment/data/GenericIndexed.java @@ -19,37 +19,218 @@ package io.druid.segment.data; +import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.primitives.Ints; +import io.druid.common.utils.SerializerUtils; import io.druid.java.util.common.IAE; +import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.guava.CloseQuietly; +import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; +import it.unimi.dsi.fastutil.bytes.ByteArrays; import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; +import java.util.List; /** * A generic, flat storage mechanism. Use static methods fromArray() or fromIterable() to construct. If input * is sorted, supports binary search index lookups. If input is not sorted, only supports array-like index lookups. - * + *

* V1 Storage Format: - * + *

* byte 1: version (0x1) - * byte 2 == 0x1 => allowReverseLookup - * bytes 3-6 => numBytesUsed - * bytes 7-10 => numElements + * byte 2 == 0x1 =>; allowReverseLookup + * bytes 3-6 =>; numBytesUsed + * bytes 7-10 =>; numElements * bytes 10-((numElements * 4) + 10): integers representing *end* offsets of byte serialized values - * bytes ((numElements * 4) + 10)-(numBytesUsed + 2): 4-byte integer representing length of value, followed by bytes for value + * bytes ((numElements * 4) + 10)-(numBytesUsed + 2): 4-byte integer representing length of value, followed by bytes + * for value + *

+ * V2 Storage Format + * Meta, header and value files are separate and header file stored in native endian byte order. + * Meta File: + * byte 1: version (0x2) + * byte 2 == 0x1 =>; allowReverseLookup + * bytes 3-6: numberOfElementsPerValueFile expressed as power of 2. That means all the value files contains same + * number of items except last value file and may have fewer elements. + * bytes 7-10 =>; numElements + * bytes 11-14 =>; columnNameLength + * bytes 15-columnNameLength =>; columnName + *

+ * Header file name is identified as: String.format("%s_header", columnName) + * value files are identified as: String.format("%s_value_%d", columnName, fileNumber) + * number of value files == numElements/numberOfElementsPerValueFile */ public class GenericIndexed implements Indexed { - private static final byte version = 0x1; + public static final byte VERSION_ONE = 0x1; + public static final byte VERSION_TWO = 0x2; + private static final byte REVERSE_LOOKUP_ALLOWED = 0x1; + private final static Ordering NATURAL_STRING_ORDERING = Ordering.natural().nullsFirst(); + private static final SerializerUtils SERIALIZER_UTILS = new SerializerUtils(); + + public static final ObjectStrategy STRING_STRATEGY = new CacheableObjectStrategy() + { + @Override + public Class getClazz() + { + return String.class; + } + + @Override + public String fromByteBuffer(final ByteBuffer buffer, final int numBytes) + { + return StringUtils.fromUtf8(buffer, numBytes); + } - private int indexOffset; + @Override + public byte[] toBytes(String val) + { + if (val == null) { + return ByteArrays.EMPTY_ARRAY; + } + return StringUtils.toUtf8(val); + } + + @Override + public int compare(String o1, String o2) + { + return NATURAL_STRING_ORDERING.compare(o1, o2); + } + }; + + private final ObjectStrategy strategy; + private final boolean allowReverseLookup; + private final int size; + private final BufferIndexed bufferIndexed; + private final List valueBuffers; + private final ByteBuffer headerBuffer; + private int logBaseTwoOfElementsPerValueFile; + + private ByteBuffer theBuffer; + + // used for single file version, v1 + GenericIndexed( + ByteBuffer buffer, + ObjectStrategy strategy, + boolean allowReverseLookup + ) + { + this.theBuffer = buffer; + this.strategy = strategy; + this.allowReverseLookup = allowReverseLookup; + size = theBuffer.getInt(); + + int indexOffset = theBuffer.position(); + int valuesOffset = theBuffer.position() + size * Ints.BYTES; + + buffer.position(valuesOffset); + valueBuffers = Lists.newArrayList(buffer.slice()); + buffer.position(indexOffset); + headerBuffer = buffer.slice(); + final ByteBuffer valueBuffer = valueBuffers.get(0); + bufferIndexed = new BufferIndexed() + { + @Override + public T get(int index) + { + checkIndex(index, size); + + final int startOffset; + final int endOffset; + + if (index == 0) { + startOffset = 4; + endOffset = headerBuffer.getInt(0); + } else { + int headerPosition = (index - 1) * Ints.BYTES; + startOffset = headerBuffer.getInt(headerPosition) + Ints.BYTES; + endOffset = headerBuffer.getInt(headerPosition + Ints.BYTES); + } + return _get(valueBuffer.asReadOnlyBuffer(), startOffset, endOffset); + } + }; + } + + // used for multiple file version, v2. + GenericIndexed( + List valueBuffs, + ByteBuffer headerBuff, + ObjectStrategy strategy, + boolean allowReverseLookup, + int logBaseTwoOfElementsPerValueFile, + int numWritten + ) + { + this.strategy = strategy; + this.allowReverseLookup = allowReverseLookup; + this.valueBuffers = valueBuffs; + this.headerBuffer = headerBuff; + this.size = numWritten; + this.logBaseTwoOfElementsPerValueFile = logBaseTwoOfElementsPerValueFile; + headerBuffer.order(ByteOrder.nativeOrder()); + bufferIndexed = new BufferIndexed() + { + @Override + public T get(int index) + { + int fileNum = index >> GenericIndexed.this.logBaseTwoOfElementsPerValueFile; + final ByteBuffer copyBuffer = valueBuffers.get(fileNum).asReadOnlyBuffer(); + + checkIndex(index, size); + + final int startOffset; + final int endOffset; + int relativePositionOfIndex = index & ((1 << GenericIndexed.this.logBaseTwoOfElementsPerValueFile) - 1); + if (relativePositionOfIndex == 0) { + int headerPosition = index * Ints.BYTES; + startOffset = 4; + endOffset = headerBuffer.getInt(headerPosition); + } else { + int headerPosition = (index - 1) * Ints.BYTES; + startOffset = headerBuffer.getInt(headerPosition) + 4; + endOffset = headerBuffer.getInt(headerPosition + 4); + } + return _get(copyBuffer, startOffset, endOffset); + } + }; + } + + public static int getNumberOfFilesRequired(int bagSize, long numWritten) + { + int numberOfFilesRequired = (int) (numWritten / bagSize); + if ((numWritten % bagSize) != 0) { + numberOfFilesRequired += 1; + } + return numberOfFilesRequired; + } + + /** + * Checks if {@code index} a valid `element index` in GenericIndexed. + * Similar to Preconditions.checkElementIndex() except this method throws {@link IAE} with custom error message. + *

+ * Used here to get existing behavior(same error message and exception) of V1 GenericIndexed. + * + * @param index index identifying an element of an GenericIndexed. + * @param size number of elements. + */ + private static void checkIndex(int index, int size) + { + if (index < 0) { + throw new IAE("Index[%s] < 0", index); + } + if (index >= size) { + throw new IAE(String.format("Index[%s] >= size[%s]", index, size)); + } + } public static GenericIndexed fromArray(T[] objects, ObjectStrategy strategy) { @@ -109,6 +290,93 @@ public static GenericIndexed fromIterable(Iterable objectsIterable, Ob return new GenericIndexed(theBuffer.asReadOnlyBuffer(), strategy, allowReverseLookup); } + public static GenericIndexed read(ByteBuffer buffer, ObjectStrategy strategy) + { + byte versionFromBuffer = buffer.get(); + + if (VERSION_ONE == versionFromBuffer) { + return createVersionOneGenericIndexed(buffer, strategy); + } else if (VERSION_TWO == versionFromBuffer) { + throw new IAE( + "use read(ByteBuffer buffer, ObjectStrategy strategy, SmooshedFileMapper fileMapper)" + + " to read version 2 indexed.", + versionFromBuffer + ); + } + throw new IAE("Unknown version[%s]", versionFromBuffer); + } + + private static GenericIndexed createVersionOneGenericIndexed(ByteBuffer byteBuffer, ObjectStrategy strategy) + { + boolean allowReverseLookup = byteBuffer.get() == REVERSE_LOOKUP_ALLOWED; + int size = byteBuffer.getInt(); + ByteBuffer bufferToUse = byteBuffer.asReadOnlyBuffer(); + bufferToUse.limit(bufferToUse.position() + size); + byteBuffer.position(bufferToUse.limit()); + + return new GenericIndexed( + bufferToUse, + strategy, + allowReverseLookup + ); + } + + private static GenericIndexed createVersionTwoGenericIndexed( + ByteBuffer byteBuffer, + ObjectStrategy strategy, + SmooshedFileMapper fileMapper + ) + { + if (fileMapper == null) { + throw new IAE("SmooshedFileMapper can not be null for version 2."); + } + boolean allowReverseLookup = byteBuffer.get() == REVERSE_LOOKUP_ALLOWED; + int logBaseTwoOfElementsPerValueFile = byteBuffer.getInt(); + int numElements = byteBuffer.getInt(); + String columnName; + + List valueBuffersToUse; + ByteBuffer headerBuffer; + try { + columnName = SERIALIZER_UTILS.readString(byteBuffer); + valueBuffersToUse = Lists.newArrayList(); + int elementsPerValueFile = 1 << logBaseTwoOfElementsPerValueFile; + int numberOfFilesRequired = getNumberOfFilesRequired(elementsPerValueFile, numElements); + for (int i = 0; i < numberOfFilesRequired; i++) { + valueBuffersToUse.add( + fileMapper.mapFile(GenericIndexedWriter.generateValueFileName(columnName, i)) + .asReadOnlyBuffer() + ); + } + headerBuffer = fileMapper.mapFile(GenericIndexedWriter.generateHeaderFileName(columnName)); + } + catch (IOException e) { + throw new RuntimeException("File mapping failed.", e); + } + + return new GenericIndexed( + valueBuffersToUse, + headerBuffer, + strategy, + allowReverseLookup, + logBaseTwoOfElementsPerValueFile, + numElements + ); + } + + public static GenericIndexed read(ByteBuffer buffer, ObjectStrategy strategy, SmooshedFileMapper fileMapper) + { + byte versionFromBuffer = buffer.get(); + + if (VERSION_ONE == versionFromBuffer) { + return createVersionOneGenericIndexed(buffer, strategy); + } else if (VERSION_TWO == versionFromBuffer) { + return createVersionTwoGenericIndexed(buffer, strategy, fileMapper); + } + + throw new IAE("Unknown version [%s]", versionFromBuffer); + } + @Override public Class getClazz() { @@ -133,6 +401,7 @@ public T get(int index) * that values-not-found will return some negative number. * * @param value value to search for + * * @return index of value, or negative number equal to (-(insertion point) - 1). */ @Override @@ -147,31 +416,100 @@ public Iterator iterator() return bufferIndexed.iterator(); } - private final ByteBuffer theBuffer; - private final ObjectStrategy strategy; - private final boolean allowReverseLookup; - private final int size; + public long getSerializedSize() + { + if (valueBuffers.size() != 1) { + throw new UnsupportedOperationException("Method not supported for version 2 GenericIndexed."); + } + return theBuffer.remaining() + + 2 + + Ints.BYTES + + Ints.BYTES; //2 Bytes for version and sorted flag. 4 bytes to store numbers + // of bytes and next 4 bytes to store number of elements. + } - private final int valuesOffset; - private final BufferIndexed bufferIndexed; + public void writeToChannel(WritableByteChannel channel) throws IOException + { + //version 2 will always have more than one buffer in valueBuffers. + if (valueBuffers.size() == 1) { + channel.write(ByteBuffer.wrap(new byte[]{VERSION_ONE, allowReverseLookup ? (byte) 0x1 : (byte) 0x0})); + channel.write(ByteBuffer.wrap(Ints.toByteArray(theBuffer.remaining() + 4))); // 4 Bytes to store size. + channel.write(ByteBuffer.wrap(Ints.toByteArray(size))); + channel.write(theBuffer.asReadOnlyBuffer()); + } else { + throw new UnsupportedOperationException( + "GenericIndexed serialization for V2 is unsupported. Use GenericIndexedWriter instead."); + } + } - GenericIndexed( - ByteBuffer buffer, - ObjectStrategy strategy, - boolean allowReverseLookup - ) + /** + * Create a non-thread-safe Indexed, which may perform better than the underlying Indexed. + * + * @return a non-thread-safe Indexed + */ + public GenericIndexed.BufferIndexed singleThreaded() { - this.theBuffer = buffer; - this.strategy = strategy; - this.allowReverseLookup = allowReverseLookup; + if (valueBuffers.size() == 1) { + final ByteBuffer copyBuffer = valueBuffers.get(0).asReadOnlyBuffer(); + return new BufferIndexed() + { + @Override + public T get(final int index) + { + checkIndex(index, size); + + final int startOffset; + final int endOffset; + + if (index == 0) { + startOffset = 4; + endOffset = headerBuffer.getInt(0); + } else { + int headerPosition = (index - 1) * Ints.BYTES; + startOffset = headerBuffer.getInt(headerPosition) + 4; + endOffset = headerBuffer.getInt(headerPosition + 4); + } + return _get(copyBuffer, startOffset, endOffset); + } + }; + } else { + + final List copyValueBuffers = new ArrayList<>(); + for (ByteBuffer buffer : valueBuffers) { + copyValueBuffers.add(buffer.asReadOnlyBuffer()); + } + + return new BufferIndexed() + { + @Override + public T get(final int index) + { + int fileNum = index >> logBaseTwoOfElementsPerValueFile; + final ByteBuffer copyBuffer = copyValueBuffers.get(fileNum); + + checkIndex(index, size); + final int startOffset; + final int endOffset; + + int relativePositionOfIndex = index & ((1 << logBaseTwoOfElementsPerValueFile) - 1); + if (relativePositionOfIndex == 0) { + int headerPosition = index * Ints.BYTES; + startOffset = 4; + endOffset = headerBuffer.getInt(headerPosition); + } else { + int headerPosition = (index - 1) * Ints.BYTES; + startOffset = headerBuffer.getInt(headerPosition) + 4; + endOffset = headerBuffer.getInt(headerPosition + 4); + } + + return _get(copyBuffer, startOffset, endOffset); + } + }; + } - size = theBuffer.getInt(); - indexOffset = theBuffer.position(); - valuesOffset = theBuffer.position() + (size << 2); - bufferIndexed = new BufferIndexed(); } - class BufferIndexed implements Indexed + abstract class BufferIndexed implements Indexed { int lastReadSize; @@ -187,51 +525,25 @@ public int size() return size; } - @Override - public T get(final int index) - { - return _get(theBuffer.asReadOnlyBuffer(), index); - } - - protected T _get(final ByteBuffer copyBuffer, final int index) + protected T _get(ByteBuffer copyValueBuffer, int startOffset, int endOffset) { - if (index < 0) { - throw new IAE("Index[%s] < 0", index); - } - if (index >= size) { - throw new IAE(String.format("Index[%s] >= size[%s]", index, size)); - } - - final int startOffset; - final int endOffset; - - if (index == 0) { - startOffset = 4; - endOffset = copyBuffer.getInt(indexOffset); - } else { - copyBuffer.position(indexOffset + ((index - 1) * 4)); - startOffset = copyBuffer.getInt() + 4; - endOffset = copyBuffer.getInt(); - } - + final int size = endOffset - startOffset; if (startOffset == endOffset) { return null; } - - copyBuffer.position(valuesOffset + startOffset); - final int size = endOffset - startOffset; + copyValueBuffer.position(startOffset); lastReadSize = size; // fromByteBuffer must not modify the buffer limit - final T value = strategy.fromByteBuffer(copyBuffer, size); - - return value; + return strategy.fromByteBuffer(copyValueBuffer, size); } /** * This method makes no guarantees with respect to thread safety + * * @return the size in bytes of the last value read */ - public int getLastValueSize() { + public int getLastValueSize() + { return lastReadSize; } @@ -272,84 +584,4 @@ public Iterator iterator() } } - public long getSerializedSize() - { - return theBuffer.remaining() + 2 + 4 + 4; - } - - public void writeToChannel(WritableByteChannel channel) throws IOException - { - channel.write(ByteBuffer.wrap(new byte[]{version, allowReverseLookup ? (byte) 0x1 : (byte) 0x0})); - channel.write(ByteBuffer.wrap(Ints.toByteArray(theBuffer.remaining() + 4))); - channel.write(ByteBuffer.wrap(Ints.toByteArray(size))); - channel.write(theBuffer.asReadOnlyBuffer()); - } - - /** - * Create a non-thread-safe Indexed, which may perform better than the underlying Indexed. - * - * @return a non-thread-safe Indexed - */ - public GenericIndexed.BufferIndexed singleThreaded() - { - final ByteBuffer copyBuffer = theBuffer.asReadOnlyBuffer(); - return new BufferIndexed() { - @Override - public T get(int index) - { - return _get(copyBuffer, index); - } - }; - } - - public static GenericIndexed read(ByteBuffer buffer, ObjectStrategy strategy) - { - byte versionFromBuffer = buffer.get(); - - if (version == versionFromBuffer) { - boolean allowReverseLookup = buffer.get() == 0x1; - int size = buffer.getInt(); - ByteBuffer bufferToUse = buffer.asReadOnlyBuffer(); - bufferToUse.limit(bufferToUse.position() + size); - buffer.position(bufferToUse.limit()); - - return new GenericIndexed( - bufferToUse, - strategy, - allowReverseLookup - ); - } - - throw new IAE("Unknown version[%s]", versionFromBuffer); - } - - public static final ObjectStrategy STRING_STRATEGY = new CacheableObjectStrategy() - { - @Override - public Class getClazz() - { - return String.class; - } - - @Override - public String fromByteBuffer(final ByteBuffer buffer, final int numBytes) - { - return io.druid.java.util.common.StringUtils.fromUtf8(buffer, numBytes); - } - - @Override - public byte[] toBytes(String val) - { - if (val == null) { - return new byte[]{}; - } - return io.druid.java.util.common.StringUtils.toUtf8(val); - } - - @Override - public int compare(String o1, String o2) - { - return Ordering.natural().nullsFirst().compare(o1, o2); - } - }; } diff --git a/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java index 19419fc2943e..8be24436085a 100644 --- a/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java +++ b/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java @@ -26,41 +26,114 @@ import com.google.common.io.CountingOutputStream; import com.google.common.io.InputSupplier; import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.io.smoosh.FileSmoosher; +import io.druid.java.util.common.io.smoosh.SmooshedWriter; import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.util.Arrays; + /** * Streams arrays of objects out in the binary format described by GenericIndexed */ public class GenericIndexedWriter implements Closeable { + private static int PAGE_SIZE = 4096; private final IOPeon ioPeon; private final String filenameBase; private final ObjectStrategy strategy; - + private final int fileSizeLimit; + private final byte[] fileNameByteArray; private boolean objectsSorted = true; private T prevObject = null; - private CountingOutputStream headerOut = null; private CountingOutputStream valuesOut = null; - int numWritten = 0; + private CountingOutputStream headerOutLong = null; + private long numWritten = 0; + private boolean requireMultipleFiles = false; + private ByteBuffer buf; + public GenericIndexedWriter( IOPeon ioPeon, String filenameBase, ObjectStrategy strategy ) + { + this(ioPeon, filenameBase, strategy, Integer.MAX_VALUE & ~PAGE_SIZE); + } + + public GenericIndexedWriter( + IOPeon ioPeon, + String filenameBase, + ObjectStrategy strategy, + int fileSizeLimit + ) { this.ioPeon = ioPeon; this.filenameBase = filenameBase; this.strategy = strategy; + this.fileSizeLimit = fileSizeLimit; + fileNameByteArray = filenameBase.getBytes(); + buf = ByteBuffer.allocate(Ints.BYTES); + } + + public static String generateValueFileName(String fileNameBase, int fileNum) + { + return String.format("%s_value_%d", fileNameBase, fileNum); + } + + public static String generateHeaderFileName(String fileNameBase) + { + return String.format("%s_header", fileNameBase); + } + + private static void writeBytesIntoSmooshedChannel( + long numBytesToPutInFile, + final byte[] buffer, + final SmooshedWriter smooshChannel, + final InputStream is + ) + throws IOException + { + ByteBuffer holderBuffer = ByteBuffer.wrap(buffer); + while (numBytesToPutInFile > 0) { + int bytesRead = is.read(buffer, 0, Math.min(buffer.length, Ints.saturatedCast(numBytesToPutInFile))); + if (bytesRead != -1) { + smooshChannel.write((ByteBuffer) holderBuffer.clear().limit(bytesRead)); + numBytesToPutInFile -= bytesRead; + } else { + throw new ISE("Could not write [%d] bytes into smooshChannel.", numBytesToPutInFile); + } + } + } + + private static void writeLongValueToOutputStream(ByteBuffer helperBuf, long value, CountingOutputStream outLong) + throws IOException + { + helperBuf.putLong(0, value); + outLong.write(helperBuf.array()); + } + + private static void writeIntValueToOutputStream(ByteBuffer helperBuf, int value, CountingOutputStream outLong) + throws IOException + { + helperBuf.putInt(0, value); + outLong.write(helperBuf.array()); } public void open() throws IOException @@ -81,7 +154,17 @@ public void write(T objectToWrite) throws IOException valuesOut.write(Ints.toByteArray(bytesToWrite.length)); valuesOut.write(bytesToWrite); - headerOut.write(Ints.toByteArray((int) valuesOut.getCount())); + if (!requireMultipleFiles) { + writeIntValueToOutputStream(buf, Ints.checkedCast(valuesOut.getCount()), headerOut); + } else { + writeLongValueToOutputStream(buf, valuesOut.getCount(), headerOutLong); + } + + if (!requireMultipleFiles && getSerializedSize() > fileSizeLimit) { + requireMultipleFiles = true; + initializeHeaderOutLong(); + buf = ByteBuffer.allocate(Longs.BYTES).order(ByteOrder.nativeOrder()); + } prevObject = objectToWrite; } @@ -94,9 +177,17 @@ private String makeFilename(String suffix) @Override public void close() throws IOException { - headerOut.close(); valuesOut.close(); + if (requireMultipleFiles) { + closeMultiFiles(); + } else { + closeSingleFile(); + } + } + private void closeSingleFile() throws IOException + { + headerOut.close(); final long numBytesWritten = headerOut.getCount() + valuesOut.getCount(); Preconditions.checkState( @@ -107,35 +198,132 @@ public void close() throws IOException headerOut.getCount() ); Preconditions.checkState( - numBytesWritten < Integer.MAX_VALUE, "Wrote[%s] bytes, which is too many.", numBytesWritten + numBytesWritten < fileSizeLimit, "Wrote[%s] bytes, which is too many.", numBytesWritten ); - OutputStream metaOut = ioPeon.makeOutputStream(makeFilename("meta")); - - try { - metaOut.write(0x1); + try (OutputStream metaOut = ioPeon.makeOutputStream(makeFilename("meta"))) { + metaOut.write(GenericIndexed.VERSION_ONE); metaOut.write(objectsSorted ? 0x1 : 0x0); - metaOut.write(Ints.toByteArray((int) numBytesWritten + 4)); - metaOut.write(Ints.toByteArray(numWritten)); + metaOut.write(Ints.toByteArray(Ints.checkedCast(numBytesWritten + 4))); + metaOut.write(Ints.toByteArray(Ints.checkedCast(numWritten))); + } + } + + private void closeMultiFiles() throws IOException + { + headerOutLong.close(); + Preconditions.checkState( + headerOutLong.getCount() == (numWritten * Longs.BYTES), + "numWritten[%s] number of rows should have [%s] bytes written to headerOutLong, had[%s]", + numWritten, + numWritten * Longs.BYTES, + headerOutLong.getCount() + ); + Preconditions.checkState( + headerOutLong.getCount() < (Integer.MAX_VALUE & ~PAGE_SIZE), + "Wrote[%s] bytes in header file, which is too many.", + headerOutLong.getCount() + ); + } + + /** + * Tries to get best value split(number of elements in each value file) which can be expressed as power of 2. + * + * @return Returns the size of value file splits as power of 2. + * + * @throws IOException + */ + private int bagSizePower() throws IOException + { + long avgObjectSize = (valuesOut.getCount() + numWritten - 1) / numWritten; + + File f = ioPeon.getFile(makeFilename("headerLong")); + Preconditions.checkNotNull(f, "header file missing."); + + try (RandomAccessFile headerFile = new RandomAccessFile(f, "r")) { + for (int i = 31; i >= 0; --i) { + if ((1L << i) * avgObjectSize <= fileSizeLimit) { + if (actuallyFits(i, headerFile)) { + return i; + } + } + } } - finally { - metaOut.close(); + throw new ISE( + "no value split found with fileSizeLimit [%d], avgObjectSize [%d] while serializing [%s]", + fileSizeLimit, + avgObjectSize, + filenameBase + ); + } + + /** + * Checks if candidate value splits can divide value file in such a way no object/element crosses the value splits. + * + * @param powerTwo candidate value split expressed as power of 2. + * @param headerFile header file. + * + * @return true if candidate value split can hold all splits. + * + * @throws IOException + */ + private boolean actuallyFits(int powerTwo, RandomAccessFile headerFile) throws IOException + { + long lastValueOffset = 0; + long currentValueOffset = 0; + long valueBytesWritten = valuesOut.getCount(); + long headerIndex = 0; + long bagSize = 1L << powerTwo; + + while (lastValueOffset < valueBytesWritten) { + + if (headerIndex >= numWritten) { + return true; + } else if (headerIndex + bagSize <= numWritten) { + headerFile.seek((headerIndex + bagSize - 1) * Longs.BYTES); + currentValueOffset = Long.reverseBytes(headerFile.readLong()); + } else if (numWritten < headerIndex + bagSize) { + headerFile.seek((numWritten - 1) * Longs.BYTES); + currentValueOffset = Long.reverseBytes(headerFile.readLong()); + } + + if (currentValueOffset - lastValueOffset <= fileSizeLimit) { + lastValueOffset = currentValueOffset; + headerIndex = headerIndex + bagSize; + } else { + return false; + } } + return true; } public long getSerializedSize() { - return 2 + // version and sorted flag - Ints.BYTES + // numBytesWritten - Ints.BYTES + // numElements - headerOut.getCount() + // header length - valuesOut.getCount(); // value length + // for version 2 getSerializedSize() returns number of bytes in meta file. + if (!requireMultipleFiles) { + return 2 + // version and sorted flag + Ints.BYTES + // numBytesWritten + Ints.BYTES + // numElements + headerOut.getCount() + // header length + valuesOut.getCount(); // value length + } else { + return 2 + // version and sorted flag + Ints.BYTES + // numElements as log base 2. + Ints.BYTES + // number of files + Ints.BYTES + // column name Size + fileNameByteArray.length; + } } + @Deprecated public InputSupplier combineStreams() { // ByteSource.concat is only available in guava 15 and higher // This is guava 14 compatible + if (requireMultipleFiles) { + throw new ISE("Can not combine streams for version 2."); //fallback to old behaviour. + } + return ByteStreams.join( Iterables.transform( Arrays.asList("meta", "header", "values"), @@ -158,10 +346,120 @@ public InputStream getInput() throws IOException ); } - public void writeToChannel(WritableByteChannel channel) throws IOException + private void writeToChannelVersionOne(WritableByteChannel channel) throws IOException { - try (final ReadableByteChannel from = Channels.newChannel(combineStreams().getInput())) { + try (ReadableByteChannel from = Channels.newChannel(combineStreams().getInput())) { ByteStreams.copy(from, channel); } + } + + private void writeToChannelVersionTwo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException + { + if (smoosher == null) { + throw new IAE("version 2 GenericIndexedWriter requires FileSmoosher."); + } + + int bagSizePower = bagSizePower(); + OutputStream metaOut = Channels.newOutputStream(channel); + metaOut.write(GenericIndexed.VERSION_TWO); + metaOut.write(objectsSorted ? 0x1 : 0x0); + metaOut.write(Ints.toByteArray(bagSizePower)); + metaOut.write(Ints.toByteArray(Ints.checkedCast(numWritten))); + metaOut.write(Ints.toByteArray(fileNameByteArray.length)); + metaOut.write(fileNameByteArray); + + try (RandomAccessFile headerFile = new RandomAccessFile(ioPeon.getFile(makeFilename("headerLong")), "r")) { + Preconditions.checkNotNull(headerFile, "header file missing."); + long previousValuePosition = 0; + int bagSize = 1 << bagSizePower; + + int numberOfFilesRequired = GenericIndexed.getNumberOfFilesRequired(bagSize, numWritten); + byte[] buffer = new byte[1 << 16]; + + try (InputStream is = new FileInputStream(ioPeon.getFile(makeFilename("values")))) { + int counter = -1; + + for (int i = 0; i < numberOfFilesRequired; i++) { + if (i != numberOfFilesRequired - 1) { + headerFile.seek((bagSize + counter) * Longs.BYTES); // 8 for long bytes. + counter = counter + bagSize; + } else { + headerFile.seek((numWritten - 1) * Longs.BYTES); // for remaining items. + } + + long valuePosition = Long.reverseBytes(headerFile.readLong()); + long numBytesToPutInFile = valuePosition - previousValuePosition; + + try (SmooshedWriter smooshChannel = smoosher + .addWithSmooshedWriter(generateValueFileName(filenameBase, i), numBytesToPutInFile)) { + writeBytesIntoSmooshedChannel(numBytesToPutInFile, buffer, smooshChannel, is); + previousValuePosition = valuePosition; + } + } + } + writeHeaderLong(smoosher, headerFile, bagSizePower, buffer); + } + + } + + public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException + { + if (!requireMultipleFiles) { + writeToChannelVersionOne(channel); + } else { + writeToChannelVersionTwo(channel, smoosher); + } + } + + private void writeHeaderLong(FileSmoosher smoosher, RandomAccessFile headerFile, int bagSizePower, byte[] buffer) throws IOException + { + ByteBuffer helperBuffer = ByteBuffer.allocate(Ints.BYTES).order(ByteOrder.nativeOrder()); + + try (CountingOutputStream finalHeaderOut = new CountingOutputStream( + ioPeon.makeOutputStream(makeFilename("header_final")))) { + int numberOfElementsPerValueFile = 1 << bagSizePower; + long currentNumBytes = 0; + long relativeRefBytes = 0; + long relativeNumBytes; + headerFile.seek(0); + + // following block converts long header indexes into int header indexes. + for (int pos = 0; pos < numWritten; pos++) { + //conversion of header offset from long to int completed for one value file done, change relativeRefBytes + // to current offset. + if ((pos & (numberOfElementsPerValueFile - 1)) == 0) { + relativeRefBytes = currentNumBytes; + } + currentNumBytes = Long.reverseBytes(headerFile.readLong()); + relativeNumBytes = currentNumBytes - relativeRefBytes; + writeIntValueToOutputStream(helperBuffer, Ints.checkedCast(relativeNumBytes), finalHeaderOut); + } + + long numBytesToPutInFile = finalHeaderOut.getCount(); + finalHeaderOut.close(); + try (InputStream is = new FileInputStream(ioPeon.getFile(makeFilename("header_final")))) { + try (SmooshedWriter smooshChannel = smoosher + .addWithSmooshedWriter(generateHeaderFileName(filenameBase), numBytesToPutInFile)) { + writeBytesIntoSmooshedChannel(numBytesToPutInFile, buffer, smooshChannel, is); + } + } + + } + } + + private void initializeHeaderOutLong() throws IOException + { + headerOut.close(); + headerOutLong = new CountingOutputStream(ioPeon.makeOutputStream(makeFilename("headerLong"))); + + try (RandomAccessFile headerFile = new RandomAccessFile(ioPeon.getFile(makeFilename("header")), "r")) { + ByteBuffer buf = ByteBuffer.allocate(Longs.BYTES).order(ByteOrder.nativeOrder()); + for (int i = 0; i < numWritten; i++) { + int count = headerFile.readInt(); + writeLongValueToOutputStream(buf, count, headerOutLong); + } + } + } + } diff --git a/processing/src/main/java/io/druid/segment/data/IOPeon.java b/processing/src/main/java/io/druid/segment/data/IOPeon.java index 7c669d735bfe..38c2eff0b658 100644 --- a/processing/src/main/java/io/druid/segment/data/IOPeon.java +++ b/processing/src/main/java/io/druid/segment/data/IOPeon.java @@ -20,6 +20,7 @@ package io.druid.segment.data; import java.io.Closeable; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -30,4 +31,5 @@ public interface IOPeon extends Closeable { public OutputStream makeOutputStream(String filename) throws IOException; public InputStream makeInputStream(String filename) throws IOException; + public File getFile(String filename); } diff --git a/processing/src/main/java/io/druid/segment/data/IndexedIntsWriter.java b/processing/src/main/java/io/druid/segment/data/IndexedIntsWriter.java index 4898351af2c0..b203b0ac7aca 100644 --- a/processing/src/main/java/io/druid/segment/data/IndexedIntsWriter.java +++ b/processing/src/main/java/io/druid/segment/data/IndexedIntsWriter.java @@ -19,6 +19,8 @@ package io.druid.segment.data; +import io.druid.java.util.common.io.smoosh.FileSmoosher; + import java.io.Closeable; import java.io.IOException; import java.nio.channels.WritableByteChannel; @@ -31,5 +33,5 @@ public interface IndexedIntsWriter extends Closeable public long getSerializedSize(); - public void writeToChannel(WritableByteChannel channel) throws IOException; + public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException; } diff --git a/processing/src/main/java/io/druid/segment/data/IntermediateLongSupplierSerializer.java b/processing/src/main/java/io/druid/segment/data/IntermediateLongSupplierSerializer.java index fbeda714b706..478d1f6777b2 100644 --- a/processing/src/main/java/io/druid/segment/data/IntermediateLongSupplierSerializer.java +++ b/processing/src/main/java/io/druid/segment/data/IntermediateLongSupplierSerializer.java @@ -25,6 +25,7 @@ import com.google.common.io.CountingOutputStream; import com.google.common.math.LongMath; import com.google.common.primitives.Longs; +import io.druid.java.util.common.io.smoosh.FileSmoosher; import java.io.BufferedInputStream; import java.io.DataInputStream; @@ -144,8 +145,8 @@ public long getSerializedSize() return delegate.getSerializedSize(); } - public void writeToChannel(WritableByteChannel channel) throws IOException + public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { - delegate.writeToChannel(channel); + delegate.writeToChannel(channel, smoosher); } } diff --git a/processing/src/main/java/io/druid/segment/data/LongSupplierSerializer.java b/processing/src/main/java/io/druid/segment/data/LongSupplierSerializer.java index 0356baba6e5a..b40554fddfe4 100644 --- a/processing/src/main/java/io/druid/segment/data/LongSupplierSerializer.java +++ b/processing/src/main/java/io/druid/segment/data/LongSupplierSerializer.java @@ -20,6 +20,7 @@ package io.druid.segment.data; import com.google.common.io.ByteSink; +import io.druid.java.util.common.io.smoosh.FileSmoosher; import java.io.Closeable; import java.io.IOException; @@ -34,5 +35,5 @@ public interface LongSupplierSerializer extends Closeable void add(long value) throws IOException; void closeAndConsolidate(ByteSink consolidatedOut) throws IOException; long getSerializedSize(); - void writeToChannel(WritableByteChannel channel) throws IOException; + void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException; } diff --git a/processing/src/main/java/io/druid/segment/data/TmpFileIOPeon.java b/processing/src/main/java/io/druid/segment/data/TmpFileIOPeon.java index 634118c6fa5a..3f5dbe556dc3 100644 --- a/processing/src/main/java/io/druid/segment/data/TmpFileIOPeon.java +++ b/processing/src/main/java/io/druid/segment/data/TmpFileIOPeon.java @@ -31,7 +31,7 @@ import java.util.Map; /** -*/ + */ public class TmpFileIOPeon implements IOPeon { private final boolean allowOverwrite; @@ -83,4 +83,11 @@ public boolean isOverwriteAllowed() { return allowOverwrite; } + + @Override + public File getFile(String filename) + { + return createdFiles.get(filename); + } + } diff --git a/processing/src/main/java/io/druid/segment/data/VSizeIndexedIntsWriter.java b/processing/src/main/java/io/druid/segment/data/VSizeIndexedIntsWriter.java index 8206dd5256f2..c6a56e61351d 100644 --- a/processing/src/main/java/io/druid/segment/data/VSizeIndexedIntsWriter.java +++ b/processing/src/main/java/io/druid/segment/data/VSizeIndexedIntsWriter.java @@ -22,6 +22,7 @@ import com.google.common.io.ByteStreams; import com.google.common.io.CountingOutputStream; import com.google.common.primitives.Ints; +import io.druid.java.util.common.io.smoosh.FileSmoosher; import java.io.IOException; import java.nio.ByteBuffer; @@ -83,7 +84,7 @@ public long getSerializedSize() } @Override - public void writeToChannel(WritableByteChannel channel) throws IOException + public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { long numBytesWritten = valuesOut.getCount(); channel.write(ByteBuffer.wrap(new byte[]{VERSION, (byte) numBytes})); diff --git a/processing/src/main/java/io/druid/segment/data/VSizeIndexedWriter.java b/processing/src/main/java/io/druid/segment/data/VSizeIndexedWriter.java index 7f46ff5ba0c3..2746f75afe44 100644 --- a/processing/src/main/java/io/druid/segment/data/VSizeIndexedWriter.java +++ b/processing/src/main/java/io/druid/segment/data/VSizeIndexedWriter.java @@ -27,6 +27,7 @@ import com.google.common.io.CountingOutputStream; import com.google.common.io.InputSupplier; import com.google.common.primitives.Ints; +import io.druid.java.util.common.io.smoosh.FileSmoosher; import java.io.Closeable; import java.io.IOException; @@ -163,7 +164,7 @@ public long getSerializedSize() } @Override - public void writeToChannel(WritableByteChannel channel) throws IOException + public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { try (final ReadableByteChannel from = Channels.newChannel(combineStreams().getInput())) { ByteStreams.copy(from, channel); diff --git a/processing/src/main/java/io/druid/segment/serde/ColumnPartSerde.java b/processing/src/main/java/io/druid/segment/serde/ColumnPartSerde.java index 09b8ffc54afd..2900c36ad023 100644 --- a/processing/src/main/java/io/druid/segment/serde/ColumnPartSerde.java +++ b/processing/src/main/java/io/druid/segment/serde/ColumnPartSerde.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.druid.java.util.common.io.smoosh.FileSmoosher; import io.druid.segment.column.ColumnBuilder; import io.druid.segment.column.ColumnConfig; @@ -47,7 +48,7 @@ public interface Serializer { public long numBytes(); - public void write(WritableByteChannel channel) throws IOException; + public void write(WritableByteChannel channel, FileSmoosher smoosher) throws IOException; } public interface Deserializer diff --git a/processing/src/main/java/io/druid/segment/serde/ComplexColumnPartSerde.java b/processing/src/main/java/io/druid/segment/serde/ComplexColumnPartSerde.java index f81d91b4e196..74f9f131b87e 100644 --- a/processing/src/main/java/io/druid/segment/serde/ComplexColumnPartSerde.java +++ b/processing/src/main/java/io/druid/segment/serde/ComplexColumnPartSerde.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.java.util.common.io.smoosh.FileSmoosher; import io.druid.segment.column.ColumnBuilder; import io.druid.segment.column.ColumnConfig; import io.druid.segment.data.GenericIndexed; @@ -33,6 +34,16 @@ */ public class ComplexColumnPartSerde implements ColumnPartSerde { + private final String typeName; + private final ComplexMetricSerde serde; + private final Serializer serializer; + private ComplexColumnPartSerde(String typeName, Serializer serializer) + { + this.typeName = typeName; + this.serde = ComplexMetrics.getSerdeForType(typeName); + this.serializer = serializer; + } + @JsonCreator public static ComplexColumnPartSerde createDeserializer( @JsonProperty("typeName") String complexType @@ -41,15 +52,14 @@ public static ComplexColumnPartSerde createDeserializer( return new ComplexColumnPartSerde(complexType, null); } - private final String typeName; - private final ComplexMetricSerde serde; - private final Serializer serializer; + public static SerializerBuilder serializerBuilder() + { + return new SerializerBuilder(); + } - private ComplexColumnPartSerde(String typeName, Serializer serializer) + public static LegacySerializerBuilder legacySerializerBuilder() { - this.typeName = typeName; - this.serde = ComplexMetrics.getSerdeForType(typeName); - this.serializer = serializer; + return new LegacySerializerBuilder(); } @JsonProperty @@ -58,9 +68,25 @@ public String getTypeName() return typeName; } - public static SerializerBuilder serializerBuilder() + @Override + public Serializer getSerializer() { - return new SerializerBuilder(); + return serializer; + } + + @Override + public Deserializer getDeserializer() + { + return new Deserializer() + { + @Override + public void read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig) + { + if (serde != null) { + serde.deserializeColumn(buffer, builder); + } + } + }; } public static class SerializerBuilder @@ -92,20 +118,15 @@ public long numBytes() } @Override - public void write(WritableByteChannel channel) throws IOException + public void write(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { - delegate.writeToChannel(channel); + delegate.writeToChannel(channel, smoosher); } } ); } } - public static LegacySerializerBuilder legacySerializerBuilder() - { - return new LegacySerializerBuilder(); - } - public static class LegacySerializerBuilder { private String typeName = null; @@ -135,7 +156,7 @@ public long numBytes() } @Override - public void write(WritableByteChannel channel) throws IOException + public void write(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { delegate.writeToChannel(channel); } @@ -143,25 +164,4 @@ public void write(WritableByteChannel channel) throws IOException ); } } - - @Override - public Serializer getSerializer() - { - return serializer; - } - - @Override - public Deserializer getDeserializer() - { - return new Deserializer() - { - @Override - public void read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig) - { - if (serde != null) { - serde.deserializeColumn(buffer, builder); - } - } - }; - } } diff --git a/processing/src/main/java/io/druid/segment/serde/ComplexColumnSerializer.java b/processing/src/main/java/io/druid/segment/serde/ComplexColumnSerializer.java index 3498a979f247..b31416d1b3f5 100644 --- a/processing/src/main/java/io/druid/segment/serde/ComplexColumnSerializer.java +++ b/processing/src/main/java/io/druid/segment/serde/ComplexColumnSerializer.java @@ -19,38 +19,39 @@ package io.druid.segment.serde; +import io.druid.java.util.common.io.smoosh.FileSmoosher; import io.druid.segment.GenericColumnSerializer; import io.druid.segment.data.GenericIndexedWriter; import io.druid.segment.data.IOPeon; +import io.druid.segment.data.ObjectStrategy; import java.io.IOException; import java.nio.channels.WritableByteChannel; public class ComplexColumnSerializer implements GenericColumnSerializer { - public static ComplexColumnSerializer create( - IOPeon ioPeon, - String filenameBase, - ComplexMetricSerde serde - ) - { - return new ComplexColumnSerializer(ioPeon, filenameBase, serde); - } - private final IOPeon ioPeon; private final String filenameBase; - private final ComplexMetricSerde serde; + private final ObjectStrategy strategy; private GenericIndexedWriter writer; - public ComplexColumnSerializer( IOPeon ioPeon, String filenameBase, - ComplexMetricSerde serde + ObjectStrategy strategy ) { this.ioPeon = ioPeon; this.filenameBase = filenameBase; - this.serde = serde; + this.strategy = strategy; + } + + public static ComplexColumnSerializer create( + IOPeon ioPeon, + String filenameBase, + ObjectStrategy strategy + ) + { + return new ComplexColumnSerializer(ioPeon, filenameBase, strategy); } @SuppressWarnings(value = "unchecked") @@ -58,7 +59,7 @@ public ComplexColumnSerializer( public void open() throws IOException { writer = new GenericIndexedWriter( - ioPeon, String.format("%s.complex_column", filenameBase), serde.getObjectStrategy() + ioPeon, String.format("%s.complex_column", filenameBase), strategy ); writer.open(); } @@ -83,8 +84,16 @@ public long getSerializedSize() } @Override - public void writeToChannel(WritableByteChannel channel) throws IOException + public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException + { + writeToChannelVersionOne(channel); + } + + private void writeToChannelVersionOne(WritableByteChannel channel) throws IOException { - writer.writeToChannel(channel); + writer.writeToChannel( + channel, + null + ); //null for the FileSmoosher means that we default to "version 1" of GenericIndexed. } } diff --git a/processing/src/main/java/io/druid/segment/serde/ComplexMetricSerde.java b/processing/src/main/java/io/druid/segment/serde/ComplexMetricSerde.java index a55b7d78a4ed..5a0d3418ac40 100644 --- a/processing/src/main/java/io/druid/segment/serde/ComplexMetricSerde.java +++ b/processing/src/main/java/io/druid/segment/serde/ComplexMetricSerde.java @@ -20,7 +20,9 @@ package io.druid.segment.serde; import com.google.common.base.Function; +import io.druid.segment.GenericColumnSerializer; import io.druid.segment.column.ColumnBuilder; +import io.druid.segment.data.IOPeon; import io.druid.segment.data.ObjectStrategy; import java.nio.ByteBuffer; @@ -30,20 +32,21 @@ public abstract class ComplexMetricSerde { public abstract String getTypeName(); + public abstract ComplexMetricExtractor getExtractor(); /** * Deserializes a ByteBuffer and adds it to the ColumnBuilder. This method allows for the ComplexMetricSerde * to implement it's own versioning scheme to allow for changes of binary format in a forward-compatible manner. * - * @param buffer the buffer to deserialize + * @param buffer the buffer to deserialize * @param builder ColumnBuilder to add the column to */ public abstract void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder); /** * This is deprecated because its usage is going to be removed from the code. - * + *

* It was introduced before deserializeColumn() existed. This method creates the assumption that Druid knows * how to interpret the actual column representation of the data, but I would much prefer that the ComplexMetricSerde * objects be in charge of creating and interpreting the whole column, which is what deserializeColumn lets @@ -57,7 +60,7 @@ public abstract class ComplexMetricSerde /** * Returns a function that can convert the Object provided by the ComplexColumn created through deserializeColumn * into a number of expected input bytes to produce that object. - * + *

* This is used to approximate the size of the input data via the SegmentMetadataQuery and does not need to be * overridden if you do not care about the query. * @@ -72,6 +75,7 @@ public Function inputSizeFn() * Converts intermediate representation of aggregate to byte[]. * * @param val intermediate representation of aggregate + * * @return serialized intermediate representation of aggregate in byte[] */ public byte[] toBytes(Object val) @@ -82,17 +86,32 @@ public byte[] toBytes(Object val) /** * Converts byte[] to intermediate representation of the aggregate. * - * @param byte array - * @param start offset in the byte array where to start reading + * @param data array + * @param start offset in the byte array where to start reading * @param numBytes number of bytes to read in given array + * * @return intermediate representation of the aggregate */ public Object fromBytes(byte[] data, int start, int numBytes) { ByteBuffer bb = ByteBuffer.wrap(data); - if(start > 0) { + if (start > 0) { bb.position(start); } return getObjectStrategy().fromByteBuffer(bb, numBytes); } + + /** + * This method provides the ability for a ComplexMetricSerde to control its own serialization. + * For large column (i.e columns greater than Integer.MAX) use + * (@link LargeColumnSupportedComplexColumnSerializer) + * + * @param peon IOPeon + * @param column name of the column + * @return an instance of GenericColumnSerializer used for serialization. + */ + public GenericColumnSerializer getSerializer(IOPeon peon, String column) + { + return ComplexColumnSerializer.create(peon, column, this.getObjectStrategy()); + } } diff --git a/processing/src/main/java/io/druid/segment/serde/DictionaryEncodedColumnPartSerde.java b/processing/src/main/java/io/druid/segment/serde/DictionaryEncodedColumnPartSerde.java index b6398fc6de29..67375f9539a4 100644 --- a/processing/src/main/java/io/druid/segment/serde/DictionaryEncodedColumnPartSerde.java +++ b/processing/src/main/java/io/druid/segment/serde/DictionaryEncodedColumnPartSerde.java @@ -26,6 +26,8 @@ import io.druid.collections.bitmap.ImmutableBitmap; import io.druid.collections.spatial.ImmutableRTree; import io.druid.java.util.common.IAE; +import io.druid.java.util.common.io.smoosh.FileSmoosher; +import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; import io.druid.segment.CompressedVSizeIndexedSupplier; import io.druid.segment.CompressedVSizeIndexedV3Supplier; import io.druid.segment.column.ColumnBuilder; @@ -225,23 +227,23 @@ public long numBytes() } @Override - public void write(WritableByteChannel channel) throws IOException + public void write(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { channel.write(ByteBuffer.wrap(new byte[]{version.asByte()})); if (version.compareTo(VERSION.COMPRESSED) >= 0) { channel.write(ByteBuffer.wrap(Ints.toByteArray(flags))); } if (dictionaryWriter != null) { - dictionaryWriter.writeToChannel(channel); + dictionaryWriter.writeToChannel(channel, smoosher); } if (valueWriter != null) { - valueWriter.writeToChannel(channel); + valueWriter.writeToChannel(channel, smoosher); } if (bitmapIndexWriter != null) { - bitmapIndexWriter.writeToChannel(channel); + bitmapIndexWriter.writeToChannel(channel, smoosher); } if (spatialIndexWriter != null) { - spatialIndexWriter.writeToChannel(channel); + spatialIndexWriter.writeToChannel(channel, smoosher); } } } @@ -369,7 +371,7 @@ public long numBytes() } @Override - public void write(WritableByteChannel channel) throws IOException + public void write(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { channel.write(ByteBuffer.wrap(new byte[]{version.asByte()})); if (version.compareTo(VERSION.COMPRESSED) >= 0) { @@ -434,17 +436,21 @@ public void read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnCo final boolean hasMultipleValues = Feature.MULTI_VALUE.isSet(rFlags) || Feature.MULTI_VALUE_V3.isSet(rFlags); - final GenericIndexed rDictionary = GenericIndexed.read(buffer, GenericIndexed.STRING_STRATEGY); + final GenericIndexed rDictionary = GenericIndexed.read( + buffer, + GenericIndexed.STRING_STRATEGY, + builder.getFileMapper() + ); builder.setType(ValueType.STRING); final WritableSupplier rSingleValuedColumn; final WritableSupplier> rMultiValuedColumn; if (hasMultipleValues) { - rMultiValuedColumn = readMultiValuedColum(rVersion, buffer, rFlags); + rMultiValuedColumn = readMultiValuedColum(rVersion, buffer, rFlags, builder.getFileMapper()); rSingleValuedColumn = null; } else { - rSingleValuedColumn = readSingleValuedColumn(rVersion, buffer); + rSingleValuedColumn = readSingleValuedColumn(rVersion, buffer, builder.getFileMapper()); rMultiValuedColumn = null; } @@ -459,7 +465,7 @@ public void read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnCo ); GenericIndexed rBitmaps = GenericIndexed.read( - buffer, bitmapSerdeFactory.getObjectStrategy() + buffer, bitmapSerdeFactory.getObjectStrategy(), builder.getFileMapper() ); builder.setBitmapIndex( new BitmapIndexColumnPartSupplier( @@ -479,19 +485,23 @@ public void read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnCo } - private WritableSupplier readSingleValuedColumn(VERSION version, ByteBuffer buffer) + private WritableSupplier readSingleValuedColumn( + VERSION version, + ByteBuffer buffer, + SmooshedFileMapper fileMapper + ) { switch (version) { case UNCOMPRESSED_SINGLE_VALUE: return VSizeIndexedInts.readFromByteBuffer(buffer).asWritableSupplier(); case COMPRESSED: - return CompressedVSizeIntsIndexedSupplier.fromByteBuffer(buffer, byteOrder); + return CompressedVSizeIntsIndexedSupplier.fromByteBuffer(buffer, byteOrder, fileMapper); } throw new IAE("Unsupported single-value version[%s]", version); } private WritableSupplier> readMultiValuedColum( - VERSION version, ByteBuffer buffer, int flags + VERSION version, ByteBuffer buffer, int flags, SmooshedFileMapper fileMapper ) { switch (version) { @@ -499,9 +509,9 @@ private WritableSupplier> readMultiValuedColum( return VSizeIndexed.readFromByteBuffer(buffer).asWritableSupplier(); case COMPRESSED: if (Feature.MULTI_VALUE.isSet(flags)) { - return CompressedVSizeIndexedSupplier.fromByteBuffer(buffer, byteOrder); + return CompressedVSizeIndexedSupplier.fromByteBuffer(buffer, byteOrder, fileMapper); } else if (Feature.MULTI_VALUE_V3.isSet(flags)) { - return CompressedVSizeIndexedV3Supplier.fromByteBuffer(buffer, byteOrder); + return CompressedVSizeIndexedV3Supplier.fromByteBuffer(buffer, byteOrder, fileMapper); } else { throw new IAE("Unrecognized multi-value flag[%d]", flags); } diff --git a/processing/src/main/java/io/druid/segment/serde/FloatGenericColumnPartSerde.java b/processing/src/main/java/io/druid/segment/serde/FloatGenericColumnPartSerde.java index 503b460027e8..4e693dc671e8 100644 --- a/processing/src/main/java/io/druid/segment/serde/FloatGenericColumnPartSerde.java +++ b/processing/src/main/java/io/druid/segment/serde/FloatGenericColumnPartSerde.java @@ -21,6 +21,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; + +import io.druid.java.util.common.io.smoosh.FileSmoosher; import io.druid.segment.FloatColumnSerializer; import io.druid.segment.column.ColumnBuilder; import io.druid.segment.column.ColumnConfig; @@ -93,9 +95,9 @@ public long numBytes() } @Override - public void write(WritableByteChannel channel) throws IOException + public void write(WritableByteChannel channel, FileSmoosher fileSmoosher) throws IOException { - delegate.writeToChannel(channel); + delegate.writeToChannel(channel, fileSmoosher); } } ); @@ -136,7 +138,7 @@ public long numBytes() } @Override - public void write(WritableByteChannel channel) throws IOException + public void write(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { delegate.writeToChannel(channel); } @@ -161,7 +163,8 @@ public void read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnCo { final CompressedFloatsIndexedSupplier column = CompressedFloatsIndexedSupplier.fromByteBuffer( buffer, - byteOrder + byteOrder, + builder.getFileMapper() ); builder.setType(ValueType.FLOAT) .setHasMultipleValues(false) diff --git a/processing/src/main/java/io/druid/segment/serde/LargeColumnSupportedComplexColumnSerializer.java b/processing/src/main/java/io/druid/segment/serde/LargeColumnSupportedComplexColumnSerializer.java new file mode 100644 index 000000000000..7c45b62c22be --- /dev/null +++ b/processing/src/main/java/io/druid/segment/serde/LargeColumnSupportedComplexColumnSerializer.java @@ -0,0 +1,113 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.serde; + +import io.druid.java.util.common.io.smoosh.FileSmoosher; +import io.druid.segment.GenericColumnSerializer; +import io.druid.segment.data.GenericIndexedWriter; +import io.druid.segment.data.IOPeon; +import io.druid.segment.data.ObjectStrategy; + +import java.io.IOException; +import java.nio.channels.WritableByteChannel; + +public class LargeColumnSupportedComplexColumnSerializer implements GenericColumnSerializer +{ + + private final IOPeon ioPeon; + private final String filenameBase; + private final ObjectStrategy strategy; + private final int columnSize; + private GenericIndexedWriter writer; + public LargeColumnSupportedComplexColumnSerializer( + IOPeon ioPeon, + String filenameBase, + ObjectStrategy strategy + ) + { + this(ioPeon, filenameBase, strategy, Integer.MAX_VALUE); + } + public LargeColumnSupportedComplexColumnSerializer( + IOPeon ioPeon, + String filenameBase, + ObjectStrategy strategy, + int columnSize + ) + { + this.ioPeon = ioPeon; + this.filenameBase = filenameBase; + this.strategy = strategy; + this.columnSize = columnSize; + } + + public static LargeColumnSupportedComplexColumnSerializer create( + IOPeon ioPeon, + String filenameBase, + ObjectStrategy strategy + ) + { + return new LargeColumnSupportedComplexColumnSerializer(ioPeon, filenameBase, strategy); + } + + public static LargeColumnSupportedComplexColumnSerializer createWithColumnSize( + IOPeon ioPeon, + String filenameBase, + ObjectStrategy strategy, + int columnSize + ) + { + return new LargeColumnSupportedComplexColumnSerializer(ioPeon, filenameBase, strategy, columnSize); + } + + @SuppressWarnings(value = "unchecked") + @Override + public void open() throws IOException + { + writer = new GenericIndexedWriter( + ioPeon, String.format("%s.complex_column", filenameBase), strategy, columnSize); + writer.open(); + } + + @SuppressWarnings(value = "unchecked") + @Override + public void serialize(Object obj) throws IOException + { + writer.write(obj); + } + + @Override + public void close() throws IOException + { + writer.close(); + } + + @Override + public long getSerializedSize() + { + return writer.getSerializedSize(); + } + + @Override + public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException + { + writer.writeToChannel(channel, smoosher); + } + +} diff --git a/processing/src/main/java/io/druid/segment/serde/LongGenericColumnPartSerde.java b/processing/src/main/java/io/druid/segment/serde/LongGenericColumnPartSerde.java index 707050855a16..d8eeff9ae738 100644 --- a/processing/src/main/java/io/druid/segment/serde/LongGenericColumnPartSerde.java +++ b/processing/src/main/java/io/druid/segment/serde/LongGenericColumnPartSerde.java @@ -21,6 +21,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; + +import io.druid.java.util.common.io.smoosh.FileSmoosher; import io.druid.segment.LongColumnSerializer; import io.druid.segment.column.ColumnBuilder; import io.druid.segment.column.ColumnConfig; @@ -93,9 +95,9 @@ public long numBytes() } @Override - public void write(WritableByteChannel channel) throws IOException + public void write(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { - delegate.writeToChannel(channel); + delegate.writeToChannel(channel, smoosher); } } ); @@ -136,7 +138,7 @@ public long numBytes() } @Override - public void write(WritableByteChannel channel) throws IOException + public void write(WritableByteChannel channel, FileSmoosher smoosher) throws IOException { delegate.writeToChannel(channel); } @@ -161,7 +163,8 @@ public void read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnCo { final CompressedLongsIndexedSupplier column = CompressedLongsIndexedSupplier.fromByteBuffer( buffer, - byteOrder + byteOrder, + builder.getFileMapper() ); builder.setType(ValueType.LONG) .setHasMultipleValues(false) diff --git a/processing/src/test/java/io/druid/segment/CompressedVSizeIndexedV3SupplierTest.java b/processing/src/test/java/io/druid/segment/CompressedVSizeIndexedV3SupplierTest.java index 47e68270094a..a09dd68e0884 100644 --- a/processing/src/test/java/io/druid/segment/CompressedVSizeIndexedV3SupplierTest.java +++ b/processing/src/test/java/io/druid/segment/CompressedVSizeIndexedV3SupplierTest.java @@ -71,7 +71,9 @@ public void teardown(){ protected WritableSupplier> fromByteBuffer(ByteBuffer buffer, ByteOrder order) { return CompressedVSizeIndexedV3Supplier.fromByteBuffer( - buffer, ByteOrder.nativeOrder() + buffer, + ByteOrder.nativeOrder(), + null ); } } diff --git a/processing/src/test/java/io/druid/segment/data/CompressedFloatsSerdeTest.java b/processing/src/test/java/io/druid/segment/data/CompressedFloatsSerdeTest.java index 27de948ba813..d47b060b1f0a 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedFloatsSerdeTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedFloatsSerdeTest.java @@ -129,7 +129,7 @@ public OutputStream openStream() throws IOException ); Assert.assertEquals(baos.size(), serializer.getSerializedSize()); CompressedFloatsIndexedSupplier supplier = CompressedFloatsIndexedSupplier - .fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order); + .fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order, null); IndexedFloats floats = supplier.get(); assertIndexMatchesVals(floats, values); @@ -183,7 +183,7 @@ private void testSupplierSerde(CompressedFloatsIndexedSupplier supplier, float[] final byte[] bytes = baos.toByteArray(); Assert.assertEquals(supplier.getSerializedSize(), bytes.length); CompressedFloatsIndexedSupplier anotherSupplier = CompressedFloatsIndexedSupplier.fromByteBuffer( - ByteBuffer.wrap(bytes), order + ByteBuffer.wrap(bytes), order, null ); IndexedFloats indexed = anotherSupplier.get(); assertIndexMatchesVals(indexed, vals); diff --git a/processing/src/test/java/io/druid/segment/data/CompressedIntsIndexedSupplierTest.java b/processing/src/test/java/io/druid/segment/data/CompressedIntsIndexedSupplierTest.java index 7e19c2f8bec9..66b6dfa94353 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedIntsIndexedSupplierTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedIntsIndexedSupplierTest.java @@ -103,7 +103,7 @@ private void makeWithSerde(final int chunkSize) throws IOException final byte[] bytes = baos.toByteArray(); Assert.assertEquals(theSupplier.getSerializedSize(), bytes.length); - supplier = CompressedIntsIndexedSupplier.fromByteBuffer(ByteBuffer.wrap(bytes), ByteOrder.nativeOrder()); + supplier = CompressedIntsIndexedSupplier.fromByteBuffer(ByteBuffer.wrap(bytes), ByteOrder.nativeOrder(), null); indexed = supplier.get(); } @@ -111,7 +111,7 @@ private void setupLargeChunks(final int chunkSize, final int totalSize) throws I { vals = new int[totalSize]; Random rand = new Random(0); - for(int i = 0; i < vals.length; ++i) { + for (int i = 0; i < vals.length; ++i) { vals[i] = rand.nextInt(); } diff --git a/processing/src/test/java/io/druid/segment/data/CompressedIntsIndexedWriterTest.java b/processing/src/test/java/io/druid/segment/data/CompressedIntsIndexedWriterTest.java index 671beb7ce54a..60ce1fa027c8 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedIntsIndexedWriterTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedIntsIndexedWriterTest.java @@ -23,7 +23,13 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; import io.druid.java.util.common.guava.CloseQuietly; +import io.druid.java.util.common.io.smoosh.FileSmoosher; +import io.druid.java.util.common.io.smoosh.Smoosh; +import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; +import io.druid.java.util.common.io.smoosh.SmooshedWriter; +import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.junit.After; import org.junit.Before; @@ -31,11 +37,13 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; +import java.nio.file.Files; import java.util.List; import java.util.Random; import java.util.Set; @@ -45,6 +53,22 @@ @RunWith(Parameterized.class) public class CompressedIntsIndexedWriterTest { + private static final int[] MAX_VALUES = new int[]{0xFF, 0xFFFF, 0xFFFFFF, 0x0FFFFFFF}; + private static final int[] CHUNK_FACTORS = new int[]{1, 2, 100, CompressedIntsIndexedSupplier.MAX_INTS_IN_BUFFER}; + private final IOPeon ioPeon = new TmpFileIOPeon(); + private final CompressedObjectStrategy.CompressionStrategy compressionStrategy; + private final ByteOrder byteOrder; + private final Random rand = new Random(0); + private int[] vals; + public CompressedIntsIndexedWriterTest( + CompressedObjectStrategy.CompressionStrategy compressionStrategy, + ByteOrder byteOrder + ) + { + this.compressionStrategy = compressionStrategy; + this.byteOrder = byteOrder; + } + @Parameterized.Parameters(name = "{index}: compression={0}, byteOrder={1}") public static Iterable compressionStrategiesAndByteOrders() { @@ -65,24 +89,6 @@ public Object[] apply(List input) ); } - private static final int[] MAX_VALUES = new int[]{0xFF, 0xFFFF, 0xFFFFFF, 0x0FFFFFFF}; - private static final int[] CHUNK_FACTORS = new int[]{1, 2, 100, CompressedIntsIndexedSupplier.MAX_INTS_IN_BUFFER}; - - private final IOPeon ioPeon = new TmpFileIOPeon(); - private final CompressedObjectStrategy.CompressionStrategy compressionStrategy; - private final ByteOrder byteOrder; - private final Random rand = new Random(0); - private int[] vals; - - public CompressedIntsIndexedWriterTest( - CompressedObjectStrategy.CompressionStrategy compressionStrategy, - ByteOrder byteOrder - ) - { - this.compressionStrategy = compressionStrategy; - this.byteOrder = byteOrder; - } - @Before public void setUp() throws Exception { @@ -105,6 +111,8 @@ private void generateVals(final int totalSize, final int maxValue) throws IOExce private void checkSerializedSizeAndData(int chunkFactor) throws Exception { + FileSmoosher smoosher = new FileSmoosher(FileUtils.getTempDirectory()); + CompressedIntsIndexedWriter writer = new CompressedIntsIndexedWriter( ioPeon, "test", chunkFactor, byteOrder, compressionStrategy ); @@ -118,14 +126,17 @@ private void checkSerializedSizeAndData(int chunkFactor) throws Exception writer.close(); long writtenLength = writer.getSerializedSize(); final WritableByteChannel outputChannel = Channels.newChannel(ioPeon.makeOutputStream("output")); - writer.writeToChannel(outputChannel); + writer.writeToChannel(outputChannel, smoosher); outputChannel.close(); + smoosher.close(); assertEquals(writtenLength, supplierFromList.getSerializedSize()); // read from ByteBuffer and check values CompressedIntsIndexedSupplier supplierFromByteBuffer = CompressedIntsIndexedSupplier.fromByteBuffer( - ByteBuffer.wrap(IOUtils.toByteArray(ioPeon.makeInputStream("output"))), byteOrder + ByteBuffer.wrap(IOUtils.toByteArray(ioPeon.makeInputStream("output"))), + byteOrder, + null ); IndexedInts indexedInts = supplierFromByteBuffer.get(); assertEquals(vals.length, indexedInts.size()); @@ -165,4 +176,72 @@ public void testWriteEmpty() throws Exception vals = new int[0]; checkSerializedSizeAndData(2); } + + + private void checkV2SerializedSizeAndData(int chunkFactor) throws Exception + { + File tmpDirectory = Files.createTempDirectory(String.format( + "CompressedIntsIndexedWriterTest_%d", + chunkFactor + )).toFile(); + + FileSmoosher smoosher = new FileSmoosher(tmpDirectory); + final IOPeon ioPeon = new TmpFileIOPeon(); + try { + + CompressedIntsIndexedWriter writer = new CompressedIntsIndexedWriter( + chunkFactor, + compressionStrategy, + new GenericIndexedWriter<>( + ioPeon, "test", + CompressedIntBufferObjectStrategy.getBufferForOrder(byteOrder, compressionStrategy, + chunkFactor + ), Longs.BYTES * 10000 + ) + ); + + writer.open(); + for (int val : vals) { + writer.add(val); + } + writer.close(); + final SmooshedWriter channel = smoosher.addWithSmooshedWriter( + "test", writer.getSerializedSize() + ); + writer.writeToChannel(channel, smoosher); + channel.close(); + smoosher.close(); + + SmooshedFileMapper mapper = Smoosh.map(tmpDirectory); + + // read from ByteBuffer and check values + CompressedIntsIndexedSupplier supplierFromByteBuffer = CompressedIntsIndexedSupplier.fromByteBuffer( + mapper.mapFile("test"), + byteOrder, + mapper + ); + IndexedInts indexedInts = supplierFromByteBuffer.get(); + assertEquals(vals.length, indexedInts.size()); + for (int i = 0; i < vals.length; ++i) { + assertEquals(vals[i], indexedInts.get(i)); + } + CloseQuietly.close(indexedInts); + mapper.close(); + } + finally { + ioPeon.close(); + } + } + + @Test + public void testMultiValueFileLargeData() throws Exception + { + // more than one chunk + for (int maxValue : MAX_VALUES) { + for (int chunkFactor : CHUNK_FACTORS) { + generateVals((rand.nextInt(5) + 5) * chunkFactor + rand.nextInt(chunkFactor), maxValue); + checkV2SerializedSizeAndData(chunkFactor); + } + } + } } diff --git a/processing/src/test/java/io/druid/segment/data/CompressedLongsSerdeTest.java b/processing/src/test/java/io/druid/segment/data/CompressedLongsSerdeTest.java index 9e67f0587974..ffae76ff6535 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedLongsSerdeTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedLongsSerdeTest.java @@ -151,7 +151,7 @@ public OutputStream openStream() throws IOException ); Assert.assertEquals(baos.size(), serializer.getSerializedSize()); CompressedLongsIndexedSupplier supplier = CompressedLongsIndexedSupplier - .fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order); + .fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order, null); IndexedLongs longs = supplier.get(); assertIndexMatchesVals(longs, values); @@ -205,7 +205,9 @@ private void testSupplierSerde(CompressedLongsIndexedSupplier supplier, long[] v final byte[] bytes = baos.toByteArray(); Assert.assertEquals(supplier.getSerializedSize(), bytes.length); CompressedLongsIndexedSupplier anotherSupplier = CompressedLongsIndexedSupplier.fromByteBuffer( - ByteBuffer.wrap(bytes), order + ByteBuffer.wrap(bytes), + order, + null ); IndexedLongs indexed = anotherSupplier.get(); assertIndexMatchesVals(indexed, vals); diff --git a/processing/src/test/java/io/druid/segment/data/CompressedVSizeIndexedSupplierTest.java b/processing/src/test/java/io/druid/segment/data/CompressedVSizeIndexedSupplierTest.java index 1b52dd757071..dd9554da4857 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedVSizeIndexedSupplierTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedVSizeIndexedSupplierTest.java @@ -142,7 +142,9 @@ private void assertSame(List someInts, IndexedMultivalue ind protected WritableSupplier> fromByteBuffer(ByteBuffer buffer, ByteOrder order) { return CompressedVSizeIndexedSupplier.fromByteBuffer( - buffer, ByteOrder.nativeOrder() + buffer, + ByteOrder.nativeOrder(), + null ); } } diff --git a/processing/src/test/java/io/druid/segment/data/CompressedVSizeIndexedV3WriterTest.java b/processing/src/test/java/io/druid/segment/data/CompressedVSizeIndexedV3WriterTest.java index b2a59d3989c5..c14942367aef 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedVSizeIndexedV3WriterTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedVSizeIndexedV3WriterTest.java @@ -24,8 +24,14 @@ import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; import io.druid.java.util.common.guava.CloseQuietly; +import io.druid.java.util.common.io.smoosh.FileSmoosher; +import io.druid.java.util.common.io.smoosh.Smoosh; +import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; +import io.druid.java.util.common.io.smoosh.SmooshedWriter; import io.druid.segment.CompressedVSizeIndexedV3Supplier; +import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.junit.After; import org.junit.Before; @@ -34,11 +40,13 @@ import org.junit.runners.Parameterized; import javax.annotation.Nullable; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; +import java.nio.file.Files; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -49,26 +57,6 @@ @RunWith(Parameterized.class) public class CompressedVSizeIndexedV3WriterTest { - @Parameterized.Parameters(name = "{index}: compression={0}, byteOrder={1}") - public static Iterable compressionStrategiesAndByteOrders() - { - Set> combinations = Sets.cartesianProduct( - Sets.newHashSet(CompressedObjectStrategy.CompressionStrategy.noNoneValues()), - Sets.newHashSet(ByteOrder.BIG_ENDIAN, ByteOrder.LITTLE_ENDIAN) - ); - - return Iterables.transform( - combinations, new Function() - { - @Override - public Object[] apply(List input) - { - return new Object[]{input.get(0), input.get(1)}; - } - } - ); - } - private static final int[] OFFSET_CHUNK_FACTORS = new int[]{ 1, 2, @@ -76,7 +64,6 @@ public Object[] apply(List input) CompressedIntsIndexedSupplier.MAX_INTS_IN_BUFFER }; private static final int[] MAX_VALUES = new int[]{0xFF, 0xFFFF, 0xFFFFFF, 0x0FFFFFFF}; - private final IOPeon ioPeon = new TmpFileIOPeon(); private final CompressedObjectStrategy.CompressionStrategy compressionStrategy; private final ByteOrder byteOrder; @@ -92,6 +79,26 @@ public CompressedVSizeIndexedV3WriterTest( this.byteOrder = byteOrder; } + @Parameterized.Parameters(name = "{index}: compression={0}, byteOrder={1}") + public static Iterable compressionStrategiesAndByteOrders() + { + Set> combinations = Sets.cartesianProduct( + Sets.newHashSet(CompressedObjectStrategy.CompressionStrategy.noNoneValues()), + Sets.newHashSet(ByteOrder.BIG_ENDIAN, ByteOrder.LITTLE_ENDIAN) + ); + + return Iterables.transform( + combinations, new Function() + { + @Override + public Object[] apply(List input) + { + return new Object[]{input.get(0), input.get(1)}; + } + } + ); + } + private void generateVals(final int totalSize, final int maxValue) throws IOException { vals = new ArrayList<>(totalSize); @@ -107,53 +114,65 @@ private void generateVals(final int totalSize, final int maxValue) throws IOExce private void checkSerializedSizeAndData(int offsetChunkFactor, int valueChunkFactor) throws Exception { - int maxValue = vals.size() > 0 ? getMaxValue(vals) : 0; - CompressedIntsIndexedWriter offsetWriter = new CompressedIntsIndexedWriter( - ioPeon, "offset", offsetChunkFactor, byteOrder, compressionStrategy - ); - CompressedVSizeIntsIndexedWriter valueWriter = new CompressedVSizeIntsIndexedWriter( - ioPeon, "value", maxValue, valueChunkFactor, byteOrder, compressionStrategy - ); - CompressedVSizeIndexedV3Writer writer = new CompressedVSizeIndexedV3Writer(offsetWriter, valueWriter); - CompressedVSizeIndexedV3Supplier supplierFromIterable = CompressedVSizeIndexedV3Supplier.fromIterable( - Iterables.transform( - vals, new Function() - { - @Nullable - @Override - public IndexedInts apply(@Nullable final int[] input) + FileSmoosher smoosher = new FileSmoosher(FileUtils.getTempDirectory()); + final IOPeon ioPeon = new TmpFileIOPeon(); + final IndexedMultivalue indexedMultivalue; + + try { + int maxValue = vals.size() > 0 ? getMaxValue(vals) : 0; + CompressedIntsIndexedWriter offsetWriter = new CompressedIntsIndexedWriter( + ioPeon, "offset", offsetChunkFactor, byteOrder, compressionStrategy + ); + CompressedVSizeIntsIndexedWriter valueWriter = new CompressedVSizeIntsIndexedWriter( + ioPeon, "value", maxValue, valueChunkFactor, byteOrder, compressionStrategy + ); + CompressedVSizeIndexedV3Writer writer = new CompressedVSizeIndexedV3Writer(offsetWriter, valueWriter); + CompressedVSizeIndexedV3Supplier supplierFromIterable = CompressedVSizeIndexedV3Supplier.fromIterable( + Iterables.transform( + vals, new Function() { - return ArrayBasedIndexedInts.of(input); + @Nullable + @Override + public IndexedInts apply(@Nullable final int[] input) + { + return ArrayBasedIndexedInts.of(input); + } } - } - ), offsetChunkFactor, maxValue, byteOrder, compressionStrategy - ); - writer.open(); - for (int[] val : vals) { - writer.add(val); - } - writer.close(); - long writtenLength = writer.getSerializedSize(); - final WritableByteChannel outputChannel = Channels.newChannel(ioPeon.makeOutputStream("output")); - writer.writeToChannel(outputChannel); - outputChannel.close(); + ), offsetChunkFactor, maxValue, byteOrder, compressionStrategy + ); + writer.open(); + for (int[] val : vals) { + writer.add(val); + } + writer.close(); + long writtenLength = writer.getSerializedSize(); + final WritableByteChannel outputChannel = Channels.newChannel(ioPeon.makeOutputStream("output")); + writer.writeToChannel(outputChannel, smoosher); + outputChannel.close(); + smoosher.close(); - assertEquals(writtenLength, supplierFromIterable.getSerializedSize()); + assertEquals(writtenLength, supplierFromIterable.getSerializedSize()); - // read from ByteBuffer and check values - CompressedVSizeIndexedV3Supplier supplierFromByteBuffer = CompressedVSizeIndexedV3Supplier.fromByteBuffer( - ByteBuffer.wrap(IOUtils.toByteArray(ioPeon.makeInputStream("output"))), byteOrder - ); - IndexedMultivalue indexedMultivalue = supplierFromByteBuffer.get(); - assertEquals(indexedMultivalue.size(), vals.size()); - for (int i = 0; i < vals.size(); ++i) { - IndexedInts subVals = indexedMultivalue.get(i); - assertEquals(subVals.size(), vals.get(i).length); - for (int j = 0; j < subVals.size(); ++j) { - assertEquals(subVals.get(j), vals.get(i)[j]); + // read from ByteBuffer and check values + CompressedVSizeIndexedV3Supplier supplierFromByteBuffer = CompressedVSizeIndexedV3Supplier.fromByteBuffer( + ByteBuffer.wrap(IOUtils.toByteArray(ioPeon.makeInputStream("output"))), + byteOrder, + null + ); + indexedMultivalue = supplierFromByteBuffer.get(); + assertEquals(indexedMultivalue.size(), vals.size()); + for (int i = 0; i < vals.size(); ++i) { + IndexedInts subVals = indexedMultivalue.get(i); + assertEquals(subVals.size(), vals.get(i).length); + for (int j = 0; j < subVals.size(); ++j) { + assertEquals(subVals.get(j), vals.get(i)[j]); + } } + CloseQuietly.close(indexedMultivalue); + } + finally { + ioPeon.close(); } - CloseQuietly.close(indexedMultivalue); } int getMaxValue(final List vals) @@ -217,4 +236,101 @@ public void testEmpty() throws Exception vals = new ArrayList<>(); checkSerializedSizeAndData(1, 2); } + + private void checkV2SerializedSizeAndData(int offsetChunkFactor, int valueChunkFactor) throws Exception + { + File tmpDirectory = Files.createTempDirectory(String.format( + "CompressedVSizeIndexedV3WriterTest_%d_%d", + offsetChunkFactor, + offsetChunkFactor + )).toFile(); + FileSmoosher smoosher = new FileSmoosher(tmpDirectory); + final IOPeon ioPeon = new TmpFileIOPeon(); + int maxValue = vals.size() > 0 ? getMaxValue(vals) : 0; + + try { + CompressedIntsIndexedWriter offsetWriter = new CompressedIntsIndexedWriter( + offsetChunkFactor, + compressionStrategy, + new GenericIndexedWriter<>( + ioPeon, "offset", + CompressedIntBufferObjectStrategy.getBufferForOrder( + byteOrder, + compressionStrategy, + offsetChunkFactor + ), + Longs.BYTES * 250000 + ) + ); + + GenericIndexedWriter genericIndexed = new GenericIndexedWriter<>( + ioPeon, + "value", + CompressedByteBufferObjectStrategy.getBufferForOrder( + byteOrder, + compressionStrategy, + valueChunkFactor * VSizeIndexedInts.getNumBytesForMax(maxValue) + + CompressedVSizeIntsIndexedSupplier.bufferPadding(VSizeIndexedInts.getNumBytesForMax(maxValue)) + ), + Longs.BYTES * 250000 + ); + CompressedVSizeIntsIndexedWriter valueWriter = new CompressedVSizeIntsIndexedWriter( + ioPeon, + "value", + maxValue, + valueChunkFactor, + byteOrder, + compressionStrategy, + genericIndexed + ); + CompressedVSizeIndexedV3Writer writer = new CompressedVSizeIndexedV3Writer(offsetWriter, valueWriter); + writer.open(); + for (int[] val : vals) { + writer.add(val); + } + writer.close(); + + final SmooshedWriter channel = smoosher.addWithSmooshedWriter( + "test", + writer.getSerializedSize() + ); + writer.writeToChannel(channel, smoosher); + channel.close(); + smoosher.close(); + SmooshedFileMapper mapper = Smoosh.map(tmpDirectory); + + CompressedVSizeIndexedV3Supplier supplierFromByteBuffer = CompressedVSizeIndexedV3Supplier.fromByteBuffer( + mapper.mapFile("test"), + byteOrder, + mapper + ); + IndexedMultivalue indexedMultivalue = supplierFromByteBuffer.get(); + assertEquals(indexedMultivalue.size(), vals.size()); + for (int i = 0; i < vals.size(); ++i) { + IndexedInts subVals = indexedMultivalue.get(i); + assertEquals(subVals.size(), vals.get(i).length); + for (int j = 0; j < subVals.size(); ++j) { + assertEquals(subVals.get(j), vals.get(i)[j]); + } + } + CloseQuietly.close(indexedMultivalue); + mapper.close(); + } + finally { + ioPeon.close(); + } + } + + @Test + public void testMultiValueFileLargeData() throws Exception + { + // more than one chunk + for (int offsetChunk : OFFSET_CHUNK_FACTORS) { + for (int maxValue : MAX_VALUES) { + final int valueChunk = CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue); + generateVals((rand.nextInt(2) + 1) * valueChunk + rand.nextInt(valueChunk), maxValue); + checkV2SerializedSizeAndData(offsetChunk, valueChunk); + } + } + } } diff --git a/processing/src/test/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplierTest.java b/processing/src/test/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplierTest.java index 690afbdfd4f0..bb29c04513ca 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplierTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplierTest.java @@ -148,7 +148,7 @@ private void makeWithSerde(final int chunkSize) throws IOException final byte[] bytes = baos.toByteArray(); Assert.assertEquals(theSupplier.getSerializedSize(), bytes.length); - supplier = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(ByteBuffer.wrap(bytes), byteOrder); + supplier = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(ByteBuffer.wrap(bytes), byteOrder, null); indexed = supplier.get(); } diff --git a/processing/src/test/java/io/druid/segment/data/CompressedVSizeIntsIndexedWriterTest.java b/processing/src/test/java/io/druid/segment/data/CompressedVSizeIntsIndexedWriterTest.java index 2a2268f13dc0..14afd1b1648f 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedVSizeIntsIndexedWriterTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedVSizeIntsIndexedWriterTest.java @@ -23,7 +23,13 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; import io.druid.java.util.common.guava.CloseQuietly; +import io.druid.java.util.common.io.smoosh.FileSmoosher; +import io.druid.java.util.common.io.smoosh.Smoosh; +import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; +import io.druid.java.util.common.io.smoosh.SmooshedWriter; +import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.junit.After; import org.junit.Before; @@ -31,6 +37,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -45,6 +52,21 @@ @RunWith(Parameterized.class) public class CompressedVSizeIntsIndexedWriterTest { + private static final int[] MAX_VALUES = new int[]{0xFF, 0xFFFF, 0xFFFFFF, 0x0FFFFFFF}; + private final IOPeon ioPeon = new TmpFileIOPeon(); + private final CompressedObjectStrategy.CompressionStrategy compressionStrategy; + private final ByteOrder byteOrder; + private final Random rand = new Random(0); + private int[] vals; + public CompressedVSizeIntsIndexedWriterTest( + CompressedObjectStrategy.CompressionStrategy compressionStrategy, + ByteOrder byteOrder + ) + { + this.compressionStrategy = compressionStrategy; + this.byteOrder = byteOrder; + } + @Parameterized.Parameters(name = "{index}: compression={0}, byteOrder={1}") public static Iterable compressionStrategiesAndByteOrders() { @@ -65,23 +87,6 @@ public Object[] apply(List input) ); } - private static final int[] MAX_VALUES = new int[]{0xFF, 0xFFFF, 0xFFFFFF, 0x0FFFFFFF}; - - private final IOPeon ioPeon = new TmpFileIOPeon(); - private final CompressedObjectStrategy.CompressionStrategy compressionStrategy; - private final ByteOrder byteOrder; - private final Random rand = new Random(0); - private int[] vals; - - public CompressedVSizeIntsIndexedWriterTest( - CompressedObjectStrategy.CompressionStrategy compressionStrategy, - ByteOrder byteOrder - ) - { - this.compressionStrategy = compressionStrategy; - this.byteOrder = byteOrder; - } - @Before public void setUp() throws Exception { @@ -104,6 +109,8 @@ private void generateVals(final int totalSize, final int maxValue) throws IOExce private void checkSerializedSizeAndData(int chunkSize) throws Exception { + FileSmoosher smoosher = new FileSmoosher(FileUtils.getTempDirectory()); + CompressedVSizeIntsIndexedWriter writer = new CompressedVSizeIntsIndexedWriter( ioPeon, "test", vals.length > 0 ? Ints.max(vals) : 0, chunkSize, byteOrder, compressionStrategy ); @@ -117,14 +124,17 @@ private void checkSerializedSizeAndData(int chunkSize) throws Exception writer.close(); long writtenLength = writer.getSerializedSize(); final WritableByteChannel outputChannel = Channels.newChannel(ioPeon.makeOutputStream("output")); - writer.writeToChannel(outputChannel); + writer.writeToChannel(outputChannel, smoosher); outputChannel.close(); + smoosher.close(); assertEquals(writtenLength, supplierFromList.getSerializedSize()); // read from ByteBuffer and check values CompressedVSizeIntsIndexedSupplier supplierFromByteBuffer = CompressedVSizeIntsIndexedSupplier.fromByteBuffer( - ByteBuffer.wrap(IOUtils.toByteArray(ioPeon.makeInputStream("output"))), byteOrder + ByteBuffer.wrap(IOUtils.toByteArray(ioPeon.makeInputStream("output"))), + byteOrder, + null ); IndexedInts indexedInts = supplierFromByteBuffer.get(); for (int i = 0; i < vals.length; ++i) { @@ -161,4 +171,66 @@ public void testEmpty() throws Exception vals = new int[0]; checkSerializedSizeAndData(2); } + + private void checkV2SerializedSizeAndData(int chunkSize) throws Exception + { + File tmpDirectory = FileUtils.getTempDirectory(); + FileSmoosher smoosher = new FileSmoosher(tmpDirectory); + + int maxValue = vals.length > 0 ? Ints.max(vals) : 0; + GenericIndexedWriter genericIndexed = new GenericIndexedWriter<>( + ioPeon, + "test", + CompressedByteBufferObjectStrategy.getBufferForOrder( + byteOrder, + compressionStrategy, + chunkSize * VSizeIndexedInts.getNumBytesForMax(maxValue) + + CompressedVSizeIntsIndexedSupplier.bufferPadding(VSizeIndexedInts.getNumBytesForMax(maxValue)) + ), + Longs.BYTES * 10000 + ); + CompressedVSizeIntsIndexedWriter writer = new CompressedVSizeIntsIndexedWriter( + ioPeon, "test", vals.length > 0 ? Ints.max(vals) : 0, chunkSize, byteOrder, compressionStrategy, + genericIndexed + ); + writer.open(); + for (int val : vals) { + writer.add(val); + } + + writer.close(); + final SmooshedWriter channel = smoosher.addWithSmooshedWriter( + "test", + writer.getSerializedSize() + ); + writer.writeToChannel(channel, smoosher); + channel.close(); + smoosher.close(); + + SmooshedFileMapper mapper = Smoosh.map(tmpDirectory); + + CompressedVSizeIntsIndexedSupplier supplierFromByteBuffer = CompressedVSizeIntsIndexedSupplier.fromByteBuffer( + mapper.mapFile("test"), + byteOrder, + mapper + ); + + IndexedInts indexedInts = supplierFromByteBuffer.get(); + for (int i = 0; i < vals.length; ++i) { + assertEquals(vals[i], indexedInts.get(i)); + } + CloseQuietly.close(indexedInts); + mapper.close(); + } + + @Test + public void testMultiValueFileLargeData() throws Exception + { + for (int maxValue : MAX_VALUES) { + final int maxChunkSize = CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue); + generateVals((rand.nextInt(5) + 5) * maxChunkSize + rand.nextInt(maxChunkSize), maxValue); + checkV2SerializedSizeAndData(maxChunkSize); + } + } + } diff --git a/processing/src/test/java/io/druid/segment/data/IOPeonForTesting.java b/processing/src/test/java/io/druid/segment/data/IOPeonForTesting.java index effc42ad9b8a..c45df43815da 100644 --- a/processing/src/test/java/io/druid/segment/data/IOPeonForTesting.java +++ b/processing/src/test/java/io/druid/segment/data/IOPeonForTesting.java @@ -23,6 +23,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -30,7 +31,7 @@ import java.util.Map; /** -*/ + */ class IOPeonForTesting implements IOPeon { Map outStreams = Maps.newHashMap(); @@ -65,4 +66,10 @@ public void close() throws IOException { outStreams.clear(); } + + @Override + public File getFile(String filename) + { + return null; + } } diff --git a/processing/src/test/java/io/druid/segment/data/VSizeIndexedIntsWriterTest.java b/processing/src/test/java/io/druid/segment/data/VSizeIndexedIntsWriterTest.java index a27509da7bd2..4a0a4a2926ad 100644 --- a/processing/src/test/java/io/druid/segment/data/VSizeIndexedIntsWriterTest.java +++ b/processing/src/test/java/io/druid/segment/data/VSizeIndexedIntsWriterTest.java @@ -78,7 +78,7 @@ private void checkSerializedSizeAndData() throws Exception writer.close(); long writtenLength = writer.getSerializedSize(); final WritableByteChannel outputChannel = Channels.newChannel(ioPeon.makeOutputStream("output")); - writer.writeToChannel(outputChannel); + writer.writeToChannel(outputChannel, null); outputChannel.close(); assertEquals(writtenLength, intsFromList.getSerializedSize()); diff --git a/processing/src/test/java/io/druid/segment/serde/HyperUniquesSerdeForTest.java b/processing/src/test/java/io/druid/segment/serde/HyperUniquesSerdeForTest.java new file mode 100644 index 000000000000..3f2df176c16b --- /dev/null +++ b/processing/src/test/java/io/druid/segment/serde/HyperUniquesSerdeForTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.serde; + +import com.google.common.collect.Ordering; +import com.google.common.hash.HashFunction; +import com.metamx.common.StringUtils; +import io.druid.data.input.InputRow; +import io.druid.hll.HyperLogLogCollector; +import io.druid.segment.GenericColumnSerializer; +import io.druid.segment.column.ColumnBuilder; +import io.druid.segment.data.GenericIndexed; +import io.druid.segment.data.IOPeon; +import io.druid.segment.data.ObjectStrategy; + +import java.nio.ByteBuffer; +import java.util.List; + +public class HyperUniquesSerdeForTest extends ComplexMetricSerde +{ + private static Ordering comparator = new Ordering() + { + @Override + public int compare( + HyperLogLogCollector arg1, HyperLogLogCollector arg2 + ) + { + return arg1.toByteBuffer().compareTo(arg2.toByteBuffer()); + } + }.nullsFirst(); + + private final HashFunction hashFn; + + public HyperUniquesSerdeForTest( + HashFunction hashFn + ) + { + this.hashFn = hashFn; + } + + @Override + public String getTypeName() + { + return "hyperUnique"; + } + + @Override + public ComplexMetricExtractor getExtractor() + { + return new ComplexMetricExtractor() + { + @Override + public Class extractedClass() + { + return HyperLogLogCollector.class; + } + + @Override + public HyperLogLogCollector extractValue(InputRow inputRow, String metricName) + { + Object rawValue = inputRow.getRaw(metricName); + + if (rawValue instanceof HyperLogLogCollector) { + return (HyperLogLogCollector) rawValue; + } else { + HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector(); + + List dimValues = inputRow.getDimension(metricName); + if (dimValues == null) { + return collector; + } + + for (String dimensionValue : dimValues) { + collector.add( + hashFn.hashBytes(StringUtils.toUtf8(dimensionValue)).asBytes()); + } + return collector; + } + } + }; + } + + @Override + public void deserializeColumn( + ByteBuffer byteBuffer, ColumnBuilder columnBuilder + ) + { + final GenericIndexed column; + if (columnBuilder.getFileMapper() == null) { + column = GenericIndexed.read(byteBuffer, getObjectStrategy()); + } else { + column = GenericIndexed.read(byteBuffer, getObjectStrategy(), columnBuilder.getFileMapper()); + } + + columnBuilder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), column)); + } + + @Override + public ObjectStrategy getObjectStrategy() + { + return new ObjectStrategy() + { + @Override + public Class getClazz() + { + return HyperLogLogCollector.class; + } + + @Override + public HyperLogLogCollector fromByteBuffer(ByteBuffer buffer, int numBytes) + { + final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); + readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes); + return HyperLogLogCollector.makeCollector(readOnlyBuffer); + } + + @Override + public byte[] toBytes(HyperLogLogCollector collector) + { + if (collector == null) { + return new byte[]{}; + } + ByteBuffer val = collector.toByteBuffer(); + byte[] retVal = new byte[val.remaining()]; + val.asReadOnlyBuffer().get(retVal); + return retVal; + } + + @Override + public int compare(HyperLogLogCollector o1, HyperLogLogCollector o2) + { + return comparator.compare(o1, o2); + } + }; + } + + @Override + public GenericColumnSerializer getSerializer(IOPeon peon, String metric) + { + return LargeColumnSupportedComplexColumnSerializer.createWithColumnSize( + peon, + metric, + this.getObjectStrategy(), + Integer.MAX_VALUE + ); + } +} diff --git a/processing/src/test/java/io/druid/segment/serde/LargeColumnSupportedComplexColumnSerializerTest.java b/processing/src/test/java/io/druid/segment/serde/LargeColumnSupportedComplexColumnSerializerTest.java new file mode 100644 index 000000000000..bba412f58bc8 --- /dev/null +++ b/processing/src/test/java/io/druid/segment/serde/LargeColumnSupportedComplexColumnSerializerTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.serde; + +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import com.google.common.primitives.Longs; +import io.druid.hll.HyperLogLogCollector; +import io.druid.java.util.common.io.smoosh.FileSmoosher; +import io.druid.java.util.common.io.smoosh.Smoosh; +import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; +import io.druid.java.util.common.io.smoosh.SmooshedWriter; +import io.druid.segment.column.Column; +import io.druid.segment.column.ColumnBuilder; +import io.druid.segment.column.ComplexColumn; +import io.druid.segment.column.ValueType; +import io.druid.segment.data.IOPeon; +import io.druid.segment.data.TmpFileIOPeon; +import org.apache.commons.io.FileUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +public class LargeColumnSupportedComplexColumnSerializerTest +{ + + private final HashFunction fn = Hashing.murmur3_128(); + + @Test + public void testSanity() throws IOException + { + + HyperUniquesSerdeForTest serde = new HyperUniquesSerdeForTest(Hashing.murmur3_128()); + int[] cases = {1000, 5000, 10000, 20000}; + int[] columnSizes = { + Integer.MAX_VALUE, Integer.MAX_VALUE / 2, Integer.MAX_VALUE / 4, 5000 * Longs.BYTES, + 2500 * Longs.BYTES + }; + + for (int k = 0; k < columnSizes.length; k++) { + for (int j = 0; j < cases.length; j++) { + IOPeon peon = new TmpFileIOPeon(); + File tmpFile = FileUtils.getTempDirectory(); + final FileSmoosher v9Smoosher = new FileSmoosher(tmpFile); + + LargeColumnSupportedComplexColumnSerializer serializer = LargeColumnSupportedComplexColumnSerializer + .createWithColumnSize(peon, "test", serde.getObjectStrategy(), columnSizes[k]); + HyperLogLogCollector baseCollector = HyperLogLogCollector.makeLatestCollector(); + + serializer.open(); + for (int i = 0; i < cases[j]; i++) { + HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector(); + byte[] hashBytes = fn.hashLong(i).asBytes(); + collector.add(hashBytes); + baseCollector.fold(collector); + serializer.serialize(collector); + } + serializer.close(); + + final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( + "test", + serializer.getSerializedSize() + ); + serializer.writeToChannel(channel, v9Smoosher); + + channel.close(); + peon.close(); + v9Smoosher.close(); + + SmooshedFileMapper mapper = Smoosh.map(tmpFile); + final ColumnBuilder builder = new ColumnBuilder() + .setType(ValueType.COMPLEX) + .setHasMultipleValues(false) + .setFileMapper(mapper); + serde.deserializeColumn(mapper.mapFile("test"), builder); + + Column column = builder.build(); + ComplexColumn complexColumn = column.getComplexColumn(); + HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector(); + + for (int i = 0; i < cases[j]; i++) { + collector.fold((HyperLogLogCollector) complexColumn.getRowValue(i)); + } + Assert.assertEquals(baseCollector.estimateCardinality(), collector.estimateCardinality(), 0.0); + } + } + } + +}