Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Fixed
-
1.18.1, 1.20.0, 1.19.1
Description
Currently when using watermark with idleness in Flink, idleness can be incorrectly detected when reading records from a source that is blocked by the runtime. For example this can easily happen when source is either backpressured, or blocked by the watermark alignment. In those cases, despite there are more records to be read from the source (or source’s split), runtime is deciding not to poll (or being unable to) those records. In such case idleness timeout can kick in, marking source/source split as idle, which can lead to incorrect combined watermark calculations and dropping of incorrectly marked late records.
Watermark alignment
If there are two source splits, A and B , and maxAllowedWatermarkDrift is set to 30s.
- Partition A emitted watermark 1042 sec, while partition B sits at watermark 1000 sec.
- 1042s - 1000s > maxAllowedWatermarkDrift, so partition A is blocked by the watermark alignment.
- For the duration of idleTimeout, partition B is emitting some large batch of records, that do not advance watermark of that partition by much. For example either watermark for partition B stays 1000s, or is updated by a small amount to for example 1005s.
- idleTimeout kicks in, marking partition A as idle
- partition B finishes emitting large batch of those older records, and let's say now there is a gap in rowtimes. Previously partition B was emitting records with rowtime ~1000s, now it jumps to for example ~5000s.
- As partition A is idle, combined watermark jumps to ~5000s as well.
- Watermark alignment unblocks partition A, and it continues emitting records with rowtime ~1042s. But now all of those records are dropped due to being late.
Backpressure
When there are two SourceOperator’s, A and B. Due to for example some data skew, it could happen that either only A gets backpressured, or A is backpressured quicker/sooner. Either way, during that time when A is backpressured, while B is not, B can bump the combined watermark high enough, so that when backpressure recedes, fresh records from A will be considered as late, leading to incorrect results.
Attachments
Issue Links
1.
|
PausableRelativeClock does not pause when the source only has one split | Open | Unassigned |