Details
-
Bug
-
Status: Reopened
-
Not a Priority
-
Resolution: Unresolved
-
None
-
None
-
Flink v1.6.0
Description
Consider the following code, where I had to jump through some hoops to manually create a DataSet[Row] that allows using groupBy and sum as shown:
object Main { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val letters = Seq("a", "a", "b").map(Row.of(_, 1.asInstanceOf[Object])) val typeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO) import scala.collection.JavaConverters._ val inputFormat = new CollectionInputFormat(letters.asJavaCollection, typeInfo.createSerializer(env.getConfig)) val source = new DataSource(env.getJavaEnv, inputFormat, typeInfo, "hello.flink.Main$.main(Main.scala:20") val dataSet = new DataSet(source) dataSet.print() dataSet .groupBy(0) .sum(1) .print() } }
The call to dataSet.print() works as expected, but the final print() throws an exception:
Caused by: java.lang.ClassCastException: org.apache.flink.api.java.typeutils.runtime.RowSerializer cannot be cast to org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase at org.apache.flink.api.scala.operators.ScalaAggregateOperator$AggregatingUdf.open(ScalaAggregateOperator.java:260)
Changing the final print() to collect() throws the same exception.