Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-23208

Late processing timers need to wait 1ms at least to be fired

    XMLWordPrintableJSON

    Details

      Description

      The problem is from the codes below:

      public static long getProcessingTimeDelay(long processingTimestamp, long currentTimestamp) {
      
      	// delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark
      	// T says we won't see elements in the future with a timestamp smaller or equal to T.
      	// With processing time, we therefore need to delay firing the timer by one ms.
      	return Math.max(processingTimestamp - currentTimestamp, 0) + 1;
      }
      

      Assuming a Flink job creates 1 timer per millionseconds, and is able to consume 1 timer/ms. Here is what will happen:

      • Timestmap1(1st ms): timer1 is registered and will be triggered on Timestamp2.
      • Timestamp2(2nd ms): timer2 is registered and timer1 is triggered
      • Timestamp3(3rd ms): timer3 is registered and timer1 is consumed, after this, InternalTimerServiceImpl registers next timer, which is timer2, and timer2 will be triggered on Timestamp4(wait 1ms at least)
      • Timestamp4(4th ms): timer4 is registered and timer2 is triggered
      • Timestamp5(5th ms): timer5 is registered and timer2 is consumed, after this, InternalTimerServiceImpl registers next timer, which is timer3, and timer3 will be triggered on Timestamp6(wait 1ms at least)

      As we can see here, the ability of the Flink job is consuming 1 timer/ms, but it's actually able to consume 0.5 timer/ms. And another problem is that we cannot observe the delay from the lag metrics of the source(Kafka). Instead, what we can tell is that the moment of output is much later than expected. I've added a metrics in our inner version, we can see the lag of the timer triggering keeps increasing:

      In another word, we should never let the late processing timer wait 1ms, I think a simple change would be as below:

      return Math.max(processingTimestamp - currentTimestamp, -1) + 1;
      

        Attachments

        1. screenshot-1.png
          90 kB
          Jiayi Liao

          Issue Links

            Activity

              People

              • Assignee:
                wind_ljy Jiayi Liao
                Reporter:
                wind_ljy Jiayi Liao
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: