Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-26062

[Changelog] Non-deterministic recovery of PriorityQueue states

    XMLWordPrintableJSON

Details

    Description

      Currently, InternalPriorityQueue.poll() is logged as a separate operation, without specifying the element that has been polled. On recovery, this recorded poll() is replayed.

      However, this is not deterministic because the order of PQ elements with equal priorityis not specified. For example, TimerHeapInternalTimer only compares timestamps, which are often equal. This results in polling timers from queue in wrong order => dropping timers => and not firing timers.

       

      ProcessingTimeWindowCheckpointingITCase.testAggregatingSlidingProcessingTimeWindow fails with materialization enabled and using heap state backend (both in-memory and fs-based implementations).

       

      Proposed solution is to replace poll with remove operation (which is based on equality).
       
      cc: masteryhx, ym, yunta

      Attachments

        Issue Links

          Activity

            People

              roman Roman Khachatryan
              roman Roman Khachatryan
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: