Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-28033

find and output new min watermark mybe wrong when in multichannel

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • Runtime / Task
    • None

    Description

      File: StatusWatermarkValve.java

      Method:  findAndOutputNewMinWatermarkAcrossAlignedChannels

      //代码占位符
      long newMinWatermark = Long.MAX_VALUE;
      boolean hasAlignedChannels = false;
      
      // determine new overall watermark by considering only watermark-aligned channels across all
      // channels
      for (InputChannelStatus channelStatus : channelStatuses) {
          if (channelStatus.isWatermarkAligned) {
              hasAlignedChannels = true;
              newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
          }
      }
      
      // we acknowledge and output the new overall watermark if it really is aggregated
      // from some remaining aligned channel, and is also larger than the last output watermark
      if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
          lastOutputWatermark = newMinWatermark;
          output.emitWatermark(new Watermark(lastOutputWatermark));
      } 

       channelStatus's initalized watermark is Long.MIN_VALUE. when one channelStatus's watermark is changed,but other channelStatus's is not changed, the newMinWatermark is always Long.MIN_VALUE and output not emitwatermark。 

      Attachments

        Activity

          People

            Unassigned Unassigned
            ye-able YeAble
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: