Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3688

ClassCastException in StreamRecordSerializer when WindowOperator.trigger() is called and TimeCharacteristic = ProcessingTime

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 1.0.0
    • 1.0.2
    • None
    • None

    Description

      Hi,

      when using TimeCharacteristics.ProcessingTime a ClassCastException is thrown in StreamRecordSerializer when WindowOperator.processWatermark() is called from WindowOperator.trigger(), i.e. whenever a ProcessingTimeTimer is triggered.

      The problem seems to be that processWatermark() is also called in trigger(), when time characteristic is ProcessingTime, but in RecordWriterOutput enableWatermarkMultiplexing is false and the TypeSerializer is a StreamRecordSerializer, which ultimately leads to the ClassCastException. Do you agree?

      If this is indeed a bug, there several possible solutions.

      1. Only calling processWatermark() in trigger(), when TimeCharacteristic is EventTime
      2. Not calling processWatermark() in trigger() at all, instead wait for the next watermark to trigger the EventTimeTimers with a timestamp behind the current watermark. This is, of course, a trade off.
      3. Using MultiplexingStreamRecordSerializer all the time, but I have no idea what the side effect of this change would be. I assume there is a reason for existence of the StreamRecordSerializer

      StackTrace:

      TimerException{java.lang.RuntimeException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord}
      at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:716)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
      at java.util.concurrent.FutureTask.run(FutureTask.java:262)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      at java.lang.Thread.run(Thread.java:744)
      Caused by: java.lang.RuntimeException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
      at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93)
      at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:370)
      at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:293)
      at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:323)
      at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:710)
      ... 7 more
      Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
      at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
      at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
      at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
      at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
      at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95)
      at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90)
      ... 11 more

      Attachments

        Activity

          People

            knaufk Konstantin Knauf
            knaufk Konstantin Knauf
            Votes:
            1 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: