Skip to content

Commit

Permalink
Expose ORC writer compression level as session property
Browse files Browse the repository at this point in the history
  • Loading branch information
sdruzkin authored and ARUNACHALAM THIRUPATHI committed Mar 10, 2022
1 parent 028ef8e commit 1e64daf
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.ThreadLocalRandom;

import static com.facebook.presto.common.type.DoubleType.DOUBLE;
Expand All @@ -35,6 +36,7 @@
import static com.facebook.presto.hive.HiveSessionProperties.InsertExistingPartitionsBehavior.APPEND;
import static com.facebook.presto.hive.HiveSessionProperties.InsertExistingPartitionsBehavior.ERROR;
import static com.facebook.presto.hive.HiveSessionProperties.InsertExistingPartitionsBehavior.OVERWRITE;
import static com.facebook.presto.hive.OrcFileWriterConfig.DEFAULT_COMPRESSION_LEVEL;
import static com.facebook.presto.hive.metastore.MetastoreUtil.METASTORE_HEADERS;
import static com.facebook.presto.hive.metastore.MetastoreUtil.USER_DEFINED_TYPE_ENCODING_ENABLED;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
Expand Down Expand Up @@ -69,6 +71,7 @@ public final class HiveSessionProperties
private static final String ORC_OPTIMIZED_WRITER_MAX_STRIPE_SIZE = "orc_optimized_writer_max_stripe_size";
private static final String ORC_OPTIMIZED_WRITER_MAX_STRIPE_ROWS = "orc_optimized_writer_max_stripe_rows";
private static final String ORC_OPTIMIZED_WRITER_MAX_DICTIONARY_MEMORY = "orc_optimized_writer_max_dictionary_memory";
private static final String ORC_OPTIMIZED_WRITER_COMPRESSION_LEVEL = "orc_optimized_writer_compression_level";
private static final String PAGEFILE_WRITER_MAX_STRIPE_SIZE = "pagefile_writer_max_stripe_size";
public static final String HIVE_STORAGE_FORMAT = "hive_storage_format";
private static final String COMPRESSION_CODEC = "compression_codec";
Expand Down Expand Up @@ -288,6 +291,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
"Experimental: ORC: Max dictionary memory",
orcFileWriterConfig.getDictionaryMaxMemory(),
false),
integerProperty(
ORC_OPTIMIZED_WRITER_COMPRESSION_LEVEL,
"Experimental: ORC: Compression level, works only for ZSTD and ZLIB compression kinds",
orcFileWriterConfig.getCompressionLevel(),
false),
dataSizeSessionProperty(
PAGEFILE_WRITER_MAX_STRIPE_SIZE,
"PAGEFILE: Max stripe size",
Expand Down Expand Up @@ -811,6 +819,15 @@ public static DataSize getOrcOptimizedWriterMaxDictionaryMemory(ConnectorSession
return session.getProperty(ORC_OPTIMIZED_WRITER_MAX_DICTIONARY_MEMORY, DataSize.class);
}

public static OptionalInt getCompressionLevel(ConnectorSession session)
{
int value = session.getProperty(ORC_OPTIMIZED_WRITER_COMPRESSION_LEVEL, Integer.class);
if (value != DEFAULT_COMPRESSION_LEVEL) {
return OptionalInt.of(value);
}
return OptionalInt.empty();
}

public static DataSize getPageFileStripeMaxSize(ConnectorSession session)
{
return session.getProperty(PAGEFILE_WRITER_MAX_STRIPE_SIZE, DataSize.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import javax.validation.constraints.NotNull;

import java.util.OptionalInt;

import static com.facebook.presto.hive.OrcFileWriterConfig.StreamLayoutType.BY_COLUMN_SIZE;

@SuppressWarnings("unused")
Expand All @@ -33,6 +35,8 @@ public enum StreamLayoutType
BY_COLUMN_SIZE,
}

public static final int DEFAULT_COMPRESSION_LEVEL = Integer.MIN_VALUE;

private DataSize stripeMinSize = DefaultOrcWriterFlushPolicy.DEFAULT_STRIPE_MIN_SIZE;
private DataSize stripeMaxSize = DefaultOrcWriterFlushPolicy.DEFAULT_STRIPE_MAX_SIZE;
private int stripeMaxRowCount = DefaultOrcWriterFlushPolicy.DEFAULT_STRIPE_MAX_ROW_COUNT;
Expand All @@ -44,6 +48,7 @@ public enum StreamLayoutType
private boolean isDwrfStripeCacheEnabled;
private DataSize dwrfStripeCacheMaxSize = OrcWriterOptions.DEFAULT_DWRF_STRIPE_CACHE_MAX_SIZE;
private DwrfStripeCacheMode dwrfStripeCacheMode = OrcWriterOptions.DEFAULT_DWRF_STRIPE_CACHE_MODE;
private int compressionLevel = DEFAULT_COMPRESSION_LEVEL;

public OrcWriterOptions.Builder toOrcWriterOptionsBuilder()
{
Expand All @@ -53,6 +58,11 @@ public OrcWriterOptions.Builder toOrcWriterOptionsBuilder()
.withStripeMaxRowCount(stripeMaxRowCount)
.build();

OptionalInt resolvedCompressionLevel = OptionalInt.empty();
if (compressionLevel != DEFAULT_COMPRESSION_LEVEL) {
resolvedCompressionLevel = OptionalInt.of(compressionLevel);
}

// Give separate copy to callers for isolation.
return OrcWriterOptions.builder()
.withFlushPolicy(flushPolicy)
Expand All @@ -63,7 +73,8 @@ public OrcWriterOptions.Builder toOrcWriterOptionsBuilder()
.withStreamLayoutFactory(getStreamLayoutFactory(streamLayoutType))
.withDwrfStripeCacheEnabled(isDwrfStripeCacheEnabled)
.withDwrfStripeCacheMaxSize(dwrfStripeCacheMaxSize)
.withDwrfStripeCacheMode(dwrfStripeCacheMode);
.withDwrfStripeCacheMode(dwrfStripeCacheMode)
.withCompressionLevel(resolvedCompressionLevel);
}

@NotNull
Expand Down Expand Up @@ -129,6 +140,18 @@ public OrcFileWriterConfig setDictionaryMaxMemory(DataSize dictionaryMaxMemory)
return this;
}

public int getCompressionLevel()
{
return compressionLevel;
}

@Config("hive.orc.writer.compression-level")
public OrcFileWriterConfig setCompressionLevel(int compressionLevel)
{
this.compressionLevel = compressionLevel;
return this;
}

@NotNull
public DataSize getStringStatisticsLimit()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_WRITER_OPEN_ERROR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_WRITE_VALIDATION_FAILED;
import static com.facebook.presto.hive.HiveSessionProperties.getCompressionLevel;
import static com.facebook.presto.hive.HiveSessionProperties.getDwrfWriterStripeCacheMaxSize;
import static com.facebook.presto.hive.HiveSessionProperties.getOrcMaxBufferSize;
import static com.facebook.presto.hive.HiveSessionProperties.getOrcMaxMergeDistance;
Expand Down Expand Up @@ -235,6 +236,7 @@ else if (com.facebook.hive.orc.OrcOutputFormat.class.getName().equals(storageFor
.withIgnoreDictionaryRowGroupSizes(isExecutionBasedMemoryAccountingEnabled(session))
.withDwrfStripeCacheEnabled(isDwrfWriterStripeCacheEnabled(session))
.withDwrfStripeCacheMaxSize(getDwrfWriterStripeCacheMaxSize(session))
.withCompressionLevel(getCompressionLevel(session))
.build(),
fileInputColumnIndexes,
ImmutableMap.<String, String>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;

import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
Expand Down Expand Up @@ -58,7 +59,8 @@ public void testDefaults()
.setStreamLayoutType(BY_COLUMN_SIZE)
.setDwrfStripeCacheEnabled(false)
.setDwrfStripeCacheMaxSize(new DataSize(8, MEGABYTE))
.setDwrfStripeCacheMode(INDEX_AND_FOOTER));
.setDwrfStripeCacheMode(INDEX_AND_FOOTER)
.setCompressionLevel(Integer.MIN_VALUE));
}

