Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35885

proctime aggregate window triggered by watermark

    XMLWordPrintableJSON

Details

    Description

      We have discovered an unexpected case where abnormal data with a count of 0 occurs when performing proctime window aggregation on data with a watermark.

      The SQL is as follows

      CREATE TABLE s1 (
          id INT,
          event_time TIMESTAMP(3),
          name string,
          proc_time AS PROCTIME (),
          WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
      )
      WITH
          ('connector' = 'my-source')
      ;
      
      SELECT
          *
      FROM
          (
              SELECT
                  name,
                  COUNT(id) AS total_count,
                  window_start,
                  window_end
              FROM
                  TABLE (
                      TUMBLE (
                          TABLE s1,
                          DESCRIPTOR (proc_time),
                          INTERVAL '30' SECONDS
                      )
                  )
              GROUP BY
                  window_start,
                  window_end,
                  name
          )
      WHERE
          total_count = 0;
      

      For detailed test code, please refer to https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java


      The root cause is that https://github.com/apache/flink/blob/release-1.17.2/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java#L229 supports advance progress by watermark. When the watermark suddenly exceeds the next window end timestamp, a result of count 0 will appear.

        public void processWatermark(Watermark mark) throws Exception {
              if (mark.getTimestamp() > currentWatermark) {
                  windowProcessor.advanceProgress(mark.getTimestamp());
                  super.processWatermark(mark);
              } else {
                  super.processWatermark(new Watermark(currentWatermark));
              }
          }
      

      Attachments

        Activity

          People

            xuyangzhong xuyang
            zbz Baozhu Zhao
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: