Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Fixed
-
1.14.0
-
None
Description
FLIP-27 sources currently start in the StreamStatus.IDLE state and they switch to ACTIVE only after emitting first Watermark. Until this happens, downstream operators are ignoring IDLE inputs from calculating the input (min) watermark.
An extreme example to what problem this leads to, are completely bogus results if for example one FLIP-27 source subtask is slower than others for some reason:
env.getConfig().setAutoWatermarkInterval(2000); env.setParallelism(2); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 10)); DataStream<Long> eventStream = env.fromSource( new NumberSequenceSource(0, Long.MAX_VALUE), WatermarkStrategy.<Long>forMonotonousTimestamps() .withTimestampAssigner(new LongTimestampAssigner()), "NumberSequenceSource") .map( new RichMapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { if (getRuntimeContext().getIndexOfThisSubtask() == 0) { Thread.sleep(1); } return 1L; } }); eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print(); (...) private static class LongTimestampAssigner implements SerializableTimestampAssigner<Long> { private long counter = 0; @Override public long extractTimestamp(Long record, long recordTimeStamp) { return counter++; } }
In such case, after 2 seconds (setAutoWatermarkInterval) the not throttled subtask (subTaskId == 1) generates very high watermarks. The other source subtask (subTaskId == 0) emits very low watermarks. If the non throttled watermark reaches the downstream WindowOperator first, while the other input channel is still idle, it will take those high watermarks as combined input watermark for the the whole WindowOperator. When the input channel from the throttled source subtask finally receives it's ACTIVE status and a much lower watermark, that's already too late.
Actual output of the example program:
1596 2000 1000 1000 1000 1000 1000 1000 (...)
while the expected output should be always "2000" (2000 records fitting in every 1 second global window)
2000 2000 2000 2000 (...)
.
Attachments
Issue Links
- is related to
-
FLINK-22926 IDLE FLIP-27 source should go ACTIVE when registering a new split
- Open
- relates to
-
FLINK-22890 Few tests fail in HiveTableSinkITCase
- Closed