Skip to content

Commit

Permalink
HIVE-13361: Orc concatenation should enforce the compression buffer s…
Browse files Browse the repository at this point in the history
…ize (Prasanth Jayachandran reviewed by Gopal V)
  • Loading branch information
prasanthj committed Mar 30, 2016
1 parent b431c27 commit 8c1f055
Show file tree
Hide file tree
Showing 7 changed files with 1,295 additions and 12 deletions.
1 change: 1 addition & 0 deletions itests/src/test/resources/testconfiguration.properties
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ minitez.query.files.shared=acid_globallimit.q,\
orc_merge9.q,\
orc_merge10.q,\
orc_merge11.q,\
orc_merge12.q,\
orc_merge_incompat1.q,\
orc_merge_incompat2.q,\
orc_merge_incompat3.q,\
Expand Down
21 changes: 20 additions & 1 deletion orc/src/java/org/apache/orc/OrcFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public static class WriterOptions {
private long blockSizeValue;
private int rowIndexStrideValue;
private int bufferSizeValue;
private boolean enforceBufferSize = false;
private boolean blockPaddingValue;
private CompressionKind compressValue;
private MemoryManager memoryManagerValue;
Expand Down Expand Up @@ -317,13 +318,27 @@ public WriterOptions rowIndexStride(int value) {

/**
* The size of the memory buffers used for compressing and storing the
* stripe in memory.
* stripe in memory. NOTE: ORC writer may choose to use smaller buffer
* size based on stripe size and number of columns for efficient stripe
* writing and memory utilization. To enforce writer to use the requested
* buffer size use enforceBufferSize().
*/
public WriterOptions bufferSize(int value) {
bufferSizeValue = value;
return this;
}

/**
* Enforce writer to use requested buffer size instead of estimating
* buffer size based on stripe size and number of columns.
* See bufferSize() method for more info.
* Default: false
*/
public WriterOptions enforceBufferSize() {
enforceBufferSize = true;
return this;
}

/**
* Sets whether the HDFS blocks are padded to prevent stripes from
* straddling blocks. Padding improves locality and thus the speed of
Expand Down Expand Up @@ -460,6 +475,10 @@ public int getBufferSize() {
return bufferSizeValue;
}

public boolean isEnforceBufferSize() {
return enforceBufferSize;
}

public int getRowIndexStride() {
return rowIndexStrideValue;
}
Expand Down
18 changes: 8 additions & 10 deletions orc/src/java/org/apache/orc/impl/WriterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,12 @@ public Writer getWriter() {
buildIndex = rowIndexStride > 0;
codec = createCodec(compress);
int numColumns = schema.getMaximumId() + 1;
this.bufferSize = getEstimatedBufferSize(defaultStripeSize,
numColumns, opts.getBufferSize());
if (opts.isEnforceBufferSize()) {
this.bufferSize = opts.getBufferSize();
} else {
this.bufferSize = getEstimatedBufferSize(defaultStripeSize,
numColumns, opts.getBufferSize());
}
if (version == OrcFile.Version.V_0_11) {
/* do not write bloom filters for ORC v11 */
this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1];
Expand All @@ -199,7 +203,7 @@ public Writer getWriter() {
// ensure that we are able to handle callbacks before we register ourselves
memoryManager.addWriter(path, opts.getStripeSize(), this);
LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" +
" compression: {} estimatedBufferSize: {}", path, defaultStripeSize, blockSize,
" compression: {} bufferSize: {}", path, defaultStripeSize, blockSize,
compress, bufferSize);
}

Expand All @@ -212,13 +216,7 @@ public static int getEstimatedBufferSize(long stripeSize, int numColumns,
// sizes.
int estBufferSize = (int) (stripeSize / (20 * numColumns));
estBufferSize = getClosestBufferSize(estBufferSize);
if (estBufferSize > bs) {
estBufferSize = bs;
} else {
LOG.info("WIDE TABLE - Number of columns: " + numColumns +
" Chosen compression buffer size: " + estBufferSize);
}
return estBufferSize;
return estBufferSize > bs ? bs : estBufferSize;
}

private static int getClosestBufferSize(int estBufferSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ private void processKeyValuePairs(Object key, Object value)
.inspector(reader.getObjectInspector());
// compression buffer size should only be set if compression is enabled
if (compression != CompressionKind.NONE) {
options.bufferSize(compressBuffSize);
// enforce is required to retain the buffer sizes of old files instead of orc writer
// inferring the optimal buffer size
options.bufferSize(compressBuffSize).enforceBufferSize();
}

outWriter = OrcFile.createWriter(outPath, options);
Expand Down
51 changes: 51 additions & 0 deletions ql/src/test/queries/clientpositive/orc_merge12.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
CREATE TABLE `alltypesorc3xcols`(
`atinyint` tinyint,
`asmallint` smallint,
`aint` int,
`abigint` bigint,
`afloat` float,
`adouble` double,
`astring1` string,
`astring2` string,
`atimestamp1` timestamp,
`atimestamp2` timestamp,
`aboolean1` boolean,
`aboolean2` boolean,
`btinyint` tinyint,
`bsmallint` smallint,
`bint` int,
`bbigint` bigint,
`bfloat` float,
`bdouble` double,
`bstring1` string,
`bstring2` string,
`btimestamp1` timestamp,
`btimestamp2` timestamp,
`bboolean1` boolean,
`bboolean2` boolean,
`ctinyint` tinyint,
`csmallint` smallint,
`cint` int,
`cbigint` bigint,
`cfloat` float,
`cdouble` double,
`cstring1` string,
`cstring2` string,
`ctimestamp1` timestamp,
`ctimestamp2` timestamp,
`cboolean1` boolean,
`cboolean2` boolean) stored as ORC;

load data local inpath '../../data/files/alltypesorc3xcols' into table alltypesorc3xcols;
load data local inpath '../../data/files/alltypesorc3xcols' into table alltypesorc3xcols;

select count(*) from alltypesorc3xcols;
select sum(hash(*)) from alltypesorc3xcols;

alter table alltypesorc3xcols concatenate;

select count(*) from alltypesorc3xcols;
select sum(hash(*)) from alltypesorc3xcols;

SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecOrcFileDump;
select * from alltypesorc3xcols limit 1;
Loading

0 comments on commit 8c1f055

Please sign in to comment.