Skip to content

Commit

Permalink
[FLINK-11334] Remove old deserialization logic from ScalaEnumSerializ…
Browse files Browse the repository at this point in the history
…erSnapshot

This logic will never be used because the new snapshot will never be
used to deserialize an old config snapshot.
  • Loading branch information
aljoscha committed Feb 21, 2019
1 parent 6d9b45b commit db8a303
Showing 1 changed file with 19 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@

package org.apache.flink.api.scala.typeutils

import java.io.IOException

import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
import org.apache.flink.api.java.typeutils.runtime.{DataInputViewStream, DataOutputViewStream}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flink.util.{InstantiationUtil, Preconditions}

Expand All @@ -48,52 +46,30 @@ class ScalaEnumSerializerSnapshot[E <: Enumeration]
override def getCurrentVersion: Int = ScalaEnumSerializerSnapshot.VERSION

override def writeSnapshot(out: DataOutputView): Unit = {
val outViewWrapper = new DataOutputViewStream(out)
try {
out.writeUTF(enumClass.getName)

out.writeInt(previousEnumConstants.length)
for ((name, idx) <- previousEnumConstants) {
out.writeUTF(name)
out.writeInt(idx)
}
} finally if (outViewWrapper != null) outViewWrapper.close()
out.writeUTF(enumClass.getName)

out.writeInt(previousEnumConstants.length)
for ((name, idx) <- previousEnumConstants) {
out.writeUTF(name)
out.writeInt(idx)
}
}

override def readSnapshot(
readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
val inViewWrapper = new DataInputViewStream(in)
try {
if (readVersion == 1) {
enumClass = InstantiationUtil.deserializeObject(
inViewWrapper, userCodeClassLoader)

// read null from input stream
InstantiationUtil.deserializeObject(inViewWrapper, userCodeClassLoader)
previousEnumConstants = List()
} else if (readVersion >= 2) {
enumClass = Class.forName(
in.readUTF(), true, userCodeClassLoader).asInstanceOf[Class[E]]

val length = in.readInt()
val listBuffer = ListBuffer[(String, Int)]()

for (_ <- 0 until length) {
val name = in.readUTF()
val idx = in.readInt()
listBuffer += ((name, idx))
}
readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {

previousEnumConstants = listBuffer.toList
} else {
throw new IOException(
s"Cannot deserialize ${getClass.getSimpleName} with version $readVersion.")
}
} catch {
case e: ClassNotFoundException =>
throw new IOException("The requested enum class cannot be found in classpath.", e)
enumClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader)

val length = in.readInt()
val listBuffer = ListBuffer[(String, Int)]()

for (_ <- 0 until length) {
val name = in.readUTF()
val idx = in.readInt()
listBuffer += ((name, idx))
}
finally if (inViewWrapper != null) inViewWrapper.close()

previousEnumConstants = listBuffer.toList
}

override def restoreSerializer(): TypeSerializer[E#Value] = {
Expand Down

0 comments on commit db8a303

Please sign in to comment.