Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10194

Serialization issue with Scala's AggregateDataSet[Row]

    XMLWordPrintableJSON

Details

    Description

       

      Consider the following code, where I had to jump through some hoops to manually create a DataSet[Row] that allows using groupBy and sum as shown:

      object Main {
        def main(args: Array[String]): Unit = {
          val env = ExecutionEnvironment.getExecutionEnvironment
      
          val letters = Seq("a", "a", "b").map(Row.of(_, 1.asInstanceOf[Object]))
          val typeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
            BasicTypeInfo.INT_TYPE_INFO)
      
          import scala.collection.JavaConverters._
          val inputFormat = new CollectionInputFormat(letters.asJavaCollection,
            typeInfo.createSerializer(env.getConfig))
      
          val source = new DataSource(env.getJavaEnv,
            inputFormat,
            typeInfo,
            "hello.flink.Main$.main(Main.scala:20")
          
          val dataSet = new DataSet(source)
      
          dataSet.print()
      
          dataSet
            .groupBy(0)
            .sum(1)
            .print()
        }
      }

      The call to dataSet.print() works as expected, but the final print() throws an exception:

      Caused by: java.lang.ClassCastException: org.apache.flink.api.java.typeutils.runtime.RowSerializer cannot be cast to org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
      at org.apache.flink.api.scala.operators.ScalaAggregateOperator$AggregatingUdf.open(ScalaAggregateOperator.java:260)

      Changing the final print() to collect() throws the same exception.

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            asardaes Alexis Sarda-Espinosa
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: