Skip to content

Commit

Permalink
Add Offset.reset() and remove unused Offset implementations (apache#4706
Browse files Browse the repository at this point in the history
)

* Add Offset.reset() and remove unused Offset implementations

* Fix BitmapOffset

* Address comments
  • Loading branch information
leventov authored and drcrallen committed Aug 23, 2017
1 parent 92bcfdc commit 326a85a
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 584 deletions.
80 changes: 39 additions & 41 deletions processing/src/main/java/io/druid/segment/BitmapOffset.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.druid.collections.bitmap.ImmutableBitmap;
import io.druid.collections.bitmap.MutableBitmap;
import io.druid.collections.bitmap.WrappedImmutableRoaringBitmap;
import io.druid.collections.bitmap.WrappedRoaringBitmap;
import io.druid.extendedset.intset.EmptyIntIterator;
import io.druid.java.util.common.RE;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
Expand Down Expand Up @@ -123,12 +122,13 @@ private static String factorizeFullness(long bitmapCardinality, long numRows)
}
}

final IntIterator itr;
final String fullness;
private final String fullness;
private IntIterator iterator;
private final IntIterator iteratorForReset;
private final int valueForReset;
private int value;

int val;

public static IntIterator getReverseBitmapOffsetIterator(ImmutableBitmap bitmapIndex)
static IntIterator getReverseBitmapOffsetIterator(ImmutableBitmap bitmapIndex)
{
ImmutableBitmap roaringBitmap = bitmapIndex;
if (!(bitmapIndex instanceof WrappedImmutableRoaringBitmap)) {
Expand All @@ -144,20 +144,18 @@ public static IntIterator getReverseBitmapOffsetIterator(ImmutableBitmap bitmapI

public static BitmapOffset of(ImmutableBitmap bitmapIndex, boolean descending, long numRows)
{
if (bitmapIndex instanceof WrappedImmutableRoaringBitmap ||
bitmapIndex instanceof WrappedRoaringBitmap ||
descending) {
return new RoaringBitmapOffset(bitmapIndex, descending, numRows);
} else {
return new BitmapOffset(bitmapIndex, descending, numRows);
}
return new BitmapOffset(bitmapIndex, descending, numRows);
}

private BitmapOffset(ImmutableBitmap bitmapIndex, boolean descending, long numRows)
{
this.itr = newIterator(bitmapIndex, descending);
this.fullness = factorizeFullness(bitmapIndex.size(), numRows);
this.iterator = newIterator(bitmapIndex, descending);
increment();
// It's important to set iteratorForReset and valueForReset after calling increment(), because only after that the
// iterator and the value are in proper initial state.
this.iteratorForReset = safeClone(iterator);
this.valueForReset = value;
}

private IntIterator newIterator(ImmutableBitmap bitmapIndex, boolean descending)
Expand All @@ -169,65 +167,65 @@ private IntIterator newIterator(ImmutableBitmap bitmapIndex, boolean descending)
}
}

private BitmapOffset(String fullness, IntIterator itr, int val)
/**
* Constructor for {@link #clone()}.
*/
private BitmapOffset(String fullness, IntIterator iterator, int value)
{
this.fullness = fullness;
this.itr = itr;
this.val = val;
this.iterator = iterator;
this.iteratorForReset = safeClone(iterator);
this.valueForReset = value;
this.value = value;
}

@Override
public void increment()
{
if (itr.hasNext()) {
val = itr.next();
if (iterator.hasNext()) {
value = iterator.next();
} else {
val = INVALID_VALUE;
value = INVALID_VALUE;
}
}

@Override
public boolean withinBounds()
{
return val > INVALID_VALUE;
return value > INVALID_VALUE;
}

@Override
public void reset()
{
iterator = safeClone(iteratorForReset);
value = valueForReset;
}

@SuppressWarnings("MethodDoesntCallSuperMethod")
@Override
public Offset clone()
{
return new BitmapOffset(fullness, itr.clone(), val);
return new BitmapOffset(fullness, safeClone(iterator), value);
}

@Override
public int getOffset()
{
return val;
return value;
}

@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("itr", itr);
inspector.visit("iterator", iterator);
inspector.visit("fullness", fullness);
}

public static class RoaringBitmapOffset extends BitmapOffset
private static IntIterator safeClone(IntIterator iterator)
{

public RoaringBitmapOffset(ImmutableBitmap bitmapIndex, boolean descending, long numRows)
{
super(bitmapIndex, descending, numRows);
}

RoaringBitmapOffset(String fullness, IntIterator itr, int val)
{
super(fullness, itr, val);
}

@Override
public Offset clone()
{
return new RoaringBitmapOffset(fullness, itr.hasNext() ? itr.clone() : EmptyIntIterator.instance(), val);
}
// Calling clone() on empty iterators from RoaringBitmap library sometimes fails with NPE,
// see https://github.com/druid-io/druid/issues/4709, https://github.com/RoaringBitmap/RoaringBitmap/issues/177
return iterator.hasNext() ? iterator.clone() : EmptyIntIterator.instance();
}
}
6 changes: 6 additions & 0 deletions processing/src/main/java/io/druid/segment/FilteredOffset.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ public boolean withinBounds()
return baseOffset.withinBounds();
}

