Skip to content

Commit

Permalink
Merge pull request apache#840 from metamx/powers-of-2-buffers
Browse files Browse the repository at this point in the history
make buffer size a power of 2 and optimize buffer lookup
  • Loading branch information
drcrallen committed Nov 12, 2014
2 parents 60e98c3 + 007e57f commit 581e683
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class CompressedPools
{
private static final Logger log = new Logger(CompressedPools.class);

public static final int BUFFER_SIZE = 0x10000;
private static final StupidPool<ChunkEncoder> chunkEncoderPool = new StupidPool<ChunkEncoder>(
new Supplier<ChunkEncoder>()
{
Expand All @@ -44,7 +45,7 @@ public class CompressedPools
public ChunkEncoder get()
{
log.info("Allocating new chunkEncoder[%,d]", counter.incrementAndGet());
return new ChunkEncoder(0xFFFF);
return new ChunkEncoder(BUFFER_SIZE);
}
}
);
Expand All @@ -63,7 +64,7 @@ public static ResourceHolder<ChunkEncoder> getChunkEncoder()
public byte[] get()
{
log.info("Allocating new outputBytesPool[%,d]", counter.incrementAndGet());
return new byte[0xFFFF];
return new byte[BUFFER_SIZE];
}
}
);
Expand All @@ -82,7 +83,7 @@ public static ResourceHolder<byte[]> getOutputBytes()
public ByteBuffer get()
{
log.info("Allocating new bigEndByteBuf[%,d]", counter.incrementAndGet());
return ByteBuffer.allocateDirect(0xFFFF).order(ByteOrder.BIG_ENDIAN);
return ByteBuffer.allocateDirect(BUFFER_SIZE).order(ByteOrder.BIG_ENDIAN);
}
}
);
Expand All @@ -96,7 +97,7 @@ public ByteBuffer get()
public ByteBuffer get()
{
log.info("Allocating new littleEndByteBuf[%,d]", counter.incrementAndGet());
return ByteBuffer.allocateDirect(0xFFFF).order(ByteOrder.LITTLE_ENDIAN);
return ByteBuffer.allocateDirect(BUFFER_SIZE).order(ByteOrder.LITTLE_ENDIAN);
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.metamx.common.guava.CloseQuietly;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidResourceHolder;
import io.druid.segment.CompressedPools;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -42,7 +43,7 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
{
public static final byte LZF_VERSION = 0x1;
public static final byte version = 0x2;
public static final int MAX_FLOATS_IN_BUFFER = (0xFFFF >> 2);
public static final int MAX_FLOATS_IN_BUFFER = CompressedPools.BUFFER_SIZE / Floats.BYTES;

private final int totalSize;
private final int sizePer;
Expand Down Expand Up @@ -70,87 +71,28 @@ public int size()
@Override
public IndexedFloats get()
{
return new IndexedFloats()
{
int currIndex = -1;
ResourceHolder<FloatBuffer> holder;
FloatBuffer buffer;

@Override
public int size()
{
return totalSize;
}

@Override
public float get(int index)
{
int bufferNum = index / sizePer;
int bufferIndex = index % sizePer;

if (bufferNum != currIndex) {
loadBuffer(bufferNum);
}
final int div = Integer.numberOfTrailingZeros(sizePer);
final int rem = sizePer - 1;
final boolean powerOf2 = sizePer == (1 << div);
if(powerOf2) {
return new CompressedIndexedFloats() {
@Override
public float get(int index)
{
// optimize division and remainder for powers of 2
final int bufferNum = index >> div;

return buffer.get(buffer.position() + bufferIndex);
}

@Override
public void fill(int index, float[] toFill)
{
if (totalSize - index < toFill.length) {
throw new IndexOutOfBoundsException(
String.format(
"Cannot fill array of size[%,d] at index[%,d]. Max size[%,d]", toFill.length, index, totalSize
)
);
}

int bufferNum = index / sizePer;
int bufferIndex = index % sizePer;

int leftToFill = toFill.length;
while (leftToFill > 0) {
if (bufferNum != currIndex) {
loadBuffer(bufferNum);
}

buffer.mark();
buffer.position(buffer.position() + bufferIndex);
final int numToGet = Math.min(buffer.remaining(), leftToFill);
buffer.get(toFill, toFill.length - leftToFill, numToGet);
buffer.reset();
leftToFill -= numToGet;
++bufferNum;
bufferIndex = 0;
final int bufferIndex = index & rem;
return buffer.get(buffer.position() + bufferIndex);
}
}

private void loadBuffer(int bufferNum)
{
CloseQuietly.close(holder);
holder = baseFloatBuffers.get(bufferNum);
buffer = holder.get();
currIndex = bufferNum;
}

@Override
public String toString()
{
return "CompressedFloatsIndexedSupplier_Anonymous{" +
"currIndex=" + currIndex +
", sizePer=" + sizePer +
", numChunks=" + baseFloatBuffers.size() +
", totalSize=" + totalSize +
'}';
}

@Override
public void close() throws IOException
{
Closeables.close(holder, false);
}
};
};
} else {
return new CompressedIndexedFloats();
}
}

public long getSerializedSize()
Expand Down Expand Up @@ -185,11 +127,6 @@ GenericIndexed<ResourceHolder<FloatBuffer>> getBaseFloatBuffers()
return baseFloatBuffers;
}

public static int numFloatsInBuffer(int numFloatsInChunk)
{
return MAX_FLOATS_IN_BUFFER - (MAX_FLOATS_IN_BUFFER % numFloatsInChunk);
}

public static CompressedFloatsIndexedSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order)
{
byte versionFromBuffer = buffer.get();
Expand Down Expand Up @@ -245,7 +182,7 @@ public static CompressedFloatsIndexedSupplier fromFloatBuffer(
)
{
Preconditions.checkArgument(
chunkFactor * Floats.BYTES <= 0xffff, "Chunks must be <= 64k bytes. chunkFactor was[%s]", chunkFactor
chunkFactor <= MAX_FLOATS_IN_BUFFER, "Chunks must be <= 64k bytes. chunkFactor was[%s]", chunkFactor
);

return new CompressedFloatsIndexedSupplier(
Expand Down Expand Up @@ -294,4 +231,85 @@ public void remove()
);
}

private class CompressedIndexedFloats implements IndexedFloats
{
int currIndex = -1;
ResourceHolder<FloatBuffer> holder;
FloatBuffer buffer;

@Override
public int size()
{
return totalSize;
}

@Override
public float get(final int index)
{
// division + remainder is optimized by the compiler so keep those together
final int bufferNum = index / sizePer;
final int bufferIndex = index % sizePer;

if (bufferNum != currIndex) {
loadBuffer(bufferNum);
}
return buffer.get(buffer.position() + bufferIndex);
}

@Override
public void fill(int index, float[] toFill)
{
if (totalSize - index < toFill.length) {
throw new IndexOutOfBoundsException(
String.format(
"Cannot fill array of size[%,d] at index[%,d]. Max size[%,d]", toFill.length, index, totalSize
)
);
}

int bufferNum = index / sizePer;
int bufferIndex = index % sizePer;

int leftToFill = toFill.length;
while (leftToFill > 0) {
if (bufferNum != currIndex) {
loadBuffer(bufferNum);
}

buffer.mark();
buffer.position(buffer.position() + bufferIndex);
final int numToGet = Math.min(buffer.remaining(), leftToFill);
buffer.get(toFill, toFill.length - leftToFill, numToGet);
buffer.reset();
leftToFill -= numToGet;
++bufferNum;
bufferIndex = 0;
}
}

protected void loadBuffer(int bufferNum)
{
CloseQuietly.close(holder);
holder = baseFloatBuffers.get(bufferNum);
buffer = holder.get();
currIndex = bufferNum;
}

@Override
public String toString()
{
return "CompressedFloatsIndexedSupplier_Anonymous{" +
"currIndex=" + currIndex +
", sizePer=" + sizePer +
", numChunks=" + baseFloatBuffers.size() +
", totalSize=" + totalSize +
'}';
}

@Override
public void close() throws IOException
{
Closeables.close(holder, false);
}
}
}
Loading

0 comments on commit 581e683

Please sign in to comment.