Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.1.0
    • Component/s: Streaming
    • Labels:
      None

      Description

      We should add the possibility for WindowAssigners to merge windows. This will enable Session windowing support, similar to how Google Cloud Dataflow supports.

      For session windows, each element would initially be assigned to its own window. When triggering we check the windows and see if any can be merged. This way, elements with overlapping session windows merge into one session.

        Activity

        Show
        aljoscha Aljoscha Krettek added a comment - Done in https://github.com/apache/flink/commit/6cd8ceb10c841827cf89b74ecf5a0495a6933d53
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha closed the pull request at:

        https://github.com/apache/flink/pull/1802

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/1802
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on the pull request:

        https://github.com/apache/flink/pull/1802#issuecomment-205394604

        No, we introduced the `InternalWindowFunction` before 1.0.0 to decouple the user `WindowFunction` from the window operator implementation. There are now special `InternalWindowFunctions` that don't forward the key to the user function. The API for the user does not change, there an `AllWindowFunction` is used, as before. And that function does not get a key.

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1802#issuecomment-205394604 No, we introduced the `InternalWindowFunction` before 1.0.0 to decouple the user `WindowFunction` from the window operator implementation. There are now special `InternalWindowFunctions` that don't forward the key to the user function. The API for the user does not change, there an `AllWindowFunction` is used, as before. And that function does not get a key.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user StephanEwen commented on the pull request:

        https://github.com/apache/flink/pull/1802#issuecomment-205384005

        Concerning the removal of the non-keyed window operator: Does that mean that the user functions now see the dummy key?

        Show
        githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1802#issuecomment-205384005 Concerning the removal of the non-keyed window operator: Does that mean that the user functions now see the dummy key?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on the pull request:

        https://github.com/apache/flink/pull/1802#issuecomment-205302445

        I did the changes, I introduced `canMerge()` and added a default `onMerge()` that throws a `RuntimeExeption` in `Trigger`. This way we don't break the API for already existing user triggers.

        If there are no objections I will also merge the removal of the non-keyed window operator that this PR is based on.

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1802#issuecomment-205302445 I did the changes, I introduced `canMerge()` and added a default `onMerge()` that throws a `RuntimeExeption` in `Trigger`. This way we don't break the API for already existing user triggers. If there are no objections I will also merge the removal of the non-keyed window operator that this PR is based on.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user StephanEwen commented on the pull request:

        https://github.com/apache/flink/pull/1802#issuecomment-205294872

        Very nice work.

        After an offline chat with @aljoscha , we concluded to slightly adjust the `Trigger` interface to make it simpler for people to develop triggers.

        Other than that, this is good to merge...

        Show
        githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1802#issuecomment-205294872 Very nice work. After an offline chat with @aljoscha , we concluded to slightly adjust the `Trigger` interface to make it simpler for people to develop triggers. Other than that, this is good to merge...
        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user aljoscha opened a pull request:

        https://github.com/apache/flink/pull/1802

        FLINK-3174 Add MergingWindowAssigner and SessionWindows

        This introduces MergingWindowAssigner, an extension of WindowAssigner
        that can merge windows. When using a MergingWindowAssigner the
        WindowOperator eagerly merges windows when processing elements.

        For keeping track of in-flight windows and for merging windows this adds
        MergingWindowSet, this keeps track of windows per key.

        Only when using a WindowAssigners is the more costly merging logic used
        in the WindowOperator.

        For triggers there is new method Trigger.onMerge() that notifies the
        trigger of the new merged window. This allows the trigger to set a timer
        for the newly merged window.

        This also adds AbstractStateBackend.mergePartitionedStates for merging
        state of several source namespaces into a target namespace. This is only
        possible for the newly introduced MergingState which is an extension of
        AppendingState. Only ReducingState and ListState are MergingState while
        FoldingState is now an AppendingState.

        This enables proper support for session windows.

        This also adds the SessionWindows window assigner and adapts an existing
        session example and adds test cases.

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/aljoscha/flink window-sessions-eager

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/flink/pull/1802.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #1802


        commit bda4e019cc464f411a2b62eaeab41a87f3961a21
        Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
        Date: 2015-12-15T16:37:48Z

        FLINK-3174 Add MergingWindowAssigner and SessionWindows

        This introduces MergingWindowAssigner, an extension of WindowAssigner
        that can merge windows. When using a MergingWindowAssigner the
        WindowOperator eagerly merges windows when processing elements.

        For keeping track of in-flight windows and for merging windows this adds
        MergingWindowSet, this keeps track of windows per key.

        Only when using a WindowAssigners is the more costly merging logic used
        in the WindowOperator.

        For triggers there is new method Trigger.onMerge() that notifies the
        trigger of the new merged window. This allows the trigger to set a timer
        for the newly merged window.

        This also adds AbstractStateBackend.mergePartitionedStates for merging
        state of several source namespaces into a target namespace. This is only
        possible for the newly introduced MergingState which is an extension of
        AppendingState. Only ReducingState and ListState are MergingState while
        FoldingState is now an AppendingState.

        This enables proper support for session windows.

        This also adds the SessionWindows window assigner and adapts an existing
        session example and adds test cases.


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/1802 FLINK-3174 Add MergingWindowAssigner and SessionWindows This introduces MergingWindowAssigner, an extension of WindowAssigner that can merge windows. When using a MergingWindowAssigner the WindowOperator eagerly merges windows when processing elements. For keeping track of in-flight windows and for merging windows this adds MergingWindowSet, this keeps track of windows per key. Only when using a WindowAssigners is the more costly merging logic used in the WindowOperator. For triggers there is new method Trigger.onMerge() that notifies the trigger of the new merged window. This allows the trigger to set a timer for the newly merged window. This also adds AbstractStateBackend.mergePartitionedStates for merging state of several source namespaces into a target namespace. This is only possible for the newly introduced MergingState which is an extension of AppendingState. Only ReducingState and ListState are MergingState while FoldingState is now an AppendingState. This enables proper support for session windows. This also adds the SessionWindows window assigner and adapts an existing session example and adds test cases. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink window-sessions-eager Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1802.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1802 commit bda4e019cc464f411a2b62eaeab41a87f3961a21 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2015-12-15T16:37:48Z FLINK-3174 Add MergingWindowAssigner and SessionWindows This introduces MergingWindowAssigner, an extension of WindowAssigner that can merge windows. When using a MergingWindowAssigner the WindowOperator eagerly merges windows when processing elements. For keeping track of in-flight windows and for merging windows this adds MergingWindowSet, this keeps track of windows per key. Only when using a WindowAssigners is the more costly merging logic used in the WindowOperator. For triggers there is new method Trigger.onMerge() that notifies the trigger of the new merged window. This allows the trigger to set a timer for the newly merged window. This also adds AbstractStateBackend.mergePartitionedStates for merging state of several source namespaces into a target namespace. This is only possible for the newly introduced MergingState which is an extension of AppendingState. Only ReducingState and ListState are MergingState while FoldingState is now an AppendingState. This enables proper support for session windows. This also adds the SessionWindows window assigner and adapts an existing session example and adds test cases.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha closed the pull request at:

        https://github.com/apache/flink/pull/1460

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/1460
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on the pull request:

        https://github.com/apache/flink/pull/1460#issuecomment-187187459

        This will have to be reworked now that windows use the keyed state abstraction.

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1460#issuecomment-187187459 This will have to be reworked now that windows use the keyed state abstraction.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user rmetzger commented on the pull request:

        https://github.com/apache/flink/pull/1460#issuecomment-169672733

        Thank you for working on this! Session windows are something many people asked for!

        Show
        githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1460#issuecomment-169672733 Thank you for working on this! Session windows are something many people asked for!
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user rmetzger commented on a diff in the pull request:

        https://github.com/apache/flink/pull/1460#discussion_r49074918

        — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java —
        @@ -438,7 +617,7 @@ public void close() throws Exception {
        }

        @SuppressWarnings("unchecked")

        • private static class ResultSortComparator implements Comparator<Object> {
          + private static class Tuple2ResultSortComparator implements Comparator<Object> {
            • End diff –

        this is also a duplicate with `NonKeyedWindowOperatorTest` according to IntelliJ

        Show
        githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1460#discussion_r49074918 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java — @@ -438,7 +617,7 @@ public void close() throws Exception { } @SuppressWarnings("unchecked") private static class ResultSortComparator implements Comparator<Object> { + private static class Tuple2ResultSortComparator implements Comparator<Object> { End diff – this is also a duplicate with `NonKeyedWindowOperatorTest` according to IntelliJ
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user rmetzger commented on a diff in the pull request:

        https://github.com/apache/flink/pull/1460#discussion_r49074871

        — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java —
        @@ -459,12 +638,29 @@ public int compare(Object o1, Object o2) {
        }
        }

        • private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
        • private static final long serialVersionUID = 1L;
          -
          + @SuppressWarnings("unchecked")
          + private static class Tuple3ResultSortComparator implements Comparator<Object> {
            • End diff –

        duplicate code with `NonKeyedWindowOperatorTest` ?

        Show
        githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1460#discussion_r49074871 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java — @@ -459,12 +638,29 @@ public int compare(Object o1, Object o2) { } } private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> { private static final long serialVersionUID = 1L; - + @SuppressWarnings("unchecked") + private static class Tuple3ResultSortComparator implements Comparator<Object> { End diff – duplicate code with `NonKeyedWindowOperatorTest` ?
        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user aljoscha opened a pull request:

        https://github.com/apache/flink/pull/1460

        FLINK-3174 Add merging WindowAssigner

        After triggering and before emitting the window contents the window
        assigner is given the change to merge existing windows. The trigger is
        then given a chance to react in the new Trigger.onMerge() call.

        This adds new method WindowAssigner.isMerging() that allows window
        assigners to specify whether they can merge windows. All existing
        assigners are now derived from NonMergingWindowAssigner that returns
        false for isMerging(). Only of a WindowAssigners announces that it can
        merge is the more costly merging logic used in the WindowOperator.

        For triggers there is new method Trigger.onMerge() that notifies the
        trigger of the new merged window as well as the old windows and old
        trigger contexts. This allows the trigger to set a timer for the newly
        merged window.

        This enables proper support for session windows.

        This also adds the SessionWindows window assigner and adapts an existing
        session example and adds test cases.

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/aljoscha/flink window-sessions

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/flink/pull/1460.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #1460


        commit dd87f6bc6ecc70d5ee2850ba0a954efdfda74db2
        Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
        Date: 2015-12-15T16:37:48Z

        FLINK-3174 Add merging WindowAssigner

        After triggering and before emitting the window contents the window
        assigner is given the change to merge existing windows. The trigger is
        then given a chance to react in the new Trigger.onMerge() call.

        This adds new method WindowAssigner.isMerging() that allows window
        assigners to specify whether they can merge windows. All existing
        assigners are now derived from NonMergingWindowAssigner that returns
        false for isMerging(). Only of a WindowAssigners announces that it can
        merge is the more costly merging logic used in the WindowOperator.

        For triggers there is new method Trigger.onMerge() that notifies the
        trigger of the new merged window as well as the old windows and old
        trigger contexts. This allows the trigger to set a timer for the newly
        merged window.

        This enables proper support for session windows.

        This also adds the SessionWindows window assigner and adapts an existing
        session example and adds test cases.


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/1460 FLINK-3174 Add merging WindowAssigner After triggering and before emitting the window contents the window assigner is given the change to merge existing windows. The trigger is then given a chance to react in the new Trigger.onMerge() call. This adds new method WindowAssigner.isMerging() that allows window assigners to specify whether they can merge windows. All existing assigners are now derived from NonMergingWindowAssigner that returns false for isMerging(). Only of a WindowAssigners announces that it can merge is the more costly merging logic used in the WindowOperator. For triggers there is new method Trigger.onMerge() that notifies the trigger of the new merged window as well as the old windows and old trigger contexts. This allows the trigger to set a timer for the newly merged window. This enables proper support for session windows. This also adds the SessionWindows window assigner and adapts an existing session example and adds test cases. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink window-sessions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1460.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1460 commit dd87f6bc6ecc70d5ee2850ba0a954efdfda74db2 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2015-12-15T16:37:48Z FLINK-3174 Add merging WindowAssigner After triggering and before emitting the window contents the window assigner is given the change to merge existing windows. The trigger is then given a chance to react in the new Trigger.onMerge() call. This adds new method WindowAssigner.isMerging() that allows window assigners to specify whether they can merge windows. All existing assigners are now derived from NonMergingWindowAssigner that returns false for isMerging(). Only of a WindowAssigners announces that it can merge is the more costly merging logic used in the WindowOperator. For triggers there is new method Trigger.onMerge() that notifies the trigger of the new merged window as well as the old windows and old trigger contexts. This allows the trigger to set a timer for the newly merged window. This enables proper support for session windows. This also adds the SessionWindows window assigner and adapts an existing session example and adds test cases.

          People

          • Assignee:
            aljoscha Aljoscha Krettek
            Reporter:
            aljoscha Aljoscha Krettek
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development