From 27c11e1b79bd68cbd2e8275c7938478e2e9532e6 Mon Sep 17 00:00:00 2001 From: Aleksandr Chermenin Date: Fri, 16 Dec 2016 14:42:50 +0300 Subject: [PATCH] [FLINK-3617] [scala apis] Added null value check. --- .../api/scala/typeutils/CaseClassSerializer.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala index 625ee809663db..29b4952bc1482 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala @@ -20,7 +20,8 @@ package org.apache.flink.api.scala.typeutils import org.apache.flink.annotation.Internal import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase -import org.apache.flink.core.memory.{DataOutputView, DataInputView} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.types.NullFieldException /** * Serializer for Case Classes. Creation and access is different from @@ -97,7 +98,13 @@ abstract class CaseClassSerializer[T <: Product]( var i = 0 while (i < arity) { val serializer = fieldSerializers(i).asInstanceOf[TypeSerializer[Any]] - serializer.serialize(value.productElement(i), target) + val o = value.productElement(i) + try + serializer.serialize(o, target) + catch { + case e: NullPointerException => + throw new NullFieldException(i, e) + } i += 1 } }