Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Issue Links

        Activity

        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user shaoxuan-wang opened a pull request:

        https://github.com/apache/flink/pull/3694

        FLINK-6240 [table] codeGen dataStream aggregates that use AggregateAggFunction

        Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
        If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
        In addition to going through the list, please provide a meaningful description of your changes.

        • [x] General
        • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
        • The pull request addresses only one issue
        • Each commit in the PR has a meaningful commit message (including the JIRA id)
        • [ ] Documentation
        • Documentation has been added for new functionality
        • Old documentation affected by the pull request has been updated
        • JavaDoc for public methods has been added
        • [x] Tests & Build
        • Functionality added by the pull request is covered by tests
        • `mvn clean verify` has been executed successfully locally or a Travis build has passed

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/shaoxuan-wang/flink F6240-submit

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/flink/pull/3694.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #3694


        commit 220487777039ec620eda536b3290453cb7acd177
        Author: shaoxuan-wang <wshaoxuan@gmail.com>
        Date: 2017-04-07T08:18:14Z

        FLINK-6240 [table] codeGen dataStream aggregates that use AggregateAggFunction


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/3694 FLINK-6240 [table] codeGen dataStream aggregates that use AggregateAggFunction Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [x] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink F6240-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3694.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3694 commit 220487777039ec620eda536b3290453cb7acd177 Author: shaoxuan-wang <wshaoxuan@gmail.com> Date: 2017-04-07T08:18:14Z FLINK-6240 [table] codeGen dataStream aggregates that use AggregateAggFunction
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3694#discussion_r110342899

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala —
        @@ -75,4 +75,13 @@ abstract class GeneratedAggregations extends Function {
        */
        def createOutputRow(): Row

        + /**
        + * merge two rows of accumulators into one row
        + *
        + * @param a one input row
        + * @param b the other input row
        + * @return a row of accumulators which contains the merged aggregated results
        + */
        + def mergeTwoRows(a: Row, b: Row): Row
        — End diff –

        rename to `mergeAccumulators` or `mergeAccumulatorsPair`?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3694#discussion_r110342899 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala — @@ -75,4 +75,13 @@ abstract class GeneratedAggregations extends Function { */ def createOutputRow(): Row + /** + * merge two rows of accumulators into one row + * + * @param a one input row + * @param b the other input row + * @return a row of accumulators which contains the merged aggregated results + */ + def mergeTwoRows(a: Row, b: Row): Row — End diff – rename to `mergeAccumulators` or `mergeAccumulatorsPair`?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3694#discussion_r110340880

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -403,6 +403,41 @@ class CodeGenerator(

        }""".stripMargin
        }

        + def generateMergeTwoRows(
        + accTypes: Array[String],
        + aggs: Array[String]): String = {
        +
        + val sig: String =
        + j"""
        + | public org.apache.flink.types.Row mergeTwoRows(
        — End diff –

        Rename to `mergeAccumulators`?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3694#discussion_r110340880 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -403,6 +403,41 @@ class CodeGenerator( }""".stripMargin } + def generateMergeTwoRows( + accTypes: Array [String] , + aggs: Array [String] ): String = { + + val sig: String = + j""" + | public org.apache.flink.types.Row mergeTwoRows( — End diff – Rename to `mergeAccumulators`?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3694#discussion_r110341342

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala —
        @@ -18,69 +18,51 @@

        package org.apache.flink.table.runtime.aggregate

        -import java.util.

        {ArrayList => JArrayList, List => JList}

        -import org.apache.flink.api.common.functions.

        {AggregateFunction => DataStreamAggFunc}

        -import org.apache.flink.table.functions.

        {Accumulator, AggregateFunction}

        +import org.apache.flink.api.common.functions.AggregateFunction
        +import org.apache.flink.table.codegen.

        {Compiler, GeneratedAggregationsFunction}

        import org.apache.flink.types.Row
        +import org.slf4j.LoggerFactory

        /**

        • Aggregate Function used for the aggregate operator in
        • [[org.apache.flink.streaming.api.datastream.WindowedStream]]
          *
        • * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
        • * used for this aggregation
        • * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param genAggregations Generated aggregate helper function
          */
          class AggregateAggFunction(
        • private val aggregates: Array[AggregateFunction[_]],
        • private val aggFields: Array[Array[Int]])
        • extends DataStreamAggFunc[Row, Row, Row] {
          + genAggregations: GeneratedAggregationsFunction)
            • End diff –

        can be moved to line above

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3694#discussion_r110341342 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala — @@ -18,69 +18,51 @@ package org.apache.flink.table.runtime.aggregate -import java.util. {ArrayList => JArrayList, List => JList} -import org.apache.flink.api.common.functions. {AggregateFunction => DataStreamAggFunc} -import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.api.common.functions.AggregateFunction +import org.apache.flink.table.codegen. {Compiler, GeneratedAggregationsFunction} import org.apache.flink.types.Row +import org.slf4j.LoggerFactory /** Aggregate Function used for the aggregate operator in [ [org.apache.flink.streaming.api.datastream.WindowedStream] ] * * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] * used for this aggregation * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param genAggregations Generated aggregate helper function */ class AggregateAggFunction( private val aggregates: Array[AggregateFunction [_] ], private val aggFields: Array[Array [Int] ]) extends DataStreamAggFunc [Row, Row, Row] { + genAggregations: GeneratedAggregationsFunction) End diff – can be moved to line above
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3694#discussion_r110342700

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
        @@ -792,6 +792,20 @@ object AggregateUtil {
        inputType,
        needRetraction = false)

        + val forwardMapping = (0 until inputType.getFieldCount).map(x => (x, x)).toArray
        — End diff –

        If we would call the `setForwardFields()` method, we would override something.
        We should pass an empty array such that the method doesn't do anything.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3694#discussion_r110342700 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -792,6 +792,20 @@ object AggregateUtil { inputType, needRetraction = false) + val forwardMapping = (0 until inputType.getFieldCount).map(x => (x, x)).toArray — End diff – If we would call the `setForwardFields()` method, we would override something. We should pass an empty array such that the method doesn't do anything.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3694#discussion_r110340764

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -403,6 +403,41 @@ class CodeGenerator(

        }""".stripMargin
        }

        + def generateMergeTwoRows(
        + accTypes: Array[String],
        + aggs: Array[String]): String = {
        +
        + val sig: String =
        + j"""
        + | public org.apache.flink.types.Row mergeTwoRows(
        + | org.apache.flink.types.Row a,
        + | org.apache.flink.types.Row b)
        + | """.stripMargin
        + val merge: String = {
        + for (i <- aggs.indices) yield
        + j"""
        + | $

        {accTypes(i)} aAcc$i = (${accTypes(i)}

        ) a.getField($i);
        + | $

        {accTypes(i)} bAcc$i = (${accTypes(i)}

        ) b.getField($i);
        + | java.util.ArrayList<$

        {accTypes(i)}

        > accumulators$i
        — End diff –

        Can we create a reusable list for each aggregation function?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3694#discussion_r110340764 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -403,6 +403,41 @@ class CodeGenerator( }""".stripMargin } + def generateMergeTwoRows( + accTypes: Array [String] , + aggs: Array [String] ): String = { + + val sig: String = + j""" + | public org.apache.flink.types.Row mergeTwoRows( + | org.apache.flink.types.Row a, + | org.apache.flink.types.Row b) + | """.stripMargin + val merge: String = { + for (i <- aggs.indices) yield + j""" + | $ {accTypes(i)} aAcc$i = (${accTypes(i)} ) a.getField($i); + | $ {accTypes(i)} bAcc$i = (${accTypes(i)} ) b.getField($i); + | java.util.ArrayList<$ {accTypes(i)} > accumulators$i — End diff – Can we create a reusable list for each aggregation function?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3694#discussion_r110341810

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala —
        @@ -18,69 +18,51 @@

        package org.apache.flink.table.runtime.aggregate

        -import java.util.

        {ArrayList => JArrayList, List => JList}

        -import org.apache.flink.api.common.functions.

        {AggregateFunction => DataStreamAggFunc}

        -import org.apache.flink.table.functions.

        {Accumulator, AggregateFunction}

        +import org.apache.flink.api.common.functions.AggregateFunction
        +import org.apache.flink.table.codegen.

        {Compiler, GeneratedAggregationsFunction}

        import org.apache.flink.types.Row
        +import org.slf4j.LoggerFactory

        /**

        • Aggregate Function used for the aggregate operator in
        • [[org.apache.flink.streaming.api.datastream.WindowedStream]]
          *
        • * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
        • * used for this aggregation
        • * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param genAggregations Generated aggregate helper function
          */
          class AggregateAggFunction(
        • private val aggregates: Array[AggregateFunction[_]],
        • private val aggFields: Array[Array[Int]])
        • extends DataStreamAggFunc[Row, Row, Row] {
          + genAggregations: GeneratedAggregationsFunction)
          + extends AggregateFunction[Row, Row, Row]
            • End diff –

        Rather extend `RichAggregateFunction` and compile the function in the `open()` method.
        This would fit the lifecycle of a function better.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3694#discussion_r110341810 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala — @@ -18,69 +18,51 @@ package org.apache.flink.table.runtime.aggregate -import java.util. {ArrayList => JArrayList, List => JList} -import org.apache.flink.api.common.functions. {AggregateFunction => DataStreamAggFunc} -import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.api.common.functions.AggregateFunction +import org.apache.flink.table.codegen. {Compiler, GeneratedAggregationsFunction} import org.apache.flink.types.Row +import org.slf4j.LoggerFactory /** Aggregate Function used for the aggregate operator in [ [org.apache.flink.streaming.api.datastream.WindowedStream] ] * * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] * used for this aggregation * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param genAggregations Generated aggregate helper function */ class AggregateAggFunction( private val aggregates: Array[AggregateFunction [_] ], private val aggFields: Array[Array [Int] ]) extends DataStreamAggFunc [Row, Row, Row] { + genAggregations: GeneratedAggregationsFunction) + extends AggregateFunction [Row, Row, Row] End diff – Rather extend `RichAggregateFunction` and compile the function in the `open()` method. This would fit the lifecycle of a function better.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user shaoxuan-wang commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3694#discussion_r110344806

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -403,6 +403,41 @@ class CodeGenerator(

        }""".stripMargin
        }

        + def generateMergeTwoRows(
        + accTypes: Array[String],
        + aggs: Array[String]): String = {
        +
        + val sig: String =
        + j"""
        + | public org.apache.flink.types.Row mergeTwoRows(
        + | org.apache.flink.types.Row a,
        + | org.apache.flink.types.Row b)
        + | """.stripMargin
        + val merge: String = {
        + for (i <- aggs.indices) yield
        + j"""
        + | $

        {accTypes(i)} aAcc$i = (${accTypes(i)}

        ) a.getField($i);
        + | $

        {accTypes(i)} bAcc$i = (${accTypes(i)}

        ) b.getField($i);
        + | java.util.ArrayList<$

        {accTypes(i)}

        > accumulators$i
        — End diff –

        I was planning to remove `trait Accumulator` very soon, we cannot reuse the list after that.

        Show
        githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3694#discussion_r110344806 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -403,6 +403,41 @@ class CodeGenerator( }""".stripMargin } + def generateMergeTwoRows( + accTypes: Array [String] , + aggs: Array [String] ): String = { + + val sig: String = + j""" + | public org.apache.flink.types.Row mergeTwoRows( + | org.apache.flink.types.Row a, + | org.apache.flink.types.Row b) + | """.stripMargin + val merge: String = { + for (i <- aggs.indices) yield + j""" + | $ {accTypes(i)} aAcc$i = (${accTypes(i)} ) a.getField($i); + | $ {accTypes(i)} bAcc$i = (${accTypes(i)} ) b.getField($i); + | java.util.ArrayList<$ {accTypes(i)} > accumulators$i — End diff – I was planning to remove `trait Accumulator` very soon, we cannot reuse the list after that.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user shaoxuan-wang commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3694#discussion_r110345902

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala —
        @@ -18,69 +18,51 @@

        package org.apache.flink.table.runtime.aggregate

        -import java.util.

        {ArrayList => JArrayList, List => JList}

        -import org.apache.flink.api.common.functions.

        {AggregateFunction => DataStreamAggFunc}

        -import org.apache.flink.table.functions.

        {Accumulator, AggregateFunction}

        +import org.apache.flink.api.common.functions.AggregateFunction
        +import org.apache.flink.table.codegen.

        {Compiler, GeneratedAggregationsFunction}

        import org.apache.flink.types.Row
        +import org.slf4j.LoggerFactory

        /**

        • Aggregate Function used for the aggregate operator in
        • [[org.apache.flink.streaming.api.datastream.WindowedStream]]
          *
        • * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
        • * used for this aggregation
        • * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param genAggregations Generated aggregate helper function
          */
          class AggregateAggFunction(
        • private val aggregates: Array[AggregateFunction[_]],
        • private val aggFields: Array[Array[Int]])
        • extends DataStreamAggFunc[Row, Row, Row] {
          + genAggregations: GeneratedAggregationsFunction)
          + extends AggregateFunction[Row, Row, Row]
            • End diff –

        I have thought about this. But unfortunately, the aggregate in WindowedStream does not support richFunction:
        ```
        public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
        ...
        if (aggregateFunction instanceof RichFunction)

        { throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction."); }

        ```

        Show
        githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3694#discussion_r110345902 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala — @@ -18,69 +18,51 @@ package org.apache.flink.table.runtime.aggregate -import java.util. {ArrayList => JArrayList, List => JList} -import org.apache.flink.api.common.functions. {AggregateFunction => DataStreamAggFunc} -import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.api.common.functions.AggregateFunction +import org.apache.flink.table.codegen. {Compiler, GeneratedAggregationsFunction} import org.apache.flink.types.Row +import org.slf4j.LoggerFactory /** Aggregate Function used for the aggregate operator in [ [org.apache.flink.streaming.api.datastream.WindowedStream] ] * * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] * used for this aggregation * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param genAggregations Generated aggregate helper function */ class AggregateAggFunction( private val aggregates: Array[AggregateFunction [_] ], private val aggFields: Array[Array [Int] ]) extends DataStreamAggFunc [Row, Row, Row] { + genAggregations: GeneratedAggregationsFunction) + extends AggregateFunction [Row, Row, Row] End diff – I have thought about this. But unfortunately, the aggregate in WindowedStream does not support richFunction: ``` public <ACC, V, R> SingleOutputStreamOperator<R> aggregate( ... if (aggregateFunction instanceof RichFunction) { throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction."); } ```
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3694#discussion_r110358881

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala —
        @@ -18,69 +18,51 @@

        package org.apache.flink.table.runtime.aggregate

        -import java.util.

        {ArrayList => JArrayList, List => JList}

        -import org.apache.flink.api.common.functions.

        {AggregateFunction => DataStreamAggFunc}

        -import org.apache.flink.table.functions.

        {Accumulator, AggregateFunction}

        +import org.apache.flink.api.common.functions.AggregateFunction
        +import org.apache.flink.table.codegen.

        {Compiler, GeneratedAggregationsFunction}

        import org.apache.flink.types.Row
        +import org.slf4j.LoggerFactory

        /**

        • Aggregate Function used for the aggregate operator in
        • [[org.apache.flink.streaming.api.datastream.WindowedStream]]
          *
        • * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
        • * used for this aggregation
        • * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param genAggregations Generated aggregate helper function
          */
          class AggregateAggFunction(
        • private val aggregates: Array[AggregateFunction[_]],
        • private val aggFields: Array[Array[Int]])
        • extends DataStreamAggFunc[Row, Row, Row] {
          + genAggregations: GeneratedAggregationsFunction)
          + extends AggregateFunction[Row, Row, Row]
            • End diff –

        Hmm, OK...
        I think we have to check that `function` is initialized in all methods, because `createAccumulator()` might not be called in all cases (e.g., on recovery or starting from a savepoint).

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3694#discussion_r110358881 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala — @@ -18,69 +18,51 @@ package org.apache.flink.table.runtime.aggregate -import java.util. {ArrayList => JArrayList, List => JList} -import org.apache.flink.api.common.functions. {AggregateFunction => DataStreamAggFunc} -import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.api.common.functions.AggregateFunction +import org.apache.flink.table.codegen. {Compiler, GeneratedAggregationsFunction} import org.apache.flink.types.Row +import org.slf4j.LoggerFactory /** Aggregate Function used for the aggregate operator in [ [org.apache.flink.streaming.api.datastream.WindowedStream] ] * * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] * used for this aggregation * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param genAggregations Generated aggregate helper function */ class AggregateAggFunction( private val aggregates: Array[AggregateFunction [_] ], private val aggFields: Array[Array [Int] ]) extends DataStreamAggFunc [Row, Row, Row] { + genAggregations: GeneratedAggregationsFunction) + extends AggregateFunction [Row, Row, Row] End diff – Hmm, OK... I think we have to check that `function` is initialized in all methods, because `createAccumulator()` might not be called in all cases (e.g., on recovery or starting from a savepoint).
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user shaoxuan-wang commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3694#discussion_r111813245

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
        @@ -792,6 +792,20 @@ object AggregateUtil {
        inputType,
        needRetraction = false)

        + val forwardMapping = (0 until inputType.getFieldCount).map(x => (x, x)).toArray
        — End diff –

        Thanks @fhueske , I have addressed your comments, please take a look.

        Show
        githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3694#discussion_r111813245 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -792,6 +792,20 @@ object AggregateUtil { inputType, needRetraction = false) + val forwardMapping = (0 until inputType.getFieldCount).map(x => (x, x)).toArray — End diff – Thanks @fhueske , I have addressed your comments, please take a look.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user shaoxuan-wang commented on the issue:

        https://github.com/apache/flink/pull/3694

        Thanks @fhueske , I have addressed your comments, please take a look.

        Show
        githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3694 Thanks @fhueske , I have addressed your comments, please take a look.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3694#discussion_r111814395

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -396,13 +396,68 @@ class CodeGenerator(

        }""".stripMargin
        }
        • def generateCreateOutputRow(outputArity: Int): String = {
          + def genCreateOutputRow(outputArity: Int): String =
          Unknown macro: { j""" | public org.apache.flink.types.Row createOutputRow() { | return new org.apache.flink.types.Row($outputArity); | }""".stripMargin }

        + def genMergeAccumulatorsPair(
        + accTypes: Array[String],
        + aggs: Array[String]): String = {
        +
        + val sig: String =
        + j"""
        + | public org.apache.flink.types.Row mergeAccumulatorsPair(
        + | org.apache.flink.types.Row a,
        + | org.apache.flink.types.Row b)
        + """.stripMargin
        + val merge: String = {
        + for (i <- aggs.indices) yield
        + j"""
        + | $

        {accTypes(i)} aAcc$i = (${accTypes(i)}

        ) a.getField($i);
        + | $

        {accTypes(i)} bAcc$i = (${accTypes(i)}

        ) b.getField($i);
        + | accList$i.set(0, aAcc$i);
        + | accList$i.set(1, bAcc$i);
        + | a.setField(
        + | $i,
        + | $

        {aggs(i)}

        .merge(accList$i));
        + """.stripMargin
        + }.mkString("\n")
        + val ret: String =
        + j"""
        + | return a;
        + """.stripMargin
        +
        + j"""$sig

        { + |$merge + |$ret + | }

        """.stripMargin
        + }
        +
        + def genMergeList(accTypes: Array[String]): String = {
        + {
        + for (i <- accTypes.indices) yield
        + j"""
        + | java.util.ArrayList<$

        {accTypes(i)}> accList$i;
        + """.stripMargin
        + }.mkString("\n")
        + }
        +
        + def initMergeList(
        + accTypes: Array[String],
        + aggs: Array[String]): String = {
        + {
        + for (i <- accTypes.indices) yield
        + j"""
        + | accList$i = new java.util.ArrayList<${accTypes(i)}

        >();
        — End diff –

        create with initial capacity 2 `new java.util.ArrayList<$

        {accTypes(i)}

        >(2);`?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3694#discussion_r111814395 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -396,13 +396,68 @@ class CodeGenerator( }""".stripMargin } def generateCreateOutputRow(outputArity: Int): String = { + def genCreateOutputRow(outputArity: Int): String = Unknown macro: { j""" | public org.apache.flink.types.Row createOutputRow() { | return new org.apache.flink.types.Row($outputArity); | }""".stripMargin } + def genMergeAccumulatorsPair( + accTypes: Array [String] , + aggs: Array [String] ): String = { + + val sig: String = + j""" + | public org.apache.flink.types.Row mergeAccumulatorsPair( + | org.apache.flink.types.Row a, + | org.apache.flink.types.Row b) + """.stripMargin + val merge: String = { + for (i <- aggs.indices) yield + j""" + | $ {accTypes(i)} aAcc$i = (${accTypes(i)} ) a.getField($i); + | $ {accTypes(i)} bAcc$i = (${accTypes(i)} ) b.getField($i); + | accList$i.set(0, aAcc$i); + | accList$i.set(1, bAcc$i); + | a.setField( + | $i, + | $ {aggs(i)} .merge(accList$i)); + """.stripMargin + }.mkString("\n") + val ret: String = + j""" + | return a; + """.stripMargin + + j"""$sig { + |$merge + |$ret + | } """.stripMargin + } + + def genMergeList(accTypes: Array [String] ): String = { + { + for (i <- accTypes.indices) yield + j""" + | java.util.ArrayList<$ {accTypes(i)}> accList$i; + """.stripMargin + }.mkString("\n") + } + + def initMergeList( + accTypes: Array [String] , + aggs: Array [String] ): String = { + { + for (i <- accTypes.indices) yield + j""" + | accList$i = new java.util.ArrayList<${accTypes(i)} >(); — End diff – create with initial capacity 2 `new java.util.ArrayList<$ {accTypes(i)} >(2);`?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3694#discussion_r111816123

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
        @@ -792,6 +792,20 @@ object AggregateUtil {
        inputType,
        needRetraction = false)

        + val forwardMapping = (0 until inputType.getFieldCount).map(x => (x, x)).toArray
        — End diff –

        This field is no longer used and can be removed

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3694#discussion_r111816123 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -792,6 +792,20 @@ object AggregateUtil { inputType, needRetraction = false) + val forwardMapping = (0 until inputType.getFieldCount).map(x => (x, x)).toArray — End diff – This field is no longer used and can be removed
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user asfgit closed the pull request at:

        https://github.com/apache/flink/pull/3694

        Show
        githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3694
        Hide
        fhueske Fabian Hueske added a comment -

        Implemented with 9be5cc42c02c258d3843c373b7350240c9570523

        Show
        fhueske Fabian Hueske added a comment - Implemented with 9be5cc42c02c258d3843c373b7350240c9570523

          People

          • Assignee:
            ShaoxuanWang Shaoxuan Wang
            Reporter:
            ShaoxuanWang Shaoxuan Wang
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development