Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7721

StatusWatermarkValve should output a new min watermark only if it was aggregated from aligned chhanels

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 1.2.1, 1.3.2, 1.4.0
    • 1.3.4, 1.4.0
    • API / DataStream
    • None

    Description

      Context:

      long newMinWatermark = Long.MAX_VALUE;
      
      for (InputChannelStatus channelStatus : channelStatuses) {
          if (channelStatus.isWatermarkAligned) {
              newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
          }
      }
      

      In the calculation of the new min watermark in StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels(), there is not verification that the calculated new min watermark newMinWatermark really is aggregated from some aligned channel.

      In the corner case where all input channels are currently not aligned but actually some are active, we would then incorrectly determine that the final aggregation of newMinWatermark is Long.MAX_VALUE and emit that.

      The fix would simply be to only emit the aggregated watermark IFF it was really calculated from some aligned input channel (as well as the already existing constraint that it needs to be larger than the last emitted watermark). This change should also safely cover the case that a Long.MAX_VALUE was genuinely aggregated from one of the input channels.

      Attachments

        Issue Links

          Activity

            People

              tzulitai Tzu-Li (Gordon) Tai
              tzulitai Tzu-Li (Gordon) Tai
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: