Skip to content

Commit

Permalink
Add DWRF stripe cache support to OrcWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
sdruzkin authored and shixuan-fan committed Jun 24, 2021
1 parent b66da38 commit 44b7a7b
Show file tree
Hide file tree
Showing 14 changed files with 489 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public OrcFileWriter(
dwrfWriterEncryption,
dwrfEncryptionProvider,
options,
Optional.empty(),
metadata,
hiveStorageTimeZone,
validationInputFactory.isPresent(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ private static OrcWriter createOrcFileWriter(DataSink sink, List<Type> types)
.withStripeMinSize(new DataSize(64, MEGABYTE))
.withDictionaryMaxMemory(new DataSize(1, MEGABYTE))
.build(),
Optional.empty(),
ImmutableMap.of(),
UTC,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ public PrestoOrcFormatWriter(File targetFile, List<String> columnNames, List<Typ
Optional.empty(),
NO_ENCRYPTION,
OrcWriterOptions.builder().build(),
Optional.empty(),
ImmutableMap.of(),
hiveStorageTimeZone,
false,
Expand Down Expand Up @@ -634,6 +635,7 @@ public PrestoDwrfFormatWriter(File targetFile, List<String> columnNames, List<Ty
Optional.empty(),
NO_ENCRYPTION,
OrcWriterOptions.builder().build(),
Optional.empty(),
ImmutableMap.of(),
hiveStorageTimeZone,
false,
Expand Down
36 changes: 34 additions & 2 deletions presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.facebook.presto.orc.metadata.CompressedMetadataWriter;
import com.facebook.presto.orc.metadata.CompressionKind;
import com.facebook.presto.orc.metadata.DwrfEncryption;
import com.facebook.presto.orc.metadata.DwrfStripeCacheData;
import com.facebook.presto.orc.metadata.DwrfStripeCacheWriter;
import com.facebook.presto.orc.metadata.EncryptionGroup;
import com.facebook.presto.orc.metadata.Footer;
import com.facebook.presto.orc.metadata.Metadata;
Expand Down Expand Up @@ -70,6 +72,7 @@
import static com.facebook.presto.common.io.DataOutput.createDataOutput;
import static com.facebook.presto.orc.DwrfEncryptionInfo.UNENCRYPTED;
import static com.facebook.presto.orc.DwrfEncryptionInfo.createNodeToGroupMap;
import static com.facebook.presto.orc.OrcEncoding.DWRF;
import static com.facebook.presto.orc.OrcReader.validateFile;
import static com.facebook.presto.orc.WriterStats.FlushReason.CLOSED;
import static com.facebook.presto.orc.WriterStats.FlushReason.DICTIONARY_FULL;
Expand Down Expand Up @@ -130,6 +133,7 @@ public class OrcWriter
private final List<OrcType> orcTypes;

private final List<ColumnWriter> columnWriters;
private final Optional<DwrfStripeCacheWriter> dwrfStripeCacheWriter;
private final int dictionaryMaxMemoryBytes;
private final DictionaryCompressionOptimizer dictionaryCompressionOptimizer;
private int stripeRowCount;
Expand All @@ -152,6 +156,7 @@ public OrcWriter(
Optional<DwrfWriterEncryption> encryption,
DwrfEncryptionProvider dwrfEncryptionProvider,
OrcWriterOptions options,
Optional<DwrfWriterOptions> dwrfOptions,
Map<String, String> userMetadata,
DateTimeZone hiveStorageTimeZone,
boolean validate,
Expand Down Expand Up @@ -231,6 +236,18 @@ public OrcWriter(
this.dwrfEncryptionInfo = UNENCRYPTED;
}

// set DwrfStripeCacheWriter for DWRF files if it's enabled through the options
if (orcEncoding == DWRF) {
this.dwrfStripeCacheWriter = dwrfOptions
.filter(DwrfWriterOptions::isStripeCacheEnabled)
.map(dwrfWriterOptions -> new DwrfStripeCacheWriter(
dwrfWriterOptions.getStripeCacheMode(),
dwrfWriterOptions.getStripeCacheMaxSize()));
}
else {
this.dwrfStripeCacheWriter = Optional.empty();
}

// create column writers
OrcType rootType = orcTypes.get(0);
checkArgument(rootType.getFieldCount() == types.size());
Expand Down Expand Up @@ -499,6 +516,10 @@ private List<DataOutput> bufferStripeData(long stripeStartOffset, FlushReason fl
}
}

if (dwrfStripeCacheWriter.isPresent()) {
dwrfStripeCacheWriter.get().addIndexStreams(ImmutableList.copyOf(outputData), indexLength);
}

// data streams (sorted by size)
long dataLength = 0;
List<StreamDataOutput> dataStreams = new ArrayList<>(columnWriters.size() * 2);
Expand Down Expand Up @@ -552,6 +573,7 @@ private List<DataOutput> bufferStripeData(long stripeStartOffset, FlushReason fl
StripeFooter stripeFooter = new StripeFooter(unencryptedStreams, unencryptedColumnEncodings, encryptedGroups);
Slice footer = metadataWriter.writeStripeFooter(stripeFooter);
outputData.add(createDataOutput(footer));
dwrfStripeCacheWriter.ifPresent(stripeCacheWriter -> stripeCacheWriter.addStripeFooter(createDataOutput(footer)));

// create final stripe statistics
StripeStatistics statistics = new StripeStatistics(toDenseList(columnStatistics, orcTypes.size()));
Expand Down Expand Up @@ -665,6 +687,11 @@ private List<DataOutput> bufferFileFooter()
dwrfEncryption = Optional.empty();
}

Optional<DwrfStripeCacheData> dwrfStripeCacheData = dwrfStripeCacheWriter.map(DwrfStripeCacheWriter::getDwrfStripeCacheData);
Slice dwrfStripeCacheSlice = metadataWriter.writeDwrfStripeCache(dwrfStripeCacheData);
outputData.add(createDataOutput(dwrfStripeCacheSlice));

Optional<List<Integer>> dwrfStripeCacheOffsets = dwrfStripeCacheWriter.map(DwrfStripeCacheWriter::getOffsets);
Footer footer = new Footer(
numberOfRows,
rowGroupMaxRowCount,
Expand All @@ -675,7 +702,7 @@ private List<DataOutput> bufferFileFooter()
ImmutableList.copyOf(unencryptedStats),
userMetadata,
dwrfEncryption,
Optional.empty());
dwrfStripeCacheOffsets);

closedStripes.clear();
closedStripesRetainedBytes = 0;
Expand All @@ -684,7 +711,12 @@ private List<DataOutput> bufferFileFooter()
outputData.add(createDataOutput(footerSlice));

recordValidation(validation -> validation.setVersion(metadataWriter.getOrcMetadataVersion()));
Slice postscriptSlice = metadataWriter.writePostscript(footerSlice.length(), metadataSlice.length(), columnWriterOptions.getCompressionKind(), columnWriterOptions.getCompressionMaxBufferSize());
Slice postscriptSlice = metadataWriter.writePostscript(
footerSlice.length(),
metadataSlice.length(),
columnWriterOptions.getCompressionKind(),
columnWriterOptions.getCompressionMaxBufferSize(),
dwrfStripeCacheData);
outputData.add(createDataOutput(postscriptSlice));
outputData.add(createDataOutput(Slices.wrappedBuffer((byte) postscriptSlice.length())));
return outputData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,22 @@ public List<Integer> getOrcMetadataVersion()
return metadataWriter.getOrcMetadataVersion();
}

public Slice writePostscript(int footerLength, int metadataLength, CompressionKind compression, int compressionBlockSize)
public Slice writePostscript(int footerLength, int metadataLength, CompressionKind compression, int compressionBlockSize, Optional<DwrfStripeCacheData> dwrfStripeCacheData)
throws IOException
{
// postscript is not compressed
DynamicSliceOutput output = new DynamicSliceOutput(64);
metadataWriter.writePostscript(output, footerLength, metadataLength, compression, compressionBlockSize, Optional.empty());
metadataWriter.writePostscript(output, footerLength, metadataLength, compression, compressionBlockSize, dwrfStripeCacheData);
return output.slice();
}

public Slice writeDwrfStripeCache(Optional<DwrfStripeCacheData> dwrfStripeCacheData)
throws IOException
{
// DWRF stripe cache is already compressed
int size = dwrfStripeCacheData.map(DwrfStripeCacheData::getDwrfStripeCacheSize).orElse(0);
DynamicSliceOutput output = new DynamicSliceOutput(size);
metadataWriter.writeDwrfStripeCache(output, dwrfStripeCacheData);
return output.slice();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.orc.metadata;

import com.facebook.presto.common.io.DataOutput;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.units.DataSize;

import java.util.ArrayList;
import java.util.List;

import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

/**
* Accumulates data for DWRF stripe cache using given mode and max size.
*/
public class DwrfStripeCacheWriter
{
private final DwrfStripeCacheMode mode;
private final DynamicSliceOutput cache;
private final long maxSize;
private final List<Integer> offsets = new ArrayList<>();

private int size;
private boolean full;

public DwrfStripeCacheWriter(DwrfStripeCacheMode mode, DataSize maxSize)
{
// make sure max size is in the int range
this.maxSize = toIntExact(requireNonNull(maxSize, "maxSize is null").toBytes());
this.mode = requireNonNull(mode, "mode is null");
this.cache = new DynamicSliceOutput(64);
if (mode == DwrfStripeCacheMode.NONE) {
full = true;
}
}

public void addIndexStreams(List<DataOutput> indexStreams, long indexSize)
{
if (full || !mode.hasIndex()) {
return;
}

if (size + indexSize > maxSize) {
full = true;
return;
}

indexStreams.forEach(indexStream -> indexStream.writeData(cache));
incrementSize(indexSize);
}

public void addStripeFooter(DataOutput stripeFooter)
{
if (full || !mode.hasFooter()) {
return;
}

long stripeFooterSize = stripeFooter.size();
if (size + stripeFooterSize > maxSize) {
full = true;
return;
}

stripeFooter.writeData(cache);
incrementSize(stripeFooterSize);
}

private void incrementSize(long sizeIncrement)
{
if (offsets.isEmpty()) {
offsets.add(0);
}
this.size += sizeIncrement;
offsets.add(this.size);
}

public DwrfStripeCacheData getDwrfStripeCacheData()
{
return new DwrfStripeCacheData(cache.slice(), size, mode);
}

public List<Integer> getOffsets()
{
return ImmutableList.copyOf(offsets);
}
}
Loading

0 comments on commit 44b7a7b

Please sign in to comment.