Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
1.9.1, 1.10.0
Description
`TimestmapType` has two types of physical representation: `Timestamp` and `LocalDateTime`. When we use following SQL, it will conflict each other:
SELECT SUM(cnt) as s, MAX(ts) FROM SELECT `string`, `int`, COUNT(*) AS cnt, MAX(rowtime) as ts FROM T1 GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND) GROUP BY `string`
with 'table.exec.emit.early-fire.enabled' = true.
The exceptions is below:
Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.sql.Timestamp
at GroupAggsHandler$83.getValue(GroupAggsHandler$83.java:529)
at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:164)
at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
at java.lang.Thread.run(Thread.java:748)
I also create a UT to quickly reproduce this bug in `WindowAggregateITCase`:
@Test def testEarlyFireWithTumblingWindow(): Unit = { val stream = failingDataSource(data) .assignTimestampsAndWatermarks( new TimestampAndWatermarkWithOffset [(Long, Int, Double, Float, BigDecimal, String, String)](10L)) val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name) tEnv.registerTable("T1", table) tEnv.getConfig.getConfiguration.setBoolean("table.exec.emit.early-fire.enabled", true) tEnv.getConfig.getConfiguration.setString("table.exec.emit.early-fire.delay", "1000 ms") val sql = """ |SELECT | SUM(cnt) as s, | MAX(ts) |FROM | (SELECT | `string`, | `int`, | COUNT(*) AS cnt, | MAX(rowtime) as ts | FROM T1 | GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND)) |GROUP BY `string` |""".stripMargin tEnv.sqlQuery(sql).toRetractStream[Row].print() env.execute() }