@Override
public void reset()
{
baseOffset.reset();
}

@Override
public Offset clone()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,62 +17,65 @@
* under the License.
*/

package io.druid.segment.data;
package io.druid.segment;

import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.data.Offset;

/**
*/
public class ArrayBasedOffset extends Offset
public class NoFilterOffset extends Offset
{
private final int[] ints;
private int currIndex;
private final int rowCount;
private final boolean descending;
private final int initialOffset;
private int currentOffset;

public ArrayBasedOffset(
int[] ints
)
NoFilterOffset(int initialOffset, int rowCount, boolean descending)
{
this(ints, 0);
this.initialOffset = initialOffset;
this.currentOffset = initialOffset;
this.rowCount = rowCount;
this.descending = descending;
}

public ArrayBasedOffset(
int[] ints,
int startIndex
)
@Override
public void increment()
{
this.ints = ints;
this.currIndex = startIndex;
currentOffset++;
}

@Override
public int getOffset()
public boolean withinBounds()
{
return ints[currIndex];
return currentOffset < rowCount;
}

@Override
public void increment()
public void reset()
{
++currIndex;
currentOffset = initialOffset;
}

@Override
public boolean withinBounds()
public Offset clone()
{
return currIndex < ints.length;
return new NoFilterOffset(currentOffset, rowCount, descending);
}

@Override
public Offset clone()
public int getOffset()
{
return descending ? rowCount - currentOffset - 1 : currentOffset;
}

@Override
public String toString()
{
final ArrayBasedOffset retVal = new ArrayBasedOffset(ints);
retVal.currIndex = currIndex;
return retVal;
return currentOffset + "/" + rowCount + (descending ? "(DSC)" : "");
}

@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
inspector.visit("descending", descending);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,12 @@ public boolean withinBounds()
return timeInRange(timestamps.getLongSingleValueRow(baseOffset.getOffset()));
}

@Override
public void reset()
{
baseOffset.reset();
}

protected abstract boolean timeInRange(long current);

@Override
Expand Down Expand Up @@ -1054,56 +1060,6 @@ public Offset clone()
}
}

public static class NoFilterOffset extends Offset
{
private final int rowCount;
private final boolean descending;
private int currentOffset;

NoFilterOffset(int currentOffset, int rowCount, boolean descending)
{
this.currentOffset = currentOffset;
this.rowCount = rowCount;
this.descending = descending;
}

@Override
public void increment()
{
currentOffset++;
}

@Override
public boolean withinBounds()
{
return currentOffset < rowCount;
}

@Override
public Offset clone()
{
return new NoFilterOffset(currentOffset, rowCount, descending);
}

@Override
public int getOffset()
{
return descending ? rowCount - currentOffset - 1 : currentOffset;
}

@Override
public String toString()
{
return currentOffset + "/" + rowCount + (descending ? "(DSC)" : "");
}

@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("descending", descending);
}
}

@Override
public Metadata getMetadata()
{
Expand Down
Loading

0 comments on commit 326a85a

Please sign in to comment.