Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5157

Extending AllWindow Function Metadata

    Details

    • Type: New Feature
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: DataStream API, Streaming
    • Labels:
      None

      Description

      Following the logic behind [1,2], ProcessAllWindowFunction can be introduced in Flink and AllWindowedStream can be extended in order to support them.

      [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata

      [2] https://issues.apache.org/jira/browse/FLINK-4997

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user VenturaDelMonte opened a pull request:

          https://github.com/apache/flink/pull/2946

          FLINK-5157 [streaming] Extend AllWindow function metadata

          This PR aims to extend AllWindow function metadata similarly to [FLINK-4997
          ](https://github.com/apache/flink/pull/2756).
          Briefly, ProcessAllWindowFunction supporting window context metadata has been introduced and AllWindowedStream apply/fold/reduce methods have been overloaded in order to support this new function in both Scala and Java.
          Moreover, new InternalWindowFunction sub-classes have been added for internally handling ProcessAllWindowFunction-s.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/VenturaDelMonte/flink flink-5157

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2946.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2946


          commit 34c54dbbeb4da546ba003ffdc8e37f4e7bb63d21
          Author: Ventura Del Monte <venturadelmonte@gmail.com>
          Date: 2016-12-05T08:07:42Z

          FLINK-5157 [streaming] Extend AllWindow function metadata


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user VenturaDelMonte opened a pull request: https://github.com/apache/flink/pull/2946 FLINK-5157 [streaming] Extend AllWindow function metadata This PR aims to extend AllWindow function metadata similarly to [ FLINK-4997 ]( https://github.com/apache/flink/pull/2756 ). Briefly, ProcessAllWindowFunction supporting window context metadata has been introduced and AllWindowedStream apply/fold/reduce methods have been overloaded in order to support this new function in both Scala and Java. Moreover, new InternalWindowFunction sub-classes have been added for internally handling ProcessAllWindowFunction-s. You can merge this pull request into a Git repository by running: $ git pull https://github.com/VenturaDelMonte/flink flink-5157 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2946.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2946 commit 34c54dbbeb4da546ba003ffdc8e37f4e7bb63d21 Author: Ventura Del Monte <venturadelmonte@gmail.com> Date: 2016-12-05T08:07:42Z FLINK-5157 [streaming] Extend AllWindow function metadata
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user VenturaDelMonte commented on the issue:

          https://github.com/apache/flink/pull/2946

          I updated the PR accordingly to what developed in #328, please note that I also kept into account FLINK-5741(https://issues.apache.org/jira/browse/FLINK-5741).
          CC @aljoscha @manuzhang

          Show
          githubbot ASF GitHub Bot added a comment - Github user VenturaDelMonte commented on the issue: https://github.com/apache/flink/pull/2946 I updated the PR accordingly to what developed in #328, please note that I also kept into account FLINK-5741 ( https://issues.apache.org/jira/browse/FLINK-5741 ). CC @aljoscha @manuzhang
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/2946

          Thanks I'll have a look!

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2946 Thanks I'll have a look!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2946#discussion_r103425262

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java —
          @@ -642,31 +980,71 @@ public AllWindowedStream(DataStream<T> input,

          • @return The data stream that is the result of applying the window function to the window.
            */
            public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) { + String callLocation = Utils.getCallLocationName(); + function = input.getExecutionEnvironment().clean(function); TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( function, AllWindowFunction.class, true, true, getInputType(), null, false); - - return apply(function, resultType); + return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation); }

          /**

          • Applies the given window function to each window. The window function is called for each
          • * evaluation of the window for each key individually. The output of the window function is
            + * evaluation of the window. The output of the window function is
          • interpreted as a regular non-windowed stream.
            *
          • <p>
          • Not that this function requires that all data in the windows is buffered until the window
          • is evaluated, as the function provides no means of incremental aggregation.
            *
          • @param function The window function.
          • * @param resultType Type information for the result type of the window function
          • @return The data stream that is the result of applying the window function to the window.
            */
            public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) { + String callLocation = Utils.getCallLocationName(); + function = input.getExecutionEnvironment().clean(function); + return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation); + }
          • //clean the closure
            + /**
            + * Applies the given window function to each window. The window function is called for each
            + * evaluation of the window. The output of the window function is
            + * interpreted as a regular non-windowed stream.
            + *
            + * <p>
            + * Not that this function requires that all data in the windows is buffered until the window
            + * is evaluated, as the function provides no means of incremental aggregation.
            + *
            + * @param function The process window function.
            + * @return The data stream that is the result of applying the window function to the window.
            + */
            + public <R> SingleOutputStreamOperator<R> process(ProcessAllWindowFunction<T, R, W> function) {
              • End diff –

          This and the following should be `@PublicEvolving`

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2946#discussion_r103425262 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java — @@ -642,31 +980,71 @@ public AllWindowedStream(DataStream<T> input, @return The data stream that is the result of applying the window function to the window. */ public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) { + String callLocation = Utils.getCallLocationName(); + function = input.getExecutionEnvironment().clean(function); TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType( function, AllWindowFunction.class, true, true, getInputType(), null, false); - - return apply(function, resultType); + return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation); } /** Applies the given window function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the window function is + * evaluation of the window. The output of the window function is interpreted as a regular non-windowed stream. * <p> Not that this function requires that all data in the windows is buffered until the window is evaluated, as the function provides no means of incremental aggregation. * @param function The window function. * @param resultType Type information for the result type of the window function @return The data stream that is the result of applying the window function to the window. */ public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) { + String callLocation = Utils.getCallLocationName(); + function = input.getExecutionEnvironment().clean(function); + return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation); + } //clean the closure + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Not that this function requires that all data in the windows is buffered until the window + * is evaluated, as the function provides no means of incremental aggregation. + * + * @param function The process window function. + * @return The data stream that is the result of applying the window function to the window. + */ + public <R> SingleOutputStreamOperator<R> process(ProcessAllWindowFunction<T, R, W> function) { End diff – This and the following should be `@PublicEvolving`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2946#discussion_r103425179

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java —
          @@ -478,6 +581,135 @@ public AllWindowedStream(DataStream<T> input,
          return input.transform(opName, resultType, operator).forceNonParallel();
          }

          + /**
          + * Applies the given window function to each window. The window function is called for each
          + * evaluation of the window for each key individually. The output of the window function is
          + * interpreted as a regular non-windowed stream.
          + *
          + * <p>Arriving data is incrementally aggregated using the given aggregate function. This means
          + * that the window function typically has only a single value to process when called.
          + *
          + * @param aggFunction The aggregate function that is used for incremental aggregation.
          + * @param windowFunction The process window function.
          + *
          + * @return The data stream that is the result of applying the window function to the window.
          + *
          + * @param <ACC> The type of the AggregateFunction's accumulator
          + * @param <V> The type of AggregateFunction's result, and the WindowFunction's input
          + * @param <R> The type of the elements in the resulting stream, equal to the
          + * WindowFunction's result type
          + */
          + public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
          — End diff –

          These should also be `@PublicEvolving`. (I know that the corresponding methods on `WindowedStream` weren't marked like this due to an oversight on the implementers side.) No problem. 😃

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2946#discussion_r103425179 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java — @@ -478,6 +581,135 @@ public AllWindowedStream(DataStream<T> input, return input.transform(opName, resultType, operator).forceNonParallel(); } + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p>Arriving data is incrementally aggregated using the given aggregate function. This means + * that the window function typically has only a single value to process when called. + * + * @param aggFunction The aggregate function that is used for incremental aggregation. + * @param windowFunction The process window function. + * + * @return The data stream that is the result of applying the window function to the window. + * + * @param <ACC> The type of the AggregateFunction's accumulator + * @param <V> The type of AggregateFunction's result, and the WindowFunction's input + * @param <R> The type of the elements in the resulting stream, equal to the + * WindowFunction's result type + */ + public <ACC, V, R> SingleOutputStreamOperator<R> aggregate( — End diff – These should also be `@PublicEvolving`. (I know that the corresponding methods on `WindowedStream` weren't marked like this due to an oversight on the implementers side.) No problem. 😃
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2946#discussion_r103427991

          — Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala —
          @@ -199,6 +198,62 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W])

          { asScalaStream(javaStream.reduce(reducer, applyFunction, returnType)) }

          + /**
          + * Applies the given window function to each window. The window function is called for each
          + * evaluation of the window for each key individually. The output of the window function is
          + * interpreted as a regular non-windowed stream.
          + *
          + * Arriving data is pre-aggregated using the given pre-aggregation reducer.
          + *
          + * @param preAggregator The reduce function that is used for pre-aggregation
          + * @param windowFunction The process window function.
          + * @return The data stream that is the result of applying the window function to the window.
          + */
          + def reduce[R: TypeInformation](
          — End diff –

          The new methods should be `@PublicEvolving`. I know the existing methods aren't, but they should be and are only like this due to oversights. 😅

          That's not an error on your side.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2946#discussion_r103427991 — Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala — @@ -199,6 +198,62 @@ class AllWindowedStream [T, W <: Window] (javaStream: JavaAllWStream [T, W] ) { asScalaStream(javaStream.reduce(reducer, applyFunction, returnType)) } + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is pre-aggregated using the given pre-aggregation reducer. + * + * @param preAggregator The reduce function that is used for pre-aggregation + * @param windowFunction The process window function. + * @return The data stream that is the result of applying the window function to the window. + */ + def reduce [R: TypeInformation] ( — End diff – The new methods should be `@PublicEvolving`. I know the existing methods aren't, but they should be and are only like this due to oversights. 😅 That's not an error on your side.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/2946

          Thanks a lot for working on this! I just merged 😃

          Could you please close this PR?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2946 Thanks a lot for working on this! I just merged 😃 Could you please close this PR?
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Implemented in 788b839213811c6f2407ac6d54fef28dfa3d29a6

          Show
          aljoscha Aljoscha Krettek added a comment - Implemented in 788b839213811c6f2407ac6d54fef28dfa3d29a6
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user VenturaDelMonte closed the pull request at:

          https://github.com/apache/flink/pull/2946

          Show
          githubbot ASF GitHub Bot added a comment - Github user VenturaDelMonte closed the pull request at: https://github.com/apache/flink/pull/2946

            People

            • Assignee:
              ventura Ventura Del Monte
              Reporter:
              ventura Ventura Del Monte
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development