@Test
Expand All @@ -76,6 +78,7 @@ public void testExplicitPropertyMappings()
.put("hive.orc.writer.dwrf-stripe-cache-enabled", "true")
.put("hive.orc.writer.dwrf-stripe-cache-max-size", "10MB")
.put("hive.orc.writer.dwrf-stripe-cache-mode", "FOOTER")
.put("hive.orc.writer.compression-level", "5")
.build();

OrcFileWriterConfig expected = new OrcFileWriterConfig()
Expand All @@ -89,7 +92,8 @@ public void testExplicitPropertyMappings()
.setStreamLayoutType(BY_STREAM_SIZE)
.setDwrfStripeCacheEnabled(true)
.setDwrfStripeCacheMaxSize(new DataSize(10, MEGABYTE))
.setDwrfStripeCacheMode(FOOTER);
.setDwrfStripeCacheMode(FOOTER)
.setCompressionLevel(5);

assertFullMapping(properties, expected);
}
Expand All @@ -115,6 +119,7 @@ public void testOrcWriterOptionsBuilder()
StreamLayoutType streamLayoutType = BY_STREAM_SIZE;
DataSize dwrfStripeCacheMaxSize = new DataSize(4, MEGABYTE);
DwrfStripeCacheMode dwrfStripeCacheMode = INDEX;
int compressionLevel = 5;

OrcFileWriterConfig config = new OrcFileWriterConfig()
.setStripeMinSize(stripeMinSize)
Expand All @@ -127,7 +132,8 @@ public void testOrcWriterOptionsBuilder()
.setStreamLayoutType(streamLayoutType)
.setDwrfStripeCacheEnabled(false)
.setDwrfStripeCacheMaxSize(dwrfStripeCacheMaxSize)
.setDwrfStripeCacheMode(dwrfStripeCacheMode);
.setDwrfStripeCacheMode(dwrfStripeCacheMode)
.setCompressionLevel(5);

assertEquals(stripeMinSize, config.getStripeMinSize());
assertEquals(stripeMaxSize, config.getStripeMaxSize());
Expand All @@ -140,6 +146,7 @@ public void testOrcWriterOptionsBuilder()
assertFalse(config.isDwrfStripeCacheEnabled());
assertEquals(dwrfStripeCacheMaxSize, config.getDwrfStripeCacheMaxSize());
assertEquals(dwrfStripeCacheMode, config.getDwrfStripeCacheMode());
assertEquals(compressionLevel, config.getCompressionLevel());

assertNotSame(config.toOrcWriterOptionsBuilder(), config.toOrcWriterOptionsBuilder());
OrcWriterOptions options = config.toOrcWriterOptionsBuilder().build();
Expand All @@ -153,6 +160,7 @@ public void testOrcWriterOptionsBuilder()
assertEquals(maxCompressionBufferSize, options.getMaxCompressionBufferSize());
assertTrue(options.getStreamLayoutFactory() instanceof StreamSizeLayoutFactory);
assertEquals(Optional.empty(), options.getDwrfStripeCacheOptions());
assertEquals(OptionalInt.of(compressionLevel), options.getCompressionLevel());
}

@Test
Expand All @@ -168,4 +176,13 @@ public void testStreamLayoutOption()
options = config.toOrcWriterOptionsBuilder().build();
assertTrue(options.getStreamLayoutFactory() instanceof ColumnSizeLayoutFactory);
}

@Test
public void testDefaultCompressionLevel()
{
OrcFileWriterConfig config = new OrcFileWriterConfig();
OrcWriterOptions options = config.toOrcWriterOptionsBuilder().build();

assertEquals(OptionalInt.empty(), options.getCompressionLevel());
}
}

0 comments on commit 1e64daf

Please sign in to comment.