Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35886

Incorrect watermark idleness timeout accounting when subtask is backpressured/blocked

    XMLWordPrintableJSON

Details

    • Hide
      For detecting idleness, the way how idleness timeout is calculated has changed. Previously the time, when source or source's split has been backpressured or blocked due to watermark alignment, was accounted towards the idleness timeout. This could lead to a situation where sources or some splits were incorrectly switching to idle, while they were being unable to make any progress and had some more records to emit, which in turn could result in incorrectly calculated watermarks and erroneous late data. This has been fixed for 1.19.2, 1.20.1 and 2.0.0.

      This change required some API changes, like introduction of `org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier.Context#getInputActivityClock`. However this shouldn't create compatibility problems for users upgrading from prior Flink versions.
      Show
      For detecting idleness, the way how idleness timeout is calculated has changed. Previously the time, when source or source's split has been backpressured or blocked due to watermark alignment, was accounted towards the idleness timeout. This could lead to a situation where sources or some splits were incorrectly switching to idle, while they were being unable to make any progress and had some more records to emit, which in turn could result in incorrectly calculated watermarks and erroneous late data. This has been fixed for 1.19.2, 1.20.1 and 2.0.0. This change required some API changes, like introduction of `org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier.Context#getInputActivityClock`. However this shouldn't create compatibility problems for users upgrading from prior Flink versions.

    Description

      Currently when using watermark with idleness in Flink, idleness can be incorrectly detected when reading records from a source that is blocked by the runtime. For example this can easily happen when source is either backpressured, or blocked by the watermark alignment. In those cases, despite there are more records to be read from the source (or source’s split), runtime is deciding not to poll (or being unable to) those records. In such case idleness timeout can kick in, marking source/source split as idle, which can lead to incorrect combined watermark calculations and dropping of incorrectly marked late records.

      Watermark alignment

      If there are two source splits, A and B , and maxAllowedWatermarkDrift is set to 30s.

      1. Partition A emitted watermark 1042 sec, while partition B sits at watermark 1000 sec.
      2. 1042s - 1000s > maxAllowedWatermarkDrift, so partition A is blocked by the watermark alignment.
      3. For the duration of idleTimeout, partition B is emitting some large batch of records, that do not advance watermark of that partition by much. For example either watermark for partition B stays 1000s, or is updated by a small amount to for example 1005s.
      4. idleTimeout kicks in, marking partition A as idle
      5. partition B finishes emitting large batch of those older records, and let's say now there is a gap in rowtimes. Previously partition B was emitting records with rowtime ~1000s, now it jumps to for example ~5000s.
      6. As partition A is idle, combined watermark jumps to ~5000s as well.
      7. Watermark alignment unblocks partition A, and it continues emitting records with rowtime ~1042s. But now all of those records are dropped due to being late.

      Backpressure

      When there are two SourceOperator’s, A and B. Due to for example some data skew, it could happen that either only A gets backpressured, or A is backpressured quicker/sooner. Either way, during that time when A is backpressured, while B is not, B can bump the combined watermark high enough, so that when backpressure recedes, fresh records from A will be considered as late, leading to incorrect results.

      Attachments

        Activity

          People

            pnowojski Piotr Nowojski
            pnowojski Piotr Nowojski
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: