Details
-
Sub-task
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.16.2, 1.17.1
Description
Watermark alignment will cause flink jobs to hang forever when any source subtask has no SourceSplit.
Root cause:
- SourceOperator#emitLatestWatermark reports the lastEmittedWatermark to SourceCoordinator
- If one subtask has no SourceSplit, the lastEmittedWatermark will be the Watermark.UNINITIALIZED.getTimestamp() forever, it's Long.MIN_VALUE.
- SourceCoordinator combines the watermark of all subtasks, and using the minimum watermark as the aggregated watermark.
- Long.MIN_VALUE must be the minimum watermark, so the maxAllowedWatermark = Long.MIN_VALUE + maxAllowedWatermarkDrift, and SourceCoordinator will announce it to all subtasks.
- The maxAllowedWatermark is very small, so all source subtasks will hang forever
How to reproduce?
When the kafka partition number is less than the parallelism of kafka source.
Here is a demo: code link
- kafka partition is 1
- The paralleslism is 2
Attachments
Attachments
Issue Links
- is caused by
-
FLINK-24441 Block SourceReader when watermarks are out of alignment
- Closed
- links to