Skip to content

Commit

Permalink
Optimize ByteOutputStream used for IN_MAP and PRESENT streams
Browse files Browse the repository at this point in the history
  • Loading branch information
sdruzkin authored and ARUNACHALAM THIRUPATHI committed Jun 20, 2022
1 parent 6b62e5e commit 2f88aa7
Show file tree
Hide file tree
Showing 5 changed files with 574 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;

public class BooleanOutputStream
implements ValueOutputStream<BooleanStreamCheckpoint>
Expand Down Expand Up @@ -72,32 +71,29 @@ public void writeBoolean(boolean value)

public void writeBooleans(int count, boolean value)
{
checkArgument(count >= 0, "count is negative");
if (count == 0) {
return;
}

checkArgument(count >= 0, "count is negative");

if (bitsInData != 0) {
int bitsToWrite = Math.min(count, 8 - bitsInData);
if (value) {
data |= getLowBitMask(bitsToWrite) << (8 - bitsInData - bitsToWrite);
}

bitsInData += bitsToWrite;
count -= bitsToWrite;
if (bitsInData == 8) {
flushData();
}
else {
if (bitsInData != 8) {
// there were not enough bits to fill the current data
verify(count == 0);
return;
}

count -= bitsToWrite;
flushData();
}

// at this point there should be no pending data
verify(bitsInData == 0);

// write 8 bits at a time
while (count >= 8) {
if (value) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.orc.stream;

import com.facebook.presto.orc.ColumnWriterOptions;
import com.facebook.presto.orc.DwrfDataEncryptor;
import com.facebook.presto.orc.OrcOutputBuffer;
import com.facebook.presto.orc.checkpoint.BooleanStreamCheckpoint;
import com.facebook.presto.orc.checkpoint.ByteStreamCheckpoint;
import com.facebook.presto.orc.metadata.Stream.StreamKind;
import com.google.common.collect.ImmutableList;
import org.openjdk.jol.info.ClassLayout;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;

public class BooleanOutputStreamOld
implements ValueOutputStream<BooleanStreamCheckpoint>
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(BooleanOutputStreamOld.class).instanceSize();
private final ByteOutputStreamOld byteOutputStream;
private final List<Integer> checkpointBitOffsets = new ArrayList<>();

private int bitsInData;
private int data;
private boolean closed;

public BooleanOutputStreamOld(ColumnWriterOptions columnWriterOptions, Optional<DwrfDataEncryptor> dwrfEncryptor)
{
this(new ByteOutputStreamOld(columnWriterOptions, dwrfEncryptor));
}

public BooleanOutputStreamOld(OrcOutputBuffer buffer)
{
this(new ByteOutputStreamOld(buffer));
}

public BooleanOutputStreamOld(ByteOutputStreamOld byteOutputStream)
{
this.byteOutputStream = byteOutputStream;
}

public void writeBoolean(boolean value)
{
checkState(!closed);

if (value) {
data |= 0x1 << (7 - bitsInData);
}
bitsInData++;

if (bitsInData == 8) {
flushData();
}
}

public void writeBooleans(int count, boolean value)
{
checkArgument(count >= 0, "count is negative");
if (count == 0) {
return;
}

if (bitsInData != 0) {
int bitsToWrite = Math.min(count, 8 - bitsInData);
if (value) {
data |= getLowBitMask(bitsToWrite) << (8 - bitsInData - bitsToWrite);
}

bitsInData += bitsToWrite;
count -= bitsToWrite;
if (bitsInData == 8) {
flushData();
}
else {
// there were not enough bits to fill the current data
verify(count == 0);
return;
}
}

// at this point there should be no pending data
verify(bitsInData == 0);

// write 8 bits at a time
while (count >= 8) {
if (value) {
byteOutputStream.writeByte((byte) 0b1111_1111);
}
else {
byteOutputStream.writeByte((byte) 0b0000_0000);
}
count -= 8;
}

// buffer remaining bits
if (count > 0) {
if (value) {
data = getLowBitMask(count) << (8 - count);
}
bitsInData = count;
}
}

private void flushData()
{
byteOutputStream.writeByte((byte) data);
data = 0;
bitsInData = 0;
}

@Override
public void recordCheckpoint()
{
checkState(!closed);
byteOutputStream.recordCheckpoint();
checkpointBitOffsets.add(bitsInData);
}

@Override
public void close()
{
closed = true;
if (bitsInData > 0) {
flushData();
}
byteOutputStream.close();
}

@Override
public List<BooleanStreamCheckpoint> getCheckpoints()
{
checkState(closed);
ImmutableList.Builder<BooleanStreamCheckpoint> booleanStreamCheckpoint = ImmutableList.builder();
List<ByteStreamCheckpoint> byteStreamCheckpoints = byteOutputStream.getCheckpoints();
for (int groupId = 0; groupId < checkpointBitOffsets.size(); groupId++) {
int checkpointBitOffset = checkpointBitOffsets.get(groupId);
ByteStreamCheckpoint byteStreamCheckpoint = byteStreamCheckpoints.get(groupId);
booleanStreamCheckpoint.add(new BooleanStreamCheckpoint(checkpointBitOffset, byteStreamCheckpoint));
}
return booleanStreamCheckpoint.build();
}

@Override
public StreamDataOutput getStreamDataOutput(int column, int sequence)
{
checkState(closed);
return byteOutputStream.getStreamDataOutput(column, sequence);
}

public StreamDataOutput getStreamDataOutput(int column, int sequence, StreamKind streamKind)
{
checkState(closed);
return byteOutputStream.getStreamDataOutput(column, sequence, streamKind);
}

@Override
public long getBufferedBytes()
{
return byteOutputStream.getBufferedBytes();
}

@Override
public long getRetainedBytes()
{
// NOTE: we do not include checkpoints because they should be small and it would be annoying to calculate the size
return INSTANCE_SIZE + byteOutputStream.getRetainedBytes();
}

@Override
public void reset()
{
data = 0;
bitsInData = 0;

closed = false;
byteOutputStream.reset();
checkpointBitOffsets.clear();
}

private static int getLowBitMask(int bits)
{
return (0x1 << bits) - 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ public class ByteOutputStream
private static final int INSTANCE_SIZE = ClassLayout.parseClass(ByteOutputStream.class).instanceSize();

private static final int MIN_REPEAT_SIZE = 3;
// A value out side of the range of a signed byte
// A value outside of the range of a signed byte
private static final int UNMATCHABLE_VALUE = Integer.MAX_VALUE;
private static final int SEQUENCE_BUFFER_SIZE = 128;

private final OrcOutputBuffer buffer;
private final List<ByteStreamCheckpoint> checkpoints = new ArrayList<>();

private final byte[] sequenceBuffer = new byte[128];
private final byte[] sequenceBuffer = new byte[SEQUENCE_BUFFER_SIZE];
private int size;

private int runCount;
Expand All @@ -66,7 +67,7 @@ public void writeByte(byte value)
checkState(!closed);

// flush if buffer is full
if (size == sequenceBuffer.length) {
if (size == SEQUENCE_BUFFER_SIZE) {
flushSequence();
}

Expand Down Expand Up @@ -98,7 +99,7 @@ public void writeByte(byte value)
runCount = MIN_REPEAT_SIZE;
size = MIN_REPEAT_SIZE;

// note there is no reason to add the run values to the buffer since is is not used
// note there is no reason to add the run values to the buffer since it is not used
// when in a run length sequence
}

Expand All @@ -107,18 +108,17 @@ public void writeByte(byte value)

private void flushSequence()
{
if (size == 0) {
return;
}

if (runCount >= MIN_REPEAT_SIZE) {
buffer.writeByte(runCount - MIN_REPEAT_SIZE);
buffer.writeByte(lastValue);
}
else {
buffer.writeByte(-size);
for (int i = 0; i < size; i++) {
buffer.writeByte(sequenceBuffer[i]);
if (size == 1) {
buffer.writeByte(sequenceBuffer[0]);
}
else {
buffer.writeBytes(sequenceBuffer, 0, size);
}
}

Expand All @@ -138,7 +138,9 @@ public void recordCheckpoint()
public void close()
{
closed = true;
flushSequence();
if (size != 0) {
flushSequence();
}
buffer.close();
}

Expand Down
Loading

0 comments on commit 2f88aa7

Please sign in to comment.