Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9935

Batch Table API: grouping by window and attribute causes java.lang.ClassCastException:

    XMLWordPrintableJSON

    Details

    • Flags:
      Important

      Description

       Grouping by window AND some other attribute(s) seems broken. Test case attached:

      class BatchStatisticsIntegrationTest extends FlatSpec with Matchers {
      
        trait BatchContext {
          implicit lazy val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
          implicit val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
      
          val data = Seq(
            (1532424567000L, "id1", "location1"),
            (1532424567000L, "id2", "location1"),
            (1532424567000L, "id3", "location1"),
            (1532424568000L, "id1", "location2"),
            (1532424568000L, "id2", "location3")
          )
      
          val rawDataSet: DataSet[(Long, String, String)] = env.fromCollection(data)
      
          val table: Table = tableEnv.fromDataSet(rawDataSet, 'rowtime, 'id, 'location)
        }
      
        it should "be possible to run Table API queries with grouping by tumble window and column(s) on batch data" in new BatchContext {
          val results = table
            .window(Tumble over 1.second on 'rowtime as 'w)
            .groupBy('w, 'location)
            .select(
              'w.start.cast(Types.LONG),
              'w.end.cast(Types.LONG),
              'location,
              'id.count
            )
            .toDataSet[(Long, Long, String, Long)]
            .collect()
      
          results should contain theSameElementsAs Seq(
            (1532424567000L, 1532424568000L, "location1", 3L),
            (1532424568000L, 1532424569000L, "location2", 1L),
            (1532424568000L, 1532424569000L, "location3", 1L)
          )
        }
      }
      

      It seems like during execution time, the 'rowtime attribute replaces 'location and that causes ClassCastException.

      [info]   Cause: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String
      [info]   at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
      [info]   at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:160)
      [info]   at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:46)
      [info]   at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
      [info]   at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
      [info]   at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
      [info]   at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
      [info]   at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
      [info]   at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
      [info]   at org.apache.flink.api.java.operators.translation.RichCombineToGroupCombineWrapper.combine(RichCombineToGroupCombineWrapper.java:52)
      

      Here is some debug information that I was able to get. So, field serializers don't match the type of Row fields:

      this.instance = {Row@68451} "1532424567000,(3),1532424567000"
       fields = {Object[3]@68461} 
        0 = {Long@68462} 1532424567000
        1 = {CountAccumulator@68463} "(3)"
        2 = {Long@68462} 1532424567000
      this.serializer = {RowSerializer@68452} 
       fieldSerializers = {TypeSerializer[3]@68455} 
        0 = {StringSerializer@68458} 
        1 = {TupleSerializer@68459} 
        2 = {LongSerializer@68460} 
       arity = 3
       nullMask = {boolean[3]@68457} 
      

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                fhueske Fabian Hueske
                Reporter:
                roman.wozniak Roman Wozniak
              • Votes:
                1 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: