Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5564 User Defined Aggregates
  3. FLINK-6361

Finalize the AggregateFunction interface and refactoring built-in aggregates

    Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      We have completed codeGen for all aggregate runtime functions. Now we can finalize the AggregateFunction. This includes 1) remove Accumulator trait; 2) remove accumulate, retract, merge, resetAccumulator, getAccumulatorType methods from interface, and allow them as contracted methods for UDAGG; 3) refactoring the built-in aggregates accordingly.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user shaoxuan-wang opened a pull request:

          https://github.com/apache/flink/pull/3762

          FLINK-6361 [table] Refactoring the AggregateFunction interface and built-in aggregates

          This PR includes the following changes:
          1) remove Accumulator trait;
          2) move accumulate, retract, merge, resetAccumulator, getAccumulatorType methods out of AggregateFunction interface, and allow them to be defined as contracted methods for UDAGG;
          3) refactoring the built-in aggregates accordingly.
          4) fixed a build warning in flink/table/api/Types.scala (unrelated to FLINK-6361)

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [x] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [x] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/shaoxuan-wang/flink F6361-submit

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3762.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3762


          commit b5806b1a3975186d69a76fd369ff77cf06e1e67f
          Author: shaoxuan-wang <wshaoxuan@gmail.com>
          Date: 2017-04-24T16:28:37Z

          FLINK-6361 [table] Refactoring the AggregateFunction interface and built-in aggregates


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/3762 FLINK-6361 [table] Refactoring the AggregateFunction interface and built-in aggregates This PR includes the following changes: 1) remove Accumulator trait; 2) move accumulate, retract, merge, resetAccumulator, getAccumulatorType methods out of AggregateFunction interface, and allow them to be defined as contracted methods for UDAGG; 3) refactoring the built-in aggregates accordingly. 4) fixed a build warning in flink/table/api/Types.scala (unrelated to FLINK-6361 ) Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [x] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink F6361-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3762.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3762 commit b5806b1a3975186d69a76fd369ff77cf06e1e67f Author: shaoxuan-wang <wshaoxuan@gmail.com> Date: 2017-04-24T16:28:37Z FLINK-6361 [table] Refactoring the AggregateFunction interface and built-in aggregates
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3762#discussion_r113067259

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala —
          @@ -17,36 +17,93 @@
          */
          package org.apache.flink.table.functions

          -import java.util.

          {List => JList}

          -
          -import org.apache.flink.api.common.typeinfo.TypeInformation
          -import org.apache.flink.table.api.TableException
          -
          /**

          • Base class for User-Defined Aggregates.
            *
          • * @tparam T the type of the aggregation result
            + * The behavior of an [[AggregateFunction]] can be defined by implementing a series of custom
            + * methods. An [[AggregateFunction]] needs at least three methods: createAccumulator, getValue,
            + * and accumulate. There are a few other methods that can be optional to have: retract, merge,
            + * resetAccumulator, and getAccumulatorType.
            + *
            + * All these methods muse be declared publicly, not static and named exactly as the names
            + * mentioned above. The methods createAccumulator and getValue are defined in the
            + * [[AggregateFunction]] functions, while other methods are explained below.
            + *
            + *
            + * {{{
            + * Processes the input values and update the provided accumulator instance. The method
            + * accumulate can be overloaded with different custom types and arguments. This function is
              • End diff –

          "This function is always a MUST to have." -> "An AggregateFunction requires at least one accumulate method"?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3762#discussion_r113067259 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala — @@ -17,36 +17,93 @@ */ package org.apache.flink.table.functions -import java.util. {List => JList} - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.TableException - /** Base class for User-Defined Aggregates. * * @tparam T the type of the aggregation result + * The behavior of an [ [AggregateFunction] ] can be defined by implementing a series of custom + * methods. An [ [AggregateFunction] ] needs at least three methods: createAccumulator, getValue, + * and accumulate. There are a few other methods that can be optional to have: retract, merge, + * resetAccumulator, and getAccumulatorType. + * + * All these methods muse be declared publicly, not static and named exactly as the names + * mentioned above. The methods createAccumulator and getValue are defined in the + * [ [AggregateFunction] ] functions, while other methods are explained below. + * + * + * {{{ + * Processes the input values and update the provided accumulator instance. The method + * accumulate can be overloaded with different custom types and arguments. This function is End diff – "This function is always a MUST to have." -> "An AggregateFunction requires at least one accumulate method"?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3762#discussion_r113065861

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -471,31 +480,36 @@ class CodeGenerator(
          j"""

          $ {accTypes(i)} aAcc$i = (${accTypes(i)}

          ) a.getField($i);

          $ {accTypes(i)} bAcc$i = (${accTypes(i)}

          ) b.getField($

          {mapping(i)}

          );

          accList$i.set(0, aAcc$i);
          accList$i.set(1, bAcc$i);
          a.setField(
          $i,
          $ {aggs(i)}.merge(accList$i));
          + | accList$i.set(0, bAcc$i);
          + | ${aggs(i)}

          .merge(aAcc$i, accList$i);
          — End diff –

          `ArrayList` creates a new `Iterator` object everytime `ArrayList.iterator()` is call. We might want to implement our own single element `Iterable` to avoid that overhead.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3762#discussion_r113065861 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -471,31 +480,36 @@ class CodeGenerator( j""" $ {accTypes(i)} aAcc$i = (${accTypes(i)} ) a.getField($i); $ {accTypes(i)} bAcc$i = (${accTypes(i)} ) b.getField($ {mapping(i)} ); accList$i.set(0, aAcc$i); accList$i.set(1, bAcc$i); a.setField( $i, $ {aggs(i)}.merge(accList$i)); + | accList$i.set(0, bAcc$i); + | ${aggs(i)} .merge(aAcc$i, accList$i); — End diff – `ArrayList` creates a new `Iterator` object everytime `ArrayList.iterator()` is call. We might want to implement our own single element `Iterable` to avoid that overhead.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3762#discussion_r113066365

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala —
          @@ -17,36 +17,93 @@
          */
          package org.apache.flink.table.functions

          -import java.util.

          {List => JList}

          -
          -import org.apache.flink.api.common.typeinfo.TypeInformation
          -import org.apache.flink.table.api.TableException
          -
          /**

          • Base class for User-Defined Aggregates.
            *
          • * @tparam T the type of the aggregation result
            + * The behavior of an [[AggregateFunction]] can be defined by implementing a series of custom
            + * methods. An [[AggregateFunction]] needs at least three methods: createAccumulator, getValue,
              • End diff –

          make this a list. Scala style supports markdown syntax:
          ```
          ... needs at least three methods:

          • createAccumulator,
          • accumulate, and
          • getValue.

          There are a few other methods that can be optional to have:

          • retract,
          • merge,
          • resetAccumulator, and
          • getAccumulatorType.
            ```
          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3762#discussion_r113066365 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala — @@ -17,36 +17,93 @@ */ package org.apache.flink.table.functions -import java.util. {List => JList} - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.TableException - /** Base class for User-Defined Aggregates. * * @tparam T the type of the aggregation result + * The behavior of an [ [AggregateFunction] ] can be defined by implementing a series of custom + * methods. An [ [AggregateFunction] ] needs at least three methods: createAccumulator, getValue, End diff – make this a list. Scala style supports markdown syntax: ``` ... needs at least three methods: createAccumulator, accumulate, and getValue. There are a few other methods that can be optional to have: retract, merge, resetAccumulator, and getAccumulatorType. ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3762#discussion_r113072949

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -1281,7 +1323,7 @@ object AggregateUtil {
          }

          private[flink] def createAccumulatorRowType(

          • aggregates: Array[TableAggregateFunction[_]]): RowTypeInfo = {
            + aggregates: Array[TableAggregateFunction[_,_]]): RowTypeInfo = {
              • End diff –

          +space

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3762#discussion_r113072949 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -1281,7 +1323,7 @@ object AggregateUtil { } private [flink] def createAccumulatorRowType( aggregates: Array[TableAggregateFunction [_] ]): RowTypeInfo = { + aggregates: Array[TableAggregateFunction [_,_] ]): RowTypeInfo = { End diff – +space
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3762#discussion_r113072758

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -1232,12 +1266,20 @@ object AggregateUtil {
          }

          private def createAccumulatorType(

          • aggregates: Array[TableAggregateFunction[_]]): Seq[TypeInformation[_]] = {
            + aggregates: Array[TableAggregateFunction[_,_]]): Seq[TypeInformation[_]] = {

          val aggTypes: Seq[TypeInformation[_]] =
          aggregates.map {
          agg =>

          • val accType = agg.getAccumulatorType
            + var accType: TypeInformation[_] = null
              • End diff –

          change to
          ```
          val accType = try

          { val method: Method = agg.getClass.getMethod("getAccumulatorType") method.invoke(agg).asInstanceOf[TypeInformation[_]] }

          catch

          { case _: NoSuchMethodException => null case ite: Throwable => throw new TableException("Unexpected exception:", ite) }

          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3762#discussion_r113072758 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -1232,12 +1266,20 @@ object AggregateUtil { } private def createAccumulatorType( aggregates: Array[TableAggregateFunction [_] ]): Seq[TypeInformation [_] ] = { + aggregates: Array[TableAggregateFunction [_,_] ]): Seq[TypeInformation [_] ] = { val aggTypes: Seq[TypeInformation [_] ] = aggregates.map { agg => val accType = agg.getAccumulatorType + var accType: TypeInformation [_] = null End diff – change to ``` val accType = try { val method: Method = agg.getClass.getMethod("getAccumulatorType") method.invoke(agg).asInstanceOf[TypeInformation[_]] } catch { case _: NoSuchMethodException => null case ite: Throwable => throw new TableException("Unexpected exception:", ite) } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3762#discussion_r113072931

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -1259,7 +1301,7 @@ object AggregateUtil {

          private def createDataSetAggregateBufferDataType(
          groupings: Array[Int],

          • aggregates: Array[TableAggregateFunction[_]],
            + aggregates: Array[TableAggregateFunction[_,_]],
              • End diff –

          +space

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3762#discussion_r113072931 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -1259,7 +1301,7 @@ object AggregateUtil { private def createDataSetAggregateBufferDataType( groupings: Array [Int] , aggregates: Array[TableAggregateFunction [_] ], + aggregates: Array[TableAggregateFunction [_,_] ], End diff – +space
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3762#discussion_r113073730

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala —
          @@ -51,7 +51,7 @@ class BoundedProcessingOverRangeProcessFunctionTest {

          val aggregates =
          Array(new LongMinWithRetractAggFunction,

          • new LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_]]]
            + new LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_,_]]]
              • End diff –

          +space

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3762#discussion_r113073730 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala — @@ -51,7 +51,7 @@ class BoundedProcessingOverRangeProcessFunctionTest { val aggregates = Array(new LongMinWithRetractAggFunction, new LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction [_] ]] + new LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction [_,_] ]] End diff – +space
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3762

          Hi @shaoxuan-wang, I made another pass over the PR and think it looks good.
          I can address the changes I suggested and also add a `SingleElementIterable` instead of using `ArrayList` and finally merge the PR.

          What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3762 Hi @shaoxuan-wang, I made another pass over the PR and think it looks good. I can address the changes I suggested and also add a `SingleElementIterable` instead of using `ArrayList` and finally merge the PR. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on the issue:

          https://github.com/apache/flink/pull/3762

          Hi @fhueske , it is good to have a dedicated Iterable for pair-merge. Please go ahead. Thanks.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3762 Hi @fhueske , it is good to have a dedicated Iterable for pair-merge. Please go ahead. Thanks.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3762

          Thanks, will merge then

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3762 Thanks, will merge then
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3762

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3762
          Hide
          fhueske Fabian Hueske added a comment -

          Implemented with 589e45c5c50c328783f71d219c6606e972f42f34

          Show
          fhueske Fabian Hueske added a comment - Implemented with 589e45c5c50c328783f71d219c6606e972f42f34

            People

            • Assignee:
              ShaoxuanWang Shaoxuan Wang
              Reporter:
              ShaoxuanWang Shaoxuan Wang
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development