Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9524

NPE from ProcTimeBoundedRangeOver.scala

    XMLWordPrintableJSON

Details

    Description

      The class ProcTimeBoundedRangeOver would throws NPE if minRetentionTime and maxRetentionTime are set to greater then 1. 

      Please see npe_from_ProcTimeBoundedRangeOver.txt for the detail of  exception. Below is a short description of the cause:

      • When the first event for a key arrives,  the cleanup time is registered with timerservice and recorded in cleanupTimeState. If the second event with same key arrives before the cleanup time, the value in cleanupTimeState is updated and a new timer is registered to timerService. So now we have two registered timers for cleanup. One is registered because of the first event, the other for the second event.
      • However, when onTimer method is fired for the first cleanup timer, the cleanupTimeStates value has already been updated to second cleanup time. So it will bypass the needToCleanupState check, and yet run through the remained code of onTimer (which is intended to update the accumulator and emit output) and cause NPE.

      RowTimeBoundedRangeOver has very similar logic with ProcTimeBoundedRangeOver. But It won't cause NPE by the same reason. To avoid the exception, it simply add a null check before running the logic for updating accumulator.

       

       

      Attachments

        Issue Links

          Activity

            People

              yzandrew yan zhou
              yzandrew yan zhou
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: