Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32548 Make watermark alignment ready for production use
  3. FLINK-32414

Watermark alignment will cause flink jobs to hang forever when any source subtask has no SourceSplit

    XMLWordPrintableJSON

Details

    Description

      Watermark alignment will cause flink jobs to hang forever when any source subtask has no SourceSplit.

      Root cause:

      1. SourceOperator#emitLatestWatermark reports the lastEmittedWatermark to SourceCoordinator
      2. If one subtask has no SourceSplit, the lastEmittedWatermark will be the Watermark.UNINITIALIZED.getTimestamp() forever, it's Long.MIN_VALUE.
      3. SourceCoordinator combines the watermark of all subtasks, and using the minimum watermark as the aggregated watermark.
      4. Long.MIN_VALUE must be the minimum watermark, so the maxAllowedWatermark =  Long.MIN_VALUE + maxAllowedWatermarkDrift, and SourceCoordinator will announce it to all subtasks.
      5. The maxAllowedWatermark is very small, so all source subtasks will hang forever

      How to reproduce?

      When the kafka partition number is less than the parallelism of kafka source.

      Here is a demo: code link

      • kafka partition is 1
      • The paralleslism is 2

       

      Attachments

        Issue Links

          Activity

            People

              fanrui Rui Fan
              fanrui Rui Fan
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: