Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3688

ClassCastException in StreamRecordSerializer when WindowOperator.trigger() is called and TimeCharacteristic = ProcessingTime

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.0.0
    • Fix Version/s: 1.0.2
    • Component/s: None
    • Labels:
      None

      Description

      Hi,

      when using TimeCharacteristics.ProcessingTime a ClassCastException is thrown in StreamRecordSerializer when WindowOperator.processWatermark() is called from WindowOperator.trigger(), i.e. whenever a ProcessingTimeTimer is triggered.

      The problem seems to be that processWatermark() is also called in trigger(), when time characteristic is ProcessingTime, but in RecordWriterOutput enableWatermarkMultiplexing is false and the TypeSerializer is a StreamRecordSerializer, which ultimately leads to the ClassCastException. Do you agree?

      If this is indeed a bug, there several possible solutions.

      1. Only calling processWatermark() in trigger(), when TimeCharacteristic is EventTime
      2. Not calling processWatermark() in trigger() at all, instead wait for the next watermark to trigger the EventTimeTimers with a timestamp behind the current watermark. This is, of course, a trade off.
      3. Using MultiplexingStreamRecordSerializer all the time, but I have no idea what the side effect of this change would be. I assume there is a reason for existence of the StreamRecordSerializer

      StackTrace:

      TimerException{java.lang.RuntimeException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord}
      at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:716)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
      at java.util.concurrent.FutureTask.run(FutureTask.java:262)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      at java.lang.Thread.run(Thread.java:744)
      Caused by: java.lang.RuntimeException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
      at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93)
      at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:370)
      at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:293)
      at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:323)
      at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:710)
      ... 7 more
      Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
      at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
      at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
      at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
      at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
      at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95)
      at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90)
      ... 11 more

        Attachments

          Activity

            People

            • Assignee:
              knaufk Konstantin Knauf
              Reporter:
              knaufk Konstantin Knauf

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment