Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
1.3.3
-
None
-
None
Description
sum with session window,.suppose the session gap is 3 seconds and allowedlateness is 60 seconds
- w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9
- if a late element (w2,TimeWindow[7,10]) had come but the watermark already at 11.
- w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first time by call triggerContext.onElement(element) because of the watermark pass the w3. w3 will be fired second times because of the timer < current watermark.
that mean w3 will be fired twice because of watermark pass the new merge window w3.
Examples
@Test @SuppressWarnings("unchecked") public void testSessionWindowsFiredTwice() throws Exception { closeCalled.set(0); final int sessionSize = 3; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents", inputType.createSerializer(new ExecutionConfig())); WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalIterableWindowFunction<>(new SessionWindowFunction()), EventTimeTrigger.create(), 60000, null /* late data output tag */); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = createTestHarness(operator); ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); testHarness.open(); // add elements out-of-order testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000)); testHarness.processWatermark(new Watermark(5500)); expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 5499)); expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), 3999)); expectedOutput.add(new Watermark(5500)); // do a snapshot, close and restore again OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); testHarness.close(); testHarness = createTestHarness(operator); testHarness.setup(); testHarness.initializeState(snapshot); testHarness.open(); expectedOutput.clear(); //suppose the watermark alread arrived 10000 testHarness.processWatermark(new Watermark(10000)); //late element with timestamp 4500 had arrived,the new session window[0, 7500] is still a valid window becase of maxtimestamp < cleantime //and fired immediately testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 4500)); expectedOutput.add(new Watermark(10000)); expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 7499)); //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired again becase of a new timer had rigstered by call triggerOnMerge testHarness.processWatermark(new Watermark(11000)); expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 7499)); expectedOutput.add(new Watermark(11000)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); testHarness.close(); }
Attachments
Issue Links
- links to