Skip to content

Commit

Permalink
Revert "[SPARK-14419] [SQL] Improve HashedRelation for key fit within…
Browse files Browse the repository at this point in the history
… Long"

This reverts commit 90c0a04.
  • Loading branch information
davies committed Apr 9, 2016
1 parent adb9d73 commit f7ec854
Show file tree
Hide file tree
Showing 8 changed files with 346 additions and 633 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ case class TungstenAggregate(
val thisPlan = ctx.addReferenceObj("plan", this)
hashMapTerm = ctx.freshName("hashMap")
val hashMapClassName = classOf[UnsafeFixedWidthAggregationMap].getName
ctx.addMutableState(hashMapClassName, hashMapTerm, s"")
ctx.addMutableState(hashMapClassName, hashMapTerm, s"$hashMapTerm = $thisPlan.createHashMap();")
sorterTerm = ctx.freshName("sorter")
ctx.addMutableState(classOf[UnsafeKVExternalSorter].getName, sorterTerm, "")

Expand All @@ -467,7 +467,6 @@ case class TungstenAggregate(
s"""
${if (isAggregateHashMapSupported) aggregateHashMapGenerator.generate() else ""}
private void $doAgg() throws java.io.IOException {
$hashMapTerm = $thisPlan.createHashMap();
${child.asInstanceOf[CodegenSupport].produce(ctx, this)}

$iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Distribution, Partitioning, UnspecifiedDistribution}
import org.apache.spark.sql.execution.{BinaryNode, CodegenSupport, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.LongType

/**
* Performs an inner hash join of two child relations. When the output RDD of this operator is
Expand All @@ -51,7 +50,10 @@ case class BroadcastHashJoin(
override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
val mode = HashedRelationBroadcastMode(
canJoinKeyFitWithinLong,
rewriteKeyExpr(buildKeys),
buildPlan.output)
buildSide match {
case BuildLeft =>
BroadcastDistribution(mode) :: UnspecifiedDistribution :: Nil
Expand All @@ -66,7 +68,7 @@ case class BroadcastHashJoin(
val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]()
streamedPlan.execute().mapPartitions { streamedIter =>
val hashed = broadcastRelation.value.asReadOnlyCopy()
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.getMemorySize)
join(streamedIter, hashed, numOutputRows)
}
}
Expand Down Expand Up @@ -103,7 +105,7 @@ case class BroadcastHashJoin(
ctx.addMutableState(clsName, relationTerm,
s"""
| $relationTerm = (($clsName) $broadcast.value()).asReadOnlyCopy();
| incPeakExecutionMemory($relationTerm.estimatedSize());
| incPeakExecutionMemory($relationTerm.getMemorySize());
""".stripMargin)
(broadcastRelation, relationTerm)
}
Expand All @@ -116,13 +118,15 @@ case class BroadcastHashJoin(
ctx: CodegenContext,
input: Seq[ExprCode]): (ExprCode, String) = {
ctx.currentVars = input
if (streamedKeys.length == 1 && streamedKeys.head.dataType == LongType) {
if (canJoinKeyFitWithinLong) {
// generate the join key as Long
val ev = streamedKeys.head.gen(ctx)
val expr = rewriteKeyExpr(streamedKeys).head
val ev = BindReferences.bindReference(expr, streamedPlan.output).gen(ctx)
(ev, ev.isNull)
} else {
// generate the join key as UnsafeRow
val ev = GenerateUnsafeProjection.createCode(ctx, streamedKeys)
val keyExpr = streamedKeys.map(BindReferences.bindReference(_, streamedPlan.output))
val ev = GenerateUnsafeProjection.createCode(ctx, keyExpr)
(ev, s"${ev.value}.anyNull()")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,9 @@ trait HashJoin {
case BuildRight => (right, left)
}

protected lazy val (buildKeys, streamedKeys) = {
val lkeys = rewriteKeyExpr(leftKeys).map(BindReferences.bindReference(_, left.output))
val rkeys = rewriteKeyExpr(rightKeys).map(BindReferences.bindReference(_, right.output))
buildSide match {
case BuildLeft => (lkeys, rkeys)
case BuildRight => (rkeys, lkeys)
}
protected lazy val (buildKeys, streamedKeys) = buildSide match {
case BuildLeft => (leftKeys, rightKeys)
case BuildRight => (rightKeys, leftKeys)
}

/**
Expand All @@ -88,8 +84,17 @@ trait HashJoin {
width = dt.defaultSize
} else {
val bits = dt.defaultSize * 8
// hashCode of Long is (l >> 32) ^ l.toInt, it means the hash code of an long with same
// value in high 32 bit and low 32 bit will be 0. To avoid the worst case that keys
// with two same ints have hash code 0, we rotate the bits of second one.
val rotated = if (e.dataType == IntegerType) {
// (e >>> 15) | (e << 17)
BitwiseOr(ShiftRightUnsigned(e, Literal(15)), ShiftLeft(e, Literal(17)))
} else {
e
}
keyExpr = BitwiseOr(ShiftLeft(keyExpr, Literal(bits)),
BitwiseAnd(Cast(e, LongType), Literal((1L << bits) - 1)))
BitwiseAnd(Cast(rotated, LongType), Literal((1L << bits) - 1)))
width -= bits
}
// TODO: support BooleanType, DateType and TimestampType
Expand All @@ -100,11 +105,17 @@ trait HashJoin {
keyExpr :: Nil
}

protected lazy val canJoinKeyFitWithinLong: Boolean = {
val sameTypes = buildKeys.map(_.dataType) == streamedKeys.map(_.dataType)
val key = rewriteKeyExpr(buildKeys)
sameTypes && key.length == 1 && key.head.dataType.isInstanceOf[LongType]
}

protected def buildSideKeyGenerator(): Projection =
UnsafeProjection.create(buildKeys)
UnsafeProjection.create(rewriteKeyExpr(buildKeys), buildPlan.output)

protected def streamSideKeyGenerator(): UnsafeProjection =
UnsafeProjection.create(streamedKeys)
UnsafeProjection.create(rewriteKeyExpr(streamedKeys), streamedPlan.output)

@transient private[this] lazy val boundCondition = if (condition.isDefined) {
newPredicate(condition.get, streamedPlan.output ++ buildPlan.output)
Expand Down
Loading

0 comments on commit f7ec854

Please sign in to comment.