Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
File: StatusWatermarkValve.java
Method: findAndOutputNewMinWatermarkAcrossAlignedChannels
//代码占位符 long newMinWatermark = Long.MAX_VALUE; boolean hasAlignedChannels = false; // determine new overall watermark by considering only watermark-aligned channels across all // channels for (InputChannelStatus channelStatus : channelStatuses) { if (channelStatus.isWatermarkAligned) { hasAlignedChannels = true; newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); } } // we acknowledge and output the new overall watermark if it really is aggregated // from some remaining aligned channel, and is also larger than the last output watermark if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) { lastOutputWatermark = newMinWatermark; output.emitWatermark(new Watermark(lastOutputWatermark)); }
channelStatus's initalized watermark is Long.MIN_VALUE. when one channelStatus's watermark is changed,but other channelStatus's is not changed, the newMinWatermark is always Long.MIN_VALUE and output not emitwatermark。