Skip to content

Commit

Permalink
[SPARK-14138] [SQL] [MASTER] Fix generated SpecificColumnarIterator c…
Browse files Browse the repository at this point in the history
…ode can exceed JVM size limit for cached DataFrames

## What changes were proposed in this pull request?

This PR reduces Java byte code size of method in ```SpecificColumnarIterator``` by using a approach to make a group for  lot of ```ColumnAccessor``` instantiations or method calls (more than 200) into a method

## How was this patch tested?

Added a new unit test, which includes large instantiations and method calls, to ```InMemoryColumnarQuerySuite```

Author: Kazuaki Ishizaki <[email protected]>

Closes apache#12108 from kiszk/SPARK-14138-master.
  • Loading branch information
kiszk authored and davies committed Apr 2, 2016
1 parent 27e71a2 commit 877dc71
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
case array: ArrayType => classOf[ArrayColumnAccessor].getName
case t: MapType => classOf[MapColumnAccessor].getName
}
ctx.addMutableState(accessorCls, accessorName, s"$accessorName = null;")
ctx.addMutableState(accessorCls, accessorName, "")

val createCode = dt match {
case t if ctx.isPrimitiveType(dt) =>
Expand All @@ -114,6 +114,42 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
(createCode, extract + patch)
}.unzip

/*
* 200 = 6000 bytes / 30 (up to 30 bytes per one call))
* the maximum byte code size to be compiled for HotSpot is 8000.
* We should keep less than 8000
*/
val numberOfStatementsThreshold = 200
val (initializerAccessorCalls, extractorCalls) =
if (initializeAccessors.length <= numberOfStatementsThreshold) {
(initializeAccessors.mkString("\n"), extractors.mkString("\n"))
} else {
val groupedAccessorsItr = initializeAccessors.grouped(numberOfStatementsThreshold)
val groupedExtractorsItr = extractors.grouped(numberOfStatementsThreshold)
var groupedAccessorsLength = 0
groupedAccessorsItr.zipWithIndex.map { case (body, i) =>
groupedAccessorsLength += 1
val funcName = s"accessors$i"
val funcCode = s"""
|private void $funcName() {
| ${body.mkString("\n")}
|}
""".stripMargin
ctx.addNewFunction(funcName, funcCode)
}
groupedExtractorsItr.zipWithIndex.map { case (body, i) =>
val funcName = s"extractors$i"
val funcCode = s"""
|private void $funcName() {
| ${body.mkString("\n")}
|}
""".stripMargin
ctx.addNewFunction(funcName, funcCode)
}
((0 to groupedAccessorsLength - 1).map { i => s"accessors$i();" }.mkString("\n"),
(0 to groupedAccessorsLength - 1).map { i => s"extractors$i();" }.mkString("\n"))
}

val code = s"""
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand Down Expand Up @@ -149,8 +185,6 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
this.nativeOrder = ByteOrder.nativeOrder();
this.buffers = new byte[${columnTypes.length}][];
this.mutableRow = new MutableUnsafeRow(rowWriter);

${ctx.initMutableStates()}
}

public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) {
Expand All @@ -159,6 +193,8 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
this.columnIndexes = columnIndexes;
}

${ctx.declareAddedFunctions()}

public boolean hasNext() {
if (currentRow < numRowsInBatch) {
return true;
Expand All @@ -173,7 +209,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
for (int i = 0; i < columnIndexes.length; i ++) {
buffers[i] = batch.buffers()[columnIndexes[i]];
}
${initializeAccessors.mkString("\n")}
${initializerAccessorCalls}

return hasNext();
}
Expand All @@ -182,7 +218,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
currentRow += 1;
bufferHolder.reset();
rowWriter.zeroOutNullBytes();
${extractors.mkString("\n")}
${extractorCalls}
unsafeRow.setTotalSize(bufferHolder.totalSize());
return unsafeRow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,14 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
assert(data.count() === 10)
assert(data.filter($"s" === "3").count() === 1)
}

test("SPARK-14138: Generated SpecificColumnarIterator can exceed JVM size limit for cached DF") {
val length1 = 3999
val columnTypes1 = List.fill(length1)(IntegerType)
val columnarIterator1 = GenerateColumnAccessor.generate(columnTypes1)

val length2 = 10000
val columnTypes2 = List.fill(length2)(IntegerType)
val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
}
}

0 comments on commit 877dc71

Please sign in to comment.