Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12872

WindowOperator may fail with UnsupportedOperationException when merging windows

    XMLWordPrintableJSON

Details

    Description

      Reported by a user.

      I have a job that uses processing time session window with inactivity gap of 60ms where I intermittently run into the following exception. I'm trying to figure out what happened here. Haven't been able to reproduce this scenario. Any thoughts?
      
      java.lang.UnsupportedOperationException: The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: 1560493731808 window: TimeWindow{start=1560493731654, end=1560493731778}
      	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325)
      	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
      	at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
      	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
      	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
      	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
      	at java.lang.Thread.run(Thread.java:745)
      

      This is happening probably because System.currentTimeMillis() is not a monotonic function and WindowOperator accesses it at least twice: once when it creates a window and second time during performing the above mentioned check (that has failed). However I would guess there are more places like this, not only in WindowOperator.

      The fix could be either to make sure that processing time is monotonic, or to access it only once per operator per record or to drop processing time in favour of ingestion time.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              pnowojski Piotr Nowojski
              Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated: