Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9201

same merge window will be fired twice if watermark already passed the merge window

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 1.3.3
    • 1.5.0
    • None
    • None

    Description

      sum with session window,.suppose the session gap is 3 seconds and allowedlateness is 60 seconds

      • w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9
      •  if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 11.
      • w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first time by call triggerContext.onElement(element) because of the watermark pass the w3. w3 will be fired second times because of the timer < current watermark.

      that mean w3 will be fired  twice because of watermark pass the new merge window w3.

      Examples

      @Test
      @SuppressWarnings("unchecked")
      public void testSessionWindowsFiredTwice() throws Exception {
       closeCalled.set(0);
      
       final int sessionSize = 3;
      
       TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
      
       ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
       inputType.createSerializer(new ExecutionConfig()));
      
       WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
       EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
       new TimeWindow.Serializer(),
       new TupleKeySelector(),
       BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
       stateDesc,
       new InternalIterableWindowFunction<>(new SessionWindowFunction()),
       EventTimeTrigger.create(),
       60000,
       null /* late data output tag */);
      
       OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
       createTestHarness(operator);
      
       ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
      
       testHarness.open();
      
       // add elements out-of-order
       testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
       testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
       testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
      
       testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
       testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
       testHarness.processWatermark(new Watermark(5500));
       expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499));
       expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), 3999));
       expectedOutput.add(new Watermark(5500));
       // do a snapshot, close and restore again
       OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
      
       TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
       testHarness.close();
      
       testHarness = createTestHarness(operator);
       testHarness.setup();
       testHarness.initializeState(snapshot);
       testHarness.open();
       expectedOutput.clear();
       //suppose the watermark alread arrived 10000
       testHarness.processWatermark(new Watermark(10000));
       //late element with timestamp 4500 had arrived,the new session window[0, 7500] is still a valid window becase of maxtimestamp < cleantime
       //and fired immediately
       testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 4500));
       expectedOutput.add(new Watermark(10000));
       expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 7499));
       //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired again becase of a new timer had rigstered by call triggerOnMerge
       testHarness.processWatermark(new Watermark(11000));
       expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 7499));
       expectedOutput.add(new Watermark(11000));
       TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
       testHarness.close();
      }
      
      

       

       

       

      Attachments

        Issue Links

          Activity

            People

              yuemeng yuemeng
              yuemeng yuemeng
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: