Skip to content

Commit

Permalink
[SPARK-18462] Fix ClassCastException in SparkListenerDriverAccumUpdat…
Browse files Browse the repository at this point in the history
…es event

## What changes were proposed in this pull request?

This patch fixes a `ClassCastException: java.lang.Integer cannot be cast to java.lang.Long` error which could occur in the HistoryServer while trying to process a deserialized `SparkListenerDriverAccumUpdates` event.

The problem stems from how `jackson-module-scala` handles primitive type parameters (see https://github.com/FasterXML/jackson-module-scala/wiki/FAQ#deserializing-optionint-and-other-primitive-challenges for more details). This was causing a problem where our code expected a field to be deserialized as a `(Long, Long)` tuple but we got an `(Int, Int)` tuple instead.

This patch hacks around this issue by registering a custom `Converter` with Jackson in order to deserialize the tuples as `(Object, Object)` and perform the appropriate casting.

## How was this patch tested?

New regression tests in `SQLListenerSuite`.

Author: Josh Rosen <[email protected]>

Closes apache#15922 from JoshRosen/SPARK-18462.
  • Loading branch information
JoshRosen authored and rxin committed Nov 18, 2016
1 parent ce13c26 commit d9dd979
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ package org.apache.spark.sql.execution.ui

import scala.collection.mutable

import com.fasterxml.jackson.databind.JavaType
import com.fasterxml.jackson.databind.`type`.TypeFactory
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.databind.util.Converter

import org.apache.spark.{JobExecutionStatus, SparkConf}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
Expand All @@ -43,9 +48,41 @@ case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerDriverAccumUpdates(executionId: Long, accumUpdates: Seq[(Long, Long)])
case class SparkListenerDriverAccumUpdates(
executionId: Long,
@JsonDeserialize(contentConverter = classOf[LongLongTupleConverter])
accumUpdates: Seq[(Long, Long)])
extends SparkListenerEvent

/**
* Jackson [[Converter]] for converting an (Int, Int) tuple into a (Long, Long) tuple.
*
* This is necessary due to limitations in how Jackson's scala module deserializes primitives;
* see the "Deserializing Option[Int] and other primitive challenges" section in
* https://github.com/FasterXML/jackson-module-scala/wiki/FAQ for a discussion of this issue and
* SPARK-18462 for the specific problem that motivated this conversion.
*/
private class LongLongTupleConverter extends Converter[(Object, Object), (Long, Long)] {

override def convert(in: (Object, Object)): (Long, Long) = {
def toLong(a: Object): Long = a match {
case i: java.lang.Integer => i.intValue()
case l: java.lang.Long => l.longValue()
}
(toLong(in._1), toLong(in._2))
}

override def getInputType(typeFactory: TypeFactory): JavaType = {
val objectType = typeFactory.uncheckedSimpleType(classOf[Object])
typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(objectType, objectType))
}

override def getOutputType(typeFactory: TypeFactory): JavaType = {
val longType = typeFactory.uncheckedSimpleType(classOf[Long])
typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(longType, longType))
}
}

class SQLHistoryListenerFactory extends SparkHistoryListenerFactory {

override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.ui

import java.util.Properties

import org.json4s.jackson.JsonMethods._
import org.mockito.Mockito.mock

import org.apache.spark._
Expand All @@ -35,10 +36,10 @@ import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanIn
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{AccumulatorMetadata, LongAccumulator}
import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator}


class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils {
import testImplicits._
import org.apache.spark.AccumulatorSuite.makeInfo

Expand Down Expand Up @@ -416,6 +417,45 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
assert(driverUpdates(physicalPlan.longMetric("dummy").id) == expectedAccumValue)
}

test("roundtripping SparkListenerDriverAccumUpdates through JsonProtocol (SPARK-18462)") {
val event = SparkListenerDriverAccumUpdates(1L, Seq((2L, 3L)))
val json = JsonProtocol.sparkEventToJson(event)
assertValidDataInJson(json,
parse("""
|{
| "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates",
| "executionId": 1,
| "accumUpdates": [[2,3]]
|}
""".stripMargin))
JsonProtocol.sparkEventFromJson(json) match {
case SparkListenerDriverAccumUpdates(executionId, accums) =>
assert(executionId == 1L)
accums.foreach { case (a, b) =>
assert(a == 2L)
assert(b == 3L)
}
}

// Test a case where the numbers in the JSON can only fit in longs:
val longJson = parse(
"""
|{
| "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates",
| "executionId": 4294967294,
| "accumUpdates": [[4294967294,3]]
|}
""".stripMargin)
JsonProtocol.sparkEventFromJson(longJson) match {
case SparkListenerDriverAccumUpdates(executionId, accums) =>
assert(executionId == 4294967294L)
accums.foreach { case (a, b) =>
assert(a == 4294967294L)
assert(b == 3L)
}
}
}

}


Expand Down

0 comments on commit d9dd979

Please sign in to comment.