Skip to content

Commit

Permalink
Removing Integer.MAX column size limit. (apache#3743)
Browse files Browse the repository at this point in the history
* Removing Integer.MAX column size limit.

* On demand creation of headerLong, use v2 instead of v3

* Avoid reusing the same object from a previous test.

* Avoid reusing the same object from a previous test part#2

* code formatting.

* GenericIndexed/Writer code review changes.

* GenericIndexed/writer code review requested changes.

* checkIndex() to static

* native endianess for genericIndexedV2, code  review requested changes.

* Formatting

* Hll fix.

* use native endianess during bag size calculation.

* Code review requested changes.

* IOPeon close() changes.

* use different tmp directory path for testing.

* Code review requested changes.
  • Loading branch information
akashdw authored and leventov committed Feb 17, 2017
1 parent e5fb0e1 commit 797488a
Show file tree
Hide file tree
Showing 62 changed files with 1,914 additions and 493 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ public void setup() throws IOException
)
);
this.compressed = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(
bufferCompressed, ByteOrder.nativeOrder()
bufferCompressed,
ByteOrder.nativeOrder(),
null
).get();

final ByteBuffer bufferUncompressed = serialize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ public IndexedInts apply(int[] input)
)
);
this.compressed = CompressedVSizeIndexedSupplier.fromByteBuffer(
bufferCompressed, ByteOrder.nativeOrder()
bufferCompressed,
ByteOrder.nativeOrder(),
null
).get();

final ByteBuffer bufferUncompressed = serialize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void setup() throws Exception
File compFile = new File(dir, file + "-" + strategy);
rand = new Random();
ByteBuffer buffer = Files.map(compFile);
supplier = CompressedFloatsIndexedSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder());
supplier = CompressedFloatsIndexedSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder(), null);
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void setup() throws Exception
File compFile = new File(dir, file + "-" + strategy + "-" + format);
rand = new Random();
ByteBuffer buffer = Files.map(compFile);
supplier = CompressedLongsIndexedSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder());
supplier = CompressedLongsIndexedSupplier.fromByteBuffer(buffer, ByteOrder.nativeOrder(), null);
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -59,4 +60,10 @@ public void close() throws IOException
{
throw new UnsupportedOperationException();
}

@Override
public File getFile(String filename)
{
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.druid.segment;

import io.druid.java.util.common.IAE;
import io.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier;
import io.druid.segment.data.IndexedInts;
Expand Down Expand Up @@ -76,18 +77,20 @@ public void writeToChannel(WritableByteChannel channel) throws IOException
valueSupplier.writeToChannel(channel);
}

public static CompressedVSizeIndexedSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order)
public static CompressedVSizeIndexedSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order, SmooshedFileMapper fileMapper)
{
byte versionFromBuffer = buffer.get();

if (versionFromBuffer == version) {
CompressedVSizeIntsIndexedSupplier offsetSupplier = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(
buffer,
order
order,
fileMapper
);
CompressedVSizeIntsIndexedSupplier valueSupplier = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(
buffer,
order
order,
fileMapper
);
return new CompressedVSizeIndexedSupplier(offsetSupplier, valueSupplier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.druid.segment;

import io.druid.java.util.common.IAE;
import io.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import io.druid.segment.data.CompressedIntsIndexedSupplier;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier;
Expand Down Expand Up @@ -58,18 +59,24 @@ public class CompressedVSizeIndexedV3Supplier implements WritableSupplier<Indexe
this.valueSupplier = valueSupplier;
}

public static CompressedVSizeIndexedV3Supplier fromByteBuffer(ByteBuffer buffer, ByteOrder order)
public static CompressedVSizeIndexedV3Supplier fromByteBuffer(
ByteBuffer buffer,
ByteOrder order,
SmooshedFileMapper fileMapper
)
{
byte versionFromBuffer = buffer.get();

if (versionFromBuffer == VERSION) {
CompressedIntsIndexedSupplier offsetSupplier = CompressedIntsIndexedSupplier.fromByteBuffer(
buffer,
order
order,
fileMapper
);
CompressedVSizeIntsIndexedSupplier valueSupplier = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(
buffer,
order
order,
fileMapper
);
return new CompressedVSizeIndexedV3Supplier(offsetSupplier, valueSupplier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.druid.segment;

import io.druid.java.util.common.io.smoosh.FileSmoosher;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressionFactory;
import io.druid.segment.data.FloatSupplierSerializer;
Expand Down Expand Up @@ -90,9 +91,9 @@ public long getSerializedSize()
}

@Override
public void writeToChannel(WritableByteChannel channel) throws IOException
public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
{
writer.writeToChannel(channel);
writer.writeToChannel(channel, smoosher);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package io.druid.segment;

import io.druid.java.util.common.io.smoosh.FileSmoosher;

import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
Expand All @@ -31,5 +33,5 @@ public interface GenericColumnSerializer extends Closeable

public long getSerializedSize();

public void writeToChannel(WritableByteChannel channel) throws IOException;
public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException;
}
51 changes: 34 additions & 17 deletions processing/src/main/java/io/druid/segment/IndexIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
import com.google.common.io.Files;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.collections.bitmap.BitmapFactory;
import io.druid.collections.bitmap.ConciseBitmapFactory;
import io.druid.collections.bitmap.ImmutableBitmap;
import io.druid.collections.bitmap.MutableBitmap;
import io.druid.collections.spatial.ImmutableRTree;
import com.metamx.emitter.EmittingLogger;
import io.druid.common.utils.SerializerUtils;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
Expand Down Expand Up @@ -396,22 +396,28 @@ public MMappedIndex mapDir(File inDir) throws IOException

indexBuffer.get(); // Skip the version byte
final GenericIndexed<String> availableDimensions = GenericIndexed.read(
indexBuffer, GenericIndexed.STRING_STRATEGY
indexBuffer,
GenericIndexed.STRING_STRATEGY,
smooshedFiles
);
final GenericIndexed<String> availableMetrics = GenericIndexed.read(
indexBuffer, GenericIndexed.STRING_STRATEGY
indexBuffer,
GenericIndexed.STRING_STRATEGY,
smooshedFiles
);
final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer));
final BitmapSerdeFactory bitmapSerdeFactory = new BitmapSerde.LegacyBitmapSerdeFactory();

CompressedLongsIndexedSupplier timestamps = CompressedLongsIndexedSupplier.fromByteBuffer(
smooshedFiles.mapFile(makeTimeFile(inDir, BYTE_ORDER).getName()), BYTE_ORDER
smooshedFiles.mapFile(makeTimeFile(inDir, BYTE_ORDER).getName()),
BYTE_ORDER,
smooshedFiles
);

Map<String, MetricHolder> metrics = Maps.newLinkedHashMap();
for (String metric : availableMetrics) {
final String metricFilename = makeMetricFile(inDir, metric, BYTE_ORDER).getName();
final MetricHolder holder = MetricHolder.fromByteBuffer(smooshedFiles.mapFile(metricFilename));
final MetricHolder holder = MetricHolder.fromByteBuffer(smooshedFiles.mapFile(metricFilename), smooshedFiles);

if (!metric.equals(holder.getName())) {
throw new ISE("Metric[%s] loaded up metric[%s] from disk. File names do matter.", metric, holder.getName());
Expand Down Expand Up @@ -509,7 +515,7 @@ public void convertV8toV9(File v8Dir, File v9Dir, IndexSpec indexSpec)
final String dimName = serializerUtils.readString(invertedBuffer);
bitmapIndexes.put(
dimName,
GenericIndexed.read(invertedBuffer, bitmapSerdeFactory.getObjectStrategy())
GenericIndexed.read(invertedBuffer, bitmapSerdeFactory.getObjectStrategy(), v8SmooshedFiles)
);
}

Expand Down Expand Up @@ -691,7 +697,7 @@ public int size()
dimension, serdeficator.numBytes() + specBytes.length
);
channel.write(ByteBuffer.wrap(specBytes));
serdeficator.write(channel);
serdeficator.write(channel, v9Smoosher);
channel.close();
} else if (filename.startsWith("met_") || filename.startsWith("numeric_dim_")) {
// NOTE: identifying numeric dimensions by using a different filename pattern is meant to allow the
Expand All @@ -702,7 +708,7 @@ public int size()
continue;
}

MetricHolder holder = MetricHolder.fromByteBuffer(v8SmooshedFiles.mapFile(filename));
MetricHolder holder = MetricHolder.fromByteBuffer(v8SmooshedFiles.mapFile(filename), v8SmooshedFiles);
final String metric = holder.getName();

final ColumnDescriptor.Builder builder = ColumnDescriptor.builder();
Expand Down Expand Up @@ -753,11 +759,13 @@ public int size()
metric, serdeficator.numBytes() + specBytes.length
);
channel.write(ByteBuffer.wrap(specBytes));
serdeficator.write(channel);
serdeficator.write(channel, v9Smoosher);
channel.close();
} else if (String.format("time_%s.drd", BYTE_ORDER).equals(filename)) {
CompressedLongsIndexedSupplier timestamps = CompressedLongsIndexedSupplier.fromByteBuffer(
v8SmooshedFiles.mapFile(filename), BYTE_ORDER
v8SmooshedFiles.mapFile(filename),
BYTE_ORDER,
v8SmooshedFiles
);

final ColumnDescriptor.Builder builder = ColumnDescriptor.builder();
Expand All @@ -778,7 +786,7 @@ public int size()
"__time", serdeficator.numBytes() + specBytes.length
);
channel.write(ByteBuffer.wrap(specBytes));
serdeficator.write(channel);
serdeficator.write(channel, v9Smoosher);
channel.close();
} else {
skippedFiles.add(filename);
Expand Down Expand Up @@ -983,8 +991,16 @@ public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException
* Index.drd should consist of the segment version, the columns and dimensions of the segment as generic
* indexes, the interval start and end millis as longs (in 16 bytes), and a bitmap index type.
*/
final GenericIndexed<String> cols = GenericIndexed.read(indexBuffer, GenericIndexed.STRING_STRATEGY);
final GenericIndexed<String> dims = GenericIndexed.read(indexBuffer, GenericIndexed.STRING_STRATEGY);
final GenericIndexed<String> cols = GenericIndexed.read(
indexBuffer,
GenericIndexed.STRING_STRATEGY,
smooshedFiles
);
final GenericIndexed<String> dims = GenericIndexed.read(
indexBuffer,
GenericIndexed.STRING_STRATEGY,
smooshedFiles
);
final Interval dataInterval = new Interval(indexBuffer.getLong(), indexBuffer.getLong());
final BitmapSerdeFactory segmentBitmapSerdeFactory;

Expand Down Expand Up @@ -1021,10 +1037,10 @@ public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException
Map<String, Column> columns = Maps.newHashMap();

for (String columnName : cols) {
columns.put(columnName, deserializeColumn(mapper, smooshedFiles.mapFile(columnName)));
columns.put(columnName, deserializeColumn(mapper, smooshedFiles.mapFile(columnName), smooshedFiles));
}

columns.put(Column.TIME_COLUMN_NAME, deserializeColumn(mapper, smooshedFiles.mapFile("__time")));
columns.put(Column.TIME_COLUMN_NAME, deserializeColumn(mapper, smooshedFiles.mapFile("__time"), smooshedFiles));

final QueryableIndex index = new SimpleQueryableIndex(
dataInterval, cols, dims, segmentBitmapSerdeFactory.getBitmapFactory(), columns, smooshedFiles, metadata
Expand All @@ -1035,12 +1051,13 @@ public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException
return index;
}

private Column deserializeColumn(ObjectMapper mapper, ByteBuffer byteBuffer) throws IOException
private Column deserializeColumn(ObjectMapper mapper, ByteBuffer byteBuffer, SmooshedFileMapper smooshedFiles)
throws IOException
{
ColumnDescriptor serde = mapper.readValue(
serializerUtils.readString(byteBuffer), ColumnDescriptor.class
);
return serde.read(byteBuffer, columnConfig);
return serde.read(byteBuffer, columnConfig, smooshedFiles);
}
}

