Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0
    • Component/s: Streaming
    • Labels:
      None

      Description

      Various followup issues to the aggregation function, like

      • Allowing different input/output types for the cases where an additional window apply function is specified
      • Adding the aggregate() methods to the Scala API
      • Adding the window translation tests

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3207#discussion_r97773952

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java —
          @@ -18,14 +18,37 @@

          package org.apache.flink.api.common.functions;

          +import org.apache.flink.annotation.PublicEvolving;
          +
          import java.io.Serializable;

          /**
          + * The

          {@code AggregateFunction} is a flexible aggregation function, characterized by the
          + * following features:
          + *
          + * <ul>
          + * <li>The aggregates may use different types for input values, intermediate aggregates,
          + * and result type, to support a wide range of aggregation types.</li>
          + *
          + * <li>Support for distributive aggregations: Different intermediate aggregates can be
          + * merged together, to allow for pre-aggregation/final-aggregation optimizations.</li>
          + * </ul>
          + *
          + * <p>The {@code AggregateFunction}

          's intermediate aggregate (in-progress aggregation state)
          + * in called the <i>accumulator</i>. Values are added to the accumulator, and final aggregates are
          — End diff –

          Typo: "is called"

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97773952 — Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java — @@ -18,14 +18,37 @@ package org.apache.flink.api.common.functions; +import org.apache.flink.annotation.PublicEvolving; + import java.io.Serializable; /** + * The {@code AggregateFunction} is a flexible aggregation function, characterized by the + * following features: + * + * <ul> + * <li>The aggregates may use different types for input values, intermediate aggregates, + * and result type, to support a wide range of aggregation types.</li> + * + * <li>Support for distributive aggregations: Different intermediate aggregates can be + * merged together, to allow for pre-aggregation/final-aggregation optimizations.</li> + * </ul> + * + * <p>The {@code AggregateFunction} 's intermediate aggregate (in-progress aggregation state) + * in called the <i>accumulator</i>. Values are added to the accumulator, and final aggregates are — End diff – Typo: "is called"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3207#discussion_r97776428

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java —
          @@ -101,6 +108,24 @@ public void testReduceWithRichReducerFails() throws Exception {
          }

          /**
          + * .aggregate() does not support RichAggregateFunction, since the reduce function is used internally
          — End diff –

          Typo: "aggregate function".

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97776428 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java — @@ -101,6 +108,24 @@ public void testReduceWithRichReducerFails() throws Exception { } /** + * .aggregate() does not support RichAggregateFunction, since the reduce function is used internally — End diff – Typo: "aggregate function".
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3207#discussion_r97775106

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java —
          @@ -279,6 +286,202 @@ public AllWindowedStream(DataStream<T> input,
          return input.transform(opName, resultType, operator).forceNonParallel();
          }

          + // ------------------------------------------------------------------------
          + // AggregateFunction
          + // ------------------------------------------------------------------------
          +
          + /**
          + * Applies the given fold function to each window. The window function is called for each
          + * evaluation of the window for each key individually. The output of the reduce function is
          — End diff –

          Typo: "reduce function"

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97775106 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java — @@ -279,6 +286,202 @@ public AllWindowedStream(DataStream<T> input, return input.transform(opName, resultType, operator).forceNonParallel(); } + // ------------------------------------------------------------------------ + // AggregateFunction + // ------------------------------------------------------------------------ + + /** + * Applies the given fold function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the reduce function is — End diff – Typo: "reduce function"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3207#discussion_r97775252

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java —
          @@ -279,6 +286,202 @@ public AllWindowedStream(DataStream<T> input,
          return input.transform(opName, resultType, operator).forceNonParallel();
          }

          + // ------------------------------------------------------------------------
          + // AggregateFunction
          + // ------------------------------------------------------------------------
          +
          + /**
          + * Applies the given fold function to each window. The window function is called for each
          + * evaluation of the window for each key individually. The output of the reduce function is
          + * interpreted as a regular non-windowed stream.
          + *
          + * @param function The fold function.
          — End diff –

          Typo: "fold function"

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97775252 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java — @@ -279,6 +286,202 @@ public AllWindowedStream(DataStream<T> input, return input.transform(opName, resultType, operator).forceNonParallel(); } + // ------------------------------------------------------------------------ + // AggregateFunction + // ------------------------------------------------------------------------ + + /** + * Applies the given fold function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the reduce function is + * interpreted as a regular non-windowed stream. + * + * @param function The fold function. — End diff – Typo: "fold function"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3207#discussion_r97774041

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java —
          @@ -81,14 +104,55 @@

          • }
          • }
          • }</pre>
            + *
            + * @param <IN> The type of the values that are aggregated (input values)
            + * @param <ACC> The type of the accumulator (intermediate aggregate state).
            + * @param <OUT> The type of the aggregated result
            */
            +@PublicEvolving
            public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {

          + /**
          + * Creates a new accumulator, starting a new aggregate.
          + *
          + * <p>The new accumulator iy typically meaningless unless a value is added
          — End diff –

          Typo: "is typically"

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97774041 — Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java — @@ -81,14 +104,55 @@ } } }</pre> + * + * @param <IN> The type of the values that are aggregated (input values) + * @param <ACC> The type of the accumulator (intermediate aggregate state). + * @param <OUT> The type of the aggregated result */ +@PublicEvolving public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable { + /** + * Creates a new accumulator, starting a new aggregate. + * + * <p>The new accumulator iy typically meaningless unless a value is added — End diff – Typo: "is typically"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3207#discussion_r97776155

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java —
          @@ -515,12 +515,16 @@ public WindowedStream(KeyedStream<T, K> input,
          // ------------------------------------------------------------------------

          /**

          • * Applies the given fold function to each window. The window function is called for each
          • * evaluation of the window for each key individually. The output of the reduce function is
          • * interpreted as a regular non-windowed stream.
            + * Applies the given aggregation function to each window. The aggregation function is called for
            + * each element, aggregating values incrementally and keeping the state to one accumulator
            + * per key and window.
            *
          • @param function The fold function.
              • End diff –

          Pre-existing typo: "fold function".

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97776155 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java — @@ -515,12 +515,16 @@ public WindowedStream(KeyedStream<T, K> input, // ------------------------------------------------------------------------ /** * Applies the given fold function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the reduce function is * interpreted as a regular non-windowed stream. + * Applies the given aggregation function to each window. The aggregation function is called for + * each element, aggregating values incrementally and keeping the state to one accumulator + * per key and window. * @param function The fold function. End diff – Pre-existing typo: "fold function".
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3207#discussion_r97774934

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java —
          @@ -279,6 +286,202 @@ public AllWindowedStream(DataStream<T> input,
          return input.transform(opName, resultType, operator).forceNonParallel();
          }

          + // ------------------------------------------------------------------------
          + // AggregateFunction
          + // ------------------------------------------------------------------------
          +
          + /**
          + * Applies the given fold function to each window. The window function is called for each
          — End diff –

          Typo: "fold function"

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3207#discussion_r97774934 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java — @@ -279,6 +286,202 @@ public AllWindowedStream(DataStream<T> input, return input.transform(opName, resultType, operator).forceNonParallel(); } + // ------------------------------------------------------------------------ + // AggregateFunction + // ------------------------------------------------------------------------ + + /** + * Applies the given fold function to each window. The window function is called for each — End diff – Typo: "fold function"
          Hide
          StephanEwen Stephan Ewen added a comment -

          Fixed in 1542260d52238e87de4fa040e6079465777e8263

          Show
          StephanEwen Stephan Ewen added a comment - Fixed in 1542260d52238e87de4fa040e6079465777e8263
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3207

          Manually merged to `master` in 1542260d52238e87de4fa040e6079465777e8263

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3207 Manually merged to `master` in 1542260d52238e87de4fa040e6079465777e8263
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen closed the pull request at:

          https://github.com/apache/flink/pull/3207

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/3207

            People

            • Assignee:
              StephanEwen Stephan Ewen
              Reporter:
              StephanEwen Stephan Ewen
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development