Skip to content

Commit

Permalink
[SPARK-6465][SQL] Fix serialization of GenericRowWithSchema using kryo
Browse files Browse the repository at this point in the history
Author: Michael Armbrust <[email protected]>

Closes apache#5191 from marmbrus/kryoRowsWithSchema and squashes the following commits:

bb83522 [Michael Armbrust] Fix serialization of GenericRowWithSchema using kryo
f914f16 [Michael Armbrust] Add no arg constructor to GenericRowWithSchema
  • Loading branch information
marmbrus authored and liancheng committed Mar 26, 2015
1 parent 855cba8 commit f88f51b
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ object EmptyRow extends Row {
*/
class GenericRow(protected[sql] val values: Array[Any]) extends Row {
/** No-arg constructor for serialization. */
def this() = this(null)
protected def this() = this(null)

def this(size: Int) = this(new Array[Any](size))

Expand Down Expand Up @@ -172,11 +172,14 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row {

class GenericRowWithSchema(values: Array[Any], override val schema: StructType)
extends GenericRow(values) {

/** No-arg constructor for serialization. */
protected def this() = this(null, null)
}

class GenericMutableRow(v: Array[Any]) extends GenericRow(v) with MutableRow {
/** No-arg constructor for serialization. */
def this() = this(null)
protected def this() = this(null)

def this(size: Int) = this(new Array[Any](size))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ import org.apache.spark.annotation.DeveloperApi
sealed class Metadata private[types] (private[types] val map: Map[String, Any])
extends Serializable {

/** No-arg constructor for kryo. */
protected def this() = this(null)

/** Tests whether this Metadata contains a binding for a key. */
def contains(key: String): Boolean = map.contains(key)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,10 @@ case class PrecisionInfo(precision: Int, scale: Int)
*/
@DeveloperApi
case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalType {

/** No-arg constructor for kryo. */
protected def this() = this(null)

private[sql] type JvmType = Decimal
@transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
private[sql] val numeric = Decimal.DecimalIsFractional
Expand Down Expand Up @@ -819,6 +823,10 @@ object ArrayType {
*/
@DeveloperApi
case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType {

/** No-arg constructor for kryo. */
protected def this() = this(null, false)

private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
builder.append(
s"$prefix-- element: ${elementType.typeName} (containsNull = $containsNull)\n")
Expand Down Expand Up @@ -857,6 +865,9 @@ case class StructField(
nullable: Boolean = true,
metadata: Metadata = Metadata.empty) {

/** No-arg constructor for kryo. */
protected def this() = this(null, null)

private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
builder.append(s"$prefix-- $name: ${dataType.typeName} (nullable = $nullable)\n")
DataType.buildFormattedString(dataType, s"$prefix |", builder)
Expand Down Expand Up @@ -1003,6 +1014,9 @@ object StructType {
@DeveloperApi
case class StructType(fields: Array[StructField]) extends DataType with Seq[StructField] {

/** No-arg constructor for kryo. */
protected def this() = this(null)

/** Returns all field names in an array. */
def fieldNames: Array[String] = fields.map(_.name)

Expand Down Expand Up @@ -1121,6 +1135,10 @@ case class MapType(
keyType: DataType,
valueType: DataType,
valueContainsNull: Boolean) extends DataType {

/** No-arg constructor for kryo. */
def this() = this(null, null, false)

private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
builder.append(s"$prefix-- key: ${keyType.typeName}\n")
builder.append(s"$prefix-- value: ${valueType.typeName} " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{IntegerHashSet, LongHa

private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
override def newKryo(): Kryo = {
val kryo = new Kryo()
val kryo = super.newKryo()
kryo.setRegistrationRequired(false)
kryo.register(classOf[MutablePair[_, _]])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
Expand All @@ -57,8 +57,6 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co
kryo.register(classOf[Decimal])

kryo.setReferences(false)
kryo.setClassLoader(Utils.getSparkClassLoader)
new AllScalaRegistrar().apply(kryo)
kryo
}
}
Expand Down
12 changes: 12 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.spark.sql

import org.apache.spark.sql.execution.SparkSqlSerializer
import org.scalatest.FunSuite

import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, SpecificMutableRow}
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext.implicits._
import org.apache.spark.sql.types._

class RowSuite extends FunSuite {
Expand Down Expand Up @@ -50,4 +53,13 @@ class RowSuite extends FunSuite {
row(0) = null
assert(row.isNullAt(0))
}

test("serialize w/ kryo") {
val row = Seq((1, Seq(1), Map(1 -> 1), BigDecimal(1))).toDF().first()
val serializer = new SparkSqlSerializer(TestSQLContext.sparkContext.getConf)
val instance = serializer.newInstance()
val ser = instance.serialize(row)
val de = instance.deserialize(ser).asInstanceOf[Row]
assert(de === row)
}
}

0 comments on commit f88f51b

Please sign in to comment.