Expand Down
4 changes: 2 additions & 2 deletions processing/src/main/java/io/druid/segment/IndexMergerV9.java
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ private void makeColumn(
);
try {
channel.write(ByteBuffer.wrap(specBytes));
serdeficator.write(channel);
serdeficator.write(channel, v9Smoosher);
}
finally {
channel.close();
Expand Down Expand Up @@ -530,7 +530,7 @@ private ArrayList<GenericColumnSerializer> setupMetricsWriters(
if (serde == null) {
throw new ISE("Unknown type[%s]", typeName);
}
writer = ComplexColumnSerializer.create(ioPeon, metric, serde);
writer = serde.getSerializer(ioPeon, metric);
break;
default:
throw new ISE("Unknown type[%s]", type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@

package io.druid.segment;

import com.metamx.common.IAE;
import com.metamx.common.guava.MergeIterator;
import io.druid.java.util.common.IAE;
import it.unimi.dsi.fastutil.ints.AbstractIntIterator;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntIterator;
Expand Down Expand Up @@ -55,7 +54,8 @@ public static int skip(IntIterator it, int n)
* It isn't checked if the given source iterators are actually ascending, if they are not, the order of values in the
* returned iterator is undefined.
* <p>
* This is similar to what {@link MergeIterator} does with simple {@link java.util.Iterator}s.
* This is similar to what {@link io.druid.java.util.common.guava.MergeIterator} does with simple
* {@link java.util.Iterator}s.
*
* @param iterators iterators to merge, must return ascending values
*/
Expand All @@ -71,7 +71,8 @@ public static IntIterator mergeAscending(List<IntIterator> iterators)
}

/**
* This class is designed mostly after {@link MergeIterator}. {@code MergeIterator} uses a priority queue of wrapper
* This class is designed mostly after {@link io.druid.java.util.common.guava.MergeIterator}.
* {@code MergeIterator} uses a priority queue of wrapper
* "peeking" iterators. Peeking wrappers are not available in fastutil for specialized iterators like IntIterator, so
* they should be implemented manually in the druid codebase. Instead, another approach is taken: a priority queue
* of primitive long values is used, in long values the high 32-bits is the last value from some iterator, and the low
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.druid.segment;

import io.druid.java.util.common.io.smoosh.FileSmoosher;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressionFactory;
import io.druid.segment.data.IOPeon;
Expand Down Expand Up @@ -95,9 +96,9 @@ public long getSerializedSize()
}

@Override
public void writeToChannel(WritableByteChannel channel) throws IOException
public void writeToChannel(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
{
writer.writeToChannel(channel);
writer.writeToChannel(channel, smoosher);
}

}
Loading

0 comments on commit 797488a

Please sign in to comment.