Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4552

Refactor WindowOperator/Trigger Tests

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.2.0, 1.3.0
    • Component/s: DataStream API
    • Labels:
      None

      Description

      Right now, tests for WindowOperator, WindowAssigner, Trigger and WindowFunction are all conflated in WindowOperatorTest. All of these test that a certain combination of a Trigger, WindowAssigner and WindowFunction produce the expected output.

      We should modularize these tests and spread them out across multiple files, possibly one per trigger, for the triggers. Also, we should extend/change the tests in some key ways:

      • WindowOperatorTest test should just verify that the interaction between WindowOperator and the various other parts works as expected, that the correct methods on Trigger and WindowFunction are called at the expected time and that snapshotting, timers, cleanup etc. work correctly. These tests should also verify that the different state types and WindowFunctions work correctly.
      • Trigger tests should present elements to triggers and verify that they fire at the correct times. The actual output of the WindowFunction is not important for these tests. We should also test that triggers correctly clean up state and timers.
      • WindowAssigner tests should test each window assigner and also verify that, for example, the offset parameter of time-based windows works correctly.

      There is already WindowingTestHarness but it is not used by tests, I think we can expand on that and provide more thorough test coverage while also making the tests more maintainable (WindowOperatorTest.java is nearing 3000 lines of code).

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user aljoscha opened a pull request:

          https://github.com/apache/flink/pull/2547

          FLINK-4552 Refactor WindowOperator/Trigger Tests

          Before `WindowOperatorTest` was more like an integration test. We would throw elements at `WindowOperator` and observe whether we got the correct output.

          The new `WindowOperatorTest` verifies the implicit contract between the windowing components. I also added explicit tests for `WindowAssigners` and `Triggers`. For triggers I added a new `TriggerTestHarness`. Also, I modified `HeapKeyedStateBackend` to allow querying how many entries there are for a given namespace, this allows testing whether trigger correctly clean up their state. I also added the new interfaces `TimerService` and `InternalTimerService` that should be used for all stuff concerning time and timers. For now, these are only used in the tests but they should be used in more components.

          R: @StefanRRichter and @kl0u for review.

          P.S. : I wanted to get this in before we get in the work on triggers/trigger DSL to ensure that we don't break things and to modularize the tests.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/aljoscha/flink window-test-refactor

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2547.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2547


          commit 90f6785d1038ebef8cd92965cd29a2d1da6e0370
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2016-09-05T10:01:11Z

          FLINK-4552 Refactor WindowOperator/Trigger Tests

          commit 140d85ffff3364f98e70539861a1c1bb51f39a1b
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2016-09-24T13:41:10Z

          Add WindowAssignerTests

          commit 56f33789e22f0bdc056be34e92c523d7965bbad4
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2016-09-25T16:39:47Z

          Add TriggerTests/TimerService


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2547 FLINK-4552 Refactor WindowOperator/Trigger Tests Before `WindowOperatorTest` was more like an integration test. We would throw elements at `WindowOperator` and observe whether we got the correct output. The new `WindowOperatorTest` verifies the implicit contract between the windowing components. I also added explicit tests for `WindowAssigners` and `Triggers`. For triggers I added a new `TriggerTestHarness`. Also, I modified `HeapKeyedStateBackend` to allow querying how many entries there are for a given namespace, this allows testing whether trigger correctly clean up their state. I also added the new interfaces `TimerService` and `InternalTimerService` that should be used for all stuff concerning time and timers. For now, these are only used in the tests but they should be used in more components. R: @StefanRRichter and @kl0u for review. P.S. : I wanted to get this in before we get in the work on triggers/trigger DSL to ensure that we don't break things and to modularize the tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink window-test-refactor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2547.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2547 commit 90f6785d1038ebef8cd92965cd29a2d1da6e0370 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2016-09-05T10:01:11Z FLINK-4552 Refactor WindowOperator/Trigger Tests commit 140d85ffff3364f98e70539861a1c1bb51f39a1b Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2016-09-24T13:41:10Z Add WindowAssignerTests commit 56f33789e22f0bdc056be34e92c523d7965bbad4 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2016-09-25T16:39:47Z Add TriggerTests/TimerService
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/2547

          There is a failing test:`TimestampsAndPeriodicWatermarksOperatorTest.testTimestampsAndPeriodicWatermarksOperator(TimestampsAndPeriodicWatermarksOperatorTest.java:74)`

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2547 There is a failing test:`TimestampsAndPeriodicWatermarksOperatorTest.testTimestampsAndPeriodicWatermarksOperator(TimestampsAndPeriodicWatermarksOperatorTest.java:74)`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/2547

          @zentol jip thanks, working on that

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2547 @zentol jip thanks, working on that
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/2547

          I'm closing this one in favor of a new version that is rebased on top of some stuff. I addressed the comments in that new version as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2547 I'm closing this one in favor of a new version that is rebased on top of some stuff. I addressed the comments in that new version as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha closed the pull request at:

          https://github.com/apache/flink/pull/2547

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/2547
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user aljoscha opened a pull request:

          https://github.com/apache/flink/pull/2572

          FLINK-4552 Refactor WindowOperator/Trigger Tests

          This builds on #2570

          Before, tests for WindowOperator, WindowAssigner, Trigger and WindowFunction
          were all conflated in WindowOperatorTest. All of these tested that a
          certain combination of a Trigger, WindowAssigner and WindowFunction produce
          the expected output.

          This change modularizes these tests and spreads them out across multiple
          files. For example, one per trigger/window assigner.

          The new WindowOperatorTest tests verify that the interaction between
          WindowOperator and the various other parts works as expected, that the
          correct methods on Trigger and WindowFunction are called at the expected
          time and that snapshotting, timers, cleanup etc. work correctly. These tests
          also verify that the different state types and WindowFunctions work correctly.

          For trigger tests this introduces TriggerTestHarness. This can be used
          to inject elements into Triggers they fire at the correct times. The
          actual output of the WindowFunction is not important for these tests.
          The new tests also make sure that triggers correctly clean up state and timers.

          WindowAssigner tests verify the behaviour of window assigners in isolation.
          They also test, for example, whether offset parameter of time-based windows
          work correctly.

          R: @StephanEwen @StefanRRichter @kl0u for review

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/aljoscha/flink window-test-refactor

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2572.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2572


          commit 1a09d9032bf5683a378a7fc8dc480f2d14c5924d
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2016-09-25T18:58:16Z

          Rename TimeServiceProvider to ProcessingTimeService

          The name is clashing with the soon-to-be-added
          TimerService/InternalTimerService which is meant as an interface for
          dealing with both processing time and event time.

          TimeServiceProvided is renamed to ProcessingTimeService to reflect the
          fact that it is a low-level utility that only deals with "physical"
          processing-time trigger tasks.

          commit 758827c3c8508ef9ef2ec079ff3a8469d0096ca8
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2016-09-28T13:10:35Z

          Use OperatorTestHarness and TestProcessingTimeService in Kafka Tests

          commit f6dd9c74dc2c58c4263fb6d084651b514898d47a
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2016-09-28T14:35:33Z

          Use Processing-Time Service of TestHarness in WindowOperatorTest

          Before, this was manually creating a TestProcessingTimeService, now,
          we're using the one that is there by default in
          OneInputStreamOperatorTestHarness.

          commit 65389d66c5586e6707b7a6bf48df512354fac085
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2016-09-28T14:43:40Z

          Refactor OperatorTestHarness to always use TestProcessingTimeService

          Before, this would allow handing in a custom ProcessingTimeService but
          this was in reality always TestProcessingTimeService.

          commit 1d013bcacc040552e5783c64d094ec309014457b
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2016-09-28T13:12:26Z

          Use TestHarness Processing-time Facility in BucketingSinkTest

          Before, this was manually creating a TestProcessingTimeService. Now we
          use the one that is there by default in
          OneInputStreamOperatorTestHarness.

          commit eaf3dd00fefeb2487c7cafff6337123cbe42874b
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2016-09-28T13:32:24Z

          Use OperatorTestHarness in AlignedWindowOperator Tests

          commit b597d2ef50c27554b83fddaff8873107265340d4
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2016-09-29T14:04:29Z

          Refactor Operator TestHarnesses to use Common Base Class

          This also introduces KeyedTwoInputStreamOperatorTestHarness which
          is similar to KeyedOneInputStreamOperatorTestHarness

          commit 58b16b26e07b6100f89e9deec63f0decb751f0e6
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2016-09-26T14:21:51Z

          FLINK-3674 Add an interface for Time aware User Functions

          This moves the event-time/processing-time trigger code from
          WindowOperator behind a well defined interface that can be used by
          operators (and user functions).

          InternalTimerService is the new interface that has the same
          functionality that WindowOperator used to have. TimerService is the user
          facing interface that does not allow dealing with namespaces/payloads
          and also does not allow deleting timers. There is a default
          implementation in HeapInternalTimerService that can checkpoint timers to
          a stream and also restore from a stream. Right now, this is managed in
          AbstractStreamOperator and operators can ask for an
          InternalTimerService.

          This also adds tests for HeapInternalTimerService.

          This adds two new user functions:

          • TimelyFlatMapFunction: an extension of FlatMapFunction that also
            allows querying time and setting timers
          • TimelyCoFlatMapFunction: the same, but for CoFlatMapFunction

          There are two new StreamOperator implementations for these that use the
          InternalTimerService interface.

          This also adds tests for the two new operators.

          This also adds the new interface KeyContext that is used for
          setting/querying the current key context for state and timers. Timers
          are always scoped to a key, for now.

          Also, this moves the handling of watermarks for both one-input and
          two-input operators to AbstractStreamOperators so that we have a central
          ground-truth.

          commit e351b4a409e50c53645ebf7bdbec263148fa956b
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2016-09-05T10:01:11Z

          FLINK-4552 Refactor WindowOperator/Trigger Tests

          Before, tests for WindowOperator, WindowAssigner, Trigger and WindowFunction
          were all conflated in WindowOperatorTest. All of these tested that a
          certain combination of a Trigger, WindowAssigner and WindowFunction produce
          the expected output.

          This change modularizes these tests and spreads them out across multiple
          files. For example, one per trigger/window assigner.

          The new WindowOperatorTest tests verify that the interaction between
          WindowOperator and the various other parts works as expected, that the
          correct methods on Trigger and WindowFunction are called at the expected
          time and that snapshotting, timers, cleanup etc. work correctly. These tests
          also verify that the different state types and WindowFunctions work correctly.

          For trigger tests this introduces TriggerTestHarness. This can be used
          to inject elements into Triggers they fire at the correct times. The
          actual output of the WindowFunction is not important for these tests.
          The new tests also make sure that triggers correctly clean up state and timers.

          WindowAssigner tests verify the behaviour of window assigners in isolation.
          They also test, for example, whether offset parameter of time-based windows
          work correctly.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2572 FLINK-4552 Refactor WindowOperator/Trigger Tests This builds on #2570 Before, tests for WindowOperator, WindowAssigner, Trigger and WindowFunction were all conflated in WindowOperatorTest. All of these tested that a certain combination of a Trigger, WindowAssigner and WindowFunction produce the expected output. This change modularizes these tests and spreads them out across multiple files. For example, one per trigger/window assigner. The new WindowOperatorTest tests verify that the interaction between WindowOperator and the various other parts works as expected, that the correct methods on Trigger and WindowFunction are called at the expected time and that snapshotting, timers, cleanup etc. work correctly. These tests also verify that the different state types and WindowFunctions work correctly. For trigger tests this introduces TriggerTestHarness. This can be used to inject elements into Triggers they fire at the correct times. The actual output of the WindowFunction is not important for these tests. The new tests also make sure that triggers correctly clean up state and timers. WindowAssigner tests verify the behaviour of window assigners in isolation. They also test, for example, whether offset parameter of time-based windows work correctly. R: @StephanEwen @StefanRRichter @kl0u for review You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink window-test-refactor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2572.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2572 commit 1a09d9032bf5683a378a7fc8dc480f2d14c5924d Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2016-09-25T18:58:16Z Rename TimeServiceProvider to ProcessingTimeService The name is clashing with the soon-to-be-added TimerService/InternalTimerService which is meant as an interface for dealing with both processing time and event time. TimeServiceProvided is renamed to ProcessingTimeService to reflect the fact that it is a low-level utility that only deals with "physical" processing-time trigger tasks. commit 758827c3c8508ef9ef2ec079ff3a8469d0096ca8 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2016-09-28T13:10:35Z Use OperatorTestHarness and TestProcessingTimeService in Kafka Tests commit f6dd9c74dc2c58c4263fb6d084651b514898d47a Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2016-09-28T14:35:33Z Use Processing-Time Service of TestHarness in WindowOperatorTest Before, this was manually creating a TestProcessingTimeService, now, we're using the one that is there by default in OneInputStreamOperatorTestHarness. commit 65389d66c5586e6707b7a6bf48df512354fac085 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2016-09-28T14:43:40Z Refactor OperatorTestHarness to always use TestProcessingTimeService Before, this would allow handing in a custom ProcessingTimeService but this was in reality always TestProcessingTimeService. commit 1d013bcacc040552e5783c64d094ec309014457b Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2016-09-28T13:12:26Z Use TestHarness Processing-time Facility in BucketingSinkTest Before, this was manually creating a TestProcessingTimeService. Now we use the one that is there by default in OneInputStreamOperatorTestHarness. commit eaf3dd00fefeb2487c7cafff6337123cbe42874b Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2016-09-28T13:32:24Z Use OperatorTestHarness in AlignedWindowOperator Tests commit b597d2ef50c27554b83fddaff8873107265340d4 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2016-09-29T14:04:29Z Refactor Operator TestHarnesses to use Common Base Class This also introduces KeyedTwoInputStreamOperatorTestHarness which is similar to KeyedOneInputStreamOperatorTestHarness commit 58b16b26e07b6100f89e9deec63f0decb751f0e6 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2016-09-26T14:21:51Z FLINK-3674 Add an interface for Time aware User Functions This moves the event-time/processing-time trigger code from WindowOperator behind a well defined interface that can be used by operators (and user functions). InternalTimerService is the new interface that has the same functionality that WindowOperator used to have. TimerService is the user facing interface that does not allow dealing with namespaces/payloads and also does not allow deleting timers. There is a default implementation in HeapInternalTimerService that can checkpoint timers to a stream and also restore from a stream. Right now, this is managed in AbstractStreamOperator and operators can ask for an InternalTimerService. This also adds tests for HeapInternalTimerService. This adds two new user functions: TimelyFlatMapFunction: an extension of FlatMapFunction that also allows querying time and setting timers TimelyCoFlatMapFunction: the same, but for CoFlatMapFunction There are two new StreamOperator implementations for these that use the InternalTimerService interface. This also adds tests for the two new operators. This also adds the new interface KeyContext that is used for setting/querying the current key context for state and timers. Timers are always scoped to a key, for now. Also, this moves the handling of watermarks for both one-input and two-input operators to AbstractStreamOperators so that we have a central ground-truth. commit e351b4a409e50c53645ebf7bdbec263148fa956b Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2016-09-05T10:01:11Z FLINK-4552 Refactor WindowOperator/Trigger Tests Before, tests for WindowOperator, WindowAssigner, Trigger and WindowFunction were all conflated in WindowOperatorTest. All of these tested that a certain combination of a Trigger, WindowAssigner and WindowFunction produce the expected output. This change modularizes these tests and spreads them out across multiple files. For example, one per trigger/window assigner. The new WindowOperatorTest tests verify that the interaction between WindowOperator and the various other parts works as expected, that the correct methods on Trigger and WindowFunction are called at the expected time and that snapshotting, timers, cleanup etc. work correctly. These tests also verify that the different state types and WindowFunctions work correctly. For trigger tests this introduces TriggerTestHarness. This can be used to inject elements into Triggers they fire at the correct times. The actual output of the WindowFunction is not important for these tests. The new tests also make sure that triggers correctly clean up state and timers. WindowAssigner tests verify the behaviour of window assigners in isolation. They also test, for example, whether offset parameter of time-based windows work correctly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/2572

          R: @StefanRRichter @kl0u , this sits on top of #2570 so you can review that one first and then this one additional commit here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2572 R: @StefanRRichter @kl0u , this sits on top of #2570 so you can review that one first and then this one additional commit here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2572#discussion_r85359406

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java —
          @@ -0,0 +1,123 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.runtime.operators.windowing;
          +
          +import com.google.common.collect.Lists;
          +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
          +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
          +import org.junit.Test;
          +
          +import static org.junit.Assert.assertEquals;
          +
          +/**
          + * Tests for

          {@link CountTrigger}

          .
          + */
          +public class CountTriggerTest {
          +
          + /**
          + * Verify that state of separate windows does not leak into other windows.
          + */
          + @Test
          + public void testWindowSeparationAndFiring() throws Exception

          { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // shouldn't have any timers + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // right now, CountTrigger will clear it's state in onElement when firing + // ideally, this should be moved to onFire() + assertEquals(1, testHarness.numStateEntries()); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + }

          — End diff –

          You could also let W(2,4) fire for completeness, so that the count was not reset by the previous firing.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2572#discussion_r85359406 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java — @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import com.google.common.collect.Lists; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link CountTrigger} . + */ +public class CountTriggerTest { + + /** + * Verify that state of separate windows does not leak into other windows. + */ + @Test + public void testWindowSeparationAndFiring() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // shouldn't have any timers + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // right now, CountTrigger will clear it's state in onElement when firing + // ideally, this should be moved to onFire() + assertEquals(1, testHarness.numStateEntries()); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + } — End diff – You could also let W(2,4) fire for completeness, so that the count was not reset by the previous firing.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2572#discussion_r85360968

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java —
          @@ -0,0 +1,149 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.runtime.operators.windowing;
          +
          +import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
          +import org.apache.flink.streaming.api.windowing.triggers.Trigger;
          +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
          +import org.junit.Test;
          +import org.mockito.Matchers;
          +import org.mockito.invocation.InvocationOnMock;
          +import org.mockito.stubbing.Answer;
          +
          +import java.lang.reflect.Method;
          +import java.util.Collections;
          +
          +import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyOnMergeContext;
          +import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyTimeWindow;
          +import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyTriggerContext;
          +import static org.junit.Assert.assertEquals;
          +import static org.mockito.Matchers.anyLong;
          +import static org.mockito.Mockito.*;
          +
          +/**
          + * Tests for

          {@link PurgingTrigger}.
          + */
          +public class PurgingTriggerTest {
          +
          + /**
          + * Check if {@link PurgingTrigger}

          implements all methods of

          {@link Trigger}

          , as a sanity
          + * check.
          + */
          + @Test
          + public void testAllMethodsImplemented() throws NoSuchMethodException {
          — End diff –

          What is the purpose of this and how is it different from checking that PurgingTrigger is not abstract?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2572#discussion_r85360968 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java — @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.lang.reflect.Method; +import java.util.Collections; + +import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyOnMergeContext; +import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyTimeWindow; +import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyTriggerContext; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.*; + +/** + * Tests for {@link PurgingTrigger}. + */ +public class PurgingTriggerTest { + + /** + * Check if {@link PurgingTrigger} implements all methods of {@link Trigger} , as a sanity + * check. + */ + @Test + public void testAllMethodsImplemented() throws NoSuchMethodException { — End diff – What is the purpose of this and how is it different from checking that PurgingTrigger is not abstract?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2572#discussion_r85360067

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsWindowsTest.java —
          @@ -0,0 +1,175 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + * <p>
          + * http://www.apache.org/licenses/LICENSE-2.0
          + * <p>
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.runtime.operators.windowing;
          +
          +
          +import com.google.common.collect.Lists;
          +import org.apache.flink.api.common.ExecutionConfig;
          +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          +import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
          +import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
          +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
          +import org.apache.flink.streaming.api.windowing.time.Time;
          +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
          +import org.apache.flink.util.TestLogger;
          +import org.junit.Test;
          +import org.mockito.Matchers;
          +
          +import java.util.Collection;
          +
          +import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
          +import static org.hamcrest.CoreMatchers.containsString;
          +import static org.hamcrest.Matchers.contains;
          +import static org.hamcrest.Matchers.containsInAnyOrder;
          +import static org.hamcrest.Matchers.instanceOf;
          +import static org.junit.Assert.*;
          +import static org.mockito.Matchers.anyCollection;
          +import static org.mockito.Mockito.*;
          +
          +/**
          + * Tests for

          {@link EventTimeSessionWindows}

          + */
          +public class EventTimeSessionWindowsWindowsTest extends TestLogger {
          +
          + @Test
          + public void testWindowAssignment() {
          + WindowAssigner.WindowAssignerContext mockContext =
          + mock(WindowAssigner.WindowAssignerContext.class);
          +
          + EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(5000));
          — End diff –

          Very minor, but I think a named constant for the value 5000 might make the contracts even clearer (e.g. int gap = 5000) and then do (+ gap) to window start time to obtain end time.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2572#discussion_r85360067 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsWindowsTest.java — @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + + +import com.google.common.collect.Lists; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; +import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import org.mockito.Matchers; + +import java.util.Collection; + +import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.*; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Mockito.*; + +/** + * Tests for {@link EventTimeSessionWindows} + */ +public class EventTimeSessionWindowsWindowsTest extends TestLogger { + + @Test + public void testWindowAssignment() { + WindowAssigner.WindowAssignerContext mockContext = + mock(WindowAssigner.WindowAssignerContext.class); + + EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(5000)); — End diff – Very minor, but I think a named constant for the value 5000 might make the contracts even clearer (e.g. int gap = 5000) and then do (+ gap) to window start time to obtain end time.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2572#discussion_r85359630

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java —
          @@ -0,0 +1,123 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.runtime.operators.windowing;
          +
          +import com.google.common.collect.Lists;
          +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
          +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
          +import org.junit.Test;
          +
          +import static org.junit.Assert.assertEquals;
          +
          +/**
          + * Tests for

          {@link CountTrigger}

          .
          + */
          +public class CountTriggerTest {
          +
          + /**
          + * Verify that state of separate windows does not leak into other windows.
          + */
          + @Test
          + public void testWindowSeparationAndFiring() throws Exception

          { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // shouldn't have any timers + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // right now, CountTrigger will clear it's state in onElement when firing + // ideally, this should be moved to onFire() + assertEquals(1, testHarness.numStateEntries()); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + }

          +
          + /**
          + * Verify that clear() does not leak across windows.
          + */
          + @Test
          + public void testClear() throws Exception

          { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // shouldn't have any timers + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + + testHarness.clearTriggerState(new TimeWindow(2, 4)); + + assertEquals(1, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))); + + testHarness.clearTriggerState(new TimeWindow(0, 2)); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))); + }

          +
          + @Test
          + public void testMergingWindows() throws Exception {
          + TriggerTestHarness<Object, TimeWindow> testHarness =
          + new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer());
          +
          + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
          + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
          +
          + // shouldn't have any timers
          + assertEquals(0, testHarness.numProcessingTimeTimers());
          + assertEquals(0, testHarness.numEventTimeTimers());
          +
          + assertEquals(2, testHarness.numStateEntries());
          + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2)));
          + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4)));
          +
          — End diff –

          You could test this a little bit more, e.g. with a wider window that also subsumes both, or 3 windows and just merging two, etc. to catch corner cases

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2572#discussion_r85359630 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java — @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import com.google.common.collect.Lists; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link CountTrigger} . + */ +public class CountTriggerTest { + + /** + * Verify that state of separate windows does not leak into other windows. + */ + @Test + public void testWindowSeparationAndFiring() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // shouldn't have any timers + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // right now, CountTrigger will clear it's state in onElement when firing + // ideally, this should be moved to onFire() + assertEquals(1, testHarness.numStateEntries()); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + } + + /** + * Verify that clear() does not leak across windows. + */ + @Test + public void testClear() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // shouldn't have any timers + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + + testHarness.clearTriggerState(new TimeWindow(2, 4)); + + assertEquals(1, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))); + + testHarness.clearTriggerState(new TimeWindow(0, 2)); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))); + } + + @Test + public void testMergingWindows() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // shouldn't have any timers + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + — End diff – You could test this a little bit more, e.g. with a wider window that also subsumes both, or 3 windows and just merging two, etc. to catch corner cases
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2572#discussion_r85361208

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java —
          @@ -0,0 +1,369 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.runtime.operators.windowing;
          +
          +import org.apache.flink.api.common.JobID;
          +import org.apache.flink.api.common.state.MergingState;
          +import org.apache.flink.api.common.state.State;
          +import org.apache.flink.api.common.state.StateDescriptor;
          +import org.apache.flink.api.common.state.ValueState;
          +import org.apache.flink.api.common.state.ValueStateDescriptor;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.api.common.typeutils.base.IntSerializer;
          +import org.apache.flink.api.java.tuple.Tuple2;
          +import org.apache.flink.metrics.MetricGroup;
          +import org.apache.flink.runtime.jobgraph.JobVertexID;
          +import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
          +import org.apache.flink.runtime.query.KvStateRegistry;
          +import org.apache.flink.runtime.state.KeyGroupRange;
          +import org.apache.flink.runtime.state.KeyedStateBackend;
          +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
          +import org.apache.flink.runtime.state.memory.MemoryStateBackend;
          +import org.apache.flink.streaming.api.operators.KeyContext;
          +import org.apache.flink.streaming.api.operators.TestInternalTimerService;
          +import org.apache.flink.streaming.api.operators.InternalTimerService;
          +import org.apache.flink.streaming.api.windowing.triggers.Trigger;
          +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
          +import org.apache.flink.streaming.api.windowing.windows.Window;
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
          +
          +import java.io.Serializable;
          +import java.util.ArrayList;
          +import java.util.Collection;
          +
          +/**
          + * Utility for testing

          {@link Trigger}

          behaviour.
          + */
          +public class TriggerTestHarness<T, W extends Window> {
          +
          + private static final Integer KEY = 1;
          +
          + private final Trigger<T, W> trigger;
          + private final TypeSerializer<W> windowSerializer;
          +
          + private final HeapKeyedStateBackend<Integer> stateBackend;
          + private final TestInternalTimerService<Integer, W> internalTimerService;
          +
          + public TriggerTestHarness(
          + Trigger<T, W> trigger,
          + TypeSerializer<W> windowSerializer) throws Exception {
          + this.trigger = trigger;
          + this.windowSerializer = windowSerializer;
          +
          + // we only ever use one key, other tests make sure that windows work across different
          + // keys
          + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
          + MemoryStateBackend backend = new MemoryStateBackend();
          +
          + @SuppressWarnings("unchecked")
          + HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(dummyEnv,
          + new JobID(),
          + "test_op",
          + IntSerializer.INSTANCE,
          + 1,
          + new KeyGroupRange(0, 0),
          + new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
          + this.stateBackend = stateBackend;
          +
          + this.stateBackend.setCurrentKey(0);
          +
          + this.internalTimerService = new TestInternalTimerService<>(new KeyContext() {
          + @Override
          + public void setCurrentKey(Object key)

          { + // ignore + }

          +
          + @Override
          + public Object getCurrentKey()

          { + return KEY; + }

          + });
          + }
          +
          + public int numProcessingTimeTimers()

          { + return internalTimerService.numProcessingTimeTimers(); + }

          +
          + public int numProcessingTimeTimers(W window)

          { + return internalTimerService.numProcessingTimeTimers(window); + }

          +
          + public int numEventTimeTimers()

          { + return internalTimerService.numEventTimeTimers(); + }

          +
          + public int numEventTimeTimers(W window)

          { + return internalTimerService.numEventTimeTimers(window); + }

          +
          + public int numStateEntries()

          { + return stateBackend.numStateEntries(); + }

          +
          + public int numStateEntries(W window)

          { + return stateBackend.numStateEntries(window); + }

          +
          + /**
          + * Injects one element into the trigger for the given window and returns the result of
          + *

          {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)}

          + */
          + public TriggerResult processElement(StreamRecord<T> element, W window) throws Exception

          { + TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>( + KEY, + window, + internalTimerService, + stateBackend, + windowSerializer); + return trigger.onElement(element.getValue(), element.getTimestamp(), window, triggerContext); + }

          +
          + /**
          + * Advanced processing time and checks whether we have exactly one firing for the given
          + * window. The result of

          {@link Trigger#onProcessingTime(long, Window, Trigger.TriggerContext)}

          + * is returned for that firing.
          + */
          + public TriggerResult advanceProcessingTime(long time, W window) throws Exception {
          + Collection<Tuple2<W, TriggerResult>> firings = advanceProcessingTime(time);
          +
          + if (firings.size() != 1)

          { + throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings); + }

          +
          + Tuple2<W, TriggerResult> firing = firings.iterator().next();
          — End diff –

          I don't think this checks EXACTLY one, but only AT LEAST one.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2572#discussion_r85361208 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java — @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.MergingState; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.api.operators.TestInternalTimerService; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; + +/** + * Utility for testing {@link Trigger} behaviour. + */ +public class TriggerTestHarness<T, W extends Window> { + + private static final Integer KEY = 1; + + private final Trigger<T, W> trigger; + private final TypeSerializer<W> windowSerializer; + + private final HeapKeyedStateBackend<Integer> stateBackend; + private final TestInternalTimerService<Integer, W> internalTimerService; + + public TriggerTestHarness( + Trigger<T, W> trigger, + TypeSerializer<W> windowSerializer) throws Exception { + this.trigger = trigger; + this.windowSerializer = windowSerializer; + + // we only ever use one key, other tests make sure that windows work across different + // keys + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + MemoryStateBackend backend = new MemoryStateBackend(); + + @SuppressWarnings("unchecked") + HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID())); + this.stateBackend = stateBackend; + + this.stateBackend.setCurrentKey(0); + + this.internalTimerService = new TestInternalTimerService<>(new KeyContext() { + @Override + public void setCurrentKey(Object key) { + // ignore + } + + @Override + public Object getCurrentKey() { + return KEY; + } + }); + } + + public int numProcessingTimeTimers() { + return internalTimerService.numProcessingTimeTimers(); + } + + public int numProcessingTimeTimers(W window) { + return internalTimerService.numProcessingTimeTimers(window); + } + + public int numEventTimeTimers() { + return internalTimerService.numEventTimeTimers(); + } + + public int numEventTimeTimers(W window) { + return internalTimerService.numEventTimeTimers(window); + } + + public int numStateEntries() { + return stateBackend.numStateEntries(); + } + + public int numStateEntries(W window) { + return stateBackend.numStateEntries(window); + } + + /** + * Injects one element into the trigger for the given window and returns the result of + * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)} + */ + public TriggerResult processElement(StreamRecord<T> element, W window) throws Exception { + TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>( + KEY, + window, + internalTimerService, + stateBackend, + windowSerializer); + return trigger.onElement(element.getValue(), element.getTimestamp(), window, triggerContext); + } + + /** + * Advanced processing time and checks whether we have exactly one firing for the given + * window. The result of {@link Trigger#onProcessingTime(long, Window, Trigger.TriggerContext)} + * is returned for that firing. + */ + public TriggerResult advanceProcessingTime(long time, W window) throws Exception { + Collection<Tuple2<W, TriggerResult>> firings = advanceProcessingTime(time); + + if (firings.size() != 1) { + throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings); + } + + Tuple2<W, TriggerResult> firing = firings.iterator().next(); — End diff – I don't think this checks EXACTLY one, but only AT LEAST one.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2572#discussion_r85360454

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java —
          @@ -0,0 +1,153 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.runtime.operators.windowing;
          +
          +import com.google.common.collect.Lists;
          +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
          +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
          +import org.junit.Test;
          +
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertTrue;
          +
          +/**
          + * Tests for

          {@link EventTimeTrigger}

          .
          + */
          +public class EventTimeTriggerTest {
          +
          + /**
          + * Verify that state of separate windows does not leak into other windows.
          + */
          + @Test
          + public void testWindowSeparationAndFiring() throws Exception {
          + TriggerTestHarness<Object, TimeWindow> testHarness =
          + new TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer());
          +
          + // inject several elements
          + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
          + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
          + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
          + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
          + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
          +
          + assertEquals(0, testHarness.numStateEntries());
          + assertEquals(0, testHarness.numProcessingTimeTimers());
          + assertEquals(2, testHarness.numEventTimeTimers());
          + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
          + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
          +
          + assertEquals(TriggerResult.FIRE, testHarness.advanceWatermark(2, new TimeWindow(0, 2)));
          +
          + assertEquals(0, testHarness.numStateEntries());
          + assertEquals(0, testHarness.numProcessingTimeTimers());
          + assertEquals(1, testHarness.numEventTimeTimers());
          + assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
          + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
          +
          + assertEquals(TriggerResult.FIRE, testHarness.advanceWatermark(4, new TimeWindow(2, 4)));
          +
          — End diff –

          We could also check that multiple trigger fire when a watermark surpassed them all at once (and maybe not all if we use 3 trigger for this), to catch corner cases.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2572#discussion_r85360454 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java — @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import com.google.common.collect.Lists; +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link EventTimeTrigger} . + */ +public class EventTimeTriggerTest { + + /** + * Verify that state of separate windows does not leak into other windows. + */ + @Test + public void testWindowSeparationAndFiring() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer()); + + // inject several elements + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(2, testHarness.numEventTimeTimers()); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + + assertEquals(TriggerResult.FIRE, testHarness.advanceWatermark(2, new TimeWindow(0, 2))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(1, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + + assertEquals(TriggerResult.FIRE, testHarness.advanceWatermark(4, new TimeWindow(2, 4))); + — End diff – We could also check that multiple trigger fire when a watermark surpassed them all at once (and maybe not all if we use 3 trigger for this), to catch corner cases.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2572#discussion_r85361303

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java —
          @@ -0,0 +1,369 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.runtime.operators.windowing;
          +
          +import org.apache.flink.api.common.JobID;
          +import org.apache.flink.api.common.state.MergingState;
          +import org.apache.flink.api.common.state.State;
          +import org.apache.flink.api.common.state.StateDescriptor;
          +import org.apache.flink.api.common.state.ValueState;
          +import org.apache.flink.api.common.state.ValueStateDescriptor;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.api.common.typeutils.base.IntSerializer;
          +import org.apache.flink.api.java.tuple.Tuple2;
          +import org.apache.flink.metrics.MetricGroup;
          +import org.apache.flink.runtime.jobgraph.JobVertexID;
          +import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
          +import org.apache.flink.runtime.query.KvStateRegistry;
          +import org.apache.flink.runtime.state.KeyGroupRange;
          +import org.apache.flink.runtime.state.KeyedStateBackend;
          +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
          +import org.apache.flink.runtime.state.memory.MemoryStateBackend;
          +import org.apache.flink.streaming.api.operators.KeyContext;
          +import org.apache.flink.streaming.api.operators.TestInternalTimerService;
          +import org.apache.flink.streaming.api.operators.InternalTimerService;
          +import org.apache.flink.streaming.api.windowing.triggers.Trigger;
          +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
          +import org.apache.flink.streaming.api.windowing.windows.Window;
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
          +
          +import java.io.Serializable;
          +import java.util.ArrayList;
          +import java.util.Collection;
          +
          +/**
          + * Utility for testing

          {@link Trigger}

          behaviour.
          + */
          +public class TriggerTestHarness<T, W extends Window> {
          +
          + private static final Integer KEY = 1;
          +
          + private final Trigger<T, W> trigger;
          + private final TypeSerializer<W> windowSerializer;
          +
          + private final HeapKeyedStateBackend<Integer> stateBackend;
          + private final TestInternalTimerService<Integer, W> internalTimerService;
          +
          + public TriggerTestHarness(
          + Trigger<T, W> trigger,
          + TypeSerializer<W> windowSerializer) throws Exception {
          + this.trigger = trigger;
          + this.windowSerializer = windowSerializer;
          +
          + // we only ever use one key, other tests make sure that windows work across different
          + // keys
          + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
          + MemoryStateBackend backend = new MemoryStateBackend();
          +
          + @SuppressWarnings("unchecked")
          + HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(dummyEnv,
          + new JobID(),
          + "test_op",
          + IntSerializer.INSTANCE,
          + 1,
          + new KeyGroupRange(0, 0),
          + new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
          + this.stateBackend = stateBackend;
          +
          + this.stateBackend.setCurrentKey(0);
          +
          + this.internalTimerService = new TestInternalTimerService<>(new KeyContext() {
          + @Override
          + public void setCurrentKey(Object key)

          { + // ignore + }

          +
          + @Override
          + public Object getCurrentKey()

          { + return KEY; + }

          + });
          + }
          +
          + public int numProcessingTimeTimers()

          { + return internalTimerService.numProcessingTimeTimers(); + }

          +
          + public int numProcessingTimeTimers(W window)

          { + return internalTimerService.numProcessingTimeTimers(window); + }

          +
          + public int numEventTimeTimers()

          { + return internalTimerService.numEventTimeTimers(); + }

          +
          + public int numEventTimeTimers(W window)

          { + return internalTimerService.numEventTimeTimers(window); + }

          +
          + public int numStateEntries()

          { + return stateBackend.numStateEntries(); + }

          +
          + public int numStateEntries(W window)

          { + return stateBackend.numStateEntries(window); + }

          +
          + /**
          + * Injects one element into the trigger for the given window and returns the result of
          + *

          {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)}

          + */
          + public TriggerResult processElement(StreamRecord<T> element, W window) throws Exception

          { + TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>( + KEY, + window, + internalTimerService, + stateBackend, + windowSerializer); + return trigger.onElement(element.getValue(), element.getTimestamp(), window, triggerContext); + }

          +
          + /**
          + * Advanced processing time and checks whether we have exactly one firing for the given
          + * window. The result of

          {@link Trigger#onProcessingTime(long, Window, Trigger.TriggerContext)}

          + * is returned for that firing.
          + */
          + public TriggerResult advanceProcessingTime(long time, W window) throws Exception {
          + Collection<Tuple2<W, TriggerResult>> firings = advanceProcessingTime(time);
          +
          + if (firings.size() != 1)

          { + throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings); + }

          +
          + Tuple2<W, TriggerResult> firing = firings.iterator().next();
          +
          + if (!firing.f0.equals(window))

          { + throw new IllegalStateException("Trigger fired for another window."); + }

          +
          + return firing.f1;
          + }
          +
          + /**
          + * Advanced the watermark and checks whether we have exactly one firing for the given
          + * window. The result of

          {@link Trigger#onEventTime(long, Window, Trigger.TriggerContext)}

          + * is returned for that firing.
          + */
          + public TriggerResult advanceWatermark(long time, W window) throws Exception {
          + Collection<Tuple2<W, TriggerResult>> firings = advanceWatermark(time);
          +
          + if (firings.size() != 1)

          { + throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings); + }

          +
          + Tuple2<W, TriggerResult> firing = firings.iterator().next();
          — End diff –

          Same here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2572#discussion_r85361303 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java — @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.MergingState; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.api.operators.TestInternalTimerService; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; + +/** + * Utility for testing {@link Trigger} behaviour. + */ +public class TriggerTestHarness<T, W extends Window> { + + private static final Integer KEY = 1; + + private final Trigger<T, W> trigger; + private final TypeSerializer<W> windowSerializer; + + private final HeapKeyedStateBackend<Integer> stateBackend; + private final TestInternalTimerService<Integer, W> internalTimerService; + + public TriggerTestHarness( + Trigger<T, W> trigger, + TypeSerializer<W> windowSerializer) throws Exception { + this.trigger = trigger; + this.windowSerializer = windowSerializer; + + // we only ever use one key, other tests make sure that windows work across different + // keys + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + MemoryStateBackend backend = new MemoryStateBackend(); + + @SuppressWarnings("unchecked") + HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID())); + this.stateBackend = stateBackend; + + this.stateBackend.setCurrentKey(0); + + this.internalTimerService = new TestInternalTimerService<>(new KeyContext() { + @Override + public void setCurrentKey(Object key) { + // ignore + } + + @Override + public Object getCurrentKey() { + return KEY; + } + }); + } + + public int numProcessingTimeTimers() { + return internalTimerService.numProcessingTimeTimers(); + } + + public int numProcessingTimeTimers(W window) { + return internalTimerService.numProcessingTimeTimers(window); + } + + public int numEventTimeTimers() { + return internalTimerService.numEventTimeTimers(); + } + + public int numEventTimeTimers(W window) { + return internalTimerService.numEventTimeTimers(window); + } + + public int numStateEntries() { + return stateBackend.numStateEntries(); + } + + public int numStateEntries(W window) { + return stateBackend.numStateEntries(window); + } + + /** + * Injects one element into the trigger for the given window and returns the result of + * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)} + */ + public TriggerResult processElement(StreamRecord<T> element, W window) throws Exception { + TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>( + KEY, + window, + internalTimerService, + stateBackend, + windowSerializer); + return trigger.onElement(element.getValue(), element.getTimestamp(), window, triggerContext); + } + + /** + * Advanced processing time and checks whether we have exactly one firing for the given + * window. The result of {@link Trigger#onProcessingTime(long, Window, Trigger.TriggerContext)} + * is returned for that firing. + */ + public TriggerResult advanceProcessingTime(long time, W window) throws Exception { + Collection<Tuple2<W, TriggerResult>> firings = advanceProcessingTime(time); + + if (firings.size() != 1) { + throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings); + } + + Tuple2<W, TriggerResult> firing = firings.iterator().next(); + + if (!firing.f0.equals(window)) { + throw new IllegalStateException("Trigger fired for another window."); + } + + return firing.f1; + } + + /** + * Advanced the watermark and checks whether we have exactly one firing for the given + * window. The result of {@link Trigger#onEventTime(long, Window, Trigger.TriggerContext)} + * is returned for that firing. + */ + public TriggerResult advanceWatermark(long time, W window) throws Exception { + Collection<Tuple2<W, TriggerResult>> firings = advanceWatermark(time); + + if (firings.size() != 1) { + throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings); + } + + Tuple2<W, TriggerResult> firing = firings.iterator().next(); — End diff – Same here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2572#discussion_r85497817

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java —
          @@ -0,0 +1,123 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.runtime.operators.windowing;
          +
          +import com.google.common.collect.Lists;
          +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
          +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
          +import org.junit.Test;
          +
          +import static org.junit.Assert.assertEquals;
          +
          +/**
          + * Tests for

          {@link CountTrigger}

          .
          + */
          +public class CountTriggerTest {
          +
          + /**
          + * Verify that state of separate windows does not leak into other windows.
          + */
          + @Test
          + public void testWindowSeparationAndFiring() throws Exception

          { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // shouldn't have any timers + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // right now, CountTrigger will clear it's state in onElement when firing + // ideally, this should be moved to onFire() + assertEquals(1, testHarness.numStateEntries()); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + }

          — End diff –

          Done

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2572#discussion_r85497817 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java — @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import com.google.common.collect.Lists; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link CountTrigger} . + */ +public class CountTriggerTest { + + /** + * Verify that state of separate windows does not leak into other windows. + */ + @Test + public void testWindowSeparationAndFiring() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // shouldn't have any timers + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // right now, CountTrigger will clear it's state in onElement when firing + // ideally, this should be moved to onFire() + assertEquals(1, testHarness.numStateEntries()); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + } — End diff – Done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2572#discussion_r85498801

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java —
          @@ -0,0 +1,123 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.runtime.operators.windowing;
          +
          +import com.google.common.collect.Lists;
          +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
          +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
          +import org.junit.Test;
          +
          +import static org.junit.Assert.assertEquals;
          +
          +/**
          + * Tests for

          {@link CountTrigger}

          .
          + */
          +public class CountTriggerTest {
          +
          + /**
          + * Verify that state of separate windows does not leak into other windows.
          + */
          + @Test
          + public void testWindowSeparationAndFiring() throws Exception

          { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // shouldn't have any timers + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // right now, CountTrigger will clear it's state in onElement when firing + // ideally, this should be moved to onFire() + assertEquals(1, testHarness.numStateEntries()); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + }

          +
          + /**
          + * Verify that clear() does not leak across windows.
          + */
          + @Test
          + public void testClear() throws Exception

          { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // shouldn't have any timers + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + + testHarness.clearTriggerState(new TimeWindow(2, 4)); + + assertEquals(1, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))); + + testHarness.clearTriggerState(new TimeWindow(0, 2)); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))); + }

          +
          + @Test
          + public void testMergingWindows() throws Exception {
          + TriggerTestHarness<Object, TimeWindow> testHarness =
          + new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer());
          +
          + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
          + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
          +
          + // shouldn't have any timers
          + assertEquals(0, testHarness.numProcessingTimeTimers());
          + assertEquals(0, testHarness.numEventTimeTimers());
          +
          + assertEquals(2, testHarness.numStateEntries());
          + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2)));
          + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4)));
          +
          — End diff –

          Done, theres are also more tests about merging specifically in `MergingWindowSetTest` and `WindowOperatorTest`. The core of the merging "algorithm" is exercised in `MergingWindowSetTest`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2572#discussion_r85498801 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java — @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import com.google.common.collect.Lists; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link CountTrigger} . + */ +public class CountTriggerTest { + + /** + * Verify that state of separate windows does not leak into other windows. + */ + @Test + public void testWindowSeparationAndFiring() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // shouldn't have any timers + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // right now, CountTrigger will clear it's state in onElement when firing + // ideally, this should be moved to onFire() + assertEquals(1, testHarness.numStateEntries()); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + } + + /** + * Verify that clear() does not leak across windows. + */ + @Test + public void testClear() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // shouldn't have any timers + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + + testHarness.clearTriggerState(new TimeWindow(2, 4)); + + assertEquals(1, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))); + + testHarness.clearTriggerState(new TimeWindow(0, 2)); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))); + } + + @Test + public void testMergingWindows() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // shouldn't have any timers + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + — End diff – Done, theres are also more tests about merging specifically in `MergingWindowSetTest` and `WindowOperatorTest`. The core of the merging "algorithm" is exercised in `MergingWindowSetTest`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2572#discussion_r85499215

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsWindowsTest.java —
          @@ -0,0 +1,175 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + * <p>
          + * http://www.apache.org/licenses/LICENSE-2.0
          + * <p>
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.runtime.operators.windowing;
          +
          +
          +import com.google.common.collect.Lists;
          +import org.apache.flink.api.common.ExecutionConfig;
          +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          +import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
          +import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
          +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
          +import org.apache.flink.streaming.api.windowing.time.Time;
          +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
          +import org.apache.flink.util.TestLogger;
          +import org.junit.Test;
          +import org.mockito.Matchers;
          +
          +import java.util.Collection;
          +
          +import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
          +import static org.hamcrest.CoreMatchers.containsString;
          +import static org.hamcrest.Matchers.contains;
          +import static org.hamcrest.Matchers.containsInAnyOrder;
          +import static org.hamcrest.Matchers.instanceOf;
          +import static org.junit.Assert.*;
          +import static org.mockito.Matchers.anyCollection;
          +import static org.mockito.Mockito.*;
          +
          +/**
          + * Tests for

          {@link EventTimeSessionWindows}

          + */
          +public class EventTimeSessionWindowsWindowsTest extends TestLogger {
          +
          + @Test
          + public void testWindowAssignment() {
          + WindowAssigner.WindowAssignerContext mockContext =
          + mock(WindowAssigner.WindowAssignerContext.class);
          +
          + EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(5000));
          — End diff –

          Done

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2572#discussion_r85499215 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsWindowsTest.java — @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + + +import com.google.common.collect.Lists; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; +import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import org.mockito.Matchers; + +import java.util.Collection; + +import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.*; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Mockito.*; + +/** + * Tests for {@link EventTimeSessionWindows} + */ +public class EventTimeSessionWindowsWindowsTest extends TestLogger { + + @Test + public void testWindowAssignment() { + WindowAssigner.WindowAssignerContext mockContext = + mock(WindowAssigner.WindowAssignerContext.class); + + EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(5000)); — End diff – Done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2572#discussion_r85500442

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java —
          @@ -0,0 +1,153 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.runtime.operators.windowing;
          +
          +import com.google.common.collect.Lists;
          +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
          +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
          +import org.junit.Test;
          +
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertTrue;
          +
          +/**
          + * Tests for

          {@link EventTimeTrigger}

          .
          + */
          +public class EventTimeTriggerTest {
          +
          + /**
          + * Verify that state of separate windows does not leak into other windows.
          + */
          + @Test
          + public void testWindowSeparationAndFiring() throws Exception {
          + TriggerTestHarness<Object, TimeWindow> testHarness =
          + new TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer());
          +
          + // inject several elements
          + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
          + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
          + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)));
          + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
          + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4)));
          +
          + assertEquals(0, testHarness.numStateEntries());
          + assertEquals(0, testHarness.numProcessingTimeTimers());
          + assertEquals(2, testHarness.numEventTimeTimers());
          + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
          + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
          +
          + assertEquals(TriggerResult.FIRE, testHarness.advanceWatermark(2, new TimeWindow(0, 2)));
          +
          + assertEquals(0, testHarness.numStateEntries());
          + assertEquals(0, testHarness.numProcessingTimeTimers());
          + assertEquals(1, testHarness.numEventTimeTimers());
          + assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2)));
          + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4)));
          +
          + assertEquals(TriggerResult.FIRE, testHarness.advanceWatermark(4, new TimeWindow(2, 4)));
          +
          — End diff –

          The trigger tests only test the isolated behaviour of Triggers, there are no multiple keys here so that wouldn't work. You're right though, about the multiple timers so I'll add a test for this to `WindowOperatorTest` where the timer firing behaviour is tested.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2572#discussion_r85500442 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java — @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import com.google.common.collect.Lists; +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link EventTimeTrigger} . + */ +public class EventTimeTriggerTest { + + /** + * Verify that state of separate windows does not leak into other windows. + */ + @Test + public void testWindowSeparationAndFiring() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer()); + + // inject several elements + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(2, testHarness.numEventTimeTimers()); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + + assertEquals(TriggerResult.FIRE, testHarness.advanceWatermark(2, new TimeWindow(0, 2))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(1, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + + assertEquals(TriggerResult.FIRE, testHarness.advanceWatermark(4, new TimeWindow(2, 4))); + — End diff – The trigger tests only test the isolated behaviour of Triggers, there are no multiple keys here so that wouldn't work. You're right though, about the multiple timers so I'll add a test for this to `WindowOperatorTest` where the timer firing behaviour is tested.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2572#discussion_r85500578

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java —
          @@ -0,0 +1,149 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.runtime.operators.windowing;
          +
          +import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
          +import org.apache.flink.streaming.api.windowing.triggers.Trigger;
          +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
          +import org.junit.Test;
          +import org.mockito.Matchers;
          +import org.mockito.invocation.InvocationOnMock;
          +import org.mockito.stubbing.Answer;
          +
          +import java.lang.reflect.Method;
          +import java.util.Collections;
          +
          +import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyOnMergeContext;
          +import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyTimeWindow;
          +import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyTriggerContext;
          +import static org.junit.Assert.assertEquals;
          +import static org.mockito.Matchers.anyLong;
          +import static org.mockito.Mockito.*;
          +
          +/**
          + * Tests for

          {@link PurgingTrigger}.
          + */
          +public class PurgingTriggerTest {
          +
          + /**
          + * Check if {@link PurgingTrigger}

          implements all methods of

          {@link Trigger}

          , as a sanity
          + * check.
          + */
          + @Test
          + public void testAllMethodsImplemented() throws NoSuchMethodException {
          — End diff –

          `Trigger` has `canMerge()` and `onMerge()` which are not abstract in `Trigger` (`clear()` used to not be abstract but is now, after we recently changed that on master). The purging trigger can therefore be non-abstract and still not implement these methods, therefore the checks.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2572#discussion_r85500578 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java — @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.lang.reflect.Method; +import java.util.Collections; + +import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyOnMergeContext; +import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyTimeWindow; +import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyTriggerContext; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.*; + +/** + * Tests for {@link PurgingTrigger}. + */ +public class PurgingTriggerTest { + + /** + * Check if {@link PurgingTrigger} implements all methods of {@link Trigger} , as a sanity + * check. + */ + @Test + public void testAllMethodsImplemented() throws NoSuchMethodException { — End diff – `Trigger` has `canMerge()` and `onMerge()` which are not abstract in `Trigger` (`clear()` used to not be abstract but is now, after we recently changed that on master). The purging trigger can therefore be non-abstract and still not implement these methods, therefore the checks.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2572#discussion_r85500759

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java —
          @@ -0,0 +1,369 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.runtime.operators.windowing;
          +
          +import org.apache.flink.api.common.JobID;
          +import org.apache.flink.api.common.state.MergingState;
          +import org.apache.flink.api.common.state.State;
          +import org.apache.flink.api.common.state.StateDescriptor;
          +import org.apache.flink.api.common.state.ValueState;
          +import org.apache.flink.api.common.state.ValueStateDescriptor;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.api.common.typeutils.base.IntSerializer;
          +import org.apache.flink.api.java.tuple.Tuple2;
          +import org.apache.flink.metrics.MetricGroup;
          +import org.apache.flink.runtime.jobgraph.JobVertexID;
          +import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
          +import org.apache.flink.runtime.query.KvStateRegistry;
          +import org.apache.flink.runtime.state.KeyGroupRange;
          +import org.apache.flink.runtime.state.KeyedStateBackend;
          +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
          +import org.apache.flink.runtime.state.memory.MemoryStateBackend;
          +import org.apache.flink.streaming.api.operators.KeyContext;
          +import org.apache.flink.streaming.api.operators.TestInternalTimerService;
          +import org.apache.flink.streaming.api.operators.InternalTimerService;
          +import org.apache.flink.streaming.api.windowing.triggers.Trigger;
          +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
          +import org.apache.flink.streaming.api.windowing.windows.Window;
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
          +
          +import java.io.Serializable;
          +import java.util.ArrayList;
          +import java.util.Collection;
          +
          +/**
          + * Utility for testing

          {@link Trigger}

          behaviour.
          + */
          +public class TriggerTestHarness<T, W extends Window> {
          +
          + private static final Integer KEY = 1;
          +
          + private final Trigger<T, W> trigger;
          + private final TypeSerializer<W> windowSerializer;
          +
          + private final HeapKeyedStateBackend<Integer> stateBackend;
          + private final TestInternalTimerService<Integer, W> internalTimerService;
          +
          + public TriggerTestHarness(
          + Trigger<T, W> trigger,
          + TypeSerializer<W> windowSerializer) throws Exception {
          + this.trigger = trigger;
          + this.windowSerializer = windowSerializer;
          +
          + // we only ever use one key, other tests make sure that windows work across different
          + // keys
          + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
          + MemoryStateBackend backend = new MemoryStateBackend();
          +
          + @SuppressWarnings("unchecked")
          + HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(dummyEnv,
          + new JobID(),
          + "test_op",
          + IntSerializer.INSTANCE,
          + 1,
          + new KeyGroupRange(0, 0),
          + new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
          + this.stateBackend = stateBackend;
          +
          + this.stateBackend.setCurrentKey(0);
          +
          + this.internalTimerService = new TestInternalTimerService<>(new KeyContext() {
          + @Override
          + public void setCurrentKey(Object key)

          { + // ignore + }

          +
          + @Override
          + public Object getCurrentKey()

          { + return KEY; + }

          + });
          + }
          +
          + public int numProcessingTimeTimers()

          { + return internalTimerService.numProcessingTimeTimers(); + }

          +
          + public int numProcessingTimeTimers(W window)

          { + return internalTimerService.numProcessingTimeTimers(window); + }

          +
          + public int numEventTimeTimers()

          { + return internalTimerService.numEventTimeTimers(); + }

          +
          + public int numEventTimeTimers(W window)

          { + return internalTimerService.numEventTimeTimers(window); + }

          +
          + public int numStateEntries()

          { + return stateBackend.numStateEntries(); + }

          +
          + public int numStateEntries(W window)

          { + return stateBackend.numStateEntries(window); + }

          +
          + /**
          + * Injects one element into the trigger for the given window and returns the result of
          + *

          {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)}

          + */
          + public TriggerResult processElement(StreamRecord<T> element, W window) throws Exception

          { + TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>( + KEY, + window, + internalTimerService, + stateBackend, + windowSerializer); + return trigger.onElement(element.getValue(), element.getTimestamp(), window, triggerContext); + }

          +
          + /**
          + * Advanced processing time and checks whether we have exactly one firing for the given
          + * window. The result of

          {@link Trigger#onProcessingTime(long, Window, Trigger.TriggerContext)}

          + * is returned for that firing.
          + */
          + public TriggerResult advanceProcessingTime(long time, W window) throws Exception {
          + Collection<Tuple2<W, TriggerResult>> firings = advanceProcessingTime(time);
          +
          + if (firings.size() != 1)

          { + throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings); + }

          +
          + Tuple2<W, TriggerResult> firing = firings.iterator().next();
          — End diff –

          With the check in the lines above (that checks for the size of `firings`) it should be correct, right?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2572#discussion_r85500759 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java — @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.MergingState; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.api.operators.TestInternalTimerService; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; + +/** + * Utility for testing {@link Trigger} behaviour. + */ +public class TriggerTestHarness<T, W extends Window> { + + private static final Integer KEY = 1; + + private final Trigger<T, W> trigger; + private final TypeSerializer<W> windowSerializer; + + private final HeapKeyedStateBackend<Integer> stateBackend; + private final TestInternalTimerService<Integer, W> internalTimerService; + + public TriggerTestHarness( + Trigger<T, W> trigger, + TypeSerializer<W> windowSerializer) throws Exception { + this.trigger = trigger; + this.windowSerializer = windowSerializer; + + // we only ever use one key, other tests make sure that windows work across different + // keys + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + MemoryStateBackend backend = new MemoryStateBackend(); + + @SuppressWarnings("unchecked") + HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID())); + this.stateBackend = stateBackend; + + this.stateBackend.setCurrentKey(0); + + this.internalTimerService = new TestInternalTimerService<>(new KeyContext() { + @Override + public void setCurrentKey(Object key) { + // ignore + } + + @Override + public Object getCurrentKey() { + return KEY; + } + }); + } + + public int numProcessingTimeTimers() { + return internalTimerService.numProcessingTimeTimers(); + } + + public int numProcessingTimeTimers(W window) { + return internalTimerService.numProcessingTimeTimers(window); + } + + public int numEventTimeTimers() { + return internalTimerService.numEventTimeTimers(); + } + + public int numEventTimeTimers(W window) { + return internalTimerService.numEventTimeTimers(window); + } + + public int numStateEntries() { + return stateBackend.numStateEntries(); + } + + public int numStateEntries(W window) { + return stateBackend.numStateEntries(window); + } + + /** + * Injects one element into the trigger for the given window and returns the result of + * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)} + */ + public TriggerResult processElement(StreamRecord<T> element, W window) throws Exception { + TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>( + KEY, + window, + internalTimerService, + stateBackend, + windowSerializer); + return trigger.onElement(element.getValue(), element.getTimestamp(), window, triggerContext); + } + + /** + * Advanced processing time and checks whether we have exactly one firing for the given + * window. The result of {@link Trigger#onProcessingTime(long, Window, Trigger.TriggerContext)} + * is returned for that firing. + */ + public TriggerResult advanceProcessingTime(long time, W window) throws Exception { + Collection<Tuple2<W, TriggerResult>> firings = advanceProcessingTime(time); + + if (firings.size() != 1) { + throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings); + } + + Tuple2<W, TriggerResult> firing = firings.iterator().next(); — End diff – With the check in the lines above (that checks for the size of `firings`) it should be correct, right?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/2572

          Thanks for your review, @StefanRRichter! You found some areas where this can be improved.

          I'm not yet sure whether parameterisation for processing-time/event-time can be done without obscuring what the test actually does to much but I'll try and come up with something.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2572 Thanks for your review, @StefanRRichter! You found some areas where this can be improved. I'm not yet sure whether parameterisation for processing-time/event-time can be done without obscuring what the test actually does to much but I'll try and come up with something.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2572#discussion_r85501090

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java —
          @@ -0,0 +1,369 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.runtime.operators.windowing;
          +
          +import org.apache.flink.api.common.JobID;
          +import org.apache.flink.api.common.state.MergingState;
          +import org.apache.flink.api.common.state.State;
          +import org.apache.flink.api.common.state.StateDescriptor;
          +import org.apache.flink.api.common.state.ValueState;
          +import org.apache.flink.api.common.state.ValueStateDescriptor;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.api.common.typeutils.base.IntSerializer;
          +import org.apache.flink.api.java.tuple.Tuple2;
          +import org.apache.flink.metrics.MetricGroup;
          +import org.apache.flink.runtime.jobgraph.JobVertexID;
          +import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
          +import org.apache.flink.runtime.query.KvStateRegistry;
          +import org.apache.flink.runtime.state.KeyGroupRange;
          +import org.apache.flink.runtime.state.KeyedStateBackend;
          +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
          +import org.apache.flink.runtime.state.memory.MemoryStateBackend;
          +import org.apache.flink.streaming.api.operators.KeyContext;
          +import org.apache.flink.streaming.api.operators.TestInternalTimerService;
          +import org.apache.flink.streaming.api.operators.InternalTimerService;
          +import org.apache.flink.streaming.api.windowing.triggers.Trigger;
          +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
          +import org.apache.flink.streaming.api.windowing.windows.Window;
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
          +
          +import java.io.Serializable;
          +import java.util.ArrayList;
          +import java.util.Collection;
          +
          +/**
          + * Utility for testing

          {@link Trigger}

          behaviour.
          + */
          +public class TriggerTestHarness<T, W extends Window> {
          +
          + private static final Integer KEY = 1;
          +
          + private final Trigger<T, W> trigger;
          + private final TypeSerializer<W> windowSerializer;
          +
          + private final HeapKeyedStateBackend<Integer> stateBackend;
          + private final TestInternalTimerService<Integer, W> internalTimerService;
          +
          + public TriggerTestHarness(
          + Trigger<T, W> trigger,
          + TypeSerializer<W> windowSerializer) throws Exception {
          + this.trigger = trigger;
          + this.windowSerializer = windowSerializer;
          +
          + // we only ever use one key, other tests make sure that windows work across different
          + // keys
          + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
          + MemoryStateBackend backend = new MemoryStateBackend();
          +
          + @SuppressWarnings("unchecked")
          + HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(dummyEnv,
          + new JobID(),
          + "test_op",
          + IntSerializer.INSTANCE,
          + 1,
          + new KeyGroupRange(0, 0),
          + new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
          + this.stateBackend = stateBackend;
          +
          + this.stateBackend.setCurrentKey(0);
          +
          + this.internalTimerService = new TestInternalTimerService<>(new KeyContext() {
          + @Override
          + public void setCurrentKey(Object key)

          { + // ignore + }

          +
          + @Override
          + public Object getCurrentKey()

          { + return KEY; + }

          + });
          + }
          +
          + public int numProcessingTimeTimers()

          { + return internalTimerService.numProcessingTimeTimers(); + }

          +
          + public int numProcessingTimeTimers(W window)

          { + return internalTimerService.numProcessingTimeTimers(window); + }

          +
          + public int numEventTimeTimers()

          { + return internalTimerService.numEventTimeTimers(); + }

          +
          + public int numEventTimeTimers(W window)

          { + return internalTimerService.numEventTimeTimers(window); + }

          +
          + public int numStateEntries()

          { + return stateBackend.numStateEntries(); + }

          +
          + public int numStateEntries(W window)

          { + return stateBackend.numStateEntries(window); + }

          +
          + /**
          + * Injects one element into the trigger for the given window and returns the result of
          + *

          {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)}

          + */
          + public TriggerResult processElement(StreamRecord<T> element, W window) throws Exception

          { + TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>( + KEY, + window, + internalTimerService, + stateBackend, + windowSerializer); + return trigger.onElement(element.getValue(), element.getTimestamp(), window, triggerContext); + }

          +
          + /**
          + * Advanced processing time and checks whether we have exactly one firing for the given
          + * window. The result of

          {@link Trigger#onProcessingTime(long, Window, Trigger.TriggerContext)}

          + * is returned for that firing.
          + */
          + public TriggerResult advanceProcessingTime(long time, W window) throws Exception {
          + Collection<Tuple2<W, TriggerResult>> firings = advanceProcessingTime(time);
          +
          + if (firings.size() != 1)

          { + throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings); + }

          +
          + Tuple2<W, TriggerResult> firing = firings.iterator().next();
          — End diff –

          True

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2572#discussion_r85501090 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java — @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.MergingState; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.operators.KeyContext; +import org.apache.flink.streaming.api.operators.TestInternalTimerService; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; + +/** + * Utility for testing {@link Trigger} behaviour. + */ +public class TriggerTestHarness<T, W extends Window> { + + private static final Integer KEY = 1; + + private final Trigger<T, W> trigger; + private final TypeSerializer<W> windowSerializer; + + private final HeapKeyedStateBackend<Integer> stateBackend; + private final TestInternalTimerService<Integer, W> internalTimerService; + + public TriggerTestHarness( + Trigger<T, W> trigger, + TypeSerializer<W> windowSerializer) throws Exception { + this.trigger = trigger; + this.windowSerializer = windowSerializer; + + // we only ever use one key, other tests make sure that windows work across different + // keys + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + MemoryStateBackend backend = new MemoryStateBackend(); + + @SuppressWarnings("unchecked") + HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID())); + this.stateBackend = stateBackend; + + this.stateBackend.setCurrentKey(0); + + this.internalTimerService = new TestInternalTimerService<>(new KeyContext() { + @Override + public void setCurrentKey(Object key) { + // ignore + } + + @Override + public Object getCurrentKey() { + return KEY; + } + }); + } + + public int numProcessingTimeTimers() { + return internalTimerService.numProcessingTimeTimers(); + } + + public int numProcessingTimeTimers(W window) { + return internalTimerService.numProcessingTimeTimers(window); + } + + public int numEventTimeTimers() { + return internalTimerService.numEventTimeTimers(); + } + + public int numEventTimeTimers(W window) { + return internalTimerService.numEventTimeTimers(window); + } + + public int numStateEntries() { + return stateBackend.numStateEntries(); + } + + public int numStateEntries(W window) { + return stateBackend.numStateEntries(window); + } + + /** + * Injects one element into the trigger for the given window and returns the result of + * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)} + */ + public TriggerResult processElement(StreamRecord<T> element, W window) throws Exception { + TestTriggerContext<Integer, W> triggerContext = new TestTriggerContext<>( + KEY, + window, + internalTimerService, + stateBackend, + windowSerializer); + return trigger.onElement(element.getValue(), element.getTimestamp(), window, triggerContext); + } + + /** + * Advanced processing time and checks whether we have exactly one firing for the given + * window. The result of {@link Trigger#onProcessingTime(long, Window, Trigger.TriggerContext)} + * is returned for that firing. + */ + public TriggerResult advanceProcessingTime(long time, W window) throws Exception { + Collection<Tuple2<W, TriggerResult>> firings = advanceProcessingTime(time); + + if (firings.size() != 1) { + throw new IllegalStateException("Must have exactly one timer firing. Fired timers: " + firings); + } + + Tuple2<W, TriggerResult> firing = firings.iterator().next(); — End diff – True
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/2572

          My idea would be to introduce an adapter that abstracts advancing of time, where two concrete implementations exist that forward to advanceWatermark or on onProcessingTime. Or am I missing something that fundamentally more different?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2572 My idea would be to introduce an adapter that abstracts advancing of time, where two concrete implementations exist that forward to advanceWatermark or on onProcessingTime. Or am I missing something that fundamentally more different?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/2572

          No, it's more or less that but there is more stuff that needs to be in that adapter, for example in `testOnProcessingTimeFire()`, I'm highlighting the places that need changing:

          ```
          public void testOnProcessingTimeFire() throws Exception {

          WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
          when(mockAssigner.isEventTime()).thenReturn(false); <-- here
          Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();

          KeyedOneInputStreamOperatorTestHarness<Integer, Integer, WindowedValue<List<Integer>, TimeWindow>> testHarness =
          createListWindowOperator(mockAssigner, mockTrigger, 0L);

          testHarness.open();

          testHarness.setProcessingTime(Long.MIN_VALUE); <-- here

          when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
          .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2)));

          assertEquals(0, testHarness.getOutput().size());
          assertEquals(0, testHarness.numKeyedStateEntries());

          doAnswer(new Answer<TriggerResult>() {
          @Override
          public TriggerResult answer(InvocationOnMock invocation) throws Exception

          { Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; context.registerProcessingTimeTimer(0L); <-- here context.getPartitionedState(valueStateDescriptor).update("hello"); return TriggerResult.CONTINUE; }

          }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());

          shouldFireOnProcessingTime(mockTrigger); <-- here

          testHarness.processElement(new StreamRecord<>(0, 0L));

          assertEquals(4, testHarness.numKeyedStateEntries()); // window-contents and trigger state for two windows
          assertEquals(4, testHarness.numProcessingTimeTimers()); // timers/gc timers for two windows <-- here

          testHarness.setProcessingTime(0L); <-- here

          // clear is only called at cleanup time/GC time
          verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());

          // FIRE should not purge contents
          assertEquals(4, testHarness.numKeyedStateEntries());
          assertEquals(2, testHarness.numProcessingTimeTimers()); // only gc timers left <-- here

          // there should be two elements now
          assertThat(testHarness.extractOutputStreamRecords(),
          containsInAnyOrder(
          isWindowedValue(contains(0), 1L, timeWindow(0, 2)),
          isWindowedValue(contains(0), 3L, timeWindow(2, 4))));
          }
          ```

          (man I can't make text bold in a code block ... 😭)

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2572 No, it's more or less that but there is more stuff that needs to be in that adapter, for example in `testOnProcessingTimeFire()`, I'm highlighting the places that need changing: ``` public void testOnProcessingTimeFire() throws Exception { WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); when(mockAssigner.isEventTime()).thenReturn(false); <-- here Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, WindowedValue<List<Integer>, TimeWindow>> testHarness = createListWindowOperator(mockAssigner, mockTrigger, 0L); testHarness.open(); testHarness.setProcessingTime(Long.MIN_VALUE); <-- here when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); assertEquals(0, testHarness.getOutput().size()); assertEquals(0, testHarness.numKeyedStateEntries()); doAnswer(new Answer<TriggerResult>() { @Override public TriggerResult answer(InvocationOnMock invocation) throws Exception { Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; context.registerProcessingTimeTimer(0L); <-- here context.getPartitionedState(valueStateDescriptor).update("hello"); return TriggerResult.CONTINUE; } }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); shouldFireOnProcessingTime(mockTrigger); <-- here testHarness.processElement(new StreamRecord<>(0, 0L)); assertEquals(4, testHarness.numKeyedStateEntries()); // window-contents and trigger state for two windows assertEquals(4, testHarness.numProcessingTimeTimers()); // timers/gc timers for two windows <-- here testHarness.setProcessingTime(0L); <-- here // clear is only called at cleanup time/GC time verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); // FIRE should not purge contents assertEquals(4, testHarness.numKeyedStateEntries()); assertEquals(2, testHarness.numProcessingTimeTimers()); // only gc timers left <-- here // there should be two elements now assertThat(testHarness.extractOutputStreamRecords(), containsInAnyOrder( isWindowedValue(contains(0), 1L, timeWindow(0, 2)), isWindowedValue(contains(0), 3L, timeWindow(2, 4)))); } ``` (man I can't make text bold in a code block ... 😭)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/2572

          Yes, I was aware that some configuration/mocking also depends on the time domain. However what those calls do is essentially equivalent, just for different time domain. So I assume an adapter that just dispatches for different time domains should not obscure the logic of the tests?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2572 Yes, I was aware that some configuration/mocking also depends on the time domain. However what those calls do is essentially equivalent, just for different time domain. So I assume an adapter that just dispatches for different time domains should not obscure the logic of the tests?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/2572

          Yeah, alright, I'm just lazy. 😅 I'll whip something up.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2572 Yeah, alright, I'm just lazy. 😅 I'll whip something up.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/2572

          Yeah, I know the pain but next times when we need to fix only half the amount of tests on a change it will quickly pay off (hopefully)

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2572 Yeah, I know the pain but next times when we need to fix only half the amount of tests on a change it will quickly pay off (hopefully)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/2572

          @StefanRRichter I added more tests (see the commit) and deduplicated processing-time/event-time tests. PTAL (please take another look) 😃

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2572 @StefanRRichter I added more tests (see the commit) and deduplicated processing-time/event-time tests. PTAL (please take another look) 😃
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/2572

          Had another look at the changes, I think it is a very well written test! +1 for merge.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2572 Had another look at the changes, I think it is a very well written test! +1 for merge.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Implemented on release-1.2 in:
          bb8586e50be9e7d0b1fba5569579def595c53217

          Show
          aljoscha Aljoscha Krettek added a comment - Implemented on release-1.2 in: bb8586e50be9e7d0b1fba5569579def595c53217
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha closed the pull request at:

          https://github.com/apache/flink/pull/2572

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/2572
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/2572

          manually merged

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2572 manually merged
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Implemented on master in:
          d1475ee86fb58ab70a6dc785d08f190189ede43d

          Show
          aljoscha Aljoscha Krettek added a comment - Implemented on master in: d1475ee86fb58ab70a6dc785d08f190189ede43d

            People

            • Assignee:
              aljoscha Aljoscha Krettek
              Reporter:
              aljoscha Aljoscha Krettek
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development