Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Issue Links

        Activity

        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user shaoxuan-wang opened a pull request:

        https://github.com/apache/flink/pull/3735

        FLINK-6242 [table] Add code generation for DataSet Aggregates

        Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
        If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
        In addition to going through the list, please provide a meaningful description of your changes.

        • [x] General
        • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
        • The pull request addresses only one issue
        • Each commit in the PR has a meaningful commit message (including the JIRA id)
        • [ ] Documentation
        • Documentation has been added for new functionality
        • Old documentation affected by the pull request has been updated
        • JavaDoc for public methods has been added
        • [x] Tests & Build
        • Functionality added by the pull request is covered by tests
        • `mvn clean verify` has been executed successfully locally or a Travis build has passed

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/shaoxuan-wang/flink F6242-submit

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/flink/pull/3735.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #3735


        commit 16e8aa7400f1b9a9f490522427f269fd01a0f640
        Author: shaoxuan-wang <wshaoxuan@gmail.com>
        Date: 2017-04-18T13:45:49Z

        FLINK-6242 [table] Add code generation for DataSet Aggregates


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/3735 FLINK-6242 [table] Add code generation for DataSet Aggregates Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [x] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink F6242-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3735.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3735 commit 16e8aa7400f1b9a9f490522427f269fd01a0f640 Author: shaoxuan-wang <wshaoxuan@gmail.com> Date: 2017-04-18T13:45:49Z FLINK-6242 [table] Add code generation for DataSet Aggregates
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r111978489

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala —
        @@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function {
        def setAggregationResults(accumulators: Row, output: Row)

        /**
        + * Calculates the results from accumulators, and set the results to the output (with key offset)
        + *
        + * @param accumulators the accumulators (saved in a row) which contains the current
        + * aggregated results
        + * @param output output results collected in a row
        + */
        + def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row)
        — End diff –

        I don't think we need to extend the `GeneratedAggregations` interface (except for `resetAccumulators()`)
        I would rather implement another code generation function that implements the existing methods differently. This would mean to add another method to `CodeGenerator` that generates the `GeneratedAggregations` interface suitable for the DataSet aggregations.

        • `setAggregationResultsWithKeyOffset` -> `setAggregationResults`
        • `setKeyToOutput` -> `setForwardedFields`
        • `accumulateWithKeyOffset` -> `accumulate`
        • `createAccumulatorsAndSetToOutput` could be replaced by `createAccumulators` (called once to create a reusable accumulators), `resetAccumulators`, and `setAggregationResults` (if it sets the accumulators instead of calling `AggFunction.getValue()`, see below)
        • `copyAccumulatorsToBuffer` -> `setAggregationResults` (the accumulators are partial aggregation results). This would mean we have two behaviors, setting the final (`getValue()`) or the partial result (accumulator) for `setAggregateResults()`. A simple flag during code gen would go for either the final or the partial result.
        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111978489 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala — @@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function { def setAggregationResults(accumulators: Row, output: Row) /** + * Calculates the results from accumulators, and set the results to the output (with key offset) + * + * @param accumulators the accumulators (saved in a row) which contains the current + * aggregated results + * @param output output results collected in a row + */ + def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row) — End diff – I don't think we need to extend the `GeneratedAggregations` interface (except for `resetAccumulators()`) I would rather implement another code generation function that implements the existing methods differently. This would mean to add another method to `CodeGenerator` that generates the `GeneratedAggregations` interface suitable for the DataSet aggregations. `setAggregationResultsWithKeyOffset` -> `setAggregationResults` `setKeyToOutput` -> `setForwardedFields` `accumulateWithKeyOffset` -> `accumulate` `createAccumulatorsAndSetToOutput` could be replaced by `createAccumulators` (called once to create a reusable accumulators), `resetAccumulators`, and `setAggregationResults` (if it sets the accumulators instead of calling `AggFunction.getValue()`, see below) `copyAccumulatorsToBuffer` -> `setAggregationResults` (the accumulators are partial aggregation results). This would mean we have two behaviors, setting the final (`getValue()`) or the partial result (accumulator) for `setAggregateResults()`. A simple flag during code gen would go for either the final or the partial result.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r111982297

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala —
        @@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function {
        def setAggregationResults(accumulators: Row, output: Row)

        /**
        + * Calculates the results from accumulators, and set the results to the output (with key offset)
        + *
        + * @param accumulators the accumulators (saved in a row) which contains the current
        + * aggregated results
        + * @param output output results collected in a row
        + */
        + def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row)
        — End diff –

        We could reuse all your code, but just put it into a different method of the `CodeGenerator` and make it implement the existing methods. Their interfaces are the same.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111982297 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala — @@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function { def setAggregationResults(accumulators: Row, output: Row) /** + * Calculates the results from accumulators, and set the results to the output (with key offset) + * + * @param accumulators the accumulators (saved in a row) which contains the current + * aggregated results + * @param output output results collected in a row + */ + def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row) — End diff – We could reuse all your code, but just put it into a different method of the `CodeGenerator` and make it implement the existing methods. Their interfaces are the same.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r111983391

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala —
        @@ -68,23 +72,16 @@ class DataSetAggFunction(
        override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {

        // create accumulators

        • var i = 0
        • while (i < aggregates.length) { - accumulators(i) = aggregates(i).createAccumulator() - i += 1 - }

          + accumulators = function.createAccumulators()

            • End diff –

        We could create the accumulators once and use `function.resetAccumulators()` to reset and reuse the object.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111983391 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala — @@ -68,23 +72,16 @@ class DataSetAggFunction( override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { // create accumulators var i = 0 while (i < aggregates.length) { - accumulators(i) = aggregates(i).createAccumulator() - i += 1 - } + accumulators = function.createAccumulators() End diff – We could create the accumulators once and use `function.resetAccumulators()` to reset and reuse the object.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r111983678

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala —
        @@ -68,23 +72,16 @@ class DataSetAggFunction(
        override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {

        // create accumulators

        • var i = 0
        • while (i < aggregates.length) { - accumulators(i) = aggregates(i).createAccumulator() - i += 1 - }

          + accumulators = function.createAccumulators()

        val iterator = records.iterator()

        while (iterator.hasNext) {
        val record = iterator.next()
        + var i = 0

        // accumulate

        • i = 0
        • while (i < aggregates.length) { - aggregates(i).accumulate(accumulators(i), record.getField(aggInFields(i)(0))) - i += 1 - }

          + function.accumulate(accumulators, record)

        // check if this record is the last record
        if (!iterator.hasNext) {
        — End diff –

        Couldn't we use `function.setForwardFields()` to forward the grouping keys to the output?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111983678 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala — @@ -68,23 +72,16 @@ class DataSetAggFunction( override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { // create accumulators var i = 0 while (i < aggregates.length) { - accumulators(i) = aggregates(i).createAccumulator() - i += 1 - } + accumulators = function.createAccumulators() val iterator = records.iterator() while (iterator.hasNext) { val record = iterator.next() + var i = 0 // accumulate i = 0 while (i < aggregates.length) { - aggregates(i).accumulate(accumulators(i), record.getField(aggInFields(i)(0))) - i += 1 - } + function.accumulate(accumulators, record) // check if this record is the last record if (!iterator.hasNext) { — End diff – Couldn't we use `function.setForwardFields()` to forward the grouping keys to the output?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r111984555

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala —
        @@ -97,12 +94,7 @@ class DataSetAggFunction(
        }

        // set agg results to output

        • i = 0
        • while (i < aggOutMapping.length) { - val (out, in) = aggOutMapping(i) - output.setField(out, aggregates(in).getValue(accumulators(in))) - i += 1 - }

          + function.setAggregationResults(accumulators, output)

        // set grouping set flags to output
        if (intermediateGKeys.isDefined) {
        — End diff –

        I think this should eventually be integrated with `setForwardFields()` as well.
        For now, we might leave it as it is.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111984555 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala — @@ -97,12 +94,7 @@ class DataSetAggFunction( } // set agg results to output i = 0 while (i < aggOutMapping.length) { - val (out, in) = aggOutMapping(i) - output.setField(out, aggregates(in).getValue(accumulators(in))) - i += 1 - } + function.setAggregationResults(accumulators, output) // set grouping set flags to output if (intermediateGKeys.isDefined) { — End diff – I think this should eventually be integrated with `setForwardFields()` as well. For now, we might leave it as it is.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user shaoxuan-wang commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r111986331

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala —
        @@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function {
        def setAggregationResults(accumulators: Row, output: Row)

        /**
        + * Calculates the results from accumulators, and set the results to the output (with key offset)
        + *
        + * @param accumulators the accumulators (saved in a row) which contains the current
        + * aggregated results
        + * @param output output results collected in a row
        + */
        + def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row)
        — End diff –

        This sounds a very good idea. I actually have thought to merge *WithKeyOffset functions into the existing functions. It works for most functions, but `setAggregationResults` and `setAggregationResults` are a little tricky. For `accumulate` and `setAggregateResults`, they do not need keyOffset, but for `merge`, they need.

        Show
        githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111986331 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala — @@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function { def setAggregationResults(accumulators: Row, output: Row) /** + * Calculates the results from accumulators, and set the results to the output (with key offset) + * + * @param accumulators the accumulators (saved in a row) which contains the current + * aggregated results + * @param output output results collected in a row + */ + def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row) — End diff – This sounds a very good idea. I actually have thought to merge *WithKeyOffset functions into the existing functions. It works for most functions, but `setAggregationResults` and `setAggregationResults` are a little tricky. For `accumulate` and `setAggregateResults`, they do not need keyOffset, but for `merge`, they need.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r111991399

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -407,32 +513,56 @@ class CodeGenerator(
        accTypes: Array[String],
        aggs: Array[String]): String = {

        • val sig: String =
          + val sigHelper: String =
          j"""
        • public org.apache.flink.types.Row mergeAccumulatorsPair(
          +
          public final org.apache.flink.types.Row mergeAccumulatorsPairHelper(
          org.apache.flink.types.Row a,
        • org.apache.flink.types.Row b)
          +
          org.apache.flink.types.Row b,
          +
          java.lang.Integer offset)
          """.stripMargin
        • val merge: String = {
          + val mergeHelper: String = {
          for (i <- aggs.indices) yield
          j"""
          $ {accTypes(i)} aAcc$i = (${accTypes(i)}

          ) a.getField($i);

        • $ {accTypes(i)} bAcc$i = (${accTypes(i)}

          ) b.getField($i);
          +

          $ {accTypes(i)} bAcc$i = (${accTypes(i)}

          ) b.getField($i + offset);

            • End diff –

        `b.getField($i + offset)` -> `b.getField($

        {i + offset}

        )`

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111991399 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -407,32 +513,56 @@ class CodeGenerator( accTypes: Array [String] , aggs: Array [String] ): String = { val sig: String = + val sigHelper: String = j""" public org.apache.flink.types.Row mergeAccumulatorsPair( + public final org.apache.flink.types.Row mergeAccumulatorsPairHelper( org.apache.flink.types.Row a, org.apache.flink.types.Row b) + org.apache.flink.types.Row b, + java.lang.Integer offset) """.stripMargin val merge: String = { + val mergeHelper: String = { for (i <- aggs.indices) yield j""" $ {accTypes(i)} aAcc$i = (${accTypes(i)} ) a.getField($i); $ {accTypes(i)} bAcc$i = (${accTypes(i)} ) b.getField($i); + $ {accTypes(i)} bAcc$i = (${accTypes(i)} ) b.getField($i + offset); End diff – `b.getField($i + offset)` -> `b.getField($ {i + offset} )`
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r111986866

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -263,33 +263,56 @@ class CodeGenerator(
        aggFields: Array[Array[Int]],
        aggMapping: Array[Int],
        fwdMapping: Array[(Int, Int)],

        • outputArity: Int)
          + outputArity: Int,
          + groupingKeys: Array[Int])
          : GeneratedAggregationsFunction = {

        def genSetAggregationResults(
        accTypes: Array[String],
        aggs: Array[String],
        aggMapping: Array[Int]): String = {

        • val sig: String =
          + val sigHelper: String =
          j"""
        • public void setAggregationResults(
        • org.apache.flink.types.Row accs,
        • org.apache.flink.types.Row output)""".stripMargin
          +
          private final void setAggregationResultsHelper(
          +
          org.apache.flink.types.Row accs,
          +
          org.apache.flink.types.Row output,
          +
          java.lang.Integer offset)""".stripMargin
        • val setAggs: String = {
          + val setAggsHelper: String = {
          for (i <- aggs.indices) yield
          j"""
          org.apache.flink.table.functions.AggregateFunction baseClass$i =
          (org.apache.flink.table.functions.AggregateFunction) $ {aggs(i)}

          ;

          output.setField(
        • $ {aggMapping(i)},
          + | ${aggMapping(i)}

          + offset,

          baseClass$i.getValue(($ {accTypes(i)}

          ) accs.getField($i)));""".stripMargin
          }.mkString("\n")

        • j"""$sig {
        • $setAggs
          + val setAggregationResults: String =
          + j"""
          +
          public void setAggregationResults(
          +
          org.apache.flink.types.Row accs,
          +
          org.apache.flink.types.Row output) {
          +
          setAggregationResultsHelper(accs, output, 0);
            • End diff –

        Code generated methods should be as "flat" as possible. Calling other helper methods adds overhead compared to inlining the code.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111986866 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -263,33 +263,56 @@ class CodeGenerator( aggFields: Array[Array [Int] ], aggMapping: Array [Int] , fwdMapping: Array [(Int, Int)] , outputArity: Int) + outputArity: Int, + groupingKeys: Array [Int] ) : GeneratedAggregationsFunction = { def genSetAggregationResults( accTypes: Array [String] , aggs: Array [String] , aggMapping: Array [Int] ): String = { val sig: String = + val sigHelper: String = j""" public void setAggregationResults( org.apache.flink.types.Row accs, org.apache.flink.types.Row output)""".stripMargin + private final void setAggregationResultsHelper( + org.apache.flink.types.Row accs, + org.apache.flink.types.Row output, + java.lang.Integer offset)""".stripMargin val setAggs: String = { + val setAggsHelper: String = { for (i <- aggs.indices) yield j""" org.apache.flink.table.functions.AggregateFunction baseClass$i = (org.apache.flink.table.functions.AggregateFunction) $ {aggs(i)} ; output.setField( $ {aggMapping(i)}, + | ${aggMapping(i)} + offset, baseClass$i.getValue(($ {accTypes(i)} ) accs.getField($i)));""".stripMargin }.mkString("\n") j"""$sig { $setAggs + val setAggregationResults: String = + j""" + public void setAggregationResults( + org.apache.flink.types.Row accs, + org.apache.flink.types.Row output) { + setAggregationResultsHelper(accs, output, 0); End diff – Code generated methods should be as "flat" as possible. Calling other helper methods adds overhead compared to inlining the code.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r111990633

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -263,33 +263,56 @@ class CodeGenerator(
        aggFields: Array[Array[Int]],
        aggMapping: Array[Int],
        fwdMapping: Array[(Int, Int)],

        • outputArity: Int)
          + outputArity: Int,
          + groupingKeys: Array[Int])
          : GeneratedAggregationsFunction = {

        def genSetAggregationResults(
        accTypes: Array[String],
        aggs: Array[String],
        aggMapping: Array[Int]): String = {

        • val sig: String =
          + val sigHelper: String =
          j"""
        • public void setAggregationResults(
        • org.apache.flink.types.Row accs,
        • org.apache.flink.types.Row output)""".stripMargin
          +
          private final void setAggregationResultsHelper(
          +
          org.apache.flink.types.Row accs,
          +
          org.apache.flink.types.Row output,
          +
          java.lang.Integer offset)""".stripMargin
        • val setAggs: String = {
          + val setAggsHelper: String = {
          for (i <- aggs.indices) yield
          j"""
          org.apache.flink.table.functions.AggregateFunction baseClass$i =
          (org.apache.flink.table.functions.AggregateFunction) $ {aggs(i)}

          ;

          output.setField(
        • $ {aggMapping(i)},
          + | ${aggMapping(i)}

          + offset,

            • End diff –

        `$

        {aggMapping(i)}

        + offset` -> `$

        {aggMapping(i) + offset}

        ` to add the constant `offset` to the mapping before generating the code.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111990633 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -263,33 +263,56 @@ class CodeGenerator( aggFields: Array[Array [Int] ], aggMapping: Array [Int] , fwdMapping: Array [(Int, Int)] , outputArity: Int) + outputArity: Int, + groupingKeys: Array [Int] ) : GeneratedAggregationsFunction = { def genSetAggregationResults( accTypes: Array [String] , aggs: Array [String] , aggMapping: Array [Int] ): String = { val sig: String = + val sigHelper: String = j""" public void setAggregationResults( org.apache.flink.types.Row accs, org.apache.flink.types.Row output)""".stripMargin + private final void setAggregationResultsHelper( + org.apache.flink.types.Row accs, + org.apache.flink.types.Row output, + java.lang.Integer offset)""".stripMargin val setAggs: String = { + val setAggsHelper: String = { for (i <- aggs.indices) yield j""" org.apache.flink.table.functions.AggregateFunction baseClass$i = (org.apache.flink.table.functions.AggregateFunction) $ {aggs(i)} ; output.setField( $ {aggMapping(i)}, + | ${aggMapping(i)} + offset, End diff – `$ {aggMapping(i)} + offset` -> `$ {aggMapping(i) + offset} ` to add the constant `offset` to the mapping before generating the code.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r111994938

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala —
        @@ -64,38 +68,22 @@ class DataSetPreAggFunction(
        def preaggregate(records: Iterable[Row], out: Collector[Row]): Unit = {

        // create accumulators

        • var i = 0
        • while (i < aggregates.length) { - accumulators(i) = aggregates(i).createAccumulator() - i += 1 - }

          + accumulators = function.createAccumulators()

        val iterator = records.iterator()

        while (iterator.hasNext) {
        val record = iterator.next()
        — End diff –

        we can make `record` a `var` and move its definition outside of the loop.
        Then we can get rid of the `if (!iterator.hasNext)` check in the body of the while loop and set the `output` fields after the loop has terminated.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111994938 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala — @@ -64,38 +68,22 @@ class DataSetPreAggFunction( def preaggregate(records: Iterable [Row] , out: Collector [Row] ): Unit = { // create accumulators var i = 0 while (i < aggregates.length) { - accumulators(i) = aggregates(i).createAccumulator() - i += 1 - } + accumulators = function.createAccumulators() val iterator = records.iterator() while (iterator.hasNext) { val record = iterator.next() — End diff – we can make `record` a `var` and move its definition outside of the loop. Then we can get rid of the `if (!iterator.hasNext)` check in the body of the while loop and set the `output` fields after the loop has terminated.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r112002278

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala —
        @@ -145,44 +121,23 @@ class DataSetSlideTimeWindowAggReduceGroupFunction(
        override def combine(records: Iterable[Row]): Row = {

        // reset first accumulator

        • var i = 0
        • while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulatorList(i).get(0)) - i += 1 - }

          + function.resetAccumulator(accumulators)

        val iterator = records.iterator()
        +
        while (iterator.hasNext) {
        val record = iterator.next()

        • i = 0
        • while (i < aggregates.length) { - // insert received accumulator into acc list - val newAcc = record.getField(groupingKeysLength + i).asInstanceOf[Accumulator] - accumulatorList(i).set(1, newAcc) - // merge acc list - val retAcc = aggregates(i).merge(accumulatorList(i)) - // insert result into acc list - accumulatorList(i).set(0, retAcc) - i += 1 - }

          + function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)

        // check if this record is the last record
        if (!iterator.hasNext) {
        — End diff –

        move this behind the loop

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112002278 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala — @@ -145,44 +121,23 @@ class DataSetSlideTimeWindowAggReduceGroupFunction( override def combine(records: Iterable [Row] ): Row = { // reset first accumulator var i = 0 while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulatorList(i).get(0)) - i += 1 - } + function.resetAccumulator(accumulators) val iterator = records.iterator() + while (iterator.hasNext) { val record = iterator.next() i = 0 while (i < aggregates.length) { - // insert received accumulator into acc list - val newAcc = record.getField(groupingKeysLength + i).asInstanceOf[Accumulator] - accumulatorList(i).set(1, newAcc) - // merge acc list - val retAcc = aggregates(i).merge(accumulatorList(i)) - // insert result into acc list - accumulatorList(i).set(0, retAcc) - i += 1 - } + function.mergeAccumulatorsPairWithKeyOffset(accumulators, record) // check if this record is the last record if (!iterator.hasNext) { — End diff – move this behind the loop
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r111995370

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala —
        @@ -68,23 +72,16 @@ class DataSetAggFunction(
        override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {

        // create accumulators

        • var i = 0
        • while (i < aggregates.length) { - accumulators(i) = aggregates(i).createAccumulator() - i += 1 - }

          + accumulators = function.createAccumulators()

        val iterator = records.iterator()

        while (iterator.hasNext) {
        val record = iterator.next()
        — End diff –

        we can make `record` a `var` and move its definition outside of the loop.
        Then we can get rid of the `if (!iterator.hasNext)` check in the body of the while loop and set the `output` fields after the loop has terminated.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111995370 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala — @@ -68,23 +72,16 @@ class DataSetAggFunction( override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { // create accumulators var i = 0 while (i < aggregates.length) { - accumulators(i) = aggregates(i).createAccumulator() - i += 1 - } + accumulators = function.createAccumulators() val iterator = records.iterator() while (iterator.hasNext) { val record = iterator.next() — End diff – we can make `record` a `var` and move its definition outside of the loop. Then we can get rid of the `if (!iterator.hasNext)` check in the body of the while loop and set the `output` fields after the loop has terminated.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r111993502

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala —
        @@ -64,38 +68,22 @@ class DataSetPreAggFunction(
        def preaggregate(records: Iterable[Row], out: Collector[Row]): Unit = {

        // create accumulators

        • var i = 0
        • while (i < aggregates.length) { - accumulators(i) = aggregates(i).createAccumulator() - i += 1 - }

          + accumulators = function.createAccumulators()

            • End diff –

        create accumulators once and use `function.resetAccumulators()`?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111993502 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala — @@ -64,38 +68,22 @@ class DataSetPreAggFunction( def preaggregate(records: Iterable [Row] , out: Collector [Row] ): Unit = { // create accumulators var i = 0 while (i < aggregates.length) { - accumulators(i) = aggregates(i).createAccumulator() - i += 1 - } + accumulators = function.createAccumulators() End diff – create accumulators once and use `function.resetAccumulators()`?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r112001966

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala —
        @@ -30,78 +30,46 @@ import org.apache.flink.types.Row
        *

        • It is used for sliding on batch for both time and count-windows.
          *
        • * @param aggregates aggregate functions.
        • * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row
        • * and output Row.
        • * @param aggregateMapping index mapping between aggregate function list and aggregated value
        • * index in output Row.
        • * @param finalRowArity output row field count
          + * @param genAggregations Code-generated [[GeneratedAggregations]]
          + * @param keysAndAggregatesArity The total arity of keys and aggregates
        • @param finalRowWindowStartPos relative window-start position to last field of output row
        • @param finalRowWindowEndPos relative window-end position to last field of output row
        • @param windowSize size of the window, used to determine window-end for output row
          */
          class DataSetSlideWindowAggReduceCombineFunction(
        • aggregates: Array[AggregateFunction[_ <: Any]],
        • groupKeysMapping: Array[(Int, Int)],
        • aggregateMapping: Array[(Int, Int)],
        • finalRowArity: Int,
          + genAggregations: GeneratedAggregationsFunction,
          + keysAndAggregatesArity: Int,
          finalRowWindowStartPos: Option[Int],
          finalRowWindowEndPos: Option[Int],
          windowSize: Long)
          extends DataSetSlideWindowAggReduceGroupFunction(
        • aggregates,
        • groupKeysMapping,
        • aggregateMapping,
        • finalRowArity,
          + genAggregations,
          + keysAndAggregatesArity,
          finalRowWindowStartPos,
          finalRowWindowEndPos,
          windowSize)
          with CombineFunction[Row, Row] {
        • private val intermediateRowArity: Int = groupKeysMapping.length + aggregateMapping.length + 1
        • private val intermediateRow: Row = new Row(intermediateRowArity)
          + private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1)

        override def combine(records: Iterable[Row]): Row = {

        • // reset first accumulator
        • var i = 0
        • while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulatorList(i).get(0)) - i += 1 - }

          + // reset accumulator
          + function.resetAccumulator(accumulators)

        val iterator = records.iterator()
        while (iterator.hasNext) {
        val record = iterator.next()

        • // accumulate
        • i = 0
        • while (i < aggregates.length) { - // insert received accumulator into acc list - val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator] - accumulatorList(i).set(1, newAcc) - // merge acc list - val retAcc = aggregates(i).merge(accumulatorList(i)) - // insert result into acc list - accumulatorList(i).set(0, retAcc) - i += 1 - }

          + function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)

        // check if this record is the last record
        if (!iterator.hasNext) {
        — End diff –

        move this behind the loop to save the check of the condition in the loop body.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112001966 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala — @@ -30,78 +30,46 @@ import org.apache.flink.types.Row * It is used for sliding on batch for both time and count-windows. * * @param aggregates aggregate functions. * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row * and output Row. * @param aggregateMapping index mapping between aggregate function list and aggregated value * index in output Row. * @param finalRowArity output row field count + * @param genAggregations Code-generated [ [GeneratedAggregations] ] + * @param keysAndAggregatesArity The total arity of keys and aggregates @param finalRowWindowStartPos relative window-start position to last field of output row @param finalRowWindowEndPos relative window-end position to last field of output row @param windowSize size of the window, used to determine window-end for output row */ class DataSetSlideWindowAggReduceCombineFunction( aggregates: Array[AggregateFunction [_ <: Any] ], groupKeysMapping: Array [(Int, Int)] , aggregateMapping: Array [(Int, Int)] , finalRowArity: Int, + genAggregations: GeneratedAggregationsFunction, + keysAndAggregatesArity: Int, finalRowWindowStartPos: Option [Int] , finalRowWindowEndPos: Option [Int] , windowSize: Long) extends DataSetSlideWindowAggReduceGroupFunction( aggregates, groupKeysMapping, aggregateMapping, finalRowArity, + genAggregations, + keysAndAggregatesArity, finalRowWindowStartPos, finalRowWindowEndPos, windowSize) with CombineFunction [Row, Row] { private val intermediateRowArity: Int = groupKeysMapping.length + aggregateMapping.length + 1 private val intermediateRow: Row = new Row(intermediateRowArity) + private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1) override def combine(records: Iterable [Row] ): Row = { // reset first accumulator var i = 0 while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulatorList(i).get(0)) - i += 1 - } + // reset accumulator + function.resetAccumulator(accumulators) val iterator = records.iterator() while (iterator.hasNext) { val record = iterator.next() // accumulate i = 0 while (i < aggregates.length) { - // insert received accumulator into acc list - val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator] - accumulatorList(i).set(1, newAcc) - // merge acc list - val retAcc = aggregates(i).merge(accumulatorList(i)) - // insert result into acc list - accumulatorList(i).set(0, retAcc) - i += 1 - } + function.mergeAccumulatorsPairWithKeyOffset(accumulators, record) // check if this record is the last record if (!iterator.hasNext) { — End diff – move this behind the loop to save the check of the condition in the loop body.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r111994200

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala —
        @@ -64,38 +68,22 @@ class DataSetPreAggFunction(
        def preaggregate(records: Iterable[Row], out: Collector[Row]): Unit = {
        — End diff –

        I think we can move the implementation of `preaggregate()` to `combine()` and let `mapPartition()` call `combine()`. `combine()` is called for groups for records with the key and `mapPartition()` just once (for the whole partition). This way we can remove some overhead from `combine()`.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111994200 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala — @@ -64,38 +68,22 @@ class DataSetPreAggFunction( def preaggregate(records: Iterable [Row] , out: Collector [Row] ): Unit = { — End diff – I think we can move the implementation of `preaggregate()` to `combine()` and let `mapPartition()` call `combine()`. `combine()` is called for groups for records with the key and `mapPartition()` just once (for the whole partition). This way we can remove some overhead from `combine()`.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r111995303

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala —
        @@ -19,88 +19,71 @@
        package org.apache.flink.table.runtime.aggregate

        import java.lang.Iterable
        -import java.util.

        {ArrayList => JArrayList}

        import org.apache.flink.api.common.functions.RichGroupReduceFunction
        import org.apache.flink.configuration.Configuration
        -import org.apache.flink.table.functions.

        {Accumulator, AggregateFunction}

        +import org.apache.flink.table.codegen.

        {Compiler, GeneratedAggregationsFunction}

        import org.apache.flink.types.Row
        import org.apache.flink.util.

        {Collector, Preconditions}

        +import org.slf4j.LoggerFactory

        /**

        • [[RichGroupReduceFunction]] to compute the final result of a pre-aggregated aggregation
        • for batch (DataSet) queries.
          *
        • * @param aggregates The aggregate functions.
        • * @param aggOutFields The positions of the aggregation results in the output
          + * @param genAggregations Code-generated [[GeneratedAggregations]]
        • @param gkeyOutFields The positions of the grouping keys in the output
        • @param groupingSetsMapping The mapping of grouping set keys between input and output positions.
        • * @param finalRowArity The arity of the final resulting row
          */
          class DataSetFinalAggFunction(
        • private val aggregates: Array[AggregateFunction[_ <: Any]],
        • private val aggOutFields: Array[Int],
          + private val genAggregations: GeneratedAggregationsFunction,
          private val gkeyOutFields: Array[Int],
        • private val groupingSetsMapping: Array[(Int, Int)],
        • private val finalRowArity: Int)
        • extends RichGroupReduceFunction[Row, Row] {
          + private val groupingSetsMapping: Array[(Int, Int)])
          + extends RichGroupReduceFunction[Row, Row]
          + with Compiler[GeneratedAggregations] {
        • Preconditions.checkNotNull(aggregates)
        • Preconditions.checkNotNull(aggOutFields)
          Preconditions.checkNotNull(gkeyOutFields)
          Preconditions.checkNotNull(groupingSetsMapping)

        private var output: Row = _
        + private var accumulators: Row = _
        +
        + val LOG = LoggerFactory.getLogger(this.getClass)
        + private var function: GeneratedAggregations = _

        private val intermediateGKeys: Option[Array[Int]] = if (!groupingSetsMapping.isEmpty)

        { Some(gkeyOutFields) }

        else

        { None }
        • private val numAggs = aggregates.length
        • private val numGKeys = gkeyOutFields.length
          -
        • private val accumulators: Array[JArrayList[Accumulator]] =
        • Array.fill(numAggs)(new JArrayList[Accumulator](2))
          -
          override def open(config: Configuration) {
        • output = new Row(finalRowArity)
          -
        • // init lists with two empty accumulators
        • for (i <- aggregates.indices) { - val accumulator = aggregates(i).createAccumulator() - accumulators(i).add(accumulator) - accumulators(i).add(accumulator) - }

          + LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
          + s"Code:\n$genAggregations.code")
          + val clazz = compile(
          + getClass.getClassLoader,
          + genAggregations.name,
          + genAggregations.code)
          + LOG.debug("Instantiating AggregateHelper.")
          + function = clazz.newInstance()
          +
          + output = function.createOutputRow()
          + accumulators = function.createAccumulators()
          }

        override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {

        val iterator = records.iterator()

        // reset first accumulator

        • var i = 0
        • while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulators(i).get(0)) - i += 1 - }

          + function.resetAccumulator(accumulators)

        + var i = 0
        while (iterator.hasNext) {
        val record = iterator.next()
        — End diff –

        we can make `record` a `var` and move its definition outside of the loop.
        Then we can get rid of the `if (!iterator.hasNext)` check in the body of the while loop and set the `output` fields after the loop has terminated.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111995303 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala — @@ -19,88 +19,71 @@ package org.apache.flink.table.runtime.aggregate import java.lang.Iterable -import java.util. {ArrayList => JArrayList} import org.apache.flink.api.common.functions.RichGroupReduceFunction import org.apache.flink.configuration.Configuration -import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.table.codegen. {Compiler, GeneratedAggregationsFunction} import org.apache.flink.types.Row import org.apache.flink.util. {Collector, Preconditions} +import org.slf4j.LoggerFactory /** [ [RichGroupReduceFunction] ] to compute the final result of a pre-aggregated aggregation for batch (DataSet) queries. * * @param aggregates The aggregate functions. * @param aggOutFields The positions of the aggregation results in the output + * @param genAggregations Code-generated [ [GeneratedAggregations] ] @param gkeyOutFields The positions of the grouping keys in the output @param groupingSetsMapping The mapping of grouping set keys between input and output positions. * @param finalRowArity The arity of the final resulting row */ class DataSetFinalAggFunction( private val aggregates: Array[AggregateFunction [_ <: Any] ], private val aggOutFields: Array [Int] , + private val genAggregations: GeneratedAggregationsFunction, private val gkeyOutFields: Array [Int] , private val groupingSetsMapping: Array [(Int, Int)] , private val finalRowArity: Int) extends RichGroupReduceFunction [Row, Row] { + private val groupingSetsMapping: Array [(Int, Int)] ) + extends RichGroupReduceFunction [Row, Row] + with Compiler [GeneratedAggregations] { Preconditions.checkNotNull(aggregates) Preconditions.checkNotNull(aggOutFields) Preconditions.checkNotNull(gkeyOutFields) Preconditions.checkNotNull(groupingSetsMapping) private var output: Row = _ + private var accumulators: Row = _ + + val LOG = LoggerFactory.getLogger(this.getClass) + private var function: GeneratedAggregations = _ private val intermediateGKeys: Option[Array [Int] ] = if (!groupingSetsMapping.isEmpty) { Some(gkeyOutFields) } else { None } private val numAggs = aggregates.length private val numGKeys = gkeyOutFields.length - private val accumulators: Array[JArrayList [Accumulator] ] = Array.fill(numAggs)(new JArrayList [Accumulator] (2)) - override def open(config: Configuration) { output = new Row(finalRowArity) - // init lists with two empty accumulators for (i <- aggregates.indices) { - val accumulator = aggregates(i).createAccumulator() - accumulators(i).add(accumulator) - accumulators(i).add(accumulator) - } + LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + + s"Code:\n$genAggregations.code") + val clazz = compile( + getClass.getClassLoader, + genAggregations.name, + genAggregations.code) + LOG.debug("Instantiating AggregateHelper.") + function = clazz.newInstance() + + output = function.createOutputRow() + accumulators = function.createAccumulators() } override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { val iterator = records.iterator() // reset first accumulator var i = 0 while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulators(i).get(0)) - i += 1 - } + function.resetAccumulator(accumulators) + var i = 0 while (iterator.hasNext) { val record = iterator.next() — End diff – we can make `record` a `var` and move its definition outside of the loop. Then we can get rid of the `if (!iterator.hasNext)` check in the body of the while loop and set the `output` fields after the loop has terminated.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r111992686

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala —
        @@ -21,44 +21,48 @@ import java.lang.Iterable

        import org.apache.flink.api.common.functions.RichGroupReduceFunction
        import org.apache.flink.configuration.Configuration
        -import org.apache.flink.table.functions.

        {Accumulator, AggregateFunction}

        +import org.apache.flink.table.codegen.

        {Compiler, GeneratedAggregationsFunction}

        import org.apache.flink.types.Row
        import org.apache.flink.util.

        {Collector, Preconditions}

        +import org.slf4j.LoggerFactory

        /**

        • [[RichGroupReduceFunction]] to compute aggregates that do not support pre-aggregation for batch
        • (DataSet) queries.
          *
        • * @param aggregates The aggregate functions.
        • * @param aggInFields The positions of the aggregation input fields.
          + * @param genAggregations Code-generated [[GeneratedAggregations]]
        • @param gkeyOutMapping The mapping of group keys between input and output positions.
        • * @param aggOutMapping The mapping of aggregates to output positions.
        • @param groupingSetsMapping The mapping of grouping set keys between input and output positions.
        • * @param finalRowArity The arity of the final resulting row.
          */
          class DataSetAggFunction(
        • private val aggregates: Array[AggregateFunction[_ <: Any]],
        • private val aggInFields: Array[Array[Int]],
        • private val aggOutMapping: Array[(Int, Int)],
          + private val genAggregations: GeneratedAggregationsFunction,
          private val gkeyOutMapping: Array[(Int, Int)],
            • End diff –

        It would be good if we could parameterize the method that generates the code such that we can do the grouping keys and grouping set copies with `GeneratedAggregations.setForwardFields()`. This should be possible as it is actually just setting constant boolean flags at certain positions in the output Row.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111992686 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala — @@ -21,44 +21,48 @@ import java.lang.Iterable import org.apache.flink.api.common.functions.RichGroupReduceFunction import org.apache.flink.configuration.Configuration -import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.table.codegen. {Compiler, GeneratedAggregationsFunction} import org.apache.flink.types.Row import org.apache.flink.util. {Collector, Preconditions} +import org.slf4j.LoggerFactory /** [ [RichGroupReduceFunction] ] to compute aggregates that do not support pre-aggregation for batch (DataSet) queries. * * @param aggregates The aggregate functions. * @param aggInFields The positions of the aggregation input fields. + * @param genAggregations Code-generated [ [GeneratedAggregations] ] @param gkeyOutMapping The mapping of group keys between input and output positions. * @param aggOutMapping The mapping of aggregates to output positions. @param groupingSetsMapping The mapping of grouping set keys between input and output positions. * @param finalRowArity The arity of the final resulting row. */ class DataSetAggFunction( private val aggregates: Array[AggregateFunction [_ <: Any] ], private val aggInFields: Array[Array [Int] ], private val aggOutMapping: Array [(Int, Int)] , + private val genAggregations: GeneratedAggregationsFunction, private val gkeyOutMapping: Array [(Int, Int)] , End diff – It would be good if we could parameterize the method that generates the code such that we can do the grouping keys and grouping set copies with `GeneratedAggregations.setForwardFields()`. This should be possible as it is actually just setting constant boolean flags at certain positions in the output Row.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r111999493

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala —
        @@ -110,12 +110,8 @@ class DataSetSessionWindowAggregatePreProcessor(
        var windowEnd: java.lang.Long = null
        — End diff –

        Move implementation to `combine()` can forward the `mapPartition()` call to `combine()`

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111999493 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala — @@ -110,12 +110,8 @@ class DataSetSessionWindowAggregatePreProcessor( var windowEnd: java.lang.Long = null — End diff – Move implementation to `combine()` can forward the `mapPartition()` call to `combine()`
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r112002140

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala —
        @@ -30,78 +30,46 @@ import org.apache.flink.types.Row
        *

        • It is used for sliding on batch for both time and count-windows.
          *
        • * @param aggregates aggregate functions.
        • * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row
        • * and output Row.
        • * @param aggregateMapping index mapping between aggregate function list and aggregated value
        • * index in output Row.
        • * @param finalRowArity output row field count
          + * @param genAggregations Code-generated [[GeneratedAggregations]]
          + * @param keysAndAggregatesArity The total arity of keys and aggregates
        • @param finalRowWindowStartPos relative window-start position to last field of output row
        • @param finalRowWindowEndPos relative window-end position to last field of output row
        • @param windowSize size of the window, used to determine window-end for output row
          */
          class DataSetSlideWindowAggReduceCombineFunction(
        • aggregates: Array[AggregateFunction[_ <: Any]],
        • groupKeysMapping: Array[(Int, Int)],
        • aggregateMapping: Array[(Int, Int)],
        • finalRowArity: Int,
          + genAggregations: GeneratedAggregationsFunction,
          + keysAndAggregatesArity: Int,
          finalRowWindowStartPos: Option[Int],
          finalRowWindowEndPos: Option[Int],
          windowSize: Long)
          extends DataSetSlideWindowAggReduceGroupFunction(
        • aggregates,
        • groupKeysMapping,
        • aggregateMapping,
        • finalRowArity,
          + genAggregations,
          + keysAndAggregatesArity,
          finalRowWindowStartPos,
          finalRowWindowEndPos,
          windowSize)
          with CombineFunction[Row, Row] {
        • private val intermediateRowArity: Int = groupKeysMapping.length + aggregateMapping.length + 1
        • private val intermediateRow: Row = new Row(intermediateRowArity)
          + private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1)

        override def combine(records: Iterable[Row]): Row = {

        • // reset first accumulator
        • var i = 0
        • while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulatorList(i).get(0)) - i += 1 - }

          + // reset accumulator
          + function.resetAccumulator(accumulators)

        val iterator = records.iterator()
        while (iterator.hasNext) {
        val record = iterator.next()
        — End diff –

        make `record` a `var` and declare it outside of the loop.

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112002140 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala — @@ -30,78 +30,46 @@ import org.apache.flink.types.Row * It is used for sliding on batch for both time and count-windows. * * @param aggregates aggregate functions. * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row * and output Row. * @param aggregateMapping index mapping between aggregate function list and aggregated value * index in output Row. * @param finalRowArity output row field count + * @param genAggregations Code-generated [ [GeneratedAggregations] ] + * @param keysAndAggregatesArity The total arity of keys and aggregates @param finalRowWindowStartPos relative window-start position to last field of output row @param finalRowWindowEndPos relative window-end position to last field of output row @param windowSize size of the window, used to determine window-end for output row */ class DataSetSlideWindowAggReduceCombineFunction( aggregates: Array[AggregateFunction [_ <: Any] ], groupKeysMapping: Array [(Int, Int)] , aggregateMapping: Array [(Int, Int)] , finalRowArity: Int, + genAggregations: GeneratedAggregationsFunction, + keysAndAggregatesArity: Int, finalRowWindowStartPos: Option [Int] , finalRowWindowEndPos: Option [Int] , windowSize: Long) extends DataSetSlideWindowAggReduceGroupFunction( aggregates, groupKeysMapping, aggregateMapping, finalRowArity, + genAggregations, + keysAndAggregatesArity, finalRowWindowStartPos, finalRowWindowEndPos, windowSize) with CombineFunction [Row, Row] { private val intermediateRowArity: Int = groupKeysMapping.length + aggregateMapping.length + 1 private val intermediateRow: Row = new Row(intermediateRowArity) + private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1) override def combine(records: Iterable [Row] ): Row = { // reset first accumulator var i = 0 while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulatorList(i).get(0)) - i += 1 - } + // reset accumulator + function.resetAccumulator(accumulators) val iterator = records.iterator() while (iterator.hasNext) { val record = iterator.next() — End diff – make `record` a `var` and declare it outside of the loop.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r112002408

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala —
        @@ -38,74 +38,58 @@ import org.apache.flink.util.

        {Collector, Preconditions}
        • it does no final aggregate evaluation. It also includes the logic of
        • [[DataSetSlideTimeWindowAggFlatMapFunction]].
          *
        • * @param aggregates aggregate functions
        • * @param groupingKeysLength number of grouping keys
        • * @param timeFieldPos position of aligned time field
          + * @param genAggregations Code-generated [[GeneratedAggregations]]
          + * @param keysAndAggregatesArity The total arity of keys and aggregates
        • @param windowSize window size of the sliding window
        • @param windowSlide window slide of the sliding window
        • @param returnType return type of this function
          */
          class DataSetSlideTimeWindowAggReduceGroupFunction(
        • private val aggregates: Array[AggregateFunction[_ <: Any]],
        • private val groupingKeysLength: Int,
        • private val timeFieldPos: Int,
          + private val genAggregations: GeneratedAggregationsFunction,
          + private val keysAndAggregatesArity: Int,
          private val windowSize: Long,
          private val windowSlide: Long,
          @transient private val returnType: TypeInformation[Row])
          extends RichGroupReduceFunction[Row, Row]
          with CombineFunction[Row, Row]
        • with ResultTypeQueryable[Row] {
          + with ResultTypeQueryable[Row]
          + with Compiler[GeneratedAggregations] {
        • Preconditions.checkNotNull(aggregates)
          + private val timeFieldPos = returnType.getArity - 1
          + private val intermediateWindowStartPos = keysAndAggregatesArity

        protected var intermediateRow: Row = _

        • // add one field to store window start
        • protected val intermediateRowArity: Int = groupingKeysLength + aggregates.length + 1
        • protected val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) { - new JArrayList[Accumulator](2) - }
        • private val intermediateWindowStartPos: Int = intermediateRowArity - 1
          + private var accumulators: Row = _
          +
          + val LOG = LoggerFactory.getLogger(this.getClass)
          + private var function: GeneratedAggregations = _

        override def open(config: Configuration) {

        • intermediateRow = new Row(intermediateRowArity)
          -
        • // init lists with two empty accumulators
        • var i = 0
        • while (i < aggregates.length) { - val accumulator = aggregates(i).createAccumulator() - accumulatorList(i).add(accumulator) - accumulatorList(i).add(accumulator) - i += 1 - }

          + LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
          + s"Code:\n$genAggregations.code")
          + val clazz = compile(
          + getClass.getClassLoader,
          + genAggregations.name,
          + genAggregations.code)
          + LOG.debug("Instantiating AggregateHelper.")
          + function = clazz.newInstance()
          +
          + accumulators = function.createAccumulators()
          + intermediateRow = function.createOutputRow()
          }

        override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {

        // reset first accumulator

        • var i = 0
        • while (i < aggregates.length) { - val accumulator = aggregates(i).createAccumulator() - accumulatorList(i).set(0, accumulator) - i += 1 - }

          + function.resetAccumulator(accumulators)

        val iterator = records.iterator()

        while (iterator.hasNext) {
        val record = iterator.next()

        // accumulate

        • i = 0
        • while (i < aggregates.length) { - // insert received accumulator into acc list - val newAcc = record.getField(groupingKeysLength + i).asInstanceOf[Accumulator] - accumulatorList(i).set(1, newAcc) - // merge acc list - val retAcc = aggregates(i).merge(accumulatorList(i)) - // insert result into acc list - accumulatorList(i).set(0, retAcc) - i += 1 - }

          + function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)

        // trigger tumbling evaluation
        if (!iterator.hasNext) {
        — End diff –

        move this behind the loop

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112002408 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala — @@ -38,74 +38,58 @@ import org.apache.flink.util. {Collector, Preconditions} it does no final aggregate evaluation. It also includes the logic of [ [DataSetSlideTimeWindowAggFlatMapFunction] ]. * * @param aggregates aggregate functions * @param groupingKeysLength number of grouping keys * @param timeFieldPos position of aligned time field + * @param genAggregations Code-generated [ [GeneratedAggregations] ] + * @param keysAndAggregatesArity The total arity of keys and aggregates @param windowSize window size of the sliding window @param windowSlide window slide of the sliding window @param returnType return type of this function */ class DataSetSlideTimeWindowAggReduceGroupFunction( private val aggregates: Array[AggregateFunction [_ <: Any] ], private val groupingKeysLength: Int, private val timeFieldPos: Int, + private val genAggregations: GeneratedAggregationsFunction, + private val keysAndAggregatesArity: Int, private val windowSize: Long, private val windowSlide: Long, @transient private val returnType: TypeInformation [Row] ) extends RichGroupReduceFunction [Row, Row] with CombineFunction [Row, Row] with ResultTypeQueryable [Row] { + with ResultTypeQueryable [Row] + with Compiler [GeneratedAggregations] { Preconditions.checkNotNull(aggregates) + private val timeFieldPos = returnType.getArity - 1 + private val intermediateWindowStartPos = keysAndAggregatesArity protected var intermediateRow: Row = _ // add one field to store window start protected val intermediateRowArity: Int = groupingKeysLength + aggregates.length + 1 protected val accumulatorList: Array[JArrayList [Accumulator] ] = Array.fill(aggregates.length) { - new JArrayList[Accumulator](2) - } private val intermediateWindowStartPos: Int = intermediateRowArity - 1 + private var accumulators: Row = _ + + val LOG = LoggerFactory.getLogger(this.getClass) + private var function: GeneratedAggregations = _ override def open(config: Configuration) { intermediateRow = new Row(intermediateRowArity) - // init lists with two empty accumulators var i = 0 while (i < aggregates.length) { - val accumulator = aggregates(i).createAccumulator() - accumulatorList(i).add(accumulator) - accumulatorList(i).add(accumulator) - i += 1 - } + LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + + s"Code:\n$genAggregations.code") + val clazz = compile( + getClass.getClassLoader, + genAggregations.name, + genAggregations.code) + LOG.debug("Instantiating AggregateHelper.") + function = clazz.newInstance() + + accumulators = function.createAccumulators() + intermediateRow = function.createOutputRow() } override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { // reset first accumulator var i = 0 while (i < aggregates.length) { - val accumulator = aggregates(i).createAccumulator() - accumulatorList(i).set(0, accumulator) - i += 1 - } + function.resetAccumulator(accumulators) val iterator = records.iterator() while (iterator.hasNext) { val record = iterator.next() // accumulate i = 0 while (i < aggregates.length) { - // insert received accumulator into acc list - val newAcc = record.getField(groupingKeysLength + i).asInstanceOf[Accumulator] - accumulatorList(i).set(1, newAcc) - // merge acc list - val retAcc = aggregates(i).merge(accumulatorList(i)) - // insert result into acc list - accumulatorList(i).set(0, retAcc) - i += 1 - } + function.mergeAccumulatorsPairWithKeyOffset(accumulators, record) // trigger tumbling evaluation if (!iterator.hasNext) { — End diff – move this behind the loop
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r112007816

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala —
        @@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function {
        def setAggregationResults(accumulators: Row, output: Row)

        /**
        + * Calculates the results from accumulators, and set the results to the output (with key offset)
        + *
        + * @param accumulators the accumulators (saved in a row) which contains the current
        + * aggregated results
        + * @param output output results collected in a row
        + */
        + def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row)
        — End diff –

        Actually, I'm not sure if we really need to implement a different code generation function. I had a look at the code generation code and think that we could just add a few more parameters to the current code gen method. Right now, the behavior of most generated methods can be exactly defined:

        • `createAccumulators()`: generates a `Row` with the accumulators for each provided `AggregationFunction`. Some methods to `GeneratedAggregations` expect a Row of accumulators with exactly this layout as one of their input parameters. In the following, this parameter is called `accs`.
        • `accumulate(accs, row)`: The `aggFields` parameter controls which fields of `row` are accumulated into which accumulator. We should rename this parameter to `accFields` though, IMO.
        • `retract(accs, row)`: same as for `accumulate`. We should add a separate parameter `retractFields: Array[Int]` though.
        • `setForwardedFields(input, output)`: The `fwdMapping` parameter controls which field of the input row is copied to which position of the output row. We could add an optional parameter to copy the `groupSetMapping` to the output as well.
        • `setAggregationResults(accs, output)`: The `aggMapping` parameter controls to which output fields the aggregation results are copied. If we add another parameter `partialResults: Boolean`, we can control whether to copy final results (`AggregateFunction.getValue()`) or partial results (the accumulator).
        • `createOutputRow()`: the `outputArity` parameter specfies the arity of the output row.
        • `mergeAccumulatorsPair(accs, other)`: *This is the only inflexible method*. We could change the behavior of the method as follows: The method expects as first parameter (`accs`) a Row with the same layout as generated by `createAccumulators`. The second parameter can be any row with accumulators at arbitrary positions. To enable the merging, we add a parameter `mergeMapping: Array[Int]` to the code generating function which defines which fields of the `other` parameter are merged with the fields in the `accs` Row. The method returns a Row with the default layout (as generated by `createAccumulators()`).
        • `resetAccumulator(accs)`: resets a Row of accumulators of the known layout.

        I haven't checked this thoroughly, but I think with these parameters, we can control the generated code sufficiently to support all aggregation operators for DataSet and DataStream, i.e., we can generate the currently existing functions such that they behave as the more specialized ones that you added. Since all code gen parameters (`accFields`, `retractFields`, `fwdMapping`, `groupSetMapping`, `aggMapping`, `partialResults`, `outputArity`, `mergeMapping`) can be independently set for each type of operator, this should give us the flexibility for all types for operators. We only need to parameterize the code generation method appropriately.

        In addition, we could make all parameters `Option` and generate empty methods if the parameters for a function are not set. (This could also be a follow up issue, IMO)

        What do you think @shaoxuan-wang ?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112007816 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala — @@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function { def setAggregationResults(accumulators: Row, output: Row) /** + * Calculates the results from accumulators, and set the results to the output (with key offset) + * + * @param accumulators the accumulators (saved in a row) which contains the current + * aggregated results + * @param output output results collected in a row + */ + def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row) — End diff – Actually, I'm not sure if we really need to implement a different code generation function. I had a look at the code generation code and think that we could just add a few more parameters to the current code gen method. Right now, the behavior of most generated methods can be exactly defined: `createAccumulators()`: generates a `Row` with the accumulators for each provided `AggregationFunction`. Some methods to `GeneratedAggregations` expect a Row of accumulators with exactly this layout as one of their input parameters. In the following, this parameter is called `accs`. `accumulate(accs, row)`: The `aggFields` parameter controls which fields of `row` are accumulated into which accumulator. We should rename this parameter to `accFields` though, IMO. `retract(accs, row)`: same as for `accumulate`. We should add a separate parameter `retractFields: Array [Int] ` though. `setForwardedFields(input, output)`: The `fwdMapping` parameter controls which field of the input row is copied to which position of the output row. We could add an optional parameter to copy the `groupSetMapping` to the output as well. `setAggregationResults(accs, output)`: The `aggMapping` parameter controls to which output fields the aggregation results are copied. If we add another parameter `partialResults: Boolean`, we can control whether to copy final results (`AggregateFunction.getValue()`) or partial results (the accumulator). `createOutputRow()`: the `outputArity` parameter specfies the arity of the output row. `mergeAccumulatorsPair(accs, other)`: * This is the only inflexible method *. We could change the behavior of the method as follows: The method expects as first parameter (`accs`) a Row with the same layout as generated by `createAccumulators`. The second parameter can be any row with accumulators at arbitrary positions. To enable the merging, we add a parameter `mergeMapping: Array [Int] ` to the code generating function which defines which fields of the `other` parameter are merged with the fields in the `accs` Row. The method returns a Row with the default layout (as generated by `createAccumulators()`). `resetAccumulator(accs)`: resets a Row of accumulators of the known layout. I haven't checked this thoroughly, but I think with these parameters, we can control the generated code sufficiently to support all aggregation operators for DataSet and DataStream, i.e., we can generate the currently existing functions such that they behave as the more specialized ones that you added. Since all code gen parameters (`accFields`, `retractFields`, `fwdMapping`, `groupSetMapping`, `aggMapping`, `partialResults`, `outputArity`, `mergeMapping`) can be independently set for each type of operator, this should give us the flexibility for all types for operators. We only need to parameterize the code generation method appropriately. In addition, we could make all parameters `Option` and generate empty methods if the parameters for a function are not set. (This could also be a follow up issue, IMO) What do you think @shaoxuan-wang ?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r112003763

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala —
        @@ -25,58 +25,56 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
        import org.apache.flink.api.java.typeutils.ResultTypeQueryable
        import org.apache.flink.configuration.Configuration
        import org.apache.flink.streaming.api.windowing.windows.TimeWindow
        -import org.apache.flink.table.functions.AggregateFunction
        +import org.apache.flink.table.codegen.

        {Compiler, GeneratedAggregationsFunction}

        import org.apache.flink.types.Row
        -import org.apache.flink.util.Preconditions
        -
        +import org.slf4j.LoggerFactory

        /**

        • This map function only works for windows on batch tables.
        • It appends an (aligned) rowtime field to the end of the output row.
          + *
          + * @param genAggregations Code-generated [[GeneratedAggregations]]
          + * @param timeFieldPos Time field position in input row
          + * @param tumbleTimeWindowSize The size of tumble time window
          */
          class DataSetWindowAggMapFunction(
        • private val aggregates: Array[AggregateFunction[_]],
        • private val aggFields: Array[Array[Int]],
        • private val groupingKeys: Array[Int],
        • private val timeFieldPos: Int, // time field position in input row
          + private val genAggregations: GeneratedAggregationsFunction,
          + private val timeFieldPos: Int,
          private val tumbleTimeWindowSize: Option[Long],
          @transient private val returnType: TypeInformation[Row])
        • extends RichMapFunction[Row, Row] with ResultTypeQueryable[Row] {
          -
        • Preconditions.checkNotNull(aggregates)
        • Preconditions.checkNotNull(aggFields)
        • Preconditions.checkArgument(aggregates.length == aggFields.length)
          + extends RichMapFunction[Row, Row]
          + with ResultTypeQueryable[Row]
          + with Compiler[GeneratedAggregations] {

        private var output: Row = _

        • // add one more arity to store rowtime
        • private val partialRowLength = groupingKeys.length + aggregates.length + 1
        • // rowtime index in the buffer output row
        • private val rowtimeIndex: Int = partialRowLength - 1
          +
          + val LOG = LoggerFactory.getLogger(this.getClass)
          + private var function: GeneratedAggregations = _

        override def open(config: Configuration)

        { - output = new Row(partialRowLength) + LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + + s"Code:\n$genAggregations.code") + val clazz = compile( + getRuntimeContext.getUserCodeClassLoader, + genAggregations.name, + genAggregations.code) + LOG.debug("Instantiating AggregateHelper.") + function = clazz.newInstance() + + output = function.createOutputRow() }

        override def map(input: Row): Row = {

        • var i = 0
        • while (i < aggregates.length) { - val agg = aggregates(i) - val fieldValue = input.getField(aggFields(i)(0)) - val accumulator = agg.createAccumulator() - agg.accumulate(accumulator, fieldValue) - output.setField(groupingKeys.length + i, accumulator) - i += 1 - }

          + function.createAccumulatorsAndSetToOutput(output)

            • End diff –

        create an accumulator with `function.createAccumulator()` once in `open()`, reset it here, and copy it to `output` with `function.setAggregationResults()`?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112003763 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala — @@ -25,58 +25,56 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.windowing.windows.TimeWindow -import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.codegen. {Compiler, GeneratedAggregationsFunction} import org.apache.flink.types.Row -import org.apache.flink.util.Preconditions - +import org.slf4j.LoggerFactory /** This map function only works for windows on batch tables. It appends an (aligned) rowtime field to the end of the output row. + * + * @param genAggregations Code-generated [ [GeneratedAggregations] ] + * @param timeFieldPos Time field position in input row + * @param tumbleTimeWindowSize The size of tumble time window */ class DataSetWindowAggMapFunction( private val aggregates: Array[AggregateFunction [_] ], private val aggFields: Array[Array [Int] ], private val groupingKeys: Array [Int] , private val timeFieldPos: Int, // time field position in input row + private val genAggregations: GeneratedAggregationsFunction, + private val timeFieldPos: Int, private val tumbleTimeWindowSize: Option [Long] , @transient private val returnType: TypeInformation [Row] ) extends RichMapFunction [Row, Row] with ResultTypeQueryable [Row] { - Preconditions.checkNotNull(aggregates) Preconditions.checkNotNull(aggFields) Preconditions.checkArgument(aggregates.length == aggFields.length) + extends RichMapFunction [Row, Row] + with ResultTypeQueryable [Row] + with Compiler [GeneratedAggregations] { private var output: Row = _ // add one more arity to store rowtime private val partialRowLength = groupingKeys.length + aggregates.length + 1 // rowtime index in the buffer output row private val rowtimeIndex: Int = partialRowLength - 1 + + val LOG = LoggerFactory.getLogger(this.getClass) + private var function: GeneratedAggregations = _ override def open(config: Configuration) { - output = new Row(partialRowLength) + LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + + s"Code:\n$genAggregations.code") + val clazz = compile( + getRuntimeContext.getUserCodeClassLoader, + genAggregations.name, + genAggregations.code) + LOG.debug("Instantiating AggregateHelper.") + function = clazz.newInstance() + + output = function.createOutputRow() } override def map(input: Row): Row = { var i = 0 while (i < aggregates.length) { - val agg = aggregates(i) - val fieldValue = input.getField(aggFields(i)(0)) - val accumulator = agg.createAccumulator() - agg.accumulate(accumulator, fieldValue) - output.setField(groupingKeys.length + i, accumulator) - i += 1 - } + function.createAccumulatorsAndSetToOutput(output) End diff – create an accumulator with `function.createAccumulator()` once in `open()`, reset it here, and copy it to `output` with `function.setAggregationResults()`?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r111991648

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -451,13 +581,34 @@ class CodeGenerator(
        {
        for (i <- accTypes.indices) yield
        j"""

        • accList$i = new java.util.ArrayList<$ {accTypes(i)}>(2);
          + | accList$i = new java.util.ArrayList<${accTypes(i)}

          >();

            • End diff –

        Why not creating the `ArrayList` with initial capacity 2?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111991648 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -451,13 +581,34 @@ class CodeGenerator( { for (i <- accTypes.indices) yield j""" accList$i = new java.util.ArrayList<$ {accTypes(i)}>(2); + | accList$i = new java.util.ArrayList<${accTypes(i)} >(); End diff – Why not creating the `ArrayList` with initial capacity 2?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3735#discussion_r112002707

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala —
        @@ -18,111 +18,77 @@
        package org.apache.flink.table.runtime.aggregate

        import java.lang.Iterable
        -import java.util.

        {ArrayList => JArrayList}

        import org.apache.flink.api.common.functions.RichGroupReduceFunction
        import org.apache.flink.configuration.Configuration
        -import org.apache.flink.table.functions.

        {Accumulator, AggregateFunction}

        +import org.apache.flink.table.codegen.

        {Compiler, GeneratedAggregationsFunction}

        import org.apache.flink.types.Row
        -import org.apache.flink.util.

        {Collector, Preconditions}

        +import org.apache.flink.util.Collector
        +import org.slf4j.LoggerFactory

        /**

        • It wraps the aggregate logic inside of
        • [[org.apache.flink.api.java.operators.GroupReduceOperator]].
          *
        • It is used for sliding on batch for both time and count-windows.
          *
        • * @param aggregates aggregate functions.
        • * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row
        • * and output Row.
        • * @param aggregateMapping index mapping between aggregate function list and aggregated value
        • * index in output Row.
        • * @param finalRowArity output row field count
          + * @param genAggregations Code-generated [[GeneratedAggregations]]
          + * @param keysAndAggregatesArity The total arity of keys and aggregates
        • @param finalRowWindowStartPos relative window-start position to last field of output row
        • @param finalRowWindowEndPos relative window-end position to last field of output row
        • @param windowSize size of the window, used to determine window-end for output row
          */
          class DataSetSlideWindowAggReduceGroupFunction(
        • aggregates: Array[AggregateFunction[_ <: Any]],
        • groupKeysMapping: Array[(Int, Int)],
        • aggregateMapping: Array[(Int, Int)],
        • finalRowArity: Int,
          + genAggregations: GeneratedAggregationsFunction,
          + keysAndAggregatesArity: Int,
          finalRowWindowStartPos: Option[Int],
          finalRowWindowEndPos: Option[Int],
          windowSize: Long)
        • extends RichGroupReduceFunction[Row, Row] {
          -
        • Preconditions.checkNotNull(aggregates)
        • Preconditions.checkNotNull(groupKeysMapping)
          + extends RichGroupReduceFunction[Row, Row]
          + with Compiler[GeneratedAggregations] {

        private var collector: TimeWindowPropertyCollector = _
        + protected val windowStartPos: Int = keysAndAggregatesArity
        +
        private var output: Row = _

        • private val accumulatorStartPos: Int = groupKeysMapping.length
        • protected val windowStartPos: Int = accumulatorStartPos + aggregates.length
          + protected var accumulators: Row = _
        • val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) { - new JArrayList[Accumulator](2) - }

          + val LOG = LoggerFactory.getLogger(this.getClass)
          + protected var function: GeneratedAggregations = _

        override def open(config: Configuration) {

        • output = new Row(finalRowArity)
          + LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
          + s"Code:\n$genAggregations.code")
          + val clazz = compile(
          + getClass.getClassLoader,
          + genAggregations.name,
          + genAggregations.code)
          + LOG.debug("Instantiating AggregateHelper.")
          + function = clazz.newInstance()
          +
          + output = function.createOutputRow()
          + accumulators = function.createAccumulators()
          collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
          -
        • // init lists with two empty accumulators
        • var i = 0
        • while (i < aggregates.length) { - val accumulator = aggregates(i).createAccumulator() - accumulatorList(i).add(accumulator) - accumulatorList(i).add(accumulator) - i += 1 - }

          }

        override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {

        • // reset first accumulator
        • var i = 0
        • while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulatorList(i).get(0)) - i += 1 - }

          + // reset accumulator
          + function.resetAccumulator(accumulators)

        val iterator = records.iterator()
        while (iterator.hasNext) {
        val record = iterator.next()

        • // accumulate
        • i = 0
        • while (i < aggregates.length) { - // insert received accumulator into acc list - val newAcc = record.getField(accumulatorStartPos + i).asInstanceOf[Accumulator] - accumulatorList(i).set(1, newAcc) - // merge acc list - val retAcc = aggregates(i).merge(accumulatorList(i)) - // insert result into acc list - accumulatorList(i).set(0, retAcc) - i += 1 - }

          + function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)

        // check if this record is the last record
        if (!iterator.hasNext) {
        — End diff –

        move this behind the loop

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112002707 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala — @@ -18,111 +18,77 @@ package org.apache.flink.table.runtime.aggregate import java.lang.Iterable -import java.util. {ArrayList => JArrayList} import org.apache.flink.api.common.functions.RichGroupReduceFunction import org.apache.flink.configuration.Configuration -import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.table.codegen. {Compiler, GeneratedAggregationsFunction} import org.apache.flink.types.Row -import org.apache.flink.util. {Collector, Preconditions} +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory /** It wraps the aggregate logic inside of [ [org.apache.flink.api.java.operators.GroupReduceOperator] ]. * It is used for sliding on batch for both time and count-windows. * * @param aggregates aggregate functions. * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row * and output Row. * @param aggregateMapping index mapping between aggregate function list and aggregated value * index in output Row. * @param finalRowArity output row field count + * @param genAggregations Code-generated [ [GeneratedAggregations] ] + * @param keysAndAggregatesArity The total arity of keys and aggregates @param finalRowWindowStartPos relative window-start position to last field of output row @param finalRowWindowEndPos relative window-end position to last field of output row @param windowSize size of the window, used to determine window-end for output row */ class DataSetSlideWindowAggReduceGroupFunction( aggregates: Array[AggregateFunction [_ <: Any] ], groupKeysMapping: Array [(Int, Int)] , aggregateMapping: Array [(Int, Int)] , finalRowArity: Int, + genAggregations: GeneratedAggregationsFunction, + keysAndAggregatesArity: Int, finalRowWindowStartPos: Option [Int] , finalRowWindowEndPos: Option [Int] , windowSize: Long) extends RichGroupReduceFunction [Row, Row] { - Preconditions.checkNotNull(aggregates) Preconditions.checkNotNull(groupKeysMapping) + extends RichGroupReduceFunction [Row, Row] + with Compiler [GeneratedAggregations] { private var collector: TimeWindowPropertyCollector = _ + protected val windowStartPos: Int = keysAndAggregatesArity + private var output: Row = _ private val accumulatorStartPos: Int = groupKeysMapping.length protected val windowStartPos: Int = accumulatorStartPos + aggregates.length + protected var accumulators: Row = _ val accumulatorList: Array[JArrayList [Accumulator] ] = Array.fill(aggregates.length) { - new JArrayList[Accumulator](2) - } + val LOG = LoggerFactory.getLogger(this.getClass) + protected var function: GeneratedAggregations = _ override def open(config: Configuration) { output = new Row(finalRowArity) + LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + + s"Code:\n$genAggregations.code") + val clazz = compile( + getClass.getClassLoader, + genAggregations.name, + genAggregations.code) + LOG.debug("Instantiating AggregateHelper.") + function = clazz.newInstance() + + output = function.createOutputRow() + accumulators = function.createAccumulators() collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) - // init lists with two empty accumulators var i = 0 while (i < aggregates.length) { - val accumulator = aggregates(i).createAccumulator() - accumulatorList(i).add(accumulator) - accumulatorList(i).add(accumulator) - i += 1 - } } override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { // reset first accumulator var i = 0 while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulatorList(i).get(0)) - i += 1 - } + // reset accumulator + function.resetAccumulator(accumulators) val iterator = records.iterator() while (iterator.hasNext) { val record = iterator.next() // accumulate i = 0 while (i < aggregates.length) { - // insert received accumulator into acc list - val newAcc = record.getField(accumulatorStartPos + i).asInstanceOf[Accumulator] - accumulatorList(i).set(1, newAcc) - // merge acc list - val retAcc = aggregates(i).merge(accumulatorList(i)) - // insert result into acc list - accumulatorList(i).set(0, retAcc) - i += 1 - } + function.mergeAccumulatorsPairWithKeyOffset(accumulators, record) // check if this record is the last record if (!iterator.hasNext) { — End diff – move this behind the loop
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user shaoxuan-wang commented on the issue:

        https://github.com/apache/flink/pull/3735

        @fhueske thanks for your feedback.
        Yes, we could keep GeneratedAggregations interface very clean as
        ```
        abstract class GeneratedAggregations extends Function

        { def setAggregationResults(accumulators: Row, output: Row) def setForwardedFields(input: Row, output: Row) def accumulate(accumulators: Row, input: Row) def retract(accumulators: Row, input: Row) def createAccumulators(): Row def mergeAccumulatorsPair(a: Row, b: Row): Row def resetAccumulator(accumulators: Row) }

        ```
        But I feel it might be not very good to add more parameters into code generate function as caller function will usually have to construct unnecessary empty parameters. I think we can break code generate functions into 2-3 functions (these are just the interface to process code-gen parameters, the fundamental implementation of each function will be shared). Let me prototype the changes, and we can continue the discussions from there.

        Regarding to your other comments. I did not look into the logic of previous implementations while just focused on the code-gen. I will take a look and optimize them.

        Show
        githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3735 @fhueske thanks for your feedback. Yes, we could keep GeneratedAggregations interface very clean as ``` abstract class GeneratedAggregations extends Function { def setAggregationResults(accumulators: Row, output: Row) def setForwardedFields(input: Row, output: Row) def accumulate(accumulators: Row, input: Row) def retract(accumulators: Row, input: Row) def createAccumulators(): Row def mergeAccumulatorsPair(a: Row, b: Row): Row def resetAccumulator(accumulators: Row) } ``` But I feel it might be not very good to add more parameters into code generate function as caller function will usually have to construct unnecessary empty parameters. I think we can break code generate functions into 2-3 functions (these are just the interface to process code-gen parameters, the fundamental implementation of each function will be shared). Let me prototype the changes, and we can continue the discussions from there. Regarding to your other comments. I did not look into the logic of previous implementations while just focused on the code-gen. I will take a look and optimize them.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on the issue:

        https://github.com/apache/flink/pull/3735

        Hi @shaoxuan-wang, I'm fine with both approaches, single method with additional parameters or multiple methods. If you think the multiple methods approach is better, let's go for it.

        Thanks, Fabian

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3735 Hi @shaoxuan-wang, I'm fine with both approaches, single method with additional parameters or multiple methods. If you think the multiple methods approach is better, let's go for it. Thanks, Fabian
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on the issue:

        https://github.com/apache/flink/pull/3735

        Hi @shaoxuan-wang, thanks for the PR. The changes look good.
        I opened a PR against your PR branch and refactored the `CodeGenerator.generateAggregations() method a bit. Among other things, I added code-gen for grouping sets.

        Let me know what you think.

        Best, Fabian

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3735 Hi @shaoxuan-wang, thanks for the PR. The changes look good. I opened a PR against your PR branch and refactored the `CodeGenerator.generateAggregations() method a bit. Among other things, I added code-gen for grouping sets. Let me know what you think. Best, Fabian
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user shaoxuan-wang commented on the issue:

        https://github.com/apache/flink/pull/3735

        @fhueske , your changes look good to me, I left a few comments.

        Show
        githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3735 @fhueske , your changes look good to me, I left a few comments.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on the issue:

        https://github.com/apache/flink/pull/3735

        Thanks @shaoxuan-wang!
        Merging

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3735 Thanks @shaoxuan-wang! Merging
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user asfgit closed the pull request at:

        https://github.com/apache/flink/pull/3735

        Show
        githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3735
        Hide
        fhueske Fabian Hueske added a comment -

        Implemented with 3b4542b8f0981f01e42c861bccbc67c8b3a20fdd

        Show
        fhueske Fabian Hueske added a comment - Implemented with 3b4542b8f0981f01e42c861bccbc67c8b3a20fdd

          People

          • Assignee:
            ShaoxuanWang Shaoxuan Wang
            Reporter:
            ShaoxuanWang Shaoxuan Wang
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development