Skip to content

Commit

Permalink
PARQUET-2438: Fixes minMaxSize for BinaryColumnIndexBuilder (apache#1279
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ConeyLiu authored Feb 27, 2024
1 parent 274dc51 commit 87ee1b3
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,11 @@ int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int ind
int sizeOf(Object value) {
return ((Binary) value).length();
}

@Override
public long getMinMaxSize() {
long minSizesSum = minValues.stream().mapToLong(Binary::length).sum();
long maxSizesSum = maxValues.stream().mapToLong(Binary::length).sum();
return minSizesSum + maxSizesSum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,9 @@ int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int ind
int sizeOf(Object value) {
return 1;
}

@Override
public long getMinMaxSize() {
return minValues.size() + maxValues.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -446,12 +446,16 @@ int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int ind
int sizeOf(Object value) {
return 0;
}

@Override
public long getMinMaxSize() {
return 0;
}
};

private PrimitiveType type;
private final BooleanList nullPages = new BooleanArrayList();
private final LongList nullCounts = new LongArrayList();
private long minMaxSize;
private final IntList pageIndexes = new IntArrayList();
private int nextPageIndex;

Expand Down Expand Up @@ -537,8 +541,6 @@ public void add(Statistics<?> stats) {
Object max = stats.genericGetMax();
addMinMax(min, max);
pageIndexes.add(nextPageIndex);
minMaxSize += sizeOf(min);
minMaxSize += sizeOf(max);
} else {
nullPages.add(true);
}
Expand Down Expand Up @@ -576,8 +578,6 @@ private void fill(
ByteBuffer max = maxValues.get(i);
addMinMaxFromBytes(min, max);
pageIndexes.add(i);
minMaxSize += min.remaining();
minMaxSize += max.remaining();
}
}
}
Expand Down Expand Up @@ -651,7 +651,6 @@ private void clear() {
nullPages.clear();
nullCounts.clear();
clearMinMax();
minMaxSize = 0;
nextPageIndex = 0;
pageIndexes.clear();
}
Expand All @@ -673,6 +672,6 @@ public int getPageCount() {
* @return the sum of size in bytes of the min/max values added so far to this builder
*/
public long getMinMaxSize() {
return minMaxSize;
throw new UnsupportedOperationException("Not implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,9 @@ int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int ind
int sizeOf(Object value) {
return Double.BYTES;
}

@Override
public long getMinMaxSize() {
return (long) minValues.size() * Double.BYTES + (long) maxValues.size() * Double.BYTES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,9 @@ int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int ind
int sizeOf(Object value) {
return Float.BYTES;
}

@Override
public long getMinMaxSize() {
return (long) minValues.size() * Float.BYTES + (long) maxValues.size() * Float.BYTES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,9 @@ int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int ind
int sizeOf(Object value) {
return Integer.BYTES;
}

@Override
public long getMinMaxSize() {
return (long) minValues.size() * Integer.BYTES + (long) maxValues.size() * Integer.BYTES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,9 @@ int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int ind
int sizeOf(Object value) {
return Long.BYTES;
}

@Override
public long getMinMaxSize() {
return (long) minValues.size() * Long.BYTES + (long) maxValues.size() * Long.BYTES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,53 @@ public void testBuildBinaryUtf8() {
columnIndex, invert(userDefined(col, BinaryUtf8StartsWithB.class)), 0, 1, 2, 3, 4, 5, 6, 7);
}

@Test
public void testBinaryWithTruncate() {
PrimitiveType type = Types.required(BINARY).as(UTF8).named("test_binary_utf8");
int truncateLen = 5;
ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, truncateLen);
assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class));
assertNull(builder.build());

StatsBuilder sb = new StatsBuilder();
builder.add(sb.stats(type, null, null));
builder.add(sb.stats(type, stringBinary("Jeltz"), stringBinary("Slartibartfast"), null, null));
builder.add(sb.stats(type, null, null, null, null, null));
builder.add(sb.stats(type, null, null));
builder.add(sb.stats(type, stringBinary("Beeblebrox"), stringBinary("Prefect")));
builder.add(sb.stats(type, stringBinary("Dent"), stringBinary("Trilian"), null));
builder.add(sb.stats(type, stringBinary("Beeblebrox")));
builder.add(sb.stats(type, null, null));
assertEquals(8, builder.getPageCount());
assertEquals(39, builder.getMinMaxSize());
ColumnIndex columnIndex = builder.build();
assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
assertCorrectNullCounts(columnIndex, 2, 2, 5, 2, 0, 1, 0, 2);
assertCorrectNullPages(columnIndex, true, false, true, true, false, false, false, true);

BinaryTruncator truncator = BinaryTruncator.getTruncator(type);
assertCorrectValues(
columnIndex.getMaxValues(),
null,
truncator.truncateMax(stringBinary("Slartibartfast"), truncateLen),
null,
null,
truncator.truncateMax(stringBinary("Prefect"), truncateLen),
truncator.truncateMax(stringBinary("Trilian"), truncateLen),
truncator.truncateMax(stringBinary("Beeblebrox"), truncateLen),
null);
assertCorrectValues(
columnIndex.getMinValues(),
null,
truncator.truncateMin(stringBinary("Jeltz"), truncateLen),
null,
null,
truncator.truncateMin(stringBinary("Beeblebrox"), truncateLen),
truncator.truncateMin(stringBinary("Dent"), truncateLen),
truncator.truncateMin(stringBinary("Beeblebrox"), truncateLen),
null);
}

@Test
public void testStaticBuildBinary() {
ColumnIndex columnIndex = ColumnIndexBuilder.build(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ private ParquetFileWriter(
* @param file the file to write to
* @param rowAndBlockSize the row group size
* @param maxPaddingSize the maximum padding
* @param columnIndexTruncateLength the length which the min/max values in column indexes tried to be truncated to
* @param allocator allocator to potentially allocate {@link java.nio.ByteBuffer} objects
* @throws IOException if the file can not be created
*/
Expand All @@ -533,15 +534,15 @@ private ParquetFileWriter(
Path file,
long rowAndBlockSize,
int maxPaddingSize,
int columnIndexTruncateLength,
ByteBufferAllocator allocator)
throws IOException {
FileSystem fs = file.getFileSystem(configuration);
this.schema = schema;
this.alignment = PaddingAlignment.get(rowAndBlockSize, rowAndBlockSize, maxPaddingSize);
this.out = HadoopStreams.wrap(fs.create(file, true, 8192, fs.getDefaultReplication(file), rowAndBlockSize));
this.encodingStatsBuilder = new EncodingStats.Builder();
// no truncation is needed for testing
this.columnIndexTruncateLength = Integer.MAX_VALUE;
this.columnIndexTruncateLength = columnIndexTruncateLength;
this.pageWriteChecksumEnabled = ParquetOutputFormat.getPageWriteChecksumEnabled(configuration);
this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
this.crcAllocator = pageWriteChecksumEnabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ private Path writeSimpleParquetFile(
path,
ParquetWriter.DEFAULT_BLOCK_SIZE,
ParquetWriter.MAX_PADDING_SIZE_DEFAULT,
Integer.MAX_VALUE,
allocator);

writer.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -97,6 +98,7 @@
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.hadoop.util.HiddenFileFilter;
import org.apache.parquet.internal.column.columnindex.BinaryTruncator;
import org.apache.parquet.internal.column.columnindex.BoundaryOrder;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
Expand Down Expand Up @@ -180,7 +182,7 @@ private ParquetFileWriter createWriter(

private ParquetFileWriter createWriter(
Configuration conf, MessageType schema, Path path, long blockSize, int maxPaddingSize) throws IOException {
return new ParquetFileWriter(conf, schema, path, blockSize, maxPaddingSize, allocator);
return new ParquetFileWriter(conf, schema, path, blockSize, maxPaddingSize, Integer.MAX_VALUE, allocator);
}

@Test
Expand Down Expand Up @@ -1206,13 +1208,27 @@ public void testWriteMetadataFileWithRelativeOutputPath() throws IOException {

@Test
public void testColumnIndexWriteRead() throws Exception {
// Don't truncate
testColumnIndexWriteRead(Integer.MAX_VALUE);
// Truncate to DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH
testColumnIndexWriteRead(ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
}

private void testColumnIndexWriteRead(int columnIndexTruncateLen) throws Exception {
File testFile = temp.newFile();
testFile.delete();

Path path = new Path(testFile.toURI());
Configuration configuration = new Configuration();

ParquetFileWriter w = createWriter(configuration, SCHEMA, path);
ParquetFileWriter w = new ParquetFileWriter(
configuration,
SCHEMA,
path,
DEFAULT_BLOCK_SIZE,
MAX_PADDING_SIZE_DEFAULT,
columnIndexTruncateLen,
allocator);
w.start();
w.startBlock(4);
w.startColumn(C1, 7, CODEC);
Expand Down Expand Up @@ -1336,8 +1352,36 @@ public void testColumnIndexWriteRead() throws Exception {
assertEquals(1, offsetIndex.getFirstRowIndex(1));
assertEquals(3, offsetIndex.getFirstRowIndex(2));

assertNull(reader.readColumnIndex(
footer.getBlocks().get(2).getColumns().get(0)));
if (columnIndexTruncateLen == Integer.MAX_VALUE) {
assertNull(reader.readColumnIndex(
footer.getBlocks().get(2).getColumns().get(0)));
} else {
blockMeta = footer.getBlocks().get(2);
assertNotNull(reader.readColumnIndex(blockMeta.getColumns().get(0)));
columnIndex = reader.readColumnIndex(blockMeta.getColumns().get(0));
assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
assertTrue(Arrays.asList(0l).equals(columnIndex.getNullCounts()));
assertTrue(Arrays.asList(false).equals(columnIndex.getNullPages()));
minValues = columnIndex.getMinValues();
assertEquals(1, minValues.size());
maxValues = columnIndex.getMaxValues();
assertEquals(1, maxValues.size());

BinaryTruncator truncator =
BinaryTruncator.getTruncator(SCHEMA.getType(PATH1).asPrimitiveType());
assertEquals(
new String(new byte[1], StandardCharsets.UTF_8),
new String(minValues.get(0).array(), StandardCharsets.UTF_8));
byte[] truncatedMaxValue = truncator
.truncateMax(
Binary.fromConstantByteArray(new byte[(int) MAX_STATS_SIZE]), columnIndexTruncateLen)
.getBytes();
assertEquals(
new String(truncatedMaxValue, StandardCharsets.UTF_8),
new String(maxValues.get(0).array(), StandardCharsets.UTF_8));

assertNull(reader.readColumnIndex(blockMeta.getColumns().get(1)));
}
}
}

Expand Down

0 comments on commit 87ee1b3

Please sign in to comment.