Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34252

WatermarkAssignerOperator should not emit WatermarkStatus.IDLE under continuous data flow

    XMLWordPrintableJSON

Details

    Description

      The WatermarkAssignerOperator in the table runtime incorrectly transitions to an IDLE state even when data is continuously flowing. This behavior, observed under normal operating conditions where the interval between data elements is shorter than the configured idleTimeout, leads to regular transitions between ACTIVE and IDLE states, which are unnecessary.

      Detail:
      In the current implementation, the lastRecordTime variable, which tracks the time of the last received data element, is updated only when the WatermarkStatus transitions from IDLE to ACTIVE. However, it is not updated when WatermarkStatus is ACTIVE, which means even under continuous data flow, the condition `(currentTime - lastRecordTime > idleTimeout)` will eventually always become true, and the WatermarkStatus will erroneously be marked IDLE.

      It is unclear to me if this bug produces any incorrectness downstream, since when the WatermarkStatus is in in the IDLE state, the next processElement will cause a WatermarkStatus.ACTIVE to be emitted. Nevertheless, we should eliminate this flip-flop behavior between states.

      The test I wrote fails without the fix and illustrates the flip-flops:

      [ERROR] Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.030 s <<< FAILURE! -- in org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest
      [ERROR] org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest.testIdleStateAvoidanceWithConsistentDataFlow -- Time elapsed: 0.013 s <<< FAILURE!
      java.lang.AssertionError:
      
      Expecting
        [WatermarkStatus(IDLE),
          WatermarkStatus(ACTIVE),
          WatermarkStatus(IDLE),
          WatermarkStatus(ACTIVE),
          WatermarkStatus(IDLE),
          WatermarkStatus(ACTIVE),
          WatermarkStatus(IDLE),
          WatermarkStatus(ACTIVE),
          WatermarkStatus(IDLE)]
      not to contain
        [WatermarkStatus(IDLE)]
      but found
        [WatermarkStatus(IDLE)]
      

      Attachments

        Issue Links

          Activity

            People

              pnowojski Piotr Nowojski
              dchristle David Christle
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: