Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.13.6, 1.17.2
-
flink 1.13.6 with blink or flink 1.17.2
Description
We have discovered an unexpected case where abnormal data with a count of 0 occurs when performing proctime window aggregation on data with a watermark.
The SQL is as follows
CREATE TABLE s1 ( id INT, event_time TIMESTAMP(3), name string, proc_time AS PROCTIME (), WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND ) WITH ('connector' = 'my-source') ; SELECT * FROM ( SELECT name, COUNT(id) AS total_count, window_start, window_end FROM TABLE ( TUMBLE ( TABLE s1, DESCRIPTOR (proc_time), INTERVAL '30' SECONDS ) ) GROUP BY window_start, window_end, name ) WHERE total_count = 0;
For detailed test code, please refer to https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java
The root cause is that https://github.com/apache/flink/blob/release-1.17.2/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java#L229 supports advance progress by watermark. When the watermark suddenly exceeds the next window end timestamp, a result of count 0 will appear.
public void processWatermark(Watermark mark) throws Exception { if (mark.getTimestamp() > currentWatermark) { windowProcessor.advanceProgress(mark.getTimestamp()); super.processWatermark(mark); } else { super.processWatermark(new Watermark(currentWatermark)); } }