Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
1.17.2, 1.19.0, 1.18.1
Description
The current watermark alignment logic can easily get stuck if some subtasks finish while others are still running.
The reason is that once a source subtask finishes, the subtask is not excluded from alignment, effectively blocking the rest of the job to make progress beyond last wm + alignment time for the finished sources.
This can be easily reproduced by the following simple pipeline:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); DataStream<Long> s = env.fromSource(new NumberSequenceSource(0, 100), WatermarkStrategy.<Long>forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner<Long>) (aLong, l) -> aLong).withWatermarkAlignment("g1", Duration.ofMillis(10), Duration.ofSeconds(2)), "Sequence Source").filter((FilterFunction<Long>) aLong -> { Thread.sleep(200); return true; } ); s.print(); env.execute();
The solution could be to send out a max watermark event once the sources finish or to exclude them from the source coordinator