Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-15421

GroupAggsHandler throws java.time.LocalDateTime cannot be cast to java.sql.Timestamp

    XMLWordPrintableJSON

    Details

      Description

      `TimestmapType` has two types of physical representation: `Timestamp` and `LocalDateTime`. When we use following SQL, it will conflict each other:

      SELECT 
        SUM(cnt) as s, 
        MAX(ts)
      FROM 
        SELECT 
          `string`,
          `int`,
          COUNT(*) AS cnt,
          MAX(rowtime) as ts
        FROM T1
        GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND)
      GROUP BY `string`
      

      with 'table.exec.emit.early-fire.enabled' = true.

      The exceptions is below:

      Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.sql.Timestamp
      at GroupAggsHandler$83.getValue(GroupAggsHandler$83.java:529)
      at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:164)
      at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
      at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
      at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
      at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
      at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
      at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
      at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
      at java.lang.Thread.run(Thread.java:748)

      I also create a UT to quickly reproduce this bug in `WindowAggregateITCase`:

      @Test
      def testEarlyFireWithTumblingWindow(): Unit = {
        val stream = failingDataSource(data)
          .assignTimestampsAndWatermarks(
            new TimestampAndWatermarkWithOffset
              [(Long, Int, Double, Float, BigDecimal, String, String)](10L))
        val table = stream.toTable(tEnv,
          'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name)
        tEnv.registerTable("T1", table)
        tEnv.getConfig.getConfiguration.setBoolean("table.exec.emit.early-fire.enabled", true)
        tEnv.getConfig.getConfiguration.setString("table.exec.emit.early-fire.delay", "1000 ms")
      
        val sql =
          """
            |SELECT
            |  SUM(cnt) as s,
            |  MAX(ts)
            |FROM
            |  (SELECT
            |    `string`,
            |    `int`,
            |    COUNT(*) AS cnt,
            |    MAX(rowtime) as ts
            |  FROM T1
            |  GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND))
            |GROUP BY `string`
            |""".stripMargin
      
        tEnv.sqlQuery(sql).toRetractStream[Row].print()
        env.execute()
      }
      

       

       

        Attachments

          Activity

            People

            • Assignee:
              docete Zhenghua Gao
              Reporter:
              libenchao Benchao Li
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 40m
                40m