Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
1.0.0
-
None
-
None
Description
Hi,
when using TimeCharacteristics.ProcessingTime a ClassCastException is thrown in StreamRecordSerializer when WindowOperator.processWatermark() is called from WindowOperator.trigger(), i.e. whenever a ProcessingTimeTimer is triggered.
The problem seems to be that processWatermark() is also called in trigger(), when time characteristic is ProcessingTime, but in RecordWriterOutput enableWatermarkMultiplexing is false and the TypeSerializer is a StreamRecordSerializer, which ultimately leads to the ClassCastException. Do you agree?
If this is indeed a bug, there several possible solutions.
- Only calling processWatermark() in trigger(), when TimeCharacteristic is EventTime
- Not calling processWatermark() in trigger() at all, instead wait for the next watermark to trigger the EventTimeTimers with a timestamp behind the current watermark. This is, of course, a trade off.
- Using MultiplexingStreamRecordSerializer all the time, but I have no idea what the side effect of this change would be. I assume there is a reason for existence of the StreamRecordSerializer
StackTrace:
TimerException{java.lang.RuntimeException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord}
at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:716)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:370)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:293)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:323)
at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:710)
... 7 more
Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90)
... 11 more