Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.9.1
Description
Currently, TupleSerializer and CaseCassSerializer do not support serialize NULL values, which I think is acceptable. But not supporting copy NULL values will cause the following codes to throw an exception, which I think is not matched with users' expectations and prone to error.
codes:
stream.map(xxx).filter(_ != null).xxx //the return type of the map function is Tuple and it may return null
exception info:
Caused by: java.lang.NullPointerException at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
suggestion:
Can we make the `copy` method of TupleSerializer/CaseClassSerializer to handle NULL values? e.g.
// org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy def copy(from: T): T = { // handle NULL values. if(from == null) { return from } initArray() var i = 0 while (i < arity) { fields(i) = fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef]) i += 1 } createInstance(fields) }