Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
Description
FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage. It seems that the order of grouping keys is sometimes messed up. The following tests fails:
@Test def testEventTimeSlidingGroupWindow(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.testResults = mutable.MutableList() val stream = env .fromCollection(data) .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) .map(t => (t._2, t._6)) val table = stream.toTable(tEnv, 'int, 'string) val windowedTable = table .window(Slide over 10.milli every 5.milli on 'rowtime as 'w) .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) val results = windowedTable.toDataStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() }
Exception:
Caused by: java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:532) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:505) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:485) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:871) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:849) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:50) at org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:29) at org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:74) at org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:64) at org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:35) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:45) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:598) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:505) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:276) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:119) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:940) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288) ... 7 more Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
Attachments
Issue Links
- links to