Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-26334

When timestamp - offset + windowSize < 0, elements cannot be assigned to the correct window

    XMLWordPrintableJSON

Details

    • Important

    Description

      issue

              Hello!

              When we were studying the flink source code, we found that there was a problem with its algorithm for calculating the window start time. When timestamp - offset + windowSize < 0 , the element will be incorrectly allocated to a window with a WindowSize larger than its own timestamp.

              The problem is in org.apache.flink.streaming.api.windowing.windows.TimeWindow

      public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
          return timestamp - (timestamp - offset + windowSize) % windowSize;
      } 

              We believe that this violates the constraints between time and window. That is, an element should fall within a window whose start time is less than its own timestamp and whose end time is greater than its own timestamp. However, the current situation is when timestamp - offset + windowSize < 0, the element falls into a future time window.

             You can reproduce the bug with the code at the end of the post.

      Solution       

              In fact, the original algorithm is no problem in python, the key to this problem is the processing of the remainder operation by the programming language.

              We finally think that it should be modified to the following algorithm.

      public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
          return timestamp
                  - (timestamp - offset) % windowSize
                  - (windowSize & (timestamp - offset) >> 63);
      } 

              windowSize & (timestamp - offset) >> 63 The function of this formula is to subtract windowSize from the overall operation result when timestamp - offset<0, otherwise do nothing. This way we can handle both positive and negative timestamps.

              Finally, the element can be assigned to the correct window.

              This code can pass current unit tests.

      getWindowStartWithOffset methods in other packages

              I think that there should be many places in getWindowStartWithOffset. We searched for this method in the project and found that the problem of negative timestamps is handled in flink.table.

              Below is their source code.

              org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping

      private long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
          long remainder = (timestamp - offset) % windowSize;
          // handle both positive and negative cases
          if (remainder < 0) {
              return timestamp - (remainder + windowSize);
          } else {
              return timestamp - remainder;
          }
      } 

       further

      When we wrote the test case, we found that the algorithm we wrote would violate the convention that the window is closed on the left and open on the right. In addition, considering the readability of the code, we decided to use the same code as in org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping.

       

      private long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
          long remainder = (timestamp - offset) % windowSize;
          // handle both positive and negative cases
          if (remainder < 0){
               return timestamp - (remainder + windowSize);
           }else {
               return timestamp - remainder;
           }
      } 


      In addition, in the process of modification, we found that the algorithm of getWindowStartWithOffset in org.apache.flink.table.runtime.operators.window.TimeWindow is the same as that in org.apache.flink.streaming.api.windowing.windows.TimeWindow. So it should cause the same problem. I think it should also be modified to support negative timestamps

      [FLINK-26334[datastream] Fix getWindowStartWithOffset in TimeWindow.java by realdengziqi · Pull Request #18982 · apache/flink (github.com)|https://github.com/apache/flink/pull/18982]

       

      Can we make a pull request?

              If the community deems it necessary to revise it, hopefully this task can be handed over to us. Our members are all students who have just graduated from school, and it is a great encouragement for us to contribute code to flink.

              Thank you so much!

              From Deng Ziqi & Lin Wanni & Guo Yuanfang

       



      reproduce

      /* output
      WindowStart: -15000    ExactSize:1    (a,-17000)
      WindowStart: -10000    ExactSize:1    (b,-12000)
      WindowStart: -5000 ExactSize:2    (c,-7000)
      WindowStart: -5000 ExactSize:2    (d,-2000)
      WindowStart: 0 ExactSize:1    (e,3000)
      WindowStart: 5000  ExactSize:1    (f,8000)
      WindowStart: 10000 ExactSize:1    (g,13000)
      WindowStart: 15000 ExactSize:1    (h,18000)
       */
      public class Example {
          public static void main(String[] args) throws Exception {
      
              final TimeZone timeZone = TimeZone.getTimeZone("GTM+0");
              TimeZone.setDefault(timeZone);
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env
                      .setParallelism(1)
                      .fromElements(
                              Tuple2.of("a",-17*1000L),
                              Tuple2.of("b",-12*1000L),
                              Tuple2.of("c",-7*1000L),
                              Tuple2.of("d",-2*1000L),
                              Tuple2.of("e",3*1000L),
                              Tuple2.of("f",8*1000L),
                              Tuple2.of("g",13*1000L),
                              Tuple2.of("h",18*1000L)
                      )
                      .assignTimestampsAndWatermarks(
                              WatermarkStrategy.<Tuple2<String,Long>>forMonotonousTimestamps()
                                      .withTimestampAssigner(
                                              new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                                  @Override
                                                  public long extractTimestamp(Tuple2<String, Long> element, long l) {
                                                      return element.f1;
                                                  }
                                              }
                                      )
                      )
                      .keyBy(r->1)
                      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                      .process(
                              new ProcessWindowFunction<Tuple2<String, Long>, String, Integer, TimeWindow>() {
                                  @Override
                                  public void process(Integer integer, ProcessWindowFunction<Tuple2<String, Long>, String, Integer, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
                                      for (Tuple2<String, Long> element : elements) {
                                          out.collect("WindowStart: "+context.window().getStart()
                                                  + "\tExactSize:" + elements.spliterator().getExactSizeIfKnown()+"\t"
                                                  + element
                                          );
                                      }
                                  }
                              }
                      )
                      .print();
              env.execute();
          }
      } 

      Attachments

        1. image-2022-03-04-11-28-26-616.png
          60 kB
          realdengziqi
        2. image-2022-03-04-11-37-10-035.png
          59 kB
          realdengziqi

        Activity

          People

            realdengziqi realdengziqi
            realdengziqi realdengziqi
            Votes:
            1 Vote for this issue
            Watchers:
            10 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - 3h
                3h
                Remaining:
                Remaining Estimate - 3h
                3h
                Logged:
                Time Spent - Not Specified
                Not Specified