Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5564 User Defined Aggregates
  3. FLINK-5768

Apply new aggregation functions for datastream and dataset tables

    Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      Apply new aggregation functions for datastream and dataset tables

      This includes:
      1. Change the implementation of the DataStream aggregation runtime code to use new aggregation functions and aggregate dataStream API.
      2. DataStream will be always running in incremental mode, as explained in 06/Feb/2017 in FLINK5564.
      2. Change the implementation of the Dataset aggregation runtime code to use new aggregation functions.
      3. Clean up unused class and method.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user shaoxuan-wang opened a pull request:

          https://github.com/apache/flink/pull/3423

          FLINK-5768 [table] Apply new aggregation functions for datastream and dataset tables

          This PR includes the following changes:
          1. Change the implementation of the DataStream aggregation runtime code to use new aggregation functions (FLINK5767) and aggregate dataStream API (FLINK5582).
          2. DataStream will be running always in incremental mode, as explained in FLINK5564 on 06/Feb/2017.
          3. Change the implementation of the Dataset aggregation runtime code to use new aggregation functions.
          4. Clean up unused class and method.

          "mvn verify" has passed under flink-libraries/flink-table

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [X] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [X] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/shaoxuan-wang/flink F5768-submit

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3423.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3423


          commit 04254e484f6cad4fb722ddcd72dbbd70b1406ce3
          Author: shaoxuan-wang <wshaoxuan@gmail.com>
          Date: 2017-02-27T11:09:30Z

          FLINK-5768 [table] Apply new aggregation functions for datastream and dataset tables


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/3423 FLINK-5768 [table] Apply new aggregation functions for datastream and dataset tables This PR includes the following changes: 1. Change the implementation of the DataStream aggregation runtime code to use new aggregation functions (FLINK5767) and aggregate dataStream API (FLINK5582). 2. DataStream will be running always in incremental mode, as explained in FLINK5564 on 06/Feb/2017. 3. Change the implementation of the Dataset aggregation runtime code to use new aggregation functions. 4. Clean up unused class and method. "mvn verify" has passed under flink-libraries/flink-table Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [X] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [X] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink F5768-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3423.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3423 commit 04254e484f6cad4fb722ddcd72dbbd70b1406ce3 Author: shaoxuan-wang <wshaoxuan@gmail.com> Date: 2017-02-27T11:09:30Z FLINK-5768 [table] Apply new aggregation functions for datastream and dataset tables
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on the issue:

          https://github.com/apache/flink/pull/3423

          I think we could further remove AggregateMapFunction and DataSetWindowAggregateMapFunction for windowedDataSet and DataSet, and let reduce function directly accumulator the input values. But since this PR is already very huge in terms of lines of changes, I prefer to optimize it in a separate PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3423 I think we could further remove AggregateMapFunction and DataSetWindowAggregateMapFunction for windowedDataSet and DataSet, and let reduce function directly accumulator the input values. But since this PR is already very huge in terms of lines of changes, I prefer to optimize it in a separate PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103432399

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala —
          @@ -23,7 +23,8 @@ import org.apache.calcite.rel.`type`.RelDataType
          import org.apache.calcite.rel.core.AggregateCall
          import org.apache.calcite.rel.

          {RelNode, RelWriter, SingleRel}

          import org.apache.flink.api.java.tuple.Tuple
          -import org.apache.flink.streaming.api.datastream.

          {AllWindowedStream, DataStream, KeyedStream, WindowedStream}

          +import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream,
          — End diff –

          Do not break line here. Imports may exceed line limit.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103432399 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala — @@ -23,7 +23,8 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel. {RelNode, RelWriter, SingleRel} import org.apache.flink.api.java.tuple.Tuple -import org.apache.flink.streaming.api.datastream. {AllWindowedStream, DataStream, KeyedStream, WindowedStream} +import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, — End diff – Do not break line here. Imports may exceed line limit.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103435665

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala —
          @@ -0,0 +1,101 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList => JArrayList, List => JList}

          +import org.apache.flink.api.common.functions.

          {AggregateFunction => ApiAggFunction}

          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +
          +/**
          + * Aggregate Function used for the aggregate operator in
          + * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate
          + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
          + * output row => the index of the aggregate) for all the aggregates
          + * @param groupKeysIndex the position (in the input Row) of grouping keys
          + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
          + * output row => the index of grouping key) for all the grouping keys
          + * @param finalRowArity the arity of the final row
          + */
          +class AggregateAggFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFieldsIndex: Array[Int],
          + private val aggregateMapping: Array[(Int, Int)],
          + private val groupKeysIndex: Array[Int],
          + private val groupKeysMapping: Array[(Int, Int)],
          + private val finalRowArity: Int)
          — End diff –

          I think we do not need this parameter. the result row should have `aggregates.length` fields. All other fields (groupKeys, window properties) can be added in the `WindowFunction`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103435665 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala — @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList => JArrayList, List => JList} +import org.apache.flink.api.common.functions. {AggregateFunction => ApiAggFunction} +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row + +/** + * Aggregate Function used for the aggregate operator in + * [ [org.apache.flink.streaming.api.datastream.WindowedStream] ] + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the + * output row => the index of the aggregate) for all the aggregates + * @param groupKeysIndex the position (in the input Row) of grouping keys + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the + * output row => the index of grouping key) for all the grouping keys + * @param finalRowArity the arity of the final row + */ +class AggregateAggFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFieldsIndex: Array [Int] , + private val aggregateMapping: Array [(Int, Int)] , + private val groupKeysIndex: Array [Int] , + private val groupKeysMapping: Array [(Int, Int)] , + private val finalRowArity: Int) — End diff – I think we do not need this parameter. the result row should have `aggregates.length` fields. All other fields (groupKeys, window properties) can be added in the `WindowFunction`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103468984

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala —
          @@ -18,32 +18,33 @@
          package org.apache.flink.table.runtime.aggregate

          import java.lang.Iterable
          +import java.util.

          {ArrayList => JArrayList}

          import org.apache.flink.api.common.functions.RichGroupReduceFunction
          import org.apache.flink.configuration.Configuration
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          import org.apache.flink.types.Row
          import org.apache.flink.util.

          {Collector, Preconditions}

          -import scala.collection.JavaConversions._
          -
          /**

          • * It wraps the aggregate logic inside of
          • * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
          • *
          • * @param aggregates The aggregate functions.
          • * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
          • * and output Row.
          • * @param aggregateMapping The index mapping between aggregate function list and aggregated value
          • * index in output Row.
          • * @param groupingSetsMapping The index mapping of keys in grouping sets between intermediate
          • * Row and output Row.
          • */
            + * It wraps the aggregate logic inside of
            + * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
            + *
            + * @param aggregates The aggregate functions.
            + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
            + * and output Row.
            + * @param aggregateMapping The index mapping between aggregate function list and aggregated
            + * value
            + * index in output Row.
            + * @param groupingSetsMapping The index mapping of keys in grouping sets between intermediate
            + * Row and output Row.
            + * @param finalRowArity The arity of the final resulting row
            + */
            class AggregateReduceGroupFunction(
          • private val aggregates: Array[Aggregate[_ <: Any]],
            + private val aggregates: Array[AggregateFunction[_ <: Any]],
            private val groupKeysMapping: Array[(Int, Int)],
            private val aggregateMapping: Array[(Int, Int)],
              • End diff –

          The positions of the aggregates in the input are known, right?
          So we can also use an `Array[Int]` instead?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103468984 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala — @@ -18,32 +18,33 @@ package org.apache.flink.table.runtime.aggregate import java.lang.Iterable +import java.util. {ArrayList => JArrayList} import org.apache.flink.api.common.functions.RichGroupReduceFunction import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} import org.apache.flink.types.Row import org.apache.flink.util. {Collector, Preconditions} -import scala.collection.JavaConversions._ - /** * It wraps the aggregate logic inside of * [ [org.apache.flink.api.java.operators.GroupReduceOperator] ]. * * @param aggregates The aggregate functions. * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row * and output Row. * @param aggregateMapping The index mapping between aggregate function list and aggregated value * index in output Row. * @param groupingSetsMapping The index mapping of keys in grouping sets between intermediate * Row and output Row. */ + * It wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupReduceOperator] ]. + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated + * value + * index in output Row. + * @param groupingSetsMapping The index mapping of keys in grouping sets between intermediate + * Row and output Row. + * @param finalRowArity The arity of the final resulting row + */ class AggregateReduceGroupFunction( private val aggregates: Array[Aggregate [_ <: Any] ], + private val aggregates: Array[AggregateFunction [_ <: Any] ], private val groupKeysMapping: Array [(Int, Int)] , private val aggregateMapping: Array [(Int, Int)] , End diff – The positions of the aggregates in the input are known, right? So we can also use an `Array [Int] ` instead?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103445700

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala —
          @@ -0,0 +1,101 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList => JArrayList, List => JList}

          +import org.apache.flink.api.common.functions.

          {AggregateFunction => ApiAggFunction}

          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +
          +/**
          + * Aggregate Function used for the aggregate operator in
          + * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate
          + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
          + * output row => the index of the aggregate) for all the aggregates
          + * @param groupKeysIndex the position (in the input Row) of grouping keys
          + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
          + * output row => the index of grouping key) for all the grouping keys
          + * @param finalRowArity the arity of the final row
          + */
          +class AggregateAggFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFieldsIndex: Array[Int],
          + private val aggregateMapping: Array[(Int, Int)],
          + private val groupKeysIndex: Array[Int],
          + private val groupKeysMapping: Array[(Int, Int)],
          + private val finalRowArity: Int)
          + extends ApiAggFunction[Row, Row, Row] {
          +
          + override def createAccumulator(): Row = {
          + val accumulatorRow: Row = new Row(groupKeysIndex.length + aggregates.length)
          +
          + for (i <- aggregates.indices)

          { + accumulatorRow.setField(groupKeysIndex.length + i, aggregates(i).createAccumulator()) + }

          + accumulatorRow
          + }
          +
          + override def add(value: Row, accumulatorRow: Row) = {
          + for (i <- groupKeysIndex.indices)

          { + accumulatorRow.setField(i, value.getField(groupKeysIndex(i))) + }

          +
          + for (i <- aggregates.indices)

          { + val accumulator = + accumulatorRow.getField(i + groupKeysIndex.length).asInstanceOf[Accumulator] + val v = value.getField(aggFieldsIndex(i)) + aggregates(i).accumulate(accumulator, v) + }

          + }
          +
          + override def getResult(accumulatorRow: Row): Row = {
          + val output = new Row(finalRowArity)
          — End diff –

          `val output = new Row(aggregates.length)`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103445700 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala — @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList => JArrayList, List => JList} +import org.apache.flink.api.common.functions. {AggregateFunction => ApiAggFunction} +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row + +/** + * Aggregate Function used for the aggregate operator in + * [ [org.apache.flink.streaming.api.datastream.WindowedStream] ] + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the + * output row => the index of the aggregate) for all the aggregates + * @param groupKeysIndex the position (in the input Row) of grouping keys + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the + * output row => the index of grouping key) for all the grouping keys + * @param finalRowArity the arity of the final row + */ +class AggregateAggFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFieldsIndex: Array [Int] , + private val aggregateMapping: Array [(Int, Int)] , + private val groupKeysIndex: Array [Int] , + private val groupKeysMapping: Array [(Int, Int)] , + private val finalRowArity: Int) + extends ApiAggFunction [Row, Row, Row] { + + override def createAccumulator(): Row = { + val accumulatorRow: Row = new Row(groupKeysIndex.length + aggregates.length) + + for (i <- aggregates.indices) { + accumulatorRow.setField(groupKeysIndex.length + i, aggregates(i).createAccumulator()) + } + accumulatorRow + } + + override def add(value: Row, accumulatorRow: Row) = { + for (i <- groupKeysIndex.indices) { + accumulatorRow.setField(i, value.getField(groupKeysIndex(i))) + } + + for (i <- aggregates.indices) { + val accumulator = + accumulatorRow.getField(i + groupKeysIndex.length).asInstanceOf[Accumulator] + val v = value.getField(aggFieldsIndex(i)) + aggregates(i).accumulate(accumulator, v) + } + } + + override def getResult(accumulatorRow: Row): Row = { + val output = new Row(finalRowArity) — End diff – `val output = new Row(aggregates.length)`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103476693

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala —
          @@ -34,13 +35,13 @@ import org.apache.flink.util.Preconditions

          • append an (aligned) rowtime field to the end of the output row.
            */
            class DataSetWindowAggregateMapFunction(
              • End diff –

          We could remove this method if we rework the subsequent combine and groupReduce functions. This would allow use to use `accumulate` in `combine()` or non-combined `groupReduce()`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103476693 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala — @@ -34,13 +35,13 @@ import org.apache.flink.util.Preconditions append an (aligned) rowtime field to the end of the output row. */ class DataSetWindowAggregateMapFunction( End diff – We could remove this method if we rework the subsequent combine and groupReduce functions. This would allow use to use `accumulate` in `combine()` or non-combined `groupReduce()`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103467528

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala —
          @@ -54,44 +55,51 @@ class AggregateReduceGroupFunction(
          override def open(config: Configuration) {
          Preconditions.checkNotNull(aggregates)
          Preconditions.checkNotNull(groupKeysMapping)

          • aggregateBuffer = new Row(intermediateRowArity)
            + aggregateBuffer = new Row(aggregates.length + groupKeysMapping.length)
            output = new Row(finalRowArity)
            if (!groupingSetsMapping.isEmpty) { intermediateGroupKeys = Some(groupKeysMapping.map(_._1)) }

            }

          /**

          • * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
          • * calculate aggregated values output by aggregate buffer, and set them into output
          • * Row based on the mapping relation between intermediate aggregate data and output data.
          • *
          • * @param records Grouped intermediate aggregate Rows iterator.
          • * @param out The collector to hand results to.
          • *
          • */
            + * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
            + * calculate aggregated values output by aggregate buffer, and set them into output
            + * Row based on the mapping relation between intermediate aggregate data and output data.
            + *
            + * @param records Grouped intermediate aggregate Rows iterator.
            + * @param out The collector to hand results to.
            + *
            + */
            override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
          • // Initiate intermediate aggregate value.
          • aggregates.foreach(_.initiate(aggregateBuffer))
            -
          • // Merge intermediate aggregate value to buffer.
            + // merge intermediate aggregate value to buffer.
            var last: Row = null
          • records.foreach((record) => {
          • aggregates.foreach(_.merge(record, aggregateBuffer))
            + val iterator = records.iterator()
            + val accumulatorList = Array.fill(aggregates.length) { + new JArrayList[Accumulator]() + }

            +
            + while (iterator.hasNext) {
            + val record = iterator.next()
            + for (i <- aggregates.indices) {
            + accumulatorList.add(

              • End diff –

          This materializes the whole group in Lists and will fail for large groups (or in case of a non-grouped aggregate).
          We need to change this to pairwise merges. The `List` should be reused.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103467528 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala — @@ -54,44 +55,51 @@ class AggregateReduceGroupFunction( override def open(config: Configuration) { Preconditions.checkNotNull(aggregates) Preconditions.checkNotNull(groupKeysMapping) aggregateBuffer = new Row(intermediateRowArity) + aggregateBuffer = new Row(aggregates.length + groupKeysMapping.length) output = new Row(finalRowArity) if (!groupingSetsMapping.isEmpty) { intermediateGroupKeys = Some(groupKeysMapping.map(_._1)) } } /** * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer, * calculate aggregated values output by aggregate buffer, and set them into output * Row based on the mapping relation between intermediate aggregate data and output data. * * @param records Grouped intermediate aggregate Rows iterator. * @param out The collector to hand results to. * */ + * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer, + * calculate aggregated values output by aggregate buffer, and set them into output + * Row based on the mapping relation between intermediate aggregate data and output data. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { // Initiate intermediate aggregate value. aggregates.foreach(_.initiate(aggregateBuffer)) - // Merge intermediate aggregate value to buffer. + // merge intermediate aggregate value to buffer. var last: Row = null records.foreach((record) => { aggregates.foreach(_.merge(record, aggregateBuffer)) + val iterator = records.iterator() + val accumulatorList = Array.fill(aggregates.length) { + new JArrayList[Accumulator]() + } + + while (iterator.hasNext) { + val record = iterator.next() + for (i <- aggregates.indices) { + accumulatorList .add( End diff – This materializes the whole group in Lists and will fail for large groups (or in case of a non-grouped aggregate). We need to change this to pairwise merges. The `List` should be reused.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103472747

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala —
          @@ -54,44 +55,51 @@ class AggregateReduceGroupFunction(
          override def open(config: Configuration) {
          Preconditions.checkNotNull(aggregates)
          Preconditions.checkNotNull(groupKeysMapping)

          • aggregateBuffer = new Row(intermediateRowArity)
            + aggregateBuffer = new Row(aggregates.length + groupKeysMapping.length)
            output = new Row(finalRowArity)
            if (!groupingSetsMapping.isEmpty) { intermediateGroupKeys = Some(groupKeysMapping.map(_._1)) }

            }

          /**

          • * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
          • * calculate aggregated values output by aggregate buffer, and set them into output
          • * Row based on the mapping relation between intermediate aggregate data and output data.
          • *
          • * @param records Grouped intermediate aggregate Rows iterator.
          • * @param out The collector to hand results to.
          • *
          • */
            + * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
            + * calculate aggregated values output by aggregate buffer, and set them into output
            + * Row based on the mapping relation between intermediate aggregate data and output data.
            + *
            + * @param records Grouped intermediate aggregate Rows iterator.
            + * @param out The collector to hand results to.
            + *
            + */
            override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
          • // Initiate intermediate aggregate value.
          • aggregates.foreach(_.initiate(aggregateBuffer))
            -
          • // Merge intermediate aggregate value to buffer.
            + // merge intermediate aggregate value to buffer.
            var last: Row = null
          • records.foreach((record) => {
          • aggregates.foreach(_.merge(record, aggregateBuffer))
            + val iterator = records.iterator()
            + val accumulatorList = Array.fill(aggregates.length) { + new JArrayList[Accumulator]() + }

            +
            + while (iterator.hasNext) {
            + val record = iterator.next()
            + for (i <- aggregates.indices) {
            + accumulatorList.add(

              • End diff –

          I'm wondering whether it would be better to have no preparing mapper and instead have two separate path for combineable and non-combineable.

          • Combinable: `input -> groupCombine() -> groupReduce()` where `groupCombine` uses `accumulate()` and `groupReduce` pairwise merges.
          • Non-Combinable: `input -> groupReduce()` where `groupReduce` uses `accumulate()`

          The combinable `groupCombine` and non-combinable `groupReduce` might even use the same code (except that `groupReduce` needs to call `getValue()` in the end).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103472747 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala — @@ -54,44 +55,51 @@ class AggregateReduceGroupFunction( override def open(config: Configuration) { Preconditions.checkNotNull(aggregates) Preconditions.checkNotNull(groupKeysMapping) aggregateBuffer = new Row(intermediateRowArity) + aggregateBuffer = new Row(aggregates.length + groupKeysMapping.length) output = new Row(finalRowArity) if (!groupingSetsMapping.isEmpty) { intermediateGroupKeys = Some(groupKeysMapping.map(_._1)) } } /** * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer, * calculate aggregated values output by aggregate buffer, and set them into output * Row based on the mapping relation between intermediate aggregate data and output data. * * @param records Grouped intermediate aggregate Rows iterator. * @param out The collector to hand results to. * */ + * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer, + * calculate aggregated values output by aggregate buffer, and set them into output + * Row based on the mapping relation between intermediate aggregate data and output data. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { // Initiate intermediate aggregate value. aggregates.foreach(_.initiate(aggregateBuffer)) - // Merge intermediate aggregate value to buffer. + // merge intermediate aggregate value to buffer. var last: Row = null records.foreach((record) => { aggregates.foreach(_.merge(record, aggregateBuffer)) + val iterator = records.iterator() + val accumulatorList = Array.fill(aggregates.length) { + new JArrayList[Accumulator]() + } + + while (iterator.hasNext) { + val record = iterator.next() + for (i <- aggregates.indices) { + accumulatorList .add( End diff – I'm wondering whether it would be better to have no preparing mapper and instead have two separate path for combineable and non-combineable. Combinable: `input -> groupCombine() -> groupReduce()` where `groupCombine` uses `accumulate()` and `groupReduce` pairwise merges. Non-Combinable: `input -> groupReduce()` where `groupReduce` uses `accumulate()` The combinable `groupCombine` and non-combinable `groupReduce` might even use the same code (except that `groupReduce` needs to call `getValue()` in the end).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103433985

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala —
          @@ -119,110 +119,54 @@ class DataStreamAggregate(
          s"select: ($aggString)"
          val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"

          • val mapFunction = AggregateUtil.createPrepareMapFunction(
          • namedAggregates,
          • grouping,
          • inputType)
            -
          • val mappedInput = inputDS.map(mapFunction).name(prepareOpName)
            -
            -
          • // check whether all aggregates support partial aggregate
          • if (AggregateUtil.doAllSupportPartialAggregation(
          • namedAggregates.map(_.getKey),
          • inputType,
          • grouping.length)) {
          • // do Incremental Aggregation
          • val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction(
          • namedAggregates,
          • inputType,
          • getRowType,
          • grouping)
          • // grouped / keyed aggregation
          • if (groupingKeys.length > 0) {
          • val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
          • window,
          • namedAggregates,
          • inputType,
          • rowRelDataType,
          • grouping,
          • namedProperties)
            + // grouped / keyed aggregation
            + if (groupingKeys.length > 0) {
            + val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
              • End diff –

          AggregationFunction -> WindowFunction?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103433985 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala — @@ -119,110 +119,54 @@ class DataStreamAggregate( s"select: ($aggString)" val nonKeyedAggOpName = s"window: ($window), select: ($aggString)" val mapFunction = AggregateUtil.createPrepareMapFunction( namedAggregates, grouping, inputType) - val mappedInput = inputDS.map(mapFunction).name(prepareOpName) - - // check whether all aggregates support partial aggregate if (AggregateUtil.doAllSupportPartialAggregation( namedAggregates.map(_.getKey), inputType, grouping.length)) { // do Incremental Aggregation val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction( namedAggregates, inputType, getRowType, grouping) // grouped / keyed aggregation if (groupingKeys.length > 0) { val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction( window, namedAggregates, inputType, rowRelDataType, grouping, namedProperties) + // grouped / keyed aggregation + if (groupingKeys.length > 0) { + val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction( End diff – AggregationFunction -> WindowFunction?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103466494

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala —
          @@ -54,44 +55,51 @@ class AggregateReduceGroupFunction(
          override def open(config: Configuration) {
          Preconditions.checkNotNull(aggregates)
          Preconditions.checkNotNull(groupKeysMapping)

          • aggregateBuffer = new Row(intermediateRowArity)
            + aggregateBuffer = new Row(aggregates.length + groupKeysMapping.length)
            output = new Row(finalRowArity)
            if (!groupingSetsMapping.isEmpty) { intermediateGroupKeys = Some(groupKeysMapping.map(_._1)) }

            }

          /**

          • * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
          • * calculate aggregated values output by aggregate buffer, and set them into output
          • * Row based on the mapping relation between intermediate aggregate data and output data.
          • *
          • * @param records Grouped intermediate aggregate Rows iterator.
          • * @param out The collector to hand results to.
          • *
          • */
            + * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
            + * calculate aggregated values output by aggregate buffer, and set them into output
            + * Row based on the mapping relation between intermediate aggregate data and output data.
            + *
            + * @param records Grouped intermediate aggregate Rows iterator.
            + * @param out The collector to hand results to.
            + *
            + */
            override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
          • // Initiate intermediate aggregate value.
          • aggregates.foreach(_.initiate(aggregateBuffer))
            -
          • // Merge intermediate aggregate value to buffer.
            + // merge intermediate aggregate value to buffer.
            var last: Row = null
          • records.foreach((record) => {
          • aggregates.foreach(_.merge(record, aggregateBuffer))
            + val iterator = records.iterator()
            + val accumulatorList = Array.fill(aggregates.length) {
              • End diff –

          Make this a member variable of size 2 for pairwise merging.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103466494 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala — @@ -54,44 +55,51 @@ class AggregateReduceGroupFunction( override def open(config: Configuration) { Preconditions.checkNotNull(aggregates) Preconditions.checkNotNull(groupKeysMapping) aggregateBuffer = new Row(intermediateRowArity) + aggregateBuffer = new Row(aggregates.length + groupKeysMapping.length) output = new Row(finalRowArity) if (!groupingSetsMapping.isEmpty) { intermediateGroupKeys = Some(groupKeysMapping.map(_._1)) } } /** * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer, * calculate aggregated values output by aggregate buffer, and set them into output * Row based on the mapping relation between intermediate aggregate data and output data. * * @param records Grouped intermediate aggregate Rows iterator. * @param out The collector to hand results to. * */ + * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer, + * calculate aggregated values output by aggregate buffer, and set them into output + * Row based on the mapping relation between intermediate aggregate data and output data. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { // Initiate intermediate aggregate value. aggregates.foreach(_.initiate(aggregateBuffer)) - // Merge intermediate aggregate value to buffer. + // merge intermediate aggregate value to buffer. var last: Row = null records.foreach((record) => { aggregates.foreach(_.merge(record, aggregateBuffer)) + val iterator = records.iterator() + val accumulatorList = Array.fill(aggregates.length) { End diff – Make this a member variable of size 2 for pairwise merging.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103475723

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala —
          @@ -28,68 +30,74 @@ import org.apache.flink.types.Row

          • [[org.apache.flink.api.java.operators.GroupCombineOperator]].
          • It is used for tumbling time-window on batch.
            *
          • * @param rowtimePos The rowtime field index in input row
          • * @param windowSize Tumbling time window size
          • * @param windowStartPos The relative window-start field position to the last field of output row
          • * @param windowEndPos The relative window-end field position to the last field of output row
          • * @param aggregates The aggregate functions.
            + * @param windowSize Tumbling time window size
            + * @param windowStartPos The relative window-start field position to the last field of output row
            + * @param windowEndPos The relative window-end field position to the last field of output row
            + * @param aggregates The aggregate functions.
          • @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
          • and output Row.
          • @param aggregateMapping The index mapping between aggregate function list and aggregated value
          • index in output Row.
          • * @param intermediateRowArity The intermediate row field count
          • * @param finalRowArity The output row field count
            + * @param finalRowArity The output row field count
            */
            class DataSetTumbleTimeWindowAggReduceCombineFunction(
          • rowtimePos: Int,
            windowSize: Long,
            windowStartPos: Option[Int],
            windowEndPos: Option[Int],
          • aggregates: Array[Aggregate[_ <: Any]],
            + aggregates: Array[AggregateFunction[_ <: Any]],
            groupKeysMapping: Array[(Int, Int)],
            aggregateMapping: Array[(Int, Int)],
          • intermediateRowArity: Int,
            finalRowArity: Int)
            extends DataSetTumbleTimeWindowAggReduceGroupFunction(
          • rowtimePos,
            windowSize,
            windowStartPos,
            windowEndPos,
            aggregates,
            groupKeysMapping,
            aggregateMapping,
          • intermediateRowArity,
            finalRowArity)
          • with CombineFunction[Row, Row] {
            + with CombineFunction[Row, Row] {

          /**

          • For sub-grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
            *
          • * @param records Sub-grouped intermediate aggregate Rows iterator.
            + * @param records Sub-grouped intermediate aggregate Rows iterator.
          • @return Combined intermediate aggregate Row.
            *
            */
            override def combine(records: Iterable[Row]): Row = {
          • // initiate intermediate aggregate value.
          • aggregates.foreach(_.initiate(aggregateBuffer))
            -
          • // merge intermediate aggregate value to buffer.
            var last: Row = null
            -
            val iterator = records.iterator()
            + val accumulatorList = Array.fill(aggregates.length) { + new JArrayList[Accumulator]() + }

            +
            + // per each aggregator, collect its accumulators to a list
            while (iterator.hasNext) {
            val record = iterator.next()

          • aggregates.foreach(_.merge(record, aggregateBuffer))
            + for (i <- aggregates.indices) {
            + accumulatorList.add(
              • End diff –

          pairwise merging

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103475723 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala — @@ -28,68 +30,74 @@ import org.apache.flink.types.Row [ [org.apache.flink.api.java.operators.GroupCombineOperator] ]. It is used for tumbling time-window on batch. * * @param rowtimePos The rowtime field index in input row * @param windowSize Tumbling time window size * @param windowStartPos The relative window-start field position to the last field of output row * @param windowEndPos The relative window-end field position to the last field of output row * @param aggregates The aggregate functions. + * @param windowSize Tumbling time window size + * @param windowStartPos The relative window-start field position to the last field of output row + * @param windowEndPos The relative window-end field position to the last field of output row + * @param aggregates The aggregate functions. @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row and output Row. @param aggregateMapping The index mapping between aggregate function list and aggregated value index in output Row. * @param intermediateRowArity The intermediate row field count * @param finalRowArity The output row field count + * @param finalRowArity The output row field count */ class DataSetTumbleTimeWindowAggReduceCombineFunction( rowtimePos: Int, windowSize: Long, windowStartPos: Option [Int] , windowEndPos: Option [Int] , aggregates: Array[Aggregate [_ <: Any] ], + aggregates: Array[AggregateFunction [_ <: Any] ], groupKeysMapping: Array [(Int, Int)] , aggregateMapping: Array [(Int, Int)] , intermediateRowArity: Int, finalRowArity: Int) extends DataSetTumbleTimeWindowAggReduceGroupFunction( rowtimePos, windowSize, windowStartPos, windowEndPos, aggregates, groupKeysMapping, aggregateMapping, intermediateRowArity, finalRowArity) with CombineFunction [Row, Row] { + with CombineFunction [Row, Row] { /** For sub-grouped intermediate aggregate Rows, merge all of them into aggregate buffer, * * @param records Sub-grouped intermediate aggregate Rows iterator. + * @param records Sub-grouped intermediate aggregate Rows iterator. @return Combined intermediate aggregate Row. * */ override def combine(records: Iterable [Row] ): Row = { // initiate intermediate aggregate value. aggregates.foreach(_.initiate(aggregateBuffer)) - // merge intermediate aggregate value to buffer. var last: Row = null - val iterator = records.iterator() + val accumulatorList = Array.fill(aggregates.length) { + new JArrayList[Accumulator]() + } + + // per each aggregator, collect its accumulators to a list while (iterator.hasNext) { val record = iterator.next() aggregates.foreach(_.merge(record, aggregateBuffer)) + for (i <- aggregates.indices) { + accumulatorList .add( End diff – pairwise merging
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103434035

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala —
          @@ -119,110 +119,54 @@ class DataStreamAggregate(
          s"select: ($aggString)"
          val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"

          • val mapFunction = AggregateUtil.createPrepareMapFunction(
          • namedAggregates,
          • grouping,
          • inputType)
            -
          • val mappedInput = inputDS.map(mapFunction).name(prepareOpName)
            -
            -
          • // check whether all aggregates support partial aggregate
          • if (AggregateUtil.doAllSupportPartialAggregation(
          • namedAggregates.map(_.getKey),
          • inputType,
          • grouping.length)) {
          • // do Incremental Aggregation
          • val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction(
          • namedAggregates,
          • inputType,
          • getRowType,
          • grouping)
          • // grouped / keyed aggregation
          • if (groupingKeys.length > 0) {
          • val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
          • window,
          • namedAggregates,
          • inputType,
          • rowRelDataType,
          • grouping,
          • namedProperties)
            + // grouped / keyed aggregation
            + if (groupingKeys.length > 0) { + val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction( + window, + rowRelDataType.getFieldCount, + namedProperties) - val keyedStream = mappedInput.keyBy(groupingKeys: _*) - val windowedStream = - createKeyedWindowedStream(window, keyedStream) + val keyedStream = inputDS.keyBy(groupingKeys: _*) + val windowedStream = + createKeyedWindowedStream(window, keyedStream) .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]] - windowedStream - .reduce(reduceFunction, windowFunction) - .returns(rowTypeInfo) - .name(keyedAggOpName) - }
          • // global / non-keyed aggregation
          • else { - val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction( - window, + val (aggFunction, accumulatorRowType) = + AggregateUtil.createDataStreamAggregateFunction( namedAggregates, inputType, rowRelDataType, - grouping, - namedProperties) - - val windowedStream = - createNonKeyedWindowedStream(window, mappedInput) - .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]] + grouping) - windowedStream - .reduce(reduceFunction, windowFunction) - .returns(rowTypeInfo) - .name(nonKeyedAggOpName) - }

            + windowedStream
            + .aggregate(aggFunction, windowFunction, accumulatorRowType, rowTypeInfo, rowTypeInfo)
            + .name(keyedAggOpName)
            }
            + // global / non-keyed aggregation
            else {

          • // do non-Incremental Aggregation
          • // grouped / keyed aggregation
          • if (groupingKeys.length > 0) {
            -
          • val windowFunction = AggregateUtil.createWindowAggregationFunction(
          • window,
          • namedAggregates,
          • inputType,
          • rowRelDataType,
          • grouping,
          • namedProperties)
            + val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction(
              • End diff –

          AggregationFunction -> WindowFunction

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103434035 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala — @@ -119,110 +119,54 @@ class DataStreamAggregate( s"select: ($aggString)" val nonKeyedAggOpName = s"window: ($window), select: ($aggString)" val mapFunction = AggregateUtil.createPrepareMapFunction( namedAggregates, grouping, inputType) - val mappedInput = inputDS.map(mapFunction).name(prepareOpName) - - // check whether all aggregates support partial aggregate if (AggregateUtil.doAllSupportPartialAggregation( namedAggregates.map(_.getKey), inputType, grouping.length)) { // do Incremental Aggregation val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction( namedAggregates, inputType, getRowType, grouping) // grouped / keyed aggregation if (groupingKeys.length > 0) { val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction( window, namedAggregates, inputType, rowRelDataType, grouping, namedProperties) + // grouped / keyed aggregation + if (groupingKeys.length > 0) { + val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction( + window, + rowRelDataType.getFieldCount, + namedProperties) - val keyedStream = mappedInput.keyBy(groupingKeys: _*) - val windowedStream = - createKeyedWindowedStream(window, keyedStream) + val keyedStream = inputDS.keyBy(groupingKeys: _*) + val windowedStream = + createKeyedWindowedStream(window, keyedStream) .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]] - windowedStream - .reduce(reduceFunction, windowFunction) - .returns(rowTypeInfo) - .name(keyedAggOpName) - } // global / non-keyed aggregation else { - val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction( - window, + val (aggFunction, accumulatorRowType) = + AggregateUtil.createDataStreamAggregateFunction( namedAggregates, inputType, rowRelDataType, - grouping, - namedProperties) - - val windowedStream = - createNonKeyedWindowedStream(window, mappedInput) - .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]] + grouping) - windowedStream - .reduce(reduceFunction, windowFunction) - .returns(rowTypeInfo) - .name(nonKeyedAggOpName) - } + windowedStream + .aggregate(aggFunction, windowFunction, accumulatorRowType, rowTypeInfo, rowTypeInfo) + .name(keyedAggOpName) } + // global / non-keyed aggregation else { // do non-Incremental Aggregation // grouped / keyed aggregation if (groupingKeys.length > 0) { - val windowFunction = AggregateUtil.createWindowAggregationFunction( window, namedAggregates, inputType, rowRelDataType, grouping, namedProperties) + val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction( End diff – AggregationFunction -> WindowFunction
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103434244

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala —
          @@ -119,110 +119,54 @@ class DataStreamAggregate(
          s"select: ($aggString)"
          val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"

          • val mapFunction = AggregateUtil.createPrepareMapFunction(
          • namedAggregates,
          • grouping,
          • inputType)
            -
          • val mappedInput = inputDS.map(mapFunction).name(prepareOpName)
            -
            -
          • // check whether all aggregates support partial aggregate
          • if (AggregateUtil.doAllSupportPartialAggregation(
          • namedAggregates.map(_.getKey),
          • inputType,
          • grouping.length)) {
          • // do Incremental Aggregation
          • val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction(
          • namedAggregates,
          • inputType,
          • getRowType,
          • grouping)
          • // grouped / keyed aggregation
          • if (groupingKeys.length > 0) {
          • val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
          • window,
          • namedAggregates,
          • inputType,
          • rowRelDataType,
          • grouping,
          • namedProperties)
            + // grouped / keyed aggregation
            + if (groupingKeys.length > 0) { + val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction( + window, + rowRelDataType.getFieldCount, + namedProperties) - val keyedStream = mappedInput.keyBy(groupingKeys: _*) - val windowedStream = - createKeyedWindowedStream(window, keyedStream) + val keyedStream = inputDS.keyBy(groupingKeys: _*) + val windowedStream = + createKeyedWindowedStream(window, keyedStream) .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]] - windowedStream - .reduce(reduceFunction, windowFunction) - .returns(rowTypeInfo) - .name(keyedAggOpName) - }
          • // global / non-keyed aggregation
          • else { - val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction( - window, + val (aggFunction, accumulatorRowType) = + AggregateUtil.createDataStreamAggregateFunction( namedAggregates, inputType, rowRelDataType, - grouping, - namedProperties) - - val windowedStream = - createNonKeyedWindowedStream(window, mappedInput) - .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]] + grouping) - windowedStream - .reduce(reduceFunction, windowFunction) - .returns(rowTypeInfo) - .name(nonKeyedAggOpName) - }

            + windowedStream
            + .aggregate(aggFunction, windowFunction, accumulatorRowType, rowTypeInfo, rowTypeInfo)
            + .name(keyedAggOpName)
            }
            + // global / non-keyed aggregation
            else {

          • // do non-Incremental Aggregation
          • // grouped / keyed aggregation
          • if (groupingKeys.length > 0) { - - val windowFunction = AggregateUtil.createWindowAggregationFunction( - window, - namedAggregates, - inputType, - rowRelDataType, - grouping, - namedProperties) + val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction( + window, + rowRelDataType.getFieldCount, + namedProperties) - val keyedStream = mappedInput.keyBy(groupingKeys: _*) - val windowedStream = - createKeyedWindowedStream(window, keyedStream) - .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]] + val windowedStream = + createNonKeyedWindowedStream(window, inputDS) + .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]] - windowedStream - .apply(windowFunction) - .returns(rowTypeInfo) - .name(keyedAggOpName) - }
          • // global / non-keyed aggregation
          • else {
          • val windowFunction = AggregateUtil.createAllWindowAggregationFunction(
          • window,
            + val (aggFunction, accumulatorRowType) =
            + AggregateUtil.createDataStreamAggregateFunction(
            namedAggregates,
            inputType,
            rowRelDataType,
          • grouping,
          • namedProperties)
            + grouping)
              • End diff –

          Make `grouping` an optional parameter or an Option?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103434244 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala — @@ -119,110 +119,54 @@ class DataStreamAggregate( s"select: ($aggString)" val nonKeyedAggOpName = s"window: ($window), select: ($aggString)" val mapFunction = AggregateUtil.createPrepareMapFunction( namedAggregates, grouping, inputType) - val mappedInput = inputDS.map(mapFunction).name(prepareOpName) - - // check whether all aggregates support partial aggregate if (AggregateUtil.doAllSupportPartialAggregation( namedAggregates.map(_.getKey), inputType, grouping.length)) { // do Incremental Aggregation val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction( namedAggregates, inputType, getRowType, grouping) // grouped / keyed aggregation if (groupingKeys.length > 0) { val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction( window, namedAggregates, inputType, rowRelDataType, grouping, namedProperties) + // grouped / keyed aggregation + if (groupingKeys.length > 0) { + val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction( + window, + rowRelDataType.getFieldCount, + namedProperties) - val keyedStream = mappedInput.keyBy(groupingKeys: _*) - val windowedStream = - createKeyedWindowedStream(window, keyedStream) + val keyedStream = inputDS.keyBy(groupingKeys: _*) + val windowedStream = + createKeyedWindowedStream(window, keyedStream) .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]] - windowedStream - .reduce(reduceFunction, windowFunction) - .returns(rowTypeInfo) - .name(keyedAggOpName) - } // global / non-keyed aggregation else { - val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction( - window, + val (aggFunction, accumulatorRowType) = + AggregateUtil.createDataStreamAggregateFunction( namedAggregates, inputType, rowRelDataType, - grouping, - namedProperties) - - val windowedStream = - createNonKeyedWindowedStream(window, mappedInput) - .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]] + grouping) - windowedStream - .reduce(reduceFunction, windowFunction) - .returns(rowTypeInfo) - .name(nonKeyedAggOpName) - } + windowedStream + .aggregate(aggFunction, windowFunction, accumulatorRowType, rowTypeInfo, rowTypeInfo) + .name(keyedAggOpName) } + // global / non-keyed aggregation else { // do non-Incremental Aggregation // grouped / keyed aggregation if (groupingKeys.length > 0) { - - val windowFunction = AggregateUtil.createWindowAggregationFunction( - window, - namedAggregates, - inputType, - rowRelDataType, - grouping, - namedProperties) + val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction( + window, + rowRelDataType.getFieldCount, + namedProperties) - val keyedStream = mappedInput.keyBy(groupingKeys: _*) - val windowedStream = - createKeyedWindowedStream(window, keyedStream) - .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]] + val windowedStream = + createNonKeyedWindowedStream(window, inputDS) + .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]] - windowedStream - .apply(windowFunction) - .returns(rowTypeInfo) - .name(keyedAggOpName) - } // global / non-keyed aggregation else { val windowFunction = AggregateUtil.createAllWindowAggregationFunction( window, + val (aggFunction, accumulatorRowType) = + AggregateUtil.createDataStreamAggregateFunction( namedAggregates, inputType, rowRelDataType, grouping, namedProperties) + grouping) End diff – Make `grouping` an optional parameter or an Option?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103474983

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala —
          @@ -104,21 +110,27 @@ class DataSetSessionWindowAggregateReduceGroupFunction(
          // calculate the current window and open a new window
          if (null != windowEnd) {
          // evaluate and emit the current window's result.

          • doEvaluateAndCollect(out, windowStart, windowEnd)
            + doEvaluateAndCollect(out, accumulatorList, windowStart, windowEnd)
            +
            + // clear the accumulator list for all aggregate
            + for (i <- aggregates.indices) { + accumulatorList(i).clear() + }

            } else

            Unknown macro: { // set group keys value to final output. groupKeysMapping.foreach { case (after, previous) => output.setField(after, record.getField(previous)) } }
          • // initiate intermediate aggregate value.
          • aggregates.foreach(_.initiate(aggregateBuffer))
            +
            windowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
            }
          • // merge intermediate aggregate value to the buffered value.
          • aggregates.foreach(_.merge(record, aggregateBuffer))
            + // collect the accumulators for each aggregate
            + for (i <- aggregates.indices) {
            + accumulatorList.add(record.getField(accumStartPos + i).asInstanceOf[Accumulator])
              • End diff –

          Pairwise merging

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103474983 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala — @@ -104,21 +110,27 @@ class DataSetSessionWindowAggregateReduceGroupFunction( // calculate the current window and open a new window if (null != windowEnd) { // evaluate and emit the current window's result. doEvaluateAndCollect(out, windowStart, windowEnd) + doEvaluateAndCollect(out, accumulatorList, windowStart, windowEnd) + + // clear the accumulator list for all aggregate + for (i <- aggregates.indices) { + accumulatorList(i).clear() + } } else Unknown macro: { // set group keys value to final output. groupKeysMapping.foreach { case (after, previous) => output.setField(after, record.getField(previous)) } } // initiate intermediate aggregate value. aggregates.foreach(_.initiate(aggregateBuffer)) + windowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf [Long] } // merge intermediate aggregate value to the buffered value. aggregates.foreach(_.merge(record, aggregateBuffer)) + // collect the accumulators for each aggregate + for (i <- aggregates.indices) { + accumulatorList .add(record.getField(accumStartPos + i).asInstanceOf [Accumulator] ) End diff – Pairwise merging
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103478412

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala —
          @@ -56,26 +46,16 @@ class IncrementalAggregateWindowFunction[W <: Window](

          • Row based on the mapping relation between intermediate aggregate data and output data.
            */
            override def apply(
          • key: Tuple,
          • window: W,
          • records: Iterable[Row],
          • out: Collector[Row]): Unit = {
            + key: Tuple,
              • End diff –

          Get group keys from `Tuple`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103478412 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala — @@ -56,26 +46,16 @@ class IncrementalAggregateWindowFunction [W <: Window] ( Row based on the mapping relation between intermediate aggregate data and output data. */ override def apply( key: Tuple, window: W, records: Iterable [Row] , out: Collector [Row] ): Unit = { + key: Tuple, End diff – Get group keys from `Tuple`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103469278

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala —
          @@ -19,61 +19,71 @@
          package org.apache.flink.table.runtime.aggregate

          import java.lang.Iterable
          +import java.util.

          {ArrayList => JArrayList}

          import org.apache.flink.api.common.functions.CombineFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          import org.apache.flink.types.Row

          -import scala.collection.JavaConversions._
          -
          /**

          • * It wraps the aggregate logic inside of
          • * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
          • * [[org.apache.flink.api.java.operators.GroupCombineOperator]]
          • *
          • * @param aggregates The aggregate functions.
          • * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
          • * and output Row.
          • * @param aggregateMapping The index mapping between aggregate function list and aggregated value
          • * index in output Row.
          • * @param groupingSetsMapping The index mapping of keys in grouping sets between intermediate
          • * Row and output Row.
          • */
            + * It wraps the aggregate logic inside of
            + * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
            + * [[org.apache.flink.api.java.operators.GroupCombineOperator]]
            + *
            + * @param aggregates The aggregate functions.
            + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
            + * and output Row.
            + * @param aggregateMapping The index mapping between aggregate function list and aggregated
            + * value
            + * index in output Row.
            + * @param groupingSetsMapping The index mapping of keys in grouping sets between intermediate
            + * Row and output Row.
            + * @param finalRowArity the arity of the final resulting row
            + */
            class AggregateReduceCombineFunction(
          • private val aggregates: Array[Aggregate[_ <: Any]],
            + private val aggregates: Array[AggregateFunction[_ <: Any]],
            private val groupKeysMapping: Array[(Int, Int)],
            private val aggregateMapping: Array[(Int, Int)],
            private val groupingSetsMapping: Array[(Int, Int)],
          • private val intermediateRowArity: Int,
            private val finalRowArity: Int)
            extends AggregateReduceGroupFunction(
            aggregates,
            groupKeysMapping,
            aggregateMapping,
            groupingSetsMapping,
          • intermediateRowArity,
          • finalRowArity)
          • with CombineFunction[Row, Row] {
            + finalRowArity) with CombineFunction[Row, Row] {

          /**

          • * For sub-grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
          • *
          • * @param records Sub-grouped intermediate aggregate Rows iterator.
          • * @return Combined intermediate aggregate Row.
          • *
          • */
            + * For sub-grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
            + *
            + * @param records Sub-grouped intermediate aggregate Rows iterator.
            + * @return Combined intermediate aggregate Row.
            + *
            + */
            override def combine(records: Iterable[Row]): Row = {
          • // Initiate intermediate aggregate value.
          • aggregates.foreach(_.initiate(aggregateBuffer))
            -
          • // Merge intermediate aggregate value to buffer.
            + // merge intermediate aggregate value to buffer.
            var last: Row = null
          • records.foreach((record) => {
          • aggregates.foreach(_.merge(record, aggregateBuffer))
            + val iterator = records.iterator()
            + val accumulatorList = Array.fill(aggregates.length) { + new JArrayList[Accumulator]() + }

            +
            + while (iterator.hasNext) {
            + val record = iterator.next()
            + for (i <- aggregates.indices) {
            + accumulatorList.add(

              • End diff –

          Same as for `reduce()`. We cannot materialize the whole group but must merge the accumulators pairwise.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103469278 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala — @@ -19,61 +19,71 @@ package org.apache.flink.table.runtime.aggregate import java.lang.Iterable +import java.util. {ArrayList => JArrayList} import org.apache.flink.api.common.functions.CombineFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} import org.apache.flink.types.Row -import scala.collection.JavaConversions._ - /** * It wraps the aggregate logic inside of * [ [org.apache.flink.api.java.operators.GroupReduceOperator] ] and * [ [org.apache.flink.api.java.operators.GroupCombineOperator] ] * * @param aggregates The aggregate functions. * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row * and output Row. * @param aggregateMapping The index mapping between aggregate function list and aggregated value * index in output Row. * @param groupingSetsMapping The index mapping of keys in grouping sets between intermediate * Row and output Row. */ + * It wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupReduceOperator] ] and + * [ [org.apache.flink.api.java.operators.GroupCombineOperator] ] + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated + * value + * index in output Row. + * @param groupingSetsMapping The index mapping of keys in grouping sets between intermediate + * Row and output Row. + * @param finalRowArity the arity of the final resulting row + */ class AggregateReduceCombineFunction( private val aggregates: Array[Aggregate [_ <: Any] ], + private val aggregates: Array[AggregateFunction [_ <: Any] ], private val groupKeysMapping: Array [(Int, Int)] , private val aggregateMapping: Array [(Int, Int)] , private val groupingSetsMapping: Array [(Int, Int)] , private val intermediateRowArity: Int, private val finalRowArity: Int) extends AggregateReduceGroupFunction( aggregates, groupKeysMapping, aggregateMapping, groupingSetsMapping, intermediateRowArity, finalRowArity) with CombineFunction [Row, Row] { + finalRowArity) with CombineFunction [Row, Row] { /** * For sub-grouped intermediate aggregate Rows, merge all of them into aggregate buffer, * * @param records Sub-grouped intermediate aggregate Rows iterator. * @return Combined intermediate aggregate Row. * */ + * For sub-grouped intermediate aggregate Rows, merge all of them into aggregate buffer, + * + * @param records Sub-grouped intermediate aggregate Rows iterator. + * @return Combined intermediate aggregate Row. + * + */ override def combine(records: Iterable [Row] ): Row = { // Initiate intermediate aggregate value. aggregates.foreach(_.initiate(aggregateBuffer)) - // Merge intermediate aggregate value to buffer. + // merge intermediate aggregate value to buffer. var last: Row = null records.foreach((record) => { aggregates.foreach(_.merge(record, aggregateBuffer)) + val iterator = records.iterator() + val accumulatorList = Array.fill(aggregates.length) { + new JArrayList[Accumulator]() + } + + while (iterator.hasNext) { + val record = iterator.next() + for (i <- aggregates.indices) { + accumulatorList .add( End diff – Same as for `reduce()`. We cannot materialize the whole group but must merge the accumulators pairwise.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103435531

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala —
          @@ -0,0 +1,101 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList => JArrayList, List => JList}

          +import org.apache.flink.api.common.functions.

          {AggregateFunction => ApiAggFunction}

          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +
          +/**
          + * Aggregate Function used for the aggregate operator in
          + * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate
          + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
          + * output row => the index of the aggregate) for all the aggregates
          + * @param groupKeysIndex the position (in the input Row) of grouping keys
          + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
          + * output row => the index of grouping key) for all the grouping keys
          + * @param finalRowArity the arity of the final row
          + */
          +class AggregateAggFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFieldsIndex: Array[Int],
          — End diff –

          `aggFieldsIndex` -> `aggInFields`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103435531 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala — @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList => JArrayList, List => JList} +import org.apache.flink.api.common.functions. {AggregateFunction => ApiAggFunction} +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row + +/** + * Aggregate Function used for the aggregate operator in + * [ [org.apache.flink.streaming.api.datastream.WindowedStream] ] + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the + * output row => the index of the aggregate) for all the aggregates + * @param groupKeysIndex the position (in the input Row) of grouping keys + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the + * output row => the index of grouping key) for all the grouping keys + * @param finalRowArity the arity of the final row + */ +class AggregateAggFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFieldsIndex: Array [Int] , — End diff – `aggFieldsIndex` -> `aggInFields`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103475803

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala —
          @@ -68,29 +68,42 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(

          override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {

          • // initiate intermediate aggregate value.
          • aggregates.foreach(_.initiate(aggregateBuffer))
            -
          • // merge intermediate aggregate value to buffer.
            var last: Row = null
            -
            val iterator = records.iterator()
            + val accumulatorList = Array.fill(aggregates.length) { + new JArrayList[Accumulator]() + }

            +
            + // per each aggregator, collect its accumulators to a list
            while (iterator.hasNext) {
            val record = iterator.next()

          • aggregates.foreach(_.merge(record, aggregateBuffer))
            + for (i <- aggregates.indices) {
            + accumulatorList.add(record.getField(accumStartPos + i).asInstanceOf[Accumulator])
              • End diff –

          pairwise merging

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103475803 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala — @@ -68,29 +68,42 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction( override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { // initiate intermediate aggregate value. aggregates.foreach(_.initiate(aggregateBuffer)) - // merge intermediate aggregate value to buffer. var last: Row = null - val iterator = records.iterator() + val accumulatorList = Array.fill(aggregates.length) { + new JArrayList[Accumulator]() + } + + // per each aggregator, collect its accumulators to a list while (iterator.hasNext) { val record = iterator.next() aggregates.foreach(_.merge(record, aggregateBuffer)) + for (i <- aggregates.indices) { + accumulatorList .add(record.getField(accumStartPos + i).asInstanceOf [Accumulator] ) End diff – pairwise merging
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103435442

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala —
          @@ -0,0 +1,101 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList => JArrayList, List => JList}

          +import org.apache.flink.api.common.functions.

          {AggregateFunction => ApiAggFunction}

          — End diff –

          Rename `ApiAggFunction` to `DataStreamAggFunc`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103435442 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala — @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList => JArrayList, List => JList} +import org.apache.flink.api.common.functions. {AggregateFunction => ApiAggFunction} — End diff – Rename `ApiAggFunction` to `DataStreamAggFunc`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103435235

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala —
          @@ -0,0 +1,101 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList => JArrayList, List => JList}

          +import org.apache.flink.api.common.functions.

          {AggregateFunction => ApiAggFunction}

          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +
          +/**
          + * Aggregate Function used for the aggregate operator in
          + * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate
          + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
          + * output row => the index of the aggregate) for all the aggregates
          + * @param groupKeysIndex the position (in the input Row) of grouping keys
          + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
          + * output row => the index of grouping key) for all the grouping keys
          + * @param finalRowArity the arity of the final row
          + */
          +class AggregateAggFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFieldsIndex: Array[Int],
          + private val aggregateMapping: Array[(Int, Int)],
          — End diff –

          I think we could simplify the interface a bit:

          • aggregateMapping could become an `aggOutFields: Array[Int]` which contains the position of each aggregate in the output (`aggOutFields[2]` would give the output position of the third agg function).
          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103435235 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala — @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList => JArrayList, List => JList} +import org.apache.flink.api.common.functions. {AggregateFunction => ApiAggFunction} +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row + +/** + * Aggregate Function used for the aggregate operator in + * [ [org.apache.flink.streaming.api.datastream.WindowedStream] ] + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the + * output row => the index of the aggregate) for all the aggregates + * @param groupKeysIndex the position (in the input Row) of grouping keys + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the + * output row => the index of grouping key) for all the grouping keys + * @param finalRowArity the arity of the final row + */ +class AggregateAggFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFieldsIndex: Array [Int] , + private val aggregateMapping: Array [(Int, Int)] , — End diff – I think we could simplify the interface a bit: aggregateMapping could become an `aggOutFields: Array [Int] ` which contains the position of each aggregate in the output (`aggOutFields [2] ` would give the output position of the third agg function).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103475198

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala —
          @@ -91,9 +93,13 @@ class DataSetSessionWindowAggregateReduceGroupFunction(

          var windowStart: java.lang.Long = null
          var windowEnd: java.lang.Long = null

          • var currentRowTime:java.lang.Long = null
            + var currentRowTime: java.lang.Long = null

          val iterator = records.iterator()
          + val accumulatorList = Array.fill(aggregates.length) {
          — End diff –

          Use a single `JArrayList(2)` for pairwise merging and make it a member variable

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103475198 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala — @@ -91,9 +93,13 @@ class DataSetSessionWindowAggregateReduceGroupFunction( var windowStart: java.lang.Long = null var windowEnd: java.lang.Long = null var currentRowTime:java.lang.Long = null + var currentRowTime: java.lang.Long = null val iterator = records.iterator() + val accumulatorList = Array.fill(aggregates.length) { — End diff – Use a single `JArrayList(2)` for pairwise merging and make it a member variable
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103475581

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala —
          @@ -63,27 +65,41 @@ class DataSetTumbleCountWindowAggReduceGroupFunction(

          val iterator = records.iterator()

          + val accumulatorList = Array.fill(aggregates.length)

          { + new JArrayList[Accumulator]() + }

          +
          while (iterator.hasNext) {
          val record = iterator.next()
          +
          if (count == 0) {

          • // initiate intermediate aggregate value.
          • aggregates.foreach(_.initiate(aggregateBuffer))
            + // clear the accumulator list for all aggregate
            + for (i <- aggregates.indices) { + accumulatorList(i).clear() + }

            }

          • // merge intermediate aggregate value to buffer.
          • aggregates.foreach(_.merge(record, aggregateBuffer))

          + // collect the accumulators for each aggregate
          + for (i <- aggregates.indices) {
          + accumulatorList.add(record.getField(accumStartPos + i).asInstanceOf[Accumulator])
          — End diff –

          pairwise merging

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103475581 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala — @@ -63,27 +65,41 @@ class DataSetTumbleCountWindowAggReduceGroupFunction( val iterator = records.iterator() + val accumulatorList = Array.fill(aggregates.length) { + new JArrayList[Accumulator]() + } + while (iterator.hasNext) { val record = iterator.next() + if (count == 0) { // initiate intermediate aggregate value. aggregates.foreach(_.initiate(aggregateBuffer)) + // clear the accumulator list for all aggregate + for (i <- aggregates.indices) { + accumulatorList(i).clear() + } } // merge intermediate aggregate value to buffer. aggregates.foreach(_.merge(record, aggregateBuffer)) + // collect the accumulators for each aggregate + for (i <- aggregates.indices) { + accumulatorList .add(record.getField(accumStartPos + i).asInstanceOf [Accumulator] ) — End diff – pairwise merging
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103445538

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala —
          @@ -0,0 +1,101 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList => JArrayList, List => JList}

          +import org.apache.flink.api.common.functions.

          {AggregateFunction => ApiAggFunction}

          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +
          +/**
          + * Aggregate Function used for the aggregate operator in
          + * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate
          + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
          + * output row => the index of the aggregate) for all the aggregates
          + * @param groupKeysIndex the position (in the input Row) of grouping keys
          + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
          + * output row => the index of grouping key) for all the grouping keys
          + * @param finalRowArity the arity of the final row
          + */
          +class AggregateAggFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFieldsIndex: Array[Int],
          + private val aggregateMapping: Array[(Int, Int)],
          + private val groupKeysIndex: Array[Int],
          + private val groupKeysMapping: Array[(Int, Int)],
          + private val finalRowArity: Int)
          + extends ApiAggFunction[Row, Row, Row] {
          +
          + override def createAccumulator(): Row = {
          + val accumulatorRow: Row = new Row(groupKeysIndex.length + aggregates.length)
          +
          + for (i <- aggregates.indices)

          { + accumulatorRow.setField(groupKeysIndex.length + i, aggregates(i).createAccumulator()) + }

          + accumulatorRow
          + }
          +
          + override def add(value: Row, accumulatorRow: Row) = {
          + for (i <- groupKeysIndex.indices)

          { + accumulatorRow.setField(i, value.getField(groupKeysIndex(i))) + }

          +
          + for (i <- aggregates.indices)

          { + val accumulator = + accumulatorRow.getField(i + groupKeysIndex.length).asInstanceOf[Accumulator] + val v = value.getField(aggFieldsIndex(i)) + aggregates(i).accumulate(accumulator, v) + }

          + }
          +
          + override def getResult(accumulatorRow: Row): Row = {
          + val output = new Row(finalRowArity)
          +
          + groupKeysMapping.foreach {
          — End diff –

          skip grouping keys

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103445538 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala — @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList => JArrayList, List => JList} +import org.apache.flink.api.common.functions. {AggregateFunction => ApiAggFunction} +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row + +/** + * Aggregate Function used for the aggregate operator in + * [ [org.apache.flink.streaming.api.datastream.WindowedStream] ] + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the + * output row => the index of the aggregate) for all the aggregates + * @param groupKeysIndex the position (in the input Row) of grouping keys + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the + * output row => the index of grouping key) for all the grouping keys + * @param finalRowArity the arity of the final row + */ +class AggregateAggFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFieldsIndex: Array [Int] , + private val aggregateMapping: Array [(Int, Int)] , + private val groupKeysIndex: Array [Int] , + private val groupKeysMapping: Array [(Int, Int)] , + private val finalRowArity: Int) + extends ApiAggFunction [Row, Row, Row] { + + override def createAccumulator(): Row = { + val accumulatorRow: Row = new Row(groupKeysIndex.length + aggregates.length) + + for (i <- aggregates.indices) { + accumulatorRow.setField(groupKeysIndex.length + i, aggregates(i).createAccumulator()) + } + accumulatorRow + } + + override def add(value: Row, accumulatorRow: Row) = { + for (i <- groupKeysIndex.indices) { + accumulatorRow.setField(i, value.getField(groupKeysIndex(i))) + } + + for (i <- aggregates.indices) { + val accumulator = + accumulatorRow.getField(i + groupKeysIndex.length).asInstanceOf[Accumulator] + val v = value.getField(aggFieldsIndex(i)) + aggregates(i).accumulate(accumulator, v) + } + } + + override def getResult(accumulatorRow: Row): Row = { + val output = new Row(finalRowArity) + + groupKeysMapping.foreach { — End diff – skip grouping keys
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103474797

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala —
          @@ -79,44 +85,60 @@ class DataSetSessionWindowAggregateCombineGroupFunction(
          // calculate the current window and open a new window.
          if (windowEnd != null) {
          // emit the current window's merged data

          • doCollect(out, windowStart, windowEnd)
            + doCollect(out, accumulatorList, windowStart, windowEnd)
            +
            + // clear the accumulator list for all aggregate
            + for (i <- aggregates.indices) { + accumulatorList(i).clear() + }

            } else

            Unknown macro: { // set group keys to aggregateBuffer. for (i <- groupingKeys.indices) { aggregateBuffer.setField(i, record.getField(i)) } }
          • // initiate intermediate aggregate value.
          • aggregates.foreach(_.initiate(aggregateBuffer))
            windowStart = record.getField(rowTimeFieldPos).asInstanceOf[Long]
            }
          • // merge intermediate aggregate value to the buffered value.
          • aggregates.foreach(_.merge(record, aggregateBuffer))
            + // collect the accumulators for each aggregate
            + for (i <- aggregates.indices) {
              • End diff –

          We cannot collect all accumulator and need to merge pairwise.
          I think it would be good to remove the preparation mapper and use `accumulate()` here but, this would result in even more significant code changes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103474797 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala — @@ -79,44 +85,60 @@ class DataSetSessionWindowAggregateCombineGroupFunction( // calculate the current window and open a new window. if (windowEnd != null) { // emit the current window's merged data doCollect(out, windowStart, windowEnd) + doCollect(out, accumulatorList, windowStart, windowEnd) + + // clear the accumulator list for all aggregate + for (i <- aggregates.indices) { + accumulatorList(i).clear() + } } else Unknown macro: { // set group keys to aggregateBuffer. for (i <- groupingKeys.indices) { aggregateBuffer.setField(i, record.getField(i)) } } // initiate intermediate aggregate value. aggregates.foreach(_.initiate(aggregateBuffer)) windowStart = record.getField(rowTimeFieldPos).asInstanceOf [Long] } // merge intermediate aggregate value to the buffered value. aggregates.foreach(_.merge(record, aggregateBuffer)) + // collect the accumulators for each aggregate + for (i <- aggregates.indices) { End diff – We cannot collect all accumulator and need to merge pairwise. I think it would be good to remove the preparation mapper and use `accumulate()` here but, this would result in even more significant code changes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103477827

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala —
          @@ -55,10 +47,10 @@ class IncrementalAggregateTimeWindowFunction(
          }

          override def apply(

          • key: Tuple,
          • window: TimeWindow,
          • records: Iterable[Row],
          • out: Collector[Row]): Unit = {
            + key: Tuple,
              • End diff –

          If we omit the grouping keys in the `AggregateAggFunction` we can get them from the `Tuple`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103477827 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala — @@ -55,10 +47,10 @@ class IncrementalAggregateTimeWindowFunction( } override def apply( key: Tuple, window: TimeWindow, records: Iterable [Row] , out: Collector [Row] ): Unit = { + key: Tuple, End diff – If we omit the grouping keys in the `AggregateAggFunction` we can get them from the `Tuple`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103434488

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala —
          @@ -0,0 +1,101 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList => JArrayList, List => JList}

          +import org.apache.flink.api.common.functions.

          {AggregateFunction => ApiAggFunction}

          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +
          +/**
          + * Aggregate Function used for the aggregate operator in
          — End diff –

          "Aggregate Function" -> "[[AggregateFunction]]"?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103434488 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala — @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList => JArrayList, List => JList} +import org.apache.flink.api.common.functions. {AggregateFunction => ApiAggFunction} +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row + +/** + * Aggregate Function used for the aggregate operator in — End diff – "Aggregate Function" -> "[ [AggregateFunction] ]"?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103437140

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala —
          @@ -0,0 +1,101 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList => JArrayList, List => JList}

          +import org.apache.flink.api.common.functions.

          {AggregateFunction => ApiAggFunction}

          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +
          +/**
          + * Aggregate Function used for the aggregate operator in
          + * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate
          + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
          + * output row => the index of the aggregate) for all the aggregates
          + * @param groupKeysIndex the position (in the input Row) of grouping keys
          + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
          + * output row => the index of grouping key) for all the grouping keys
          + * @param finalRowArity the arity of the final row
          + */
          +class AggregateAggFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFieldsIndex: Array[Int],
          + private val aggregateMapping: Array[(Int, Int)],
          + private val groupKeysIndex: Array[Int],
          + private val groupKeysMapping: Array[(Int, Int)],
          + private val finalRowArity: Int)
          + extends ApiAggFunction[Row, Row, Row] {
          +
          + override def createAccumulator(): Row = {
          + val accumulatorRow: Row = new Row(groupKeysIndex.length + aggregates.length)
          +
          + for (i <- aggregates.indices)

          { + accumulatorRow.setField(groupKeysIndex.length + i, aggregates(i).createAccumulator()) + }

          + accumulatorRow
          + }
          +
          + override def add(value: Row, accumulatorRow: Row) = {
          + for (i <- groupKeysIndex.indices) {
          — End diff –

          No need to keep the grouping keys in the accumulator. They are available in `WindowFunction.apply()` in the key attribute.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103437140 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala — @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList => JArrayList, List => JList} +import org.apache.flink.api.common.functions. {AggregateFunction => ApiAggFunction} +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row + +/** + * Aggregate Function used for the aggregate operator in + * [ [org.apache.flink.streaming.api.datastream.WindowedStream] ] + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the + * output row => the index of the aggregate) for all the aggregates + * @param groupKeysIndex the position (in the input Row) of grouping keys + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the + * output row => the index of grouping key) for all the grouping keys + * @param finalRowArity the arity of the final row + */ +class AggregateAggFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFieldsIndex: Array [Int] , + private val aggregateMapping: Array [(Int, Int)] , + private val groupKeysIndex: Array [Int] , + private val groupKeysMapping: Array [(Int, Int)] , + private val finalRowArity: Int) + extends ApiAggFunction [Row, Row, Row] { + + override def createAccumulator(): Row = { + val accumulatorRow: Row = new Row(groupKeysIndex.length + aggregates.length) + + for (i <- aggregates.indices) { + accumulatorRow.setField(groupKeysIndex.length + i, aggregates(i).createAccumulator()) + } + accumulatorRow + } + + override def add(value: Row, accumulatorRow: Row) = { + for (i <- groupKeysIndex.indices) { — End diff – No need to keep the grouping keys in the accumulator. They are available in `WindowFunction.apply()` in the key attribute.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103469070

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala —
          @@ -18,32 +18,33 @@
          package org.apache.flink.table.runtime.aggregate

          import java.lang.Iterable
          +import java.util.

          {ArrayList => JArrayList}

          import org.apache.flink.api.common.functions.RichGroupReduceFunction
          import org.apache.flink.configuration.Configuration
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          import org.apache.flink.types.Row
          import org.apache.flink.util.

          {Collector, Preconditions}

          -import scala.collection.JavaConversions._
          -
          /**

          • * It wraps the aggregate logic inside of
          • * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
          • *
          • * @param aggregates The aggregate functions.
          • * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
          • * and output Row.
          • * @param aggregateMapping The index mapping between aggregate function list and aggregated value
          • * index in output Row.
          • * @param groupingSetsMapping The index mapping of keys in grouping sets between intermediate
          • * Row and output Row.
          • */
            + * It wraps the aggregate logic inside of
            + * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
            + *
            + * @param aggregates The aggregate functions.
            + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
            + * and output Row.
            + * @param aggregateMapping The index mapping between aggregate function list and aggregated
            + * value
            + * index in output Row.
            + * @param groupingSetsMapping The index mapping of keys in grouping sets between intermediate
            + * Row and output Row.
            + * @param finalRowArity The arity of the final resulting row
            + */
            class AggregateReduceGroupFunction(
          • private val aggregates: Array[Aggregate[_ <: Any]],
            + private val aggregates: Array[AggregateFunction[_ <: Any]],
            private val groupKeysMapping: Array[(Int, Int)],
              • End diff –

          The group key positions in the input are known, right?
          So we can also use an `Array[Int]` instead?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103469070 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala — @@ -18,32 +18,33 @@ package org.apache.flink.table.runtime.aggregate import java.lang.Iterable +import java.util. {ArrayList => JArrayList} import org.apache.flink.api.common.functions.RichGroupReduceFunction import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} import org.apache.flink.types.Row import org.apache.flink.util. {Collector, Preconditions} -import scala.collection.JavaConversions._ - /** * It wraps the aggregate logic inside of * [ [org.apache.flink.api.java.operators.GroupReduceOperator] ]. * * @param aggregates The aggregate functions. * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row * and output Row. * @param aggregateMapping The index mapping between aggregate function list and aggregated value * index in output Row. * @param groupingSetsMapping The index mapping of keys in grouping sets between intermediate * Row and output Row. */ + * It wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupReduceOperator] ]. + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated + * value + * index in output Row. + * @param groupingSetsMapping The index mapping of keys in grouping sets between intermediate + * Row and output Row. + * @param finalRowArity The arity of the final resulting row + */ class AggregateReduceGroupFunction( private val aggregates: Array[Aggregate [_ <: Any] ], + private val aggregates: Array[AggregateFunction [_ <: Any] ], private val groupKeysMapping: Array [(Int, Int)] , End diff – The group key positions in the input are known, right? So we can also use an `Array [Int] ` instead?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103436838

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala —
          @@ -0,0 +1,101 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList => JArrayList, List => JList}

          +import org.apache.flink.api.common.functions.

          {AggregateFunction => ApiAggFunction}

          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +
          +/**
          + * Aggregate Function used for the aggregate operator in
          + * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate
          + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
          + * output row => the index of the aggregate) for all the aggregates
          + * @param groupKeysIndex the position (in the input Row) of grouping keys
          + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
          + * output row => the index of grouping key) for all the grouping keys
          + * @param finalRowArity the arity of the final row
          + */
          +class AggregateAggFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFieldsIndex: Array[Int],
          + private val aggregateMapping: Array[(Int, Int)],
          + private val groupKeysIndex: Array[Int],
          — End diff –

          I think we do not need to include the `groupKeys` here. They can be added in the `WindowFunction`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103436838 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala — @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList => JArrayList, List => JList} +import org.apache.flink.api.common.functions. {AggregateFunction => ApiAggFunction} +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row + +/** + * Aggregate Function used for the aggregate operator in + * [ [org.apache.flink.streaming.api.datastream.WindowedStream] ] + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the + * output row => the index of the aggregate) for all the aggregates + * @param groupKeysIndex the position (in the input Row) of grouping keys + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the + * output row => the index of grouping key) for all the grouping keys + * @param finalRowArity the arity of the final row + */ +class AggregateAggFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFieldsIndex: Array [Int] , + private val aggregateMapping: Array [(Int, Int)] , + private val groupKeysIndex: Array [Int] , — End diff – I think we do not need to include the `groupKeys` here. They can be added in the `WindowFunction`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on the issue:

          https://github.com/apache/flink/pull/3423

          @fhueske thanks for the review. I completely agree with your suggestion on "reworking the batch design". Actually I have proposed the same idea to "rework the batch" before your review (maybe you have missed my comment on Feb.27). I was hesitating to make the changes, as I want to keep this PR as dedicated as possible. But since the performance of the current design is a concern, let's do the clean up all together within this PR. Regarding to "pairwise merging", I have different opinion, there are many aggregates that merging a list at once is much more efficient than merge just two. If we always use "pairwise merging" in the runtime, we will lose the advantage of merge(List) API defined in AggregateFunction interface. If the memory is a concern, we can limit the size of List for each merge iteration, I will provide the update very soon.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3423 @fhueske thanks for the review. I completely agree with your suggestion on "reworking the batch design". Actually I have proposed the same idea to "rework the batch" before your review (maybe you have missed my comment on Feb.27). I was hesitating to make the changes, as I want to keep this PR as dedicated as possible. But since the performance of the current design is a concern, let's do the clean up all together within this PR. Regarding to "pairwise merging", I have different opinion, there are many aggregates that merging a list at once is much more efficient than merge just two. If we always use "pairwise merging" in the runtime, we will lose the advantage of merge(List) API defined in AggregateFunction interface. If the memory is a concern, we can limit the size of List for each merge iteration, I will provide the update very soon.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3423

          Hi @shaoxuan-wang, sorry. I missed your comment. If you haven't started reworking the batch aggregations yet, I agree to do it later and just change the merging to smaller batches.

          So far, we always put an emphasis on robustness and tried to avoid memory issues as much as possible. In batch, most of the JVM memory is maintained by Flink and not available for regular user-function objects. So I'd suggest to be a bit conservative here and batch 16 rows. We can also run a few benchmarks to check how much the parameter affects the performance.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3423 Hi @shaoxuan-wang, sorry. I missed your comment. If you haven't started reworking the batch aggregations yet, I agree to do it later and just change the merging to smaller batches. So far, we always put an emphasis on robustness and tried to avoid memory issues as much as possible. In batch, most of the JVM memory is maintained by Flink and not available for regular user-function objects. So I'd suggest to be a bit conservative here and batch 16 rows. We can also run a few benchmarks to check how much the parameter affects the performance.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103673820

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala —
          @@ -0,0 +1,101 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList => JArrayList, List => JList}

          +import org.apache.flink.api.common.functions.

          {AggregateFunction => ApiAggFunction}

          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +
          +/**
          + * Aggregate Function used for the aggregate operator in
          + * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate
          + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
          + * output row => the index of the aggregate) for all the aggregates
          + * @param groupKeysIndex the position (in the input Row) of grouping keys
          + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
          + * output row => the index of grouping key) for all the grouping keys
          + * @param finalRowArity the arity of the final row
          + */
          +class AggregateAggFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFieldsIndex: Array[Int],
          + private val aggregateMapping: Array[(Int, Int)],
          — End diff –

          Yes, I think a single Array should be good enough, but I am not quite sure the reason why we have had this initially. I prefer to spend time to investigate it a little before change it. I think we can make this change at the codegen step.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103673820 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala — @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList => JArrayList, List => JList} +import org.apache.flink.api.common.functions. {AggregateFunction => ApiAggFunction} +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row + +/** + * Aggregate Function used for the aggregate operator in + * [ [org.apache.flink.streaming.api.datastream.WindowedStream] ] + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the + * output row => the index of the aggregate) for all the aggregates + * @param groupKeysIndex the position (in the input Row) of grouping keys + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the + * output row => the index of grouping key) for all the grouping keys + * @param finalRowArity the arity of the final row + */ +class AggregateAggFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFieldsIndex: Array [Int] , + private val aggregateMapping: Array [(Int, Int)] , — End diff – Yes, I think a single Array should be good enough, but I am not quite sure the reason why we have had this initially. I prefer to spend time to investigate it a little before change it. I think we can make this change at the codegen step.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103674711

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala —
          @@ -0,0 +1,101 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList => JArrayList, List => JList}

          +import org.apache.flink.api.common.functions.

          {AggregateFunction => ApiAggFunction}

          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +
          +/**
          + * Aggregate Function used for the aggregate operator in
          + * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate
          + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
          + * output row => the index of the aggregate) for all the aggregates
          + * @param groupKeysIndex the position (in the input Row) of grouping keys
          + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
          + * output row => the index of grouping key) for all the grouping keys
          + * @param finalRowArity the arity of the final row
          + */
          +class AggregateAggFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFieldsIndex: Array[Int],
          + private val aggregateMapping: Array[(Int, Int)],
          + private val groupKeysIndex: Array[Int],
          — End diff –

          Yes, I agree with you we can move the groupkeys and agg/group mappings to WindowFunction. Since this PR is dedicated to change aggregate API and functions, and we want to merge this asap such that it won't block other aggregate related PR to be merged. Can we refactor this later?

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103674711 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala — @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList => JArrayList, List => JList} +import org.apache.flink.api.common.functions. {AggregateFunction => ApiAggFunction} +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row + +/** + * Aggregate Function used for the aggregate operator in + * [ [org.apache.flink.streaming.api.datastream.WindowedStream] ] + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the + * output row => the index of the aggregate) for all the aggregates + * @param groupKeysIndex the position (in the input Row) of grouping keys + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the + * output row => the index of grouping key) for all the grouping keys + * @param finalRowArity the arity of the final row + */ +class AggregateAggFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFieldsIndex: Array [Int] , + private val aggregateMapping: Array [(Int, Int)] , + private val groupKeysIndex: Array [Int] , — End diff – Yes, I agree with you we can move the groupkeys and agg/group mappings to WindowFunction. Since this PR is dedicated to change aggregate API and functions, and we want to merge this asap such that it won't block other aggregate related PR to be merged. Can we refactor this later?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103674994

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala —
          @@ -0,0 +1,101 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList => JArrayList, List => JList}

          +import org.apache.flink.api.common.functions.

          {AggregateFunction => ApiAggFunction}

          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +
          +/**
          + * Aggregate Function used for the aggregate operator in
          + * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate
          + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
          + * output row => the index of the aggregate) for all the aggregates
          + * @param groupKeysIndex the position (in the input Row) of grouping keys
          + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
          + * output row => the index of grouping key) for all the grouping keys
          + * @param finalRowArity the arity of the final row
          + */
          +class AggregateAggFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFieldsIndex: Array[Int],
          + private val aggregateMapping: Array[(Int, Int)],
          + private val groupKeysIndex: Array[Int],
          + private val groupKeysMapping: Array[(Int, Int)],
          + private val finalRowArity: Int)
          + extends ApiAggFunction[Row, Row, Row] {
          +
          + override def createAccumulator(): Row = {
          + val accumulatorRow: Row = new Row(groupKeysIndex.length + aggregates.length)
          +
          + for (i <- aggregates.indices)

          { + accumulatorRow.setField(groupKeysIndex.length + i, aggregates(i).createAccumulator()) + }

          + accumulatorRow
          + }
          +
          + override def add(value: Row, accumulatorRow: Row) = {
          + for (i <- groupKeysIndex.indices)

          { + accumulatorRow.setField(i, value.getField(groupKeysIndex(i))) + }

          +
          + for (i <- aggregates.indices)

          { + val accumulator = + accumulatorRow.getField(i + groupKeysIndex.length).asInstanceOf[Accumulator] + val v = value.getField(aggFieldsIndex(i)) + aggregates(i).accumulate(accumulator, v) + }

          + }
          +
          + override def getResult(accumulatorRow: Row): Row = {
          + val output = new Row(finalRowArity)
          — End diff –

          It will be exact the same as aggregates.length when we remove groupkeys from accumulator state. let us refactor this later

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103674994 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala — @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList => JArrayList, List => JList} +import org.apache.flink.api.common.functions. {AggregateFunction => ApiAggFunction} +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row + +/** + * Aggregate Function used for the aggregate operator in + * [ [org.apache.flink.streaming.api.datastream.WindowedStream] ] + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the + * output row => the index of the aggregate) for all the aggregates + * @param groupKeysIndex the position (in the input Row) of grouping keys + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the + * output row => the index of grouping key) for all the grouping keys + * @param finalRowArity the arity of the final row + */ +class AggregateAggFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFieldsIndex: Array [Int] , + private val aggregateMapping: Array [(Int, Int)] , + private val groupKeysIndex: Array [Int] , + private val groupKeysMapping: Array [(Int, Int)] , + private val finalRowArity: Int) + extends ApiAggFunction [Row, Row, Row] { + + override def createAccumulator(): Row = { + val accumulatorRow: Row = new Row(groupKeysIndex.length + aggregates.length) + + for (i <- aggregates.indices) { + accumulatorRow.setField(groupKeysIndex.length + i, aggregates(i).createAccumulator()) + } + accumulatorRow + } + + override def add(value: Row, accumulatorRow: Row) = { + for (i <- groupKeysIndex.indices) { + accumulatorRow.setField(i, value.getField(groupKeysIndex(i))) + } + + for (i <- aggregates.indices) { + val accumulator = + accumulatorRow.getField(i + groupKeysIndex.length).asInstanceOf[Accumulator] + val v = value.getField(aggFieldsIndex(i)) + aggregates(i).accumulate(accumulator, v) + } + } + + override def getResult(accumulatorRow: Row): Row = { + val output = new Row(finalRowArity) — End diff – It will be exact the same as aggregates.length when we remove groupkeys from accumulator state. let us refactor this later
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on the issue:

          https://github.com/apache/flink/pull/3423

          Thanks @fhueske , I updated the PR which has limited the batch merge to 16 rows, please take a look. This PR is primarily dedicated to change the dataStream and DataSet to use new aggregate Interface. I noted down all your other value suggestions, I will definitely address them in the next PRs for UDAGG.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3423 Thanks @fhueske , I updated the PR which has limited the batch merge to 16 rows, please take a look. This PR is primarily dedicated to change the dataStream and DataSet to use new aggregate Interface. I noted down all your other value suggestions, I will definitely address them in the next PRs for UDAGG.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103730749

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala —
          @@ -76,8 +75,10 @@ class DataSetSessionWindowAggregateCombineGroupFunction(
          new JArrayList[Accumulator]()
          — End diff –

          Move out of `combine()`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103730749 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala — @@ -76,8 +75,10 @@ class DataSetSessionWindowAggregateCombineGroupFunction( new JArrayList [Accumulator] () — End diff – Move out of `combine()`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103734677

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala —
          @@ -99,8 +106,8 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
          // get final aggregate value and set to output.
          aggregateMapping.foreach {
          case (after, previous) => {

          • val accumulator =
          • aggregateBuffer.getField(accumStartPos + previous).asInstanceOf[Accumulator]
            + val agg = aggregates(previous)
            + val accumulator = agg.merge(accumulatorList(previous))
            val result = aggregates(previous).getValue(accumulator)
              • End diff –

          `aggregates(previous)` -> `agg`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103734677 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala — @@ -99,8 +106,8 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction( // get final aggregate value and set to output. aggregateMapping.foreach { case (after, previous) => { val accumulator = aggregateBuffer.getField(accumStartPos + previous).asInstanceOf [Accumulator] + val agg = aggregates(previous) + val accumulator = agg.merge(accumulatorList(previous)) val result = aggregates(previous).getValue(accumulator) End diff – `aggregates(previous)` -> `agg`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103732265

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala —
          @@ -100,8 +99,10 @@ class DataSetSessionWindowAggregateReduceGroupFunction(
          new JArrayList[Accumulator]()
          — End diff –

          Move out of `reduce()`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103732265 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala — @@ -100,8 +99,10 @@ class DataSetSessionWindowAggregateReduceGroupFunction( new JArrayList [Accumulator] () — End diff – Move out of `reduce()`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103733313

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala —
          @@ -85,6 +86,16 @@ class DataSetTumbleCountWindowAggReduceGroupFunction(
          }
          count += 1

          + // for every maxMergeLen accumulators, we merge them into one
          + if (count % maxMergeLen == 0) {
          + for (i <- aggregates.indices) {
          + val agg = aggregates
          + val accumulator = agg.merge(accumulatorList)
          — End diff –

          make `accumulatorList` a member of the function and move the initialization out of `reduce()`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103733313 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala — @@ -85,6 +86,16 @@ class DataSetTumbleCountWindowAggReduceGroupFunction( } count += 1 + // for every maxMergeLen accumulators, we merge them into one + if (count % maxMergeLen == 0) { + for (i <- aggregates.indices) { + val agg = aggregates + val accumulator = agg.merge(accumulatorList ) — End diff – make `accumulatorList` a member of the function and move the initialization out of `reduce()`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103733986

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala —
          @@ -74,22 +75,28 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
          new JArrayList[Accumulator]()
          }

          • // per each aggregator, collect its accumulators to a list
            + var count:Int = 0
              • End diff –

          space

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103733986 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala — @@ -74,22 +75,28 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction( new JArrayList [Accumulator] () } // per each aggregator, collect its accumulators to a list + var count:Int = 0 End diff – space
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103728434

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala —
          @@ -69,11 +69,25 @@ class AggregateReduceCombineFunction(
          new JArrayList[Accumulator]()
          }

          + var count:Int = 0
          — End diff –

          space after `:`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103728434 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala — @@ -69,11 +69,25 @@ class AggregateReduceCombineFunction( new JArrayList [Accumulator] () } + var count:Int = 0 — End diff – space after `:`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103733963

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala —
          @@ -74,22 +75,28 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
          new JArrayList[Accumulator]()
          — End diff –

          move out of `reduce()`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103733963 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala — @@ -74,22 +75,28 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction( new JArrayList [Accumulator] () — End diff – move out of `reduce()`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103733512

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala —
          @@ -73,12 +73,25 @@ class DataSetTumbleTimeWindowAggReduceCombineFunction(
          new JArrayList[Accumulator]()
          — End diff –

          move out of `combine()`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103733512 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala — @@ -73,12 +73,25 @@ class DataSetTumbleTimeWindowAggReduceCombineFunction( new JArrayList [Accumulator] () — End diff – move out of `combine()`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103729009

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala —
          @@ -80,11 +81,25 @@ class AggregateReduceGroupFunction(
          new JArrayList[Accumulator]()
          }

          + var count:Int = 0
          — End diff –

          space

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103729009 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala — @@ -80,11 +81,25 @@ class AggregateReduceGroupFunction( new JArrayList [Accumulator] () } + var count:Int = 0 — End diff – space
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103729741

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala —
          @@ -95,11 +110,14 @@ class AggregateReduceGroupFunction(
          output.setField(after, last.getField(previous))
          }

          • // get the final aggregate value and set it to output.
            + // get final aggregate value and set to output.
            aggregateMapping.foreach {
          • case (after, previous) =>
            + case (after, previous) => {
            val agg = aggregates(previous)
          • output.setField(after, agg.getValue(agg.merge(accumulatorList(previous))))
            + val accumulator = agg.merge(accumulatorList(previous))
            + val result = aggregates(previous).getValue(accumulator)
              • End diff –

          `aggregates(previous)` -> `agg`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103729741 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala — @@ -95,11 +110,14 @@ class AggregateReduceGroupFunction( output.setField(after, last.getField(previous)) } // get the final aggregate value and set it to output. + // get final aggregate value and set to output. aggregateMapping.foreach { case (after, previous) => + case (after, previous) => { val agg = aggregates(previous) output.setField(after, agg.getValue(agg.merge(accumulatorList(previous)))) + val accumulator = agg.merge(accumulatorList(previous)) + val result = aggregates(previous).getValue(accumulator) End diff – `aggregates(previous)` -> `agg`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103732317

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala —
          @@ -100,8 +99,10 @@ class DataSetSessionWindowAggregateReduceGroupFunction(
          new JArrayList[Accumulator]()
          }

          + var count:Int = 0
          — End diff –

          space

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103732317 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala — @@ -100,8 +99,10 @@ class DataSetSessionWindowAggregateReduceGroupFunction( new JArrayList [Accumulator] () } + var count:Int = 0 — End diff – space
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103728943

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala —
          @@ -80,11 +81,25 @@ class AggregateReduceGroupFunction(
          new JArrayList[Accumulator]()
          — End diff –

          Move out of the `reduce` function

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103728943 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala — @@ -80,11 +81,25 @@ class AggregateReduceGroupFunction( new JArrayList [Accumulator] () — End diff – Move out of the `reduce` function
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103733543

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala —
          @@ -73,12 +73,25 @@ class DataSetTumbleTimeWindowAggReduceCombineFunction(
          new JArrayList[Accumulator]()
          }

          • // per each aggregator, collect its accumulators to a list
            + var count:Int = 0
              • End diff –

          space

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103733543 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala — @@ -73,12 +73,25 @@ class DataSetTumbleTimeWindowAggReduceCombineFunction( new JArrayList [Accumulator] () } // per each aggregator, collect its accumulators to a list + var count:Int = 0 End diff – space
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103727403

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala —
          @@ -69,11 +69,25 @@ class AggregateReduceCombineFunction(
          new JArrayList[Accumulator]()
          — End diff –

          Move the array and the ArrayLists out of the `combine()` method and clean it at the beginning of `combine()` instead of creating new objects.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103727403 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala — @@ -69,11 +69,25 @@ class AggregateReduceCombineFunction( new JArrayList [Accumulator] () — End diff – Move the array and the ArrayLists out of the `combine()` method and clean it at the beginning of `combine()` instead of creating new objects.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103730884

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala —
          @@ -76,8 +75,10 @@ class DataSetSessionWindowAggregateCombineGroupFunction(
          new JArrayList[Accumulator]()
          }

          + var count:Int = 0
          — End diff –

          space

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103730884 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala — @@ -76,8 +75,10 @@ class DataSetSessionWindowAggregateCombineGroupFunction( new JArrayList [Accumulator] () } + var count:Int = 0 — End diff – space
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103732627

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala —
          @@ -100,8 +99,10 @@ class DataSetSessionWindowAggregateReduceGroupFunction(
          new JArrayList[Accumulator]()
          }

          + var count:Int = 0
          while (iterator.hasNext) {
          val record = iterator.next()
          + count += 1
          — End diff –

          Set `count = 0` after clearing the accumulator list in line 119

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103732627 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala — @@ -100,8 +99,10 @@ class DataSetSessionWindowAggregateReduceGroupFunction( new JArrayList [Accumulator] () } + var count:Int = 0 while (iterator.hasNext) { val record = iterator.next() + count += 1 — End diff – Set `count = 0` after clearing the accumulator list in line 119
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103806213

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -737,101 +632,121 @@ object AggregateUtil {
          aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) {
          sqlTypeName match

          { case TINYINT => - new ByteMinAggregate + new ByteMinAggFunction case SMALLINT => - new ShortMinAggregate + new ShortMinAggFunction case INTEGER => - new IntMinAggregate + new IntMinAggFunction case BIGINT => - new LongMinAggregate + new LongMinAggFunction case FLOAT => - new FloatMinAggregate + new FloatMinAggFunction case DOUBLE => - new DoubleMinAggregate + new DoubleMinAggFunction case DECIMAL => - new DecimalMinAggregate + new DecimalMinAggFunction case BOOLEAN => - new BooleanMinAggregate + new BooleanMinAggFunction case sqlType: SqlTypeName => throw new TableException("Min aggregate does no support type:" + sqlType) }

          } else {
          sqlTypeName match

          { case TINYINT => - new ByteMaxAggregate + new ByteMaxAggFunction case SMALLINT => - new ShortMaxAggregate + new ShortMaxAggFunction case INTEGER => - new IntMaxAggregate + new IntMaxAggFunction case BIGINT => - new LongMaxAggregate + new LongMaxAggFunction case FLOAT => - new FloatMaxAggregate + new FloatMaxAggFunction case DOUBLE => - new DoubleMaxAggregate + new DoubleMaxAggFunction case DECIMAL => - new DecimalMaxAggregate + new DecimalMaxAggFunction case BOOLEAN => - new BooleanMaxAggregate + new BooleanMaxAggFunction case sqlType: SqlTypeName => throw new TableException("Max aggregate does no support type:" + sqlType) }

          }
          }
          case _: SqlCountAggFunction =>

          • aggregates(index) = new CountAggregate
            + aggregates(index) = new CountAggFunction
            case unSupported: SqlAggFunction =>
            throw new TableException("unsupported Function: " + unSupported.getName)
            }
          • setAggregateDataOffset(index)
          • }
            -
          • // set the aggregate intermediate data start index in Row, and update current value.
          • def setAggregateDataOffset(index: Int): Unit = { - aggregates(index).setAggOffsetInRow(aggOffset) - aggOffset += aggregates(index).intermediateDataType.length }

          (aggFieldIndexes, aggregates)
          }

          • private def createAggregateBufferDataType(
          • groupings: Array[Int],
          • aggregates: Array[Aggregate[_]],
          • inputType: RelDataType,
          • windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {
            + private def createDataSetAggregateBufferDataType(
            + groupings: Array[Int],
            + aggregates: Array[TableAggregateFunction[_]],
            + inputType: RelDataType,
            + windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {

          // get the field data types of group keys.

          • val groupingTypes: Seq[TypeInformation[_]] = groupings
          • .map(inputType.getFieldList.get(_).getType)
          • .map(FlinkTypeFactory.toTypeInfo)
            + val groupingTypes: Seq[TypeInformation[_]] =
            + groupings
            + .map(inputType.getFieldList.get(_).getType)
            + .map(FlinkTypeFactory.toTypeInfo)

          // get all field data types of all intermediate aggregates

          • val aggTypes: Seq[TypeInformation[_]] = aggregates.flatMap(_.intermediateDataType)
            + val aggTypes: Seq[TypeInformation[_]] = aggregates.map { agg =>
            + val clazz: Class[_] = agg.getClass
              • End diff –

          We need to obtain the `TypeInformation` of the `Accumulator` here, not the type of the `AggregateFunction`.
          We might need to add a `getAccumulatorType()` method to the `AggregateFunction` if we cannot extract the type from the object returned by `AggregateFunction.createAccumulator()`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103806213 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -737,101 +632,121 @@ object AggregateUtil { aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) { sqlTypeName match { case TINYINT => - new ByteMinAggregate + new ByteMinAggFunction case SMALLINT => - new ShortMinAggregate + new ShortMinAggFunction case INTEGER => - new IntMinAggregate + new IntMinAggFunction case BIGINT => - new LongMinAggregate + new LongMinAggFunction case FLOAT => - new FloatMinAggregate + new FloatMinAggFunction case DOUBLE => - new DoubleMinAggregate + new DoubleMinAggFunction case DECIMAL => - new DecimalMinAggregate + new DecimalMinAggFunction case BOOLEAN => - new BooleanMinAggregate + new BooleanMinAggFunction case sqlType: SqlTypeName => throw new TableException("Min aggregate does no support type:" + sqlType) } } else { sqlTypeName match { case TINYINT => - new ByteMaxAggregate + new ByteMaxAggFunction case SMALLINT => - new ShortMaxAggregate + new ShortMaxAggFunction case INTEGER => - new IntMaxAggregate + new IntMaxAggFunction case BIGINT => - new LongMaxAggregate + new LongMaxAggFunction case FLOAT => - new FloatMaxAggregate + new FloatMaxAggFunction case DOUBLE => - new DoubleMaxAggregate + new DoubleMaxAggFunction case DECIMAL => - new DecimalMaxAggregate + new DecimalMaxAggFunction case BOOLEAN => - new BooleanMaxAggregate + new BooleanMaxAggFunction case sqlType: SqlTypeName => throw new TableException("Max aggregate does no support type:" + sqlType) } } } case _: SqlCountAggFunction => aggregates(index) = new CountAggregate + aggregates(index) = new CountAggFunction case unSupported: SqlAggFunction => throw new TableException("unsupported Function: " + unSupported.getName) } setAggregateDataOffset(index) } - // set the aggregate intermediate data start index in Row, and update current value. def setAggregateDataOffset(index: Int): Unit = { - aggregates(index).setAggOffsetInRow(aggOffset) - aggOffset += aggregates(index).intermediateDataType.length } (aggFieldIndexes, aggregates) } private def createAggregateBufferDataType( groupings: Array [Int] , aggregates: Array[Aggregate [_] ], inputType: RelDataType, windowKeyTypes: Option[Array[TypeInformation [_] ]] = None): RowTypeInfo = { + private def createDataSetAggregateBufferDataType( + groupings: Array [Int] , + aggregates: Array[TableAggregateFunction [_] ], + inputType: RelDataType, + windowKeyTypes: Option[Array[TypeInformation [_] ]] = None): RowTypeInfo = { // get the field data types of group keys. val groupingTypes: Seq[TypeInformation [_] ] = groupings .map(inputType.getFieldList.get(_).getType) .map(FlinkTypeFactory.toTypeInfo) + val groupingTypes: Seq[TypeInformation [_] ] = + groupings + .map(inputType.getFieldList.get(_).getType) + .map(FlinkTypeFactory.toTypeInfo) // get all field data types of all intermediate aggregates val aggTypes: Seq[TypeInformation [_] ] = aggregates.flatMap(_.intermediateDataType) + val aggTypes: Seq[TypeInformation [_] ] = aggregates.map { agg => + val clazz: Class [_] = agg.getClass End diff – We need to obtain the `TypeInformation` of the `Accumulator` here, not the type of the `AggregateFunction`. We might need to add a `getAccumulatorType()` method to the `AggregateFunction` if we cannot extract the type from the object returned by `AggregateFunction.createAccumulator()`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103801735

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -363,199 +342,112 @@ object AggregateUtil

          { groupingOffsetMapping, aggOffsetMapping, groupingSetsMapping, - intermediateRowArity, outputType.getFieldCount) }

          groupReduceFunction
          }

          /**

          • * Create a [[org.apache.flink.api.common.functions.ReduceFunction]] for incremental window
          • * aggregation.
          • *
            + * Create an [[AllWindowFunction]] for non-partitioned window aggregates.
            */
          • private[flink] def createIncrementalAggregateReduceFunction(
          • namedAggregates: Seq[CalcitePair[AggregateCall, String]],
          • inputType: RelDataType,
          • outputType: RelDataType,
          • groupings: Array[Int])
          • : IncrementalAggregateReduceFunction = { - - val aggregates = transformToAggregateFunctions( - namedAggregates.map(_.getKey),inputType,groupings.length)._2 - - val groupingOffsetMapping = - getGroupingOffsetAndAggOffsetMapping( - namedAggregates, - inputType, - outputType, - groupings)._1 - - val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum - val reduceFunction = new IncrementalAggregateReduceFunction( - aggregates, - groupingOffsetMapping, - intermediateRowArity) - reduceFunction - }

            -

          • /**
          • * Create an [[AllWindowFunction]] to compute non-partitioned group window aggregates.
          • */
          • private[flink] def createAllWindowAggregationFunction(
            + private[flink] def createAggregationAllWindowFunction(
            window: LogicalWindow,
          • namedAggregates: Seq[CalcitePair[AggregateCall, String]],
          • inputType: RelDataType,
          • outputType: RelDataType,
          • groupings: Array[Int],
          • properties: Seq[NamedWindowProperty])
          • : AllWindowFunction[Row, Row, DataStreamWindow] = {
            -
          • val aggFunction =
          • createAggregateGroupReduceFunction(
          • namedAggregates,
          • inputType,
          • outputType,
          • groupings,
          • inGroupingSet = false)
            + finalRowArity: Int,
            + properties: Seq[NamedWindowProperty]): AllWindowFunction[Row, Row, DataStreamWindow] = {

          if (isTimeWindow(window))

          { val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) - .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] + new IncrementalAggregateAllTimeWindowFunction( + startPos, + endPos, + finalRowArity) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] }

          else

          { - new AggregateAllWindowFunction(aggFunction) + new IncrementalAggregateAllWindowFunction( + finalRowArity) }

          }

          /**

          • * Create a [[WindowFunction]] to compute partitioned group window aggregates.
          • *
            + * Create a [[WindowFunction]] for group window aggregates.
            */
          • private[flink] def createWindowAggregationFunction(
            + private[flink] def createAggregationGroupWindowFunction(
            window: LogicalWindow,
          • namedAggregates: Seq[CalcitePair[AggregateCall, String]],
          • inputType: RelDataType,
          • outputType: RelDataType,
          • groupings: Array[Int],
          • properties: Seq[NamedWindowProperty])
          • : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
            -
          • val aggFunction =
          • createAggregateGroupReduceFunction(
          • namedAggregates,
          • inputType,
          • outputType,
          • groupings,
          • inGroupingSet = false)
            + finalRowArity: Int,
            + properties: Seq[NamedWindowProperty]): WindowFunction[Row, Row, Tuple, DataStreamWindow] = {

          if (isTimeWindow(window))

          { val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new AggregateTimeWindowFunction(aggFunction, startPos, endPos) - .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] + new IncrementalAggregateTimeWindowFunction( + startPos, + endPos, + finalRowArity) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] }

          else

          { - new AggregateWindowFunction(aggFunction) + new IncrementalAggregateWindowFunction( + finalRowArity) }

          }

          • /**
          • * Create an [[AllWindowFunction]] to finalize incrementally pre-computed non-partitioned
          • * window aggregates.
          • */
          • private[flink] def createAllWindowIncrementalAggregationFunction(
          • window: LogicalWindow,
            + private[flink] def createDataStreamAggregateFunction(
            namedAggregates: Seq[CalcitePair[AggregateCall, String]],
            inputType: RelDataType,
            outputType: RelDataType,
          • groupings: Array[Int],
          • properties: Seq[NamedWindowProperty])
          • : AllWindowFunction[Row, Row, DataStreamWindow] = {
            + groupKeysIndex: Array[Int]): (ApiAggregateFunction[Row, Row, Row], RowTypeInfo) = {
          • val aggregates = transformToAggregateFunctions(
          • namedAggregates.map(_.getKey),inputType,groupings.length)._2
            + val (aggFields, aggregates) =
            + transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupKeysIndex.length)
          • val (groupingOffsetMapping, aggOffsetMapping) =
          • getGroupingOffsetAndAggOffsetMapping(
          • namedAggregates,
          • inputType,
          • outputType,
          • groupings)
            + val groupKeysMapping = getGroupKeysMapping(inputType, outputType, groupKeysIndex)
          • val finalRowArity = outputType.getFieldCount
            + val aggregateMapping = getAggregateMapping(namedAggregates, outputType)
          • if (isTimeWindow(window)) { - val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new IncrementalAggregateAllTimeWindowFunction( - aggregates, - groupingOffsetMapping, - aggOffsetMapping, - finalRowArity, - startPos, - endPos) - .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] - }

            else {

          • new IncrementalAggregateAllWindowFunction(
          • aggregates,
          • groupingOffsetMapping,
          • aggOffsetMapping,
          • finalRowArity)
            + if (groupKeysMapping.length != groupKeysIndex.length ||
            + aggregateMapping.length != namedAggregates.length) { + throw new TableException( + "Could not find output field in input data type or aggregate functions.") }

            +
            + val accumulatorRowType = createAccumulatorRowType(inputType, groupKeysIndex, aggregates)
            + val aggFunction = new AggregateAggFunction(
            + aggregates,
            + aggFields,
            + aggregateMapping,
            + groupKeysIndex,
            + groupKeysMapping,
            + outputType.getFieldCount)
            +
            + (aggFunction, accumulatorRowType)
            }

          /**

          • * Create a [[WindowFunction]] to finalize incrementally pre-computed window aggregates.
            + * Return true if all aggregates can be partially merged. False otherwise.
            */
          • private[flink] def createWindowIncrementalAggregationFunction(
          • window: LogicalWindow,
          • namedAggregates: Seq[CalcitePair[AggregateCall, String]],
            + private[flink] def doAllSupportPartialMerge(
            + aggregateCalls: Seq[AggregateCall],
            inputType: RelDataType,
          • outputType: RelDataType,
          • groupings: Array[Int],
          • properties: Seq[NamedWindowProperty])
          • : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
            + groupKeysCount: Int): Boolean = {
          • val aggregates = transformToAggregateFunctions(
          • namedAggregates.map(_.getKey),inputType,groupings.length)._2
            -
          • val (groupingOffsetMapping, aggOffsetMapping) =
          • getGroupingOffsetAndAggOffsetMapping(
          • namedAggregates,
          • inputType,
          • outputType,
          • groupings)
            -
          • val finalRowArity = outputType.getFieldCount
            + val aggregateList = transformToAggregateFunctions(
            + aggregateCalls,
            + inputType,
            + groupKeysCount)._2
          • if (isTimeWindow(window)) { - val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new IncrementalAggregateTimeWindowFunction( - aggregates, - groupingOffsetMapping, - aggOffsetMapping, - finalRowArity, - startPos, - endPos) - .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] - }

            else

            { - new IncrementalAggregateWindowFunction( - aggregates, - groupingOffsetMapping, - aggOffsetMapping, - finalRowArity) - }

            + doAllSupportPartialMerge(aggregateList)
            }

          /**

          • * Return true if all aggregates can be partially computed. False otherwise.
            + * Return true if all aggregates can be partially merged. False otherwise.
            */
          • private[flink] def doAllSupportPartialAggregation(
          • aggregateCalls: Seq[AggregateCall],
          • inputType: RelDataType,
          • groupKeysCount: Int): Boolean = {
          • transformToAggregateFunctions(
          • aggregateCalls,
          • inputType,
          • groupKeysCount).2.forall(.supportPartial)
            + private[flink] def doAllSupportPartialMerge(
            + aggregateList: Array[TableAggregateFunction[_ <: Any]]): Boolean = {
            + var ret: Boolean = true
              • End diff –

          can be simplified to
          ```
          aggregateList.forall(ifMethodExitInFunction("merge", _))
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103801735 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -363,199 +342,112 @@ object AggregateUtil { groupingOffsetMapping, aggOffsetMapping, groupingSetsMapping, - intermediateRowArity, outputType.getFieldCount) } groupReduceFunction } /** * Create a [ [org.apache.flink.api.common.functions.ReduceFunction] ] for incremental window * aggregation. * + * Create an [ [AllWindowFunction] ] for non-partitioned window aggregates. */ private [flink] def createIncrementalAggregateReduceFunction( namedAggregates: Seq[CalcitePair [AggregateCall, String] ], inputType: RelDataType, outputType: RelDataType, groupings: Array [Int] ) : IncrementalAggregateReduceFunction = { - - val aggregates = transformToAggregateFunctions( - namedAggregates.map(_.getKey),inputType,groupings.length)._2 - - val groupingOffsetMapping = - getGroupingOffsetAndAggOffsetMapping( - namedAggregates, - inputType, - outputType, - groupings)._1 - - val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum - val reduceFunction = new IncrementalAggregateReduceFunction( - aggregates, - groupingOffsetMapping, - intermediateRowArity) - reduceFunction - } - /** * Create an [ [AllWindowFunction] ] to compute non-partitioned group window aggregates. */ private [flink] def createAllWindowAggregationFunction( + private [flink] def createAggregationAllWindowFunction( window: LogicalWindow, namedAggregates: Seq[CalcitePair [AggregateCall, String] ], inputType: RelDataType, outputType: RelDataType, groupings: Array [Int] , properties: Seq [NamedWindowProperty] ) : AllWindowFunction [Row, Row, DataStreamWindow] = { - val aggFunction = createAggregateGroupReduceFunction( namedAggregates, inputType, outputType, groupings, inGroupingSet = false) + finalRowArity: Int, + properties: Seq [NamedWindowProperty] ): AllWindowFunction [Row, Row, DataStreamWindow] = { if (isTimeWindow(window)) { val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) - .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] + new IncrementalAggregateAllTimeWindowFunction( + startPos, + endPos, + finalRowArity) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] } else { - new AggregateAllWindowFunction(aggFunction) + new IncrementalAggregateAllWindowFunction( + finalRowArity) } } /** * Create a [ [WindowFunction] ] to compute partitioned group window aggregates. * + * Create a [ [WindowFunction] ] for group window aggregates. */ private [flink] def createWindowAggregationFunction( + private [flink] def createAggregationGroupWindowFunction( window: LogicalWindow, namedAggregates: Seq[CalcitePair [AggregateCall, String] ], inputType: RelDataType, outputType: RelDataType, groupings: Array [Int] , properties: Seq [NamedWindowProperty] ) : WindowFunction [Row, Row, Tuple, DataStreamWindow] = { - val aggFunction = createAggregateGroupReduceFunction( namedAggregates, inputType, outputType, groupings, inGroupingSet = false) + finalRowArity: Int, + properties: Seq [NamedWindowProperty] ): WindowFunction [Row, Row, Tuple, DataStreamWindow] = { if (isTimeWindow(window)) { val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new AggregateTimeWindowFunction(aggFunction, startPos, endPos) - .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] + new IncrementalAggregateTimeWindowFunction( + startPos, + endPos, + finalRowArity) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] } else { - new AggregateWindowFunction(aggFunction) + new IncrementalAggregateWindowFunction( + finalRowArity) } } /** * Create an [ [AllWindowFunction] ] to finalize incrementally pre-computed non-partitioned * window aggregates. */ private [flink] def createAllWindowIncrementalAggregationFunction( window: LogicalWindow, + private [flink] def createDataStreamAggregateFunction( namedAggregates: Seq[CalcitePair [AggregateCall, String] ], inputType: RelDataType, outputType: RelDataType, groupings: Array [Int] , properties: Seq [NamedWindowProperty] ) : AllWindowFunction [Row, Row, DataStreamWindow] = { + groupKeysIndex: Array [Int] ): (ApiAggregateFunction [Row, Row, Row] , RowTypeInfo) = { val aggregates = transformToAggregateFunctions( namedAggregates.map(_.getKey),inputType,groupings.length)._2 + val (aggFields, aggregates) = + transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupKeysIndex.length) val (groupingOffsetMapping, aggOffsetMapping) = getGroupingOffsetAndAggOffsetMapping( namedAggregates, inputType, outputType, groupings) + val groupKeysMapping = getGroupKeysMapping(inputType, outputType, groupKeysIndex) val finalRowArity = outputType.getFieldCount + val aggregateMapping = getAggregateMapping(namedAggregates, outputType) if (isTimeWindow(window)) { - val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new IncrementalAggregateAllTimeWindowFunction( - aggregates, - groupingOffsetMapping, - aggOffsetMapping, - finalRowArity, - startPos, - endPos) - .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] - } else { new IncrementalAggregateAllWindowFunction( aggregates, groupingOffsetMapping, aggOffsetMapping, finalRowArity) + if (groupKeysMapping.length != groupKeysIndex.length || + aggregateMapping.length != namedAggregates.length) { + throw new TableException( + "Could not find output field in input data type or aggregate functions.") } + + val accumulatorRowType = createAccumulatorRowType(inputType, groupKeysIndex, aggregates) + val aggFunction = new AggregateAggFunction( + aggregates, + aggFields, + aggregateMapping, + groupKeysIndex, + groupKeysMapping, + outputType.getFieldCount) + + (aggFunction, accumulatorRowType) } /** * Create a [ [WindowFunction] ] to finalize incrementally pre-computed window aggregates. + * Return true if all aggregates can be partially merged. False otherwise. */ private [flink] def createWindowIncrementalAggregationFunction( window: LogicalWindow, namedAggregates: Seq[CalcitePair [AggregateCall, String] ], + private [flink] def doAllSupportPartialMerge( + aggregateCalls: Seq [AggregateCall] , inputType: RelDataType, outputType: RelDataType, groupings: Array [Int] , properties: Seq [NamedWindowProperty] ) : WindowFunction [Row, Row, Tuple, DataStreamWindow] = { + groupKeysCount: Int): Boolean = { val aggregates = transformToAggregateFunctions( namedAggregates.map(_.getKey),inputType,groupings.length)._2 - val (groupingOffsetMapping, aggOffsetMapping) = getGroupingOffsetAndAggOffsetMapping( namedAggregates, inputType, outputType, groupings) - val finalRowArity = outputType.getFieldCount + val aggregateList = transformToAggregateFunctions( + aggregateCalls, + inputType, + groupKeysCount)._2 if (isTimeWindow(window)) { - val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new IncrementalAggregateTimeWindowFunction( - aggregates, - groupingOffsetMapping, - aggOffsetMapping, - finalRowArity, - startPos, - endPos) - .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] - } else { - new IncrementalAggregateWindowFunction( - aggregates, - groupingOffsetMapping, - aggOffsetMapping, - finalRowArity) - } + doAllSupportPartialMerge(aggregateList) } /** * Return true if all aggregates can be partially computed. False otherwise. + * Return true if all aggregates can be partially merged. False otherwise. */ private [flink] def doAllSupportPartialAggregation( aggregateCalls: Seq [AggregateCall] , inputType: RelDataType, groupKeysCount: Int): Boolean = { transformToAggregateFunctions( aggregateCalls, inputType, groupKeysCount). 2.forall( .supportPartial) + private [flink] def doAllSupportPartialMerge( + aggregateList: Array[TableAggregateFunction [_ <: Any] ]): Boolean = { + var ret: Boolean = true End diff – can be simplified to ``` aggregateList.forall(ifMethodExitInFunction("merge", _)) ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103801835

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -363,199 +342,112 @@ object AggregateUtil

          { groupingOffsetMapping, aggOffsetMapping, groupingSetsMapping, - intermediateRowArity, outputType.getFieldCount) }

          groupReduceFunction
          }

          /**

          • * Create a [[org.apache.flink.api.common.functions.ReduceFunction]] for incremental window
          • * aggregation.
          • *
            + * Create an [[AllWindowFunction]] for non-partitioned window aggregates.
            */
          • private[flink] def createIncrementalAggregateReduceFunction(
          • namedAggregates: Seq[CalcitePair[AggregateCall, String]],
          • inputType: RelDataType,
          • outputType: RelDataType,
          • groupings: Array[Int])
          • : IncrementalAggregateReduceFunction = { - - val aggregates = transformToAggregateFunctions( - namedAggregates.map(_.getKey),inputType,groupings.length)._2 - - val groupingOffsetMapping = - getGroupingOffsetAndAggOffsetMapping( - namedAggregates, - inputType, - outputType, - groupings)._1 - - val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum - val reduceFunction = new IncrementalAggregateReduceFunction( - aggregates, - groupingOffsetMapping, - intermediateRowArity) - reduceFunction - }

            -

          • /**
          • * Create an [[AllWindowFunction]] to compute non-partitioned group window aggregates.
          • */
          • private[flink] def createAllWindowAggregationFunction(
            + private[flink] def createAggregationAllWindowFunction(
            window: LogicalWindow,
          • namedAggregates: Seq[CalcitePair[AggregateCall, String]],
          • inputType: RelDataType,
          • outputType: RelDataType,
          • groupings: Array[Int],
          • properties: Seq[NamedWindowProperty])
          • : AllWindowFunction[Row, Row, DataStreamWindow] = {
            -
          • val aggFunction =
          • createAggregateGroupReduceFunction(
          • namedAggregates,
          • inputType,
          • outputType,
          • groupings,
          • inGroupingSet = false)
            + finalRowArity: Int,
            + properties: Seq[NamedWindowProperty]): AllWindowFunction[Row, Row, DataStreamWindow] = {

          if (isTimeWindow(window))

          { val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) - .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] + new IncrementalAggregateAllTimeWindowFunction( + startPos, + endPos, + finalRowArity) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] }

          else

          { - new AggregateAllWindowFunction(aggFunction) + new IncrementalAggregateAllWindowFunction( + finalRowArity) }

          }

          /**

          • * Create a [[WindowFunction]] to compute partitioned group window aggregates.
          • *
            + * Create a [[WindowFunction]] for group window aggregates.
            */
          • private[flink] def createWindowAggregationFunction(
            + private[flink] def createAggregationGroupWindowFunction(
            window: LogicalWindow,
          • namedAggregates: Seq[CalcitePair[AggregateCall, String]],
          • inputType: RelDataType,
          • outputType: RelDataType,
          • groupings: Array[Int],
          • properties: Seq[NamedWindowProperty])
          • : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
            -
          • val aggFunction =
          • createAggregateGroupReduceFunction(
          • namedAggregates,
          • inputType,
          • outputType,
          • groupings,
          • inGroupingSet = false)
            + finalRowArity: Int,
            + properties: Seq[NamedWindowProperty]): WindowFunction[Row, Row, Tuple, DataStreamWindow] = {

          if (isTimeWindow(window))

          { val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new AggregateTimeWindowFunction(aggFunction, startPos, endPos) - .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] + new IncrementalAggregateTimeWindowFunction( + startPos, + endPos, + finalRowArity) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] }

          else

          { - new AggregateWindowFunction(aggFunction) + new IncrementalAggregateWindowFunction( + finalRowArity) }

          }

          • /**
          • * Create an [[AllWindowFunction]] to finalize incrementally pre-computed non-partitioned
          • * window aggregates.
          • */
          • private[flink] def createAllWindowIncrementalAggregationFunction(
          • window: LogicalWindow,
            + private[flink] def createDataStreamAggregateFunction(
            namedAggregates: Seq[CalcitePair[AggregateCall, String]],
            inputType: RelDataType,
            outputType: RelDataType,
          • groupings: Array[Int],
          • properties: Seq[NamedWindowProperty])
          • : AllWindowFunction[Row, Row, DataStreamWindow] = {
            + groupKeysIndex: Array[Int]): (ApiAggregateFunction[Row, Row, Row], RowTypeInfo) = {
          • val aggregates = transformToAggregateFunctions(
          • namedAggregates.map(_.getKey),inputType,groupings.length)._2
            + val (aggFields, aggregates) =
            + transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupKeysIndex.length)
          • val (groupingOffsetMapping, aggOffsetMapping) =
          • getGroupingOffsetAndAggOffsetMapping(
          • namedAggregates,
          • inputType,
          • outputType,
          • groupings)
            + val groupKeysMapping = getGroupKeysMapping(inputType, outputType, groupKeysIndex)
          • val finalRowArity = outputType.getFieldCount
            + val aggregateMapping = getAggregateMapping(namedAggregates, outputType)
          • if (isTimeWindow(window)) { - val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new IncrementalAggregateAllTimeWindowFunction( - aggregates, - groupingOffsetMapping, - aggOffsetMapping, - finalRowArity, - startPos, - endPos) - .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] - }

            else {

          • new IncrementalAggregateAllWindowFunction(
          • aggregates,
          • groupingOffsetMapping,
          • aggOffsetMapping,
          • finalRowArity)
            + if (groupKeysMapping.length != groupKeysIndex.length ||
            + aggregateMapping.length != namedAggregates.length) { + throw new TableException( + "Could not find output field in input data type or aggregate functions.") }

            +
            + val accumulatorRowType = createAccumulatorRowType(inputType, groupKeysIndex, aggregates)
            + val aggFunction = new AggregateAggFunction(
            + aggregates,
            + aggFields,
            + aggregateMapping,
            + groupKeysIndex,
            + groupKeysMapping,
            + outputType.getFieldCount)
            +
            + (aggFunction, accumulatorRowType)
            }

          /**

          • * Create a [[WindowFunction]] to finalize incrementally pre-computed window aggregates.
            + * Return true if all aggregates can be partially merged. False otherwise.
            */
          • private[flink] def createWindowIncrementalAggregationFunction(
          • window: LogicalWindow,
          • namedAggregates: Seq[CalcitePair[AggregateCall, String]],
            + private[flink] def doAllSupportPartialMerge(
            + aggregateCalls: Seq[AggregateCall],
            inputType: RelDataType,
          • outputType: RelDataType,
          • groupings: Array[Int],
          • properties: Seq[NamedWindowProperty])
          • : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
            + groupKeysCount: Int): Boolean = {
          • val aggregates = transformToAggregateFunctions(
          • namedAggregates.map(_.getKey),inputType,groupings.length)._2
            -
          • val (groupingOffsetMapping, aggOffsetMapping) =
          • getGroupingOffsetAndAggOffsetMapping(
          • namedAggregates,
          • inputType,
          • outputType,
          • groupings)
            -
          • val finalRowArity = outputType.getFieldCount
            + val aggregateList = transformToAggregateFunctions(
            + aggregateCalls,
            + inputType,
            + groupKeysCount)._2
          • if (isTimeWindow(window)) { - val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new IncrementalAggregateTimeWindowFunction( - aggregates, - groupingOffsetMapping, - aggOffsetMapping, - finalRowArity, - startPos, - endPos) - .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] - }

            else

            { - new IncrementalAggregateWindowFunction( - aggregates, - groupingOffsetMapping, - aggOffsetMapping, - finalRowArity) - }

            + doAllSupportPartialMerge(aggregateList)
            }

          /**

          • * Return true if all aggregates can be partially computed. False otherwise.
            + * Return true if all aggregates can be partially merged. False otherwise.
            */
          • private[flink] def doAllSupportPartialAggregation(
          • aggregateCalls: Seq[AggregateCall],
          • inputType: RelDataType,
          • groupKeysCount: Int): Boolean = {
          • transformToAggregateFunctions(
          • aggregateCalls,
          • inputType,
          • groupKeysCount).2.forall(.supportPartial)
            + private[flink] def doAllSupportPartialMerge(
            + aggregateList: Array[TableAggregateFunction[_ <: Any]]): Boolean = {
            + var ret: Boolean = true
            + var i: Int = 0
            + while (i < aggregateList.length && ret) {
            + ret = ifMethodExitInFunction("merge", aggregateList)
              • End diff –

          Last check overrides all previous ones.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103801835 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -363,199 +342,112 @@ object AggregateUtil { groupingOffsetMapping, aggOffsetMapping, groupingSetsMapping, - intermediateRowArity, outputType.getFieldCount) } groupReduceFunction } /** * Create a [ [org.apache.flink.api.common.functions.ReduceFunction] ] for incremental window * aggregation. * + * Create an [ [AllWindowFunction] ] for non-partitioned window aggregates. */ private [flink] def createIncrementalAggregateReduceFunction( namedAggregates: Seq[CalcitePair [AggregateCall, String] ], inputType: RelDataType, outputType: RelDataType, groupings: Array [Int] ) : IncrementalAggregateReduceFunction = { - - val aggregates = transformToAggregateFunctions( - namedAggregates.map(_.getKey),inputType,groupings.length)._2 - - val groupingOffsetMapping = - getGroupingOffsetAndAggOffsetMapping( - namedAggregates, - inputType, - outputType, - groupings)._1 - - val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum - val reduceFunction = new IncrementalAggregateReduceFunction( - aggregates, - groupingOffsetMapping, - intermediateRowArity) - reduceFunction - } - /** * Create an [ [AllWindowFunction] ] to compute non-partitioned group window aggregates. */ private [flink] def createAllWindowAggregationFunction( + private [flink] def createAggregationAllWindowFunction( window: LogicalWindow, namedAggregates: Seq[CalcitePair [AggregateCall, String] ], inputType: RelDataType, outputType: RelDataType, groupings: Array [Int] , properties: Seq [NamedWindowProperty] ) : AllWindowFunction [Row, Row, DataStreamWindow] = { - val aggFunction = createAggregateGroupReduceFunction( namedAggregates, inputType, outputType, groupings, inGroupingSet = false) + finalRowArity: Int, + properties: Seq [NamedWindowProperty] ): AllWindowFunction [Row, Row, DataStreamWindow] = { if (isTimeWindow(window)) { val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) - .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] + new IncrementalAggregateAllTimeWindowFunction( + startPos, + endPos, + finalRowArity) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] } else { - new AggregateAllWindowFunction(aggFunction) + new IncrementalAggregateAllWindowFunction( + finalRowArity) } } /** * Create a [ [WindowFunction] ] to compute partitioned group window aggregates. * + * Create a [ [WindowFunction] ] for group window aggregates. */ private [flink] def createWindowAggregationFunction( + private [flink] def createAggregationGroupWindowFunction( window: LogicalWindow, namedAggregates: Seq[CalcitePair [AggregateCall, String] ], inputType: RelDataType, outputType: RelDataType, groupings: Array [Int] , properties: Seq [NamedWindowProperty] ) : WindowFunction [Row, Row, Tuple, DataStreamWindow] = { - val aggFunction = createAggregateGroupReduceFunction( namedAggregates, inputType, outputType, groupings, inGroupingSet = false) + finalRowArity: Int, + properties: Seq [NamedWindowProperty] ): WindowFunction [Row, Row, Tuple, DataStreamWindow] = { if (isTimeWindow(window)) { val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new AggregateTimeWindowFunction(aggFunction, startPos, endPos) - .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] + new IncrementalAggregateTimeWindowFunction( + startPos, + endPos, + finalRowArity) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] } else { - new AggregateWindowFunction(aggFunction) + new IncrementalAggregateWindowFunction( + finalRowArity) } } /** * Create an [ [AllWindowFunction] ] to finalize incrementally pre-computed non-partitioned * window aggregates. */ private [flink] def createAllWindowIncrementalAggregationFunction( window: LogicalWindow, + private [flink] def createDataStreamAggregateFunction( namedAggregates: Seq[CalcitePair [AggregateCall, String] ], inputType: RelDataType, outputType: RelDataType, groupings: Array [Int] , properties: Seq [NamedWindowProperty] ) : AllWindowFunction [Row, Row, DataStreamWindow] = { + groupKeysIndex: Array [Int] ): (ApiAggregateFunction [Row, Row, Row] , RowTypeInfo) = { val aggregates = transformToAggregateFunctions( namedAggregates.map(_.getKey),inputType,groupings.length)._2 + val (aggFields, aggregates) = + transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupKeysIndex.length) val (groupingOffsetMapping, aggOffsetMapping) = getGroupingOffsetAndAggOffsetMapping( namedAggregates, inputType, outputType, groupings) + val groupKeysMapping = getGroupKeysMapping(inputType, outputType, groupKeysIndex) val finalRowArity = outputType.getFieldCount + val aggregateMapping = getAggregateMapping(namedAggregates, outputType) if (isTimeWindow(window)) { - val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new IncrementalAggregateAllTimeWindowFunction( - aggregates, - groupingOffsetMapping, - aggOffsetMapping, - finalRowArity, - startPos, - endPos) - .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] - } else { new IncrementalAggregateAllWindowFunction( aggregates, groupingOffsetMapping, aggOffsetMapping, finalRowArity) + if (groupKeysMapping.length != groupKeysIndex.length || + aggregateMapping.length != namedAggregates.length) { + throw new TableException( + "Could not find output field in input data type or aggregate functions.") } + + val accumulatorRowType = createAccumulatorRowType(inputType, groupKeysIndex, aggregates) + val aggFunction = new AggregateAggFunction( + aggregates, + aggFields, + aggregateMapping, + groupKeysIndex, + groupKeysMapping, + outputType.getFieldCount) + + (aggFunction, accumulatorRowType) } /** * Create a [ [WindowFunction] ] to finalize incrementally pre-computed window aggregates. + * Return true if all aggregates can be partially merged. False otherwise. */ private [flink] def createWindowIncrementalAggregationFunction( window: LogicalWindow, namedAggregates: Seq[CalcitePair [AggregateCall, String] ], + private [flink] def doAllSupportPartialMerge( + aggregateCalls: Seq [AggregateCall] , inputType: RelDataType, outputType: RelDataType, groupings: Array [Int] , properties: Seq [NamedWindowProperty] ) : WindowFunction [Row, Row, Tuple, DataStreamWindow] = { + groupKeysCount: Int): Boolean = { val aggregates = transformToAggregateFunctions( namedAggregates.map(_.getKey),inputType,groupings.length)._2 - val (groupingOffsetMapping, aggOffsetMapping) = getGroupingOffsetAndAggOffsetMapping( namedAggregates, inputType, outputType, groupings) - val finalRowArity = outputType.getFieldCount + val aggregateList = transformToAggregateFunctions( + aggregateCalls, + inputType, + groupKeysCount)._2 if (isTimeWindow(window)) { - val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new IncrementalAggregateTimeWindowFunction( - aggregates, - groupingOffsetMapping, - aggOffsetMapping, - finalRowArity, - startPos, - endPos) - .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] - } else { - new IncrementalAggregateWindowFunction( - aggregates, - groupingOffsetMapping, - aggOffsetMapping, - finalRowArity) - } + doAllSupportPartialMerge(aggregateList) } /** * Return true if all aggregates can be partially computed. False otherwise. + * Return true if all aggregates can be partially merged. False otherwise. */ private [flink] def doAllSupportPartialAggregation( aggregateCalls: Seq [AggregateCall] , inputType: RelDataType, groupKeysCount: Int): Boolean = { transformToAggregateFunctions( aggregateCalls, inputType, groupKeysCount). 2.forall( .supportPartial) + private [flink] def doAllSupportPartialMerge( + aggregateList: Array[TableAggregateFunction [_ <: Any] ]): Boolean = { + var ret: Boolean = true + var i: Int = 0 + while (i < aggregateList.length && ret) { + ret = ifMethodExitInFunction("merge", aggregateList ) End diff – Last check overrides all previous ones.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103807487

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -54,31 +58,31 @@ object AggregateUtil {

          • organized by the following format:
            *
          • {{ { - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - * +---------+---------+--------+--------+--------+--------+ - * |groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - * +---------+---------+--------+--------+--------+--------+ + * avg(x) count(z) + * | | + * v v + * +---------+---------+-----------------+------------------+------------------+ + * |groupKey1|groupKey2| AvgAccumulator | SumAccumulator | CountAccumulator | + * +---------+---------+-----------------+------------------+------------------+ * ^ * | - * sum(y) aggOffsetInRow = 4 + * sum(y) * }

            }}
            *
            */
            private[flink] def createPrepareMapFunction(
            namedAggregates: Seq[CalcitePair[AggregateCall, String]],
            groupings: Array[Int],
            inputType: RelDataType)

          • : MapFunction[Row, Row] = {
              • End diff –

          There are many reformatting changes in this PR. This makes the diffs hard to read.

          Unfortunately, we do not have a fixed code style so we keep changing the formatting back and forth. IMO, the best approach is to not reformat large parts of a file because it hides the relevant changes and somebody might just change it back.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103807487 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -54,31 +58,31 @@ object AggregateUtil { organized by the following format: * {{ { - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - * +---------+---------+--------+--------+--------+--------+ - * |groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - * +---------+---------+--------+--------+--------+--------+ + * avg(x) count(z) + * | | + * v v + * +---------+---------+-----------------+------------------+------------------+ + * |groupKey1|groupKey2| AvgAccumulator | SumAccumulator | CountAccumulator | + * +---------+---------+-----------------+------------------+------------------+ * ^ * | - * sum(y) aggOffsetInRow = 4 + * sum(y) * } }} * */ private [flink] def createPrepareMapFunction( namedAggregates: Seq[CalcitePair [AggregateCall, String] ], groupings: Array [Int] , inputType: RelDataType) : MapFunction [Row, Row] = { End diff – There are many reformatting changes in this PR. This makes the diffs hard to read. Unfortunately, we do not have a fixed code style so we keep changing the formatting back and forth. IMO, the best approach is to not reformat large parts of a file because it hides the relevant changes and somebody might just change it back.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103806696

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -737,101 +632,121 @@ object AggregateUtil {
          aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) {
          sqlTypeName match

          { case TINYINT => - new ByteMinAggregate + new ByteMinAggFunction case SMALLINT => - new ShortMinAggregate + new ShortMinAggFunction case INTEGER => - new IntMinAggregate + new IntMinAggFunction case BIGINT => - new LongMinAggregate + new LongMinAggFunction case FLOAT => - new FloatMinAggregate + new FloatMinAggFunction case DOUBLE => - new DoubleMinAggregate + new DoubleMinAggFunction case DECIMAL => - new DecimalMinAggregate + new DecimalMinAggFunction case BOOLEAN => - new BooleanMinAggregate + new BooleanMinAggFunction case sqlType: SqlTypeName => throw new TableException("Min aggregate does no support type:" + sqlType) }

          } else {
          sqlTypeName match

          { case TINYINT => - new ByteMaxAggregate + new ByteMaxAggFunction case SMALLINT => - new ShortMaxAggregate + new ShortMaxAggFunction case INTEGER => - new IntMaxAggregate + new IntMaxAggFunction case BIGINT => - new LongMaxAggregate + new LongMaxAggFunction case FLOAT => - new FloatMaxAggregate + new FloatMaxAggFunction case DOUBLE => - new DoubleMaxAggregate + new DoubleMaxAggFunction case DECIMAL => - new DecimalMaxAggregate + new DecimalMaxAggFunction case BOOLEAN => - new BooleanMaxAggregate + new BooleanMaxAggFunction case sqlType: SqlTypeName => throw new TableException("Max aggregate does no support type:" + sqlType) }

          }
          }
          case _: SqlCountAggFunction =>

          • aggregates(index) = new CountAggregate
            + aggregates(index) = new CountAggFunction
            case unSupported: SqlAggFunction =>
            throw new TableException("unsupported Function: " + unSupported.getName)
            }
          • setAggregateDataOffset(index)
          • }
            -
          • // set the aggregate intermediate data start index in Row, and update current value.
          • def setAggregateDataOffset(index: Int): Unit = { - aggregates(index).setAggOffsetInRow(aggOffset) - aggOffset += aggregates(index).intermediateDataType.length }

          (aggFieldIndexes, aggregates)
          }

          • private def createAggregateBufferDataType(
          • groupings: Array[Int],
          • aggregates: Array[Aggregate[_]],
          • inputType: RelDataType,
          • windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {
            + private def createDataSetAggregateBufferDataType(
            + groupings: Array[Int],
            + aggregates: Array[TableAggregateFunction[_]],
            + inputType: RelDataType,
            + windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {

          // get the field data types of group keys.

          • val groupingTypes: Seq[TypeInformation[_]] = groupings
          • .map(inputType.getFieldList.get(_).getType)
          • .map(FlinkTypeFactory.toTypeInfo)
            + val groupingTypes: Seq[TypeInformation[_]] =
            + groupings
            + .map(inputType.getFieldList.get(_).getType)
            + .map(FlinkTypeFactory.toTypeInfo)

          // get all field data types of all intermediate aggregates

          • val aggTypes: Seq[TypeInformation[_]] = aggregates.flatMap(_.intermediateDataType)
            + val aggTypes: Seq[TypeInformation[_]] = aggregates.map { agg => + val clazz: Class[_] = agg.getClass + TypeInformation.of(clazz) + }

          // concat group key types, aggregation types, and window key types

          • val allFieldTypes:Seq[TypeInformation[_]] = windowKeyTypes match {
            + val allFieldTypes: Seq[TypeInformation[_]] = windowKeyTypes match { case None => groupingTypes ++: aggTypes case _ => groupingTypes ++: aggTypes ++: windowKeyTypes.get }
          • new RowTypeInfo(allFieldTypes :_*)
            + new RowTypeInfo(allFieldTypes: _*)
            + }
            +
            + private def createAccumulatorRowType(
            + inputType: RelDataType,
            + groupings: Array[Int],
            + aggregates: Array[TableAggregateFunction[_]]): RowTypeInfo = {
            +
            + // get the field data types of group keys.
            + val groupingTypes: Seq[TypeInformation[_]] =
            + groupings
            + .map(inputType.getFieldList.get(_).getType)
            + .map(FlinkTypeFactory.toTypeInfo)
            +
            + val aggTypes: Seq[TypeInformation[_]] =
            + aggregates.map {
            + agg =>
            + val clazz: Class[_] = agg.getClass
            + TypeInformation.of(clazz)
              • End diff –

          Same as above. We need the type of the accumulator, not of the aggregate function.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103806696 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -737,101 +632,121 @@ object AggregateUtil { aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) { sqlTypeName match { case TINYINT => - new ByteMinAggregate + new ByteMinAggFunction case SMALLINT => - new ShortMinAggregate + new ShortMinAggFunction case INTEGER => - new IntMinAggregate + new IntMinAggFunction case BIGINT => - new LongMinAggregate + new LongMinAggFunction case FLOAT => - new FloatMinAggregate + new FloatMinAggFunction case DOUBLE => - new DoubleMinAggregate + new DoubleMinAggFunction case DECIMAL => - new DecimalMinAggregate + new DecimalMinAggFunction case BOOLEAN => - new BooleanMinAggregate + new BooleanMinAggFunction case sqlType: SqlTypeName => throw new TableException("Min aggregate does no support type:" + sqlType) } } else { sqlTypeName match { case TINYINT => - new ByteMaxAggregate + new ByteMaxAggFunction case SMALLINT => - new ShortMaxAggregate + new ShortMaxAggFunction case INTEGER => - new IntMaxAggregate + new IntMaxAggFunction case BIGINT => - new LongMaxAggregate + new LongMaxAggFunction case FLOAT => - new FloatMaxAggregate + new FloatMaxAggFunction case DOUBLE => - new DoubleMaxAggregate + new DoubleMaxAggFunction case DECIMAL => - new DecimalMaxAggregate + new DecimalMaxAggFunction case BOOLEAN => - new BooleanMaxAggregate + new BooleanMaxAggFunction case sqlType: SqlTypeName => throw new TableException("Max aggregate does no support type:" + sqlType) } } } case _: SqlCountAggFunction => aggregates(index) = new CountAggregate + aggregates(index) = new CountAggFunction case unSupported: SqlAggFunction => throw new TableException("unsupported Function: " + unSupported.getName) } setAggregateDataOffset(index) } - // set the aggregate intermediate data start index in Row, and update current value. def setAggregateDataOffset(index: Int): Unit = { - aggregates(index).setAggOffsetInRow(aggOffset) - aggOffset += aggregates(index).intermediateDataType.length } (aggFieldIndexes, aggregates) } private def createAggregateBufferDataType( groupings: Array [Int] , aggregates: Array[Aggregate [_] ], inputType: RelDataType, windowKeyTypes: Option[Array[TypeInformation [_] ]] = None): RowTypeInfo = { + private def createDataSetAggregateBufferDataType( + groupings: Array [Int] , + aggregates: Array[TableAggregateFunction [_] ], + inputType: RelDataType, + windowKeyTypes: Option[Array[TypeInformation [_] ]] = None): RowTypeInfo = { // get the field data types of group keys. val groupingTypes: Seq[TypeInformation [_] ] = groupings .map(inputType.getFieldList.get(_).getType) .map(FlinkTypeFactory.toTypeInfo) + val groupingTypes: Seq[TypeInformation [_] ] = + groupings + .map(inputType.getFieldList.get(_).getType) + .map(FlinkTypeFactory.toTypeInfo) // get all field data types of all intermediate aggregates val aggTypes: Seq[TypeInformation [_] ] = aggregates.flatMap(_.intermediateDataType) + val aggTypes: Seq[TypeInformation [_] ] = aggregates.map { agg => + val clazz: Class[_] = agg.getClass + TypeInformation.of(clazz) + } // concat group key types, aggregation types, and window key types val allFieldTypes:Seq[TypeInformation [_] ] = windowKeyTypes match { + val allFieldTypes: Seq[TypeInformation [_] ] = windowKeyTypes match { case None => groupingTypes ++: aggTypes case _ => groupingTypes ++: aggTypes ++: windowKeyTypes.get } new RowTypeInfo(allFieldTypes :_*) + new RowTypeInfo(allFieldTypes: _*) + } + + private def createAccumulatorRowType( + inputType: RelDataType, + groupings: Array [Int] , + aggregates: Array[TableAggregateFunction [_] ]): RowTypeInfo = { + + // get the field data types of group keys. + val groupingTypes: Seq[TypeInformation [_] ] = + groupings + .map(inputType.getFieldList.get(_).getType) + .map(FlinkTypeFactory.toTypeInfo) + + val aggTypes: Seq[TypeInformation [_] ] = + aggregates.map { + agg => + val clazz: Class [_] = agg.getClass + TypeInformation.of(clazz) End diff – Same as above. We need the type of the accumulator, not of the aggregate function.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103818316

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -737,101 +632,121 @@ object AggregateUtil {
          aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) {
          sqlTypeName match

          { case TINYINT => - new ByteMinAggregate + new ByteMinAggFunction case SMALLINT => - new ShortMinAggregate + new ShortMinAggFunction case INTEGER => - new IntMinAggregate + new IntMinAggFunction case BIGINT => - new LongMinAggregate + new LongMinAggFunction case FLOAT => - new FloatMinAggregate + new FloatMinAggFunction case DOUBLE => - new DoubleMinAggregate + new DoubleMinAggFunction case DECIMAL => - new DecimalMinAggregate + new DecimalMinAggFunction case BOOLEAN => - new BooleanMinAggregate + new BooleanMinAggFunction case sqlType: SqlTypeName => throw new TableException("Min aggregate does no support type:" + sqlType) }

          } else {
          sqlTypeName match

          { case TINYINT => - new ByteMaxAggregate + new ByteMaxAggFunction case SMALLINT => - new ShortMaxAggregate + new ShortMaxAggFunction case INTEGER => - new IntMaxAggregate + new IntMaxAggFunction case BIGINT => - new LongMaxAggregate + new LongMaxAggFunction case FLOAT => - new FloatMaxAggregate + new FloatMaxAggFunction case DOUBLE => - new DoubleMaxAggregate + new DoubleMaxAggFunction case DECIMAL => - new DecimalMaxAggregate + new DecimalMaxAggFunction case BOOLEAN => - new BooleanMaxAggregate + new BooleanMaxAggFunction case sqlType: SqlTypeName => throw new TableException("Max aggregate does no support type:" + sqlType) }

          }
          }
          case _: SqlCountAggFunction =>

          • aggregates(index) = new CountAggregate
            + aggregates(index) = new CountAggFunction
            case unSupported: SqlAggFunction =>
            throw new TableException("unsupported Function: " + unSupported.getName)
            }
          • setAggregateDataOffset(index)
          • }
            -
          • // set the aggregate intermediate data start index in Row, and update current value.
          • def setAggregateDataOffset(index: Int): Unit = { - aggregates(index).setAggOffsetInRow(aggOffset) - aggOffset += aggregates(index).intermediateDataType.length }

          (aggFieldIndexes, aggregates)
          }

          • private def createAggregateBufferDataType(
          • groupings: Array[Int],
          • aggregates: Array[Aggregate[_]],
          • inputType: RelDataType,
          • windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {
            + private def createDataSetAggregateBufferDataType(
            + groupings: Array[Int],
            + aggregates: Array[TableAggregateFunction[_]],
            + inputType: RelDataType,
            + windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {

          // get the field data types of group keys.

          • val groupingTypes: Seq[TypeInformation[_]] = groupings
          • .map(inputType.getFieldList.get(_).getType)
          • .map(FlinkTypeFactory.toTypeInfo)
            + val groupingTypes: Seq[TypeInformation[_]] =
            + groupings
            + .map(inputType.getFieldList.get(_).getType)
            + .map(FlinkTypeFactory.toTypeInfo)

          // get all field data types of all intermediate aggregates

          • val aggTypes: Seq[TypeInformation[_]] = aggregates.flatMap(_.intermediateDataType)
            + val aggTypes: Seq[TypeInformation[_]] = aggregates.map { agg =>
            + val clazz: Class[_] = agg.getClass
              • End diff –

          I played around with this.
          I think we can make it work without a `getAccumulatorType()` method but have to change a few parts.
          First of all, the accumulators need to be moved out of the `AggregationFunction` and become regular "top-level" classes. Once this is done, `TypeInformation.of(clazz)` should detect them as tuples and create a `TupleTypeInfo`. However, the fields inside are still of `GenericType`. I figured out that it helps to use the Java boxed types instead of Scala's types, i.e., `a JTuple1[JLong]` (with `JLong = java.lang.Long) will result in a correct TupleTypeInfo with a single Long field.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103818316 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -737,101 +632,121 @@ object AggregateUtil { aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) { sqlTypeName match { case TINYINT => - new ByteMinAggregate + new ByteMinAggFunction case SMALLINT => - new ShortMinAggregate + new ShortMinAggFunction case INTEGER => - new IntMinAggregate + new IntMinAggFunction case BIGINT => - new LongMinAggregate + new LongMinAggFunction case FLOAT => - new FloatMinAggregate + new FloatMinAggFunction case DOUBLE => - new DoubleMinAggregate + new DoubleMinAggFunction case DECIMAL => - new DecimalMinAggregate + new DecimalMinAggFunction case BOOLEAN => - new BooleanMinAggregate + new BooleanMinAggFunction case sqlType: SqlTypeName => throw new TableException("Min aggregate does no support type:" + sqlType) } } else { sqlTypeName match { case TINYINT => - new ByteMaxAggregate + new ByteMaxAggFunction case SMALLINT => - new ShortMaxAggregate + new ShortMaxAggFunction case INTEGER => - new IntMaxAggregate + new IntMaxAggFunction case BIGINT => - new LongMaxAggregate + new LongMaxAggFunction case FLOAT => - new FloatMaxAggregate + new FloatMaxAggFunction case DOUBLE => - new DoubleMaxAggregate + new DoubleMaxAggFunction case DECIMAL => - new DecimalMaxAggregate + new DecimalMaxAggFunction case BOOLEAN => - new BooleanMaxAggregate + new BooleanMaxAggFunction case sqlType: SqlTypeName => throw new TableException("Max aggregate does no support type:" + sqlType) } } } case _: SqlCountAggFunction => aggregates(index) = new CountAggregate + aggregates(index) = new CountAggFunction case unSupported: SqlAggFunction => throw new TableException("unsupported Function: " + unSupported.getName) } setAggregateDataOffset(index) } - // set the aggregate intermediate data start index in Row, and update current value. def setAggregateDataOffset(index: Int): Unit = { - aggregates(index).setAggOffsetInRow(aggOffset) - aggOffset += aggregates(index).intermediateDataType.length } (aggFieldIndexes, aggregates) } private def createAggregateBufferDataType( groupings: Array [Int] , aggregates: Array[Aggregate [_] ], inputType: RelDataType, windowKeyTypes: Option[Array[TypeInformation [_] ]] = None): RowTypeInfo = { + private def createDataSetAggregateBufferDataType( + groupings: Array [Int] , + aggregates: Array[TableAggregateFunction [_] ], + inputType: RelDataType, + windowKeyTypes: Option[Array[TypeInformation [_] ]] = None): RowTypeInfo = { // get the field data types of group keys. val groupingTypes: Seq[TypeInformation [_] ] = groupings .map(inputType.getFieldList.get(_).getType) .map(FlinkTypeFactory.toTypeInfo) + val groupingTypes: Seq[TypeInformation [_] ] = + groupings + .map(inputType.getFieldList.get(_).getType) + .map(FlinkTypeFactory.toTypeInfo) // get all field data types of all intermediate aggregates val aggTypes: Seq[TypeInformation [_] ] = aggregates.flatMap(_.intermediateDataType) + val aggTypes: Seq[TypeInformation [_] ] = aggregates.map { agg => + val clazz: Class [_] = agg.getClass End diff – I played around with this. I think we can make it work without a `getAccumulatorType()` method but have to change a few parts. First of all, the accumulators need to be moved out of the `AggregationFunction` and become regular "top-level" classes. Once this is done, `TypeInformation.of(clazz)` should detect them as tuples and create a `TupleTypeInfo`. However, the fields inside are still of `GenericType`. I figured out that it helps to use the Java boxed types instead of Scala's types, i.e., `a JTuple1 [JLong] ` (with `JLong = java.lang.Long) will result in a correct TupleTypeInfo with a single Long field.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103818719

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala —
          @@ -0,0 +1,101 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList => JArrayList, List => JList}

          +import org.apache.flink.api.common.functions.

          {AggregateFunction => ApiAggFunction}

          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +
          +/**
          + * Aggregate Function used for the aggregate operator in
          + * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate
          + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
          + * output row => the index of the aggregate) for all the aggregates
          + * @param groupKeysIndex the position (in the input Row) of grouping keys
          + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
          + * output row => the index of grouping key) for all the grouping keys
          + * @param finalRowArity the arity of the final row
          + */
          +class AggregateAggFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFieldsIndex: Array[Int],
          + private val aggregateMapping: Array[(Int, Int)],
          + private val groupKeysIndex: Array[Int],
          + private val groupKeysMapping: Array[(Int, Int)],
          + private val finalRowArity: Int)
          + extends ApiAggFunction[Row, Row, Row] {
          +
          + override def createAccumulator(): Row = {
          + val accumulatorRow: Row = new Row(groupKeysIndex.length + aggregates.length)
          +
          + for (i <- aggregates.indices)

          { + accumulatorRow.setField(groupKeysIndex.length + i, aggregates(i).createAccumulator()) + }

          + accumulatorRow
          + }
          +
          + override def add(value: Row, accumulatorRow: Row) = {
          + for (i <- groupKeysIndex.indices) {
          — End diff –

          I played a bit around and have a commit that removes the grouping keys from the `AggregateFunction` and sets them in the `WindowFunction`.
          I'll put it on top of the PR before it is merged.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103818719 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala — @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList => JArrayList, List => JList} +import org.apache.flink.api.common.functions. {AggregateFunction => ApiAggFunction} +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row + +/** + * Aggregate Function used for the aggregate operator in + * [ [org.apache.flink.streaming.api.datastream.WindowedStream] ] + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFieldsIndex the position (in the input Row) of the input value for each aggregate + * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the + * output row => the index of the aggregate) for all the aggregates + * @param groupKeysIndex the position (in the input Row) of grouping keys + * @param groupKeysMapping the list of mapping of (the position of the grouping key in the + * output row => the index of grouping key) for all the grouping keys + * @param finalRowArity the arity of the final row + */ +class AggregateAggFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFieldsIndex: Array [Int] , + private val aggregateMapping: Array [(Int, Int)] , + private val groupKeysIndex: Array [Int] , + private val groupKeysMapping: Array [(Int, Int)] , + private val finalRowArity: Int) + extends ApiAggFunction [Row, Row, Row] { + + override def createAccumulator(): Row = { + val accumulatorRow: Row = new Row(groupKeysIndex.length + aggregates.length) + + for (i <- aggregates.indices) { + accumulatorRow.setField(groupKeysIndex.length + i, aggregates(i).createAccumulator()) + } + accumulatorRow + } + + override def add(value: Row, accumulatorRow: Row) = { + for (i <- groupKeysIndex.indices) { — End diff – I played a bit around and have a commit that removes the grouping keys from the `AggregateFunction` and sets them in the `WindowFunction`. I'll put it on top of the PR before it is merged.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103834968

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala —
          @@ -100,8 +99,10 @@ class DataSetSessionWindowAggregateReduceGroupFunction(
          new JArrayList[Accumulator]()
          }

          + var count:Int = 0
          while (iterator.hasNext) {
          val record = iterator.next()
          + count += 1
          — End diff –

          Good catch!

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103834968 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala — @@ -100,8 +99,10 @@ class DataSetSessionWindowAggregateReduceGroupFunction( new JArrayList [Accumulator] () } + var count:Int = 0 while (iterator.hasNext) { val record = iterator.next() + count += 1 — End diff – Good catch!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103835523

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -54,31 +58,31 @@ object AggregateUtil {

          • organized by the following format:
            *
          • {{ { - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - * +---------+---------+--------+--------+--------+--------+ - * |groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - * +---------+---------+--------+--------+--------+--------+ + * avg(x) count(z) + * | | + * v v + * +---------+---------+-----------------+------------------+------------------+ + * |groupKey1|groupKey2| AvgAccumulator | SumAccumulator | CountAccumulator | + * +---------+---------+-----------------+------------------+------------------+ * ^ * | - * sum(y) aggOffsetInRow = 4 + * sum(y) * }

            }}
            *
            */
            private[flink] def createPrepareMapFunction(
            namedAggregates: Seq[CalcitePair[AggregateCall, String]],
            groupings: Array[Int],
            inputType: RelDataType)

          • : MapFunction[Row, Row] = {
              • End diff –

          Ok, based on the DEV discussion, I thought we are going to fix the code style in the new PRs. But you are right, I should just keep the style changes in the new lines. I will discard the "style only" change.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103835523 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -54,31 +58,31 @@ object AggregateUtil { organized by the following format: * {{ { - * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 - * | | - * v v - * +---------+---------+--------+--------+--------+--------+ - * |groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | - * +---------+---------+--------+--------+--------+--------+ + * avg(x) count(z) + * | | + * v v + * +---------+---------+-----------------+------------------+------------------+ + * |groupKey1|groupKey2| AvgAccumulator | SumAccumulator | CountAccumulator | + * +---------+---------+-----------------+------------------+------------------+ * ^ * | - * sum(y) aggOffsetInRow = 4 + * sum(y) * } }} * */ private [flink] def createPrepareMapFunction( namedAggregates: Seq[CalcitePair [AggregateCall, String] ], groupings: Array [Int] , inputType: RelDataType) : MapFunction [Row, Row] = { End diff – Ok, based on the DEV discussion, I thought we are going to fix the code style in the new PRs. But you are right, I should just keep the style changes in the new lines. I will discard the "style only" change.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103839943

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -363,199 +342,112 @@ object AggregateUtil

          { groupingOffsetMapping, aggOffsetMapping, groupingSetsMapping, - intermediateRowArity, outputType.getFieldCount) }

          groupReduceFunction
          }

          /**

          • * Create a [[org.apache.flink.api.common.functions.ReduceFunction]] for incremental window
          • * aggregation.
          • *
            + * Create an [[AllWindowFunction]] for non-partitioned window aggregates.
            */
          • private[flink] def createIncrementalAggregateReduceFunction(
          • namedAggregates: Seq[CalcitePair[AggregateCall, String]],
          • inputType: RelDataType,
          • outputType: RelDataType,
          • groupings: Array[Int])
          • : IncrementalAggregateReduceFunction = { - - val aggregates = transformToAggregateFunctions( - namedAggregates.map(_.getKey),inputType,groupings.length)._2 - - val groupingOffsetMapping = - getGroupingOffsetAndAggOffsetMapping( - namedAggregates, - inputType, - outputType, - groupings)._1 - - val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum - val reduceFunction = new IncrementalAggregateReduceFunction( - aggregates, - groupingOffsetMapping, - intermediateRowArity) - reduceFunction - }

            -

          • /**
          • * Create an [[AllWindowFunction]] to compute non-partitioned group window aggregates.
          • */
          • private[flink] def createAllWindowAggregationFunction(
            + private[flink] def createAggregationAllWindowFunction(
            window: LogicalWindow,
          • namedAggregates: Seq[CalcitePair[AggregateCall, String]],
          • inputType: RelDataType,
          • outputType: RelDataType,
          • groupings: Array[Int],
          • properties: Seq[NamedWindowProperty])
          • : AllWindowFunction[Row, Row, DataStreamWindow] = {
            -
          • val aggFunction =
          • createAggregateGroupReduceFunction(
          • namedAggregates,
          • inputType,
          • outputType,
          • groupings,
          • inGroupingSet = false)
            + finalRowArity: Int,
            + properties: Seq[NamedWindowProperty]): AllWindowFunction[Row, Row, DataStreamWindow] = {

          if (isTimeWindow(window))

          { val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) - .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] + new IncrementalAggregateAllTimeWindowFunction( + startPos, + endPos, + finalRowArity) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] }

          else

          { - new AggregateAllWindowFunction(aggFunction) + new IncrementalAggregateAllWindowFunction( + finalRowArity) }

          }

          /**

          • * Create a [[WindowFunction]] to compute partitioned group window aggregates.
          • *
            + * Create a [[WindowFunction]] for group window aggregates.
            */
          • private[flink] def createWindowAggregationFunction(
            + private[flink] def createAggregationGroupWindowFunction(
            window: LogicalWindow,
          • namedAggregates: Seq[CalcitePair[AggregateCall, String]],
          • inputType: RelDataType,
          • outputType: RelDataType,
          • groupings: Array[Int],
          • properties: Seq[NamedWindowProperty])
          • : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
            -
          • val aggFunction =
          • createAggregateGroupReduceFunction(
          • namedAggregates,
          • inputType,
          • outputType,
          • groupings,
          • inGroupingSet = false)
            + finalRowArity: Int,
            + properties: Seq[NamedWindowProperty]): WindowFunction[Row, Row, Tuple, DataStreamWindow] = {

          if (isTimeWindow(window))

          { val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new AggregateTimeWindowFunction(aggFunction, startPos, endPos) - .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] + new IncrementalAggregateTimeWindowFunction( + startPos, + endPos, + finalRowArity) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] }

          else

          { - new AggregateWindowFunction(aggFunction) + new IncrementalAggregateWindowFunction( + finalRowArity) }

          }

          • /**
          • * Create an [[AllWindowFunction]] to finalize incrementally pre-computed non-partitioned
          • * window aggregates.
          • */
          • private[flink] def createAllWindowIncrementalAggregationFunction(
          • window: LogicalWindow,
            + private[flink] def createDataStreamAggregateFunction(
            namedAggregates: Seq[CalcitePair[AggregateCall, String]],
            inputType: RelDataType,
            outputType: RelDataType,
          • groupings: Array[Int],
          • properties: Seq[NamedWindowProperty])
          • : AllWindowFunction[Row, Row, DataStreamWindow] = {
            + groupKeysIndex: Array[Int]): (ApiAggregateFunction[Row, Row, Row], RowTypeInfo) = {
          • val aggregates = transformToAggregateFunctions(
          • namedAggregates.map(_.getKey),inputType,groupings.length)._2
            + val (aggFields, aggregates) =
            + transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupKeysIndex.length)
          • val (groupingOffsetMapping, aggOffsetMapping) =
          • getGroupingOffsetAndAggOffsetMapping(
          • namedAggregates,
          • inputType,
          • outputType,
          • groupings)
            + val groupKeysMapping = getGroupKeysMapping(inputType, outputType, groupKeysIndex)
          • val finalRowArity = outputType.getFieldCount
            + val aggregateMapping = getAggregateMapping(namedAggregates, outputType)
          • if (isTimeWindow(window)) { - val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new IncrementalAggregateAllTimeWindowFunction( - aggregates, - groupingOffsetMapping, - aggOffsetMapping, - finalRowArity, - startPos, - endPos) - .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] - }

            else {

          • new IncrementalAggregateAllWindowFunction(
          • aggregates,
          • groupingOffsetMapping,
          • aggOffsetMapping,
          • finalRowArity)
            + if (groupKeysMapping.length != groupKeysIndex.length ||
            + aggregateMapping.length != namedAggregates.length) { + throw new TableException( + "Could not find output field in input data type or aggregate functions.") }

            +
            + val accumulatorRowType = createAccumulatorRowType(inputType, groupKeysIndex, aggregates)
            + val aggFunction = new AggregateAggFunction(
            + aggregates,
            + aggFields,
            + aggregateMapping,
            + groupKeysIndex,
            + groupKeysMapping,
            + outputType.getFieldCount)
            +
            + (aggFunction, accumulatorRowType)
            }

          /**

          • * Create a [[WindowFunction]] to finalize incrementally pre-computed window aggregates.
            + * Return true if all aggregates can be partially merged. False otherwise.
            */
          • private[flink] def createWindowIncrementalAggregationFunction(
          • window: LogicalWindow,
          • namedAggregates: Seq[CalcitePair[AggregateCall, String]],
            + private[flink] def doAllSupportPartialMerge(
            + aggregateCalls: Seq[AggregateCall],
            inputType: RelDataType,
          • outputType: RelDataType,
          • groupings: Array[Int],
          • properties: Seq[NamedWindowProperty])
          • : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
            + groupKeysCount: Int): Boolean = {
          • val aggregates = transformToAggregateFunctions(
          • namedAggregates.map(_.getKey),inputType,groupings.length)._2
            -
          • val (groupingOffsetMapping, aggOffsetMapping) =
          • getGroupingOffsetAndAggOffsetMapping(
          • namedAggregates,
          • inputType,
          • outputType,
          • groupings)
            -
          • val finalRowArity = outputType.getFieldCount
            + val aggregateList = transformToAggregateFunctions(
            + aggregateCalls,
            + inputType,
            + groupKeysCount)._2
          • if (isTimeWindow(window)) { - val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new IncrementalAggregateTimeWindowFunction( - aggregates, - groupingOffsetMapping, - aggOffsetMapping, - finalRowArity, - startPos, - endPos) - .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] - }

            else

            { - new IncrementalAggregateWindowFunction( - aggregates, - groupingOffsetMapping, - aggOffsetMapping, - finalRowArity) - }

            + doAllSupportPartialMerge(aggregateList)
            }

          /**

          • * Return true if all aggregates can be partially computed. False otherwise.
            + * Return true if all aggregates can be partially merged. False otherwise.
            */
          • private[flink] def doAllSupportPartialAggregation(
          • aggregateCalls: Seq[AggregateCall],
          • inputType: RelDataType,
          • groupKeysCount: Int): Boolean = {
          • transformToAggregateFunctions(
          • aggregateCalls,
          • inputType,
          • groupKeysCount).2.forall(.supportPartial)
            + private[flink] def doAllSupportPartialMerge(
            + aggregateList: Array[TableAggregateFunction[_ <: Any]]): Boolean = {
            + var ret: Boolean = true
            + var i: Int = 0
            + while (i < aggregateList.length && ret) {
            + ret = ifMethodExitInFunction("merge", aggregateList)
              • End diff –

          it will break the loop once ret = false, so there should not be any override. This should be exactly same as forall (I was intend to ensure the performance without traveling the entire list, but it seems forall behaviors in the same way)

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103839943 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -363,199 +342,112 @@ object AggregateUtil { groupingOffsetMapping, aggOffsetMapping, groupingSetsMapping, - intermediateRowArity, outputType.getFieldCount) } groupReduceFunction } /** * Create a [ [org.apache.flink.api.common.functions.ReduceFunction] ] for incremental window * aggregation. * + * Create an [ [AllWindowFunction] ] for non-partitioned window aggregates. */ private [flink] def createIncrementalAggregateReduceFunction( namedAggregates: Seq[CalcitePair [AggregateCall, String] ], inputType: RelDataType, outputType: RelDataType, groupings: Array [Int] ) : IncrementalAggregateReduceFunction = { - - val aggregates = transformToAggregateFunctions( - namedAggregates.map(_.getKey),inputType,groupings.length)._2 - - val groupingOffsetMapping = - getGroupingOffsetAndAggOffsetMapping( - namedAggregates, - inputType, - outputType, - groupings)._1 - - val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum - val reduceFunction = new IncrementalAggregateReduceFunction( - aggregates, - groupingOffsetMapping, - intermediateRowArity) - reduceFunction - } - /** * Create an [ [AllWindowFunction] ] to compute non-partitioned group window aggregates. */ private [flink] def createAllWindowAggregationFunction( + private [flink] def createAggregationAllWindowFunction( window: LogicalWindow, namedAggregates: Seq[CalcitePair [AggregateCall, String] ], inputType: RelDataType, outputType: RelDataType, groupings: Array [Int] , properties: Seq [NamedWindowProperty] ) : AllWindowFunction [Row, Row, DataStreamWindow] = { - val aggFunction = createAggregateGroupReduceFunction( namedAggregates, inputType, outputType, groupings, inGroupingSet = false) + finalRowArity: Int, + properties: Seq [NamedWindowProperty] ): AllWindowFunction [Row, Row, DataStreamWindow] = { if (isTimeWindow(window)) { val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) - .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] + new IncrementalAggregateAllTimeWindowFunction( + startPos, + endPos, + finalRowArity) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] } else { - new AggregateAllWindowFunction(aggFunction) + new IncrementalAggregateAllWindowFunction( + finalRowArity) } } /** * Create a [ [WindowFunction] ] to compute partitioned group window aggregates. * + * Create a [ [WindowFunction] ] for group window aggregates. */ private [flink] def createWindowAggregationFunction( + private [flink] def createAggregationGroupWindowFunction( window: LogicalWindow, namedAggregates: Seq[CalcitePair [AggregateCall, String] ], inputType: RelDataType, outputType: RelDataType, groupings: Array [Int] , properties: Seq [NamedWindowProperty] ) : WindowFunction [Row, Row, Tuple, DataStreamWindow] = { - val aggFunction = createAggregateGroupReduceFunction( namedAggregates, inputType, outputType, groupings, inGroupingSet = false) + finalRowArity: Int, + properties: Seq [NamedWindowProperty] ): WindowFunction [Row, Row, Tuple, DataStreamWindow] = { if (isTimeWindow(window)) { val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new AggregateTimeWindowFunction(aggFunction, startPos, endPos) - .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] + new IncrementalAggregateTimeWindowFunction( + startPos, + endPos, + finalRowArity) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] } else { - new AggregateWindowFunction(aggFunction) + new IncrementalAggregateWindowFunction( + finalRowArity) } } /** * Create an [ [AllWindowFunction] ] to finalize incrementally pre-computed non-partitioned * window aggregates. */ private [flink] def createAllWindowIncrementalAggregationFunction( window: LogicalWindow, + private [flink] def createDataStreamAggregateFunction( namedAggregates: Seq[CalcitePair [AggregateCall, String] ], inputType: RelDataType, outputType: RelDataType, groupings: Array [Int] , properties: Seq [NamedWindowProperty] ) : AllWindowFunction [Row, Row, DataStreamWindow] = { + groupKeysIndex: Array [Int] ): (ApiAggregateFunction [Row, Row, Row] , RowTypeInfo) = { val aggregates = transformToAggregateFunctions( namedAggregates.map(_.getKey),inputType,groupings.length)._2 + val (aggFields, aggregates) = + transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupKeysIndex.length) val (groupingOffsetMapping, aggOffsetMapping) = getGroupingOffsetAndAggOffsetMapping( namedAggregates, inputType, outputType, groupings) + val groupKeysMapping = getGroupKeysMapping(inputType, outputType, groupKeysIndex) val finalRowArity = outputType.getFieldCount + val aggregateMapping = getAggregateMapping(namedAggregates, outputType) if (isTimeWindow(window)) { - val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new IncrementalAggregateAllTimeWindowFunction( - aggregates, - groupingOffsetMapping, - aggOffsetMapping, - finalRowArity, - startPos, - endPos) - .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] - } else { new IncrementalAggregateAllWindowFunction( aggregates, groupingOffsetMapping, aggOffsetMapping, finalRowArity) + if (groupKeysMapping.length != groupKeysIndex.length || + aggregateMapping.length != namedAggregates.length) { + throw new TableException( + "Could not find output field in input data type or aggregate functions.") } + + val accumulatorRowType = createAccumulatorRowType(inputType, groupKeysIndex, aggregates) + val aggFunction = new AggregateAggFunction( + aggregates, + aggFields, + aggregateMapping, + groupKeysIndex, + groupKeysMapping, + outputType.getFieldCount) + + (aggFunction, accumulatorRowType) } /** * Create a [ [WindowFunction] ] to finalize incrementally pre-computed window aggregates. + * Return true if all aggregates can be partially merged. False otherwise. */ private [flink] def createWindowIncrementalAggregationFunction( window: LogicalWindow, namedAggregates: Seq[CalcitePair [AggregateCall, String] ], + private [flink] def doAllSupportPartialMerge( + aggregateCalls: Seq [AggregateCall] , inputType: RelDataType, outputType: RelDataType, groupings: Array [Int] , properties: Seq [NamedWindowProperty] ) : WindowFunction [Row, Row, Tuple, DataStreamWindow] = { + groupKeysCount: Int): Boolean = { val aggregates = transformToAggregateFunctions( namedAggregates.map(_.getKey),inputType,groupings.length)._2 - val (groupingOffsetMapping, aggOffsetMapping) = getGroupingOffsetAndAggOffsetMapping( namedAggregates, inputType, outputType, groupings) - val finalRowArity = outputType.getFieldCount + val aggregateList = transformToAggregateFunctions( + aggregateCalls, + inputType, + groupKeysCount)._2 if (isTimeWindow(window)) { - val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new IncrementalAggregateTimeWindowFunction( - aggregates, - groupingOffsetMapping, - aggOffsetMapping, - finalRowArity, - startPos, - endPos) - .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] - } else { - new IncrementalAggregateWindowFunction( - aggregates, - groupingOffsetMapping, - aggOffsetMapping, - finalRowArity) - } + doAllSupportPartialMerge(aggregateList) } /** * Return true if all aggregates can be partially computed. False otherwise. + * Return true if all aggregates can be partially merged. False otherwise. */ private [flink] def doAllSupportPartialAggregation( aggregateCalls: Seq [AggregateCall] , inputType: RelDataType, groupKeysCount: Int): Boolean = { transformToAggregateFunctions( aggregateCalls, inputType, groupKeysCount). 2.forall( .supportPartial) + private [flink] def doAllSupportPartialMerge( + aggregateList: Array[TableAggregateFunction [_ <: Any] ]): Boolean = { + var ret: Boolean = true + var i: Int = 0 + while (i < aggregateList.length && ret) { + ret = ifMethodExitInFunction("merge", aggregateList ) End diff – it will break the loop once ret = false, so there should not be any override. This should be exactly same as forall (I was intend to ensure the performance without traveling the entire list, but it seems forall behaviors in the same way)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103883798

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -737,101 +632,121 @@ object AggregateUtil {
          aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) {
          sqlTypeName match

          { case TINYINT => - new ByteMinAggregate + new ByteMinAggFunction case SMALLINT => - new ShortMinAggregate + new ShortMinAggFunction case INTEGER => - new IntMinAggregate + new IntMinAggFunction case BIGINT => - new LongMinAggregate + new LongMinAggFunction case FLOAT => - new FloatMinAggregate + new FloatMinAggFunction case DOUBLE => - new DoubleMinAggregate + new DoubleMinAggFunction case DECIMAL => - new DecimalMinAggregate + new DecimalMinAggFunction case BOOLEAN => - new BooleanMinAggregate + new BooleanMinAggFunction case sqlType: SqlTypeName => throw new TableException("Min aggregate does no support type:" + sqlType) }

          } else {
          sqlTypeName match

          { case TINYINT => - new ByteMaxAggregate + new ByteMaxAggFunction case SMALLINT => - new ShortMaxAggregate + new ShortMaxAggFunction case INTEGER => - new IntMaxAggregate + new IntMaxAggFunction case BIGINT => - new LongMaxAggregate + new LongMaxAggFunction case FLOAT => - new FloatMaxAggregate + new FloatMaxAggFunction case DOUBLE => - new DoubleMaxAggregate + new DoubleMaxAggFunction case DECIMAL => - new DecimalMaxAggregate + new DecimalMaxAggFunction case BOOLEAN => - new BooleanMaxAggregate + new BooleanMaxAggFunction case sqlType: SqlTypeName => throw new TableException("Max aggregate does no support type:" + sqlType) }

          }
          }
          case _: SqlCountAggFunction =>

          • aggregates(index) = new CountAggregate
            + aggregates(index) = new CountAggFunction
            case unSupported: SqlAggFunction =>
            throw new TableException("unsupported Function: " + unSupported.getName)
            }
          • setAggregateDataOffset(index)
          • }
            -
          • // set the aggregate intermediate data start index in Row, and update current value.
          • def setAggregateDataOffset(index: Int): Unit = { - aggregates(index).setAggOffsetInRow(aggOffset) - aggOffset += aggregates(index).intermediateDataType.length }

          (aggFieldIndexes, aggregates)
          }

          • private def createAggregateBufferDataType(
          • groupings: Array[Int],
          • aggregates: Array[Aggregate[_]],
          • inputType: RelDataType,
          • windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {
            + private def createDataSetAggregateBufferDataType(
            + groupings: Array[Int],
            + aggregates: Array[TableAggregateFunction[_]],
            + inputType: RelDataType,
            + windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {

          // get the field data types of group keys.

          • val groupingTypes: Seq[TypeInformation[_]] = groupings
          • .map(inputType.getFieldList.get(_).getType)
          • .map(FlinkTypeFactory.toTypeInfo)
            + val groupingTypes: Seq[TypeInformation[_]] =
            + groupings
            + .map(inputType.getFieldList.get(_).getType)
            + .map(FlinkTypeFactory.toTypeInfo)

          // get all field data types of all intermediate aggregates

          • val aggTypes: Seq[TypeInformation[_]] = aggregates.flatMap(_.intermediateDataType)
            + val aggTypes: Seq[TypeInformation[_]] = aggregates.map { agg =>
            + val clazz: Class[_] = agg.getClass
              • End diff –

          Thanks for spending time to look into this. I played around your approach, it works well except for the class with Template. I think it is fair to just return a Generic type for the accumulator with Template. Though it is not very efficient, but it is the user's call.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103883798 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -737,101 +632,121 @@ object AggregateUtil { aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) { sqlTypeName match { case TINYINT => - new ByteMinAggregate + new ByteMinAggFunction case SMALLINT => - new ShortMinAggregate + new ShortMinAggFunction case INTEGER => - new IntMinAggregate + new IntMinAggFunction case BIGINT => - new LongMinAggregate + new LongMinAggFunction case FLOAT => - new FloatMinAggregate + new FloatMinAggFunction case DOUBLE => - new DoubleMinAggregate + new DoubleMinAggFunction case DECIMAL => - new DecimalMinAggregate + new DecimalMinAggFunction case BOOLEAN => - new BooleanMinAggregate + new BooleanMinAggFunction case sqlType: SqlTypeName => throw new TableException("Min aggregate does no support type:" + sqlType) } } else { sqlTypeName match { case TINYINT => - new ByteMaxAggregate + new ByteMaxAggFunction case SMALLINT => - new ShortMaxAggregate + new ShortMaxAggFunction case INTEGER => - new IntMaxAggregate + new IntMaxAggFunction case BIGINT => - new LongMaxAggregate + new LongMaxAggFunction case FLOAT => - new FloatMaxAggregate + new FloatMaxAggFunction case DOUBLE => - new DoubleMaxAggregate + new DoubleMaxAggFunction case DECIMAL => - new DecimalMaxAggregate + new DecimalMaxAggFunction case BOOLEAN => - new BooleanMaxAggregate + new BooleanMaxAggFunction case sqlType: SqlTypeName => throw new TableException("Max aggregate does no support type:" + sqlType) } } } case _: SqlCountAggFunction => aggregates(index) = new CountAggregate + aggregates(index) = new CountAggFunction case unSupported: SqlAggFunction => throw new TableException("unsupported Function: " + unSupported.getName) } setAggregateDataOffset(index) } - // set the aggregate intermediate data start index in Row, and update current value. def setAggregateDataOffset(index: Int): Unit = { - aggregates(index).setAggOffsetInRow(aggOffset) - aggOffset += aggregates(index).intermediateDataType.length } (aggFieldIndexes, aggregates) } private def createAggregateBufferDataType( groupings: Array [Int] , aggregates: Array[Aggregate [_] ], inputType: RelDataType, windowKeyTypes: Option[Array[TypeInformation [_] ]] = None): RowTypeInfo = { + private def createDataSetAggregateBufferDataType( + groupings: Array [Int] , + aggregates: Array[TableAggregateFunction [_] ], + inputType: RelDataType, + windowKeyTypes: Option[Array[TypeInformation [_] ]] = None): RowTypeInfo = { // get the field data types of group keys. val groupingTypes: Seq[TypeInformation [_] ] = groupings .map(inputType.getFieldList.get(_).getType) .map(FlinkTypeFactory.toTypeInfo) + val groupingTypes: Seq[TypeInformation [_] ] = + groupings + .map(inputType.getFieldList.get(_).getType) + .map(FlinkTypeFactory.toTypeInfo) // get all field data types of all intermediate aggregates val aggTypes: Seq[TypeInformation [_] ] = aggregates.flatMap(_.intermediateDataType) + val aggTypes: Seq[TypeInformation [_] ] = aggregates.map { agg => + val clazz: Class [_] = agg.getClass End diff – Thanks for spending time to look into this. I played around your approach, it works well except for the class with Template. I think it is fair to just return a Generic type for the accumulator with Template. Though it is not very efficient, but it is the user's call.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on the issue:

          https://github.com/apache/flink/pull/3423

          @fhueske Thanks for the detailed review. I have addressed all your comments. Please take a look.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3423 @fhueske Thanks for the detailed review. I have addressed all your comments. Please take a look.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3423#discussion_r103885054

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -737,101 +632,121 @@ object AggregateUtil {
          aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) {
          sqlTypeName match

          { case TINYINT => - new ByteMinAggregate + new ByteMinAggFunction case SMALLINT => - new ShortMinAggregate + new ShortMinAggFunction case INTEGER => - new IntMinAggregate + new IntMinAggFunction case BIGINT => - new LongMinAggregate + new LongMinAggFunction case FLOAT => - new FloatMinAggregate + new FloatMinAggFunction case DOUBLE => - new DoubleMinAggregate + new DoubleMinAggFunction case DECIMAL => - new DecimalMinAggregate + new DecimalMinAggFunction case BOOLEAN => - new BooleanMinAggregate + new BooleanMinAggFunction case sqlType: SqlTypeName => throw new TableException("Min aggregate does no support type:" + sqlType) }

          } else {
          sqlTypeName match

          { case TINYINT => - new ByteMaxAggregate + new ByteMaxAggFunction case SMALLINT => - new ShortMaxAggregate + new ShortMaxAggFunction case INTEGER => - new IntMaxAggregate + new IntMaxAggFunction case BIGINT => - new LongMaxAggregate + new LongMaxAggFunction case FLOAT => - new FloatMaxAggregate + new FloatMaxAggFunction case DOUBLE => - new DoubleMaxAggregate + new DoubleMaxAggFunction case DECIMAL => - new DecimalMaxAggregate + new DecimalMaxAggFunction case BOOLEAN => - new BooleanMaxAggregate + new BooleanMaxAggFunction case sqlType: SqlTypeName => throw new TableException("Max aggregate does no support type:" + sqlType) }

          }
          }
          case _: SqlCountAggFunction =>

          • aggregates(index) = new CountAggregate
            + aggregates(index) = new CountAggFunction
            case unSupported: SqlAggFunction =>
            throw new TableException("unsupported Function: " + unSupported.getName)
            }
          • setAggregateDataOffset(index)
          • }
            -
          • // set the aggregate intermediate data start index in Row, and update current value.
          • def setAggregateDataOffset(index: Int): Unit = { - aggregates(index).setAggOffsetInRow(aggOffset) - aggOffset += aggregates(index).intermediateDataType.length }

          (aggFieldIndexes, aggregates)
          }

          • private def createAggregateBufferDataType(
          • groupings: Array[Int],
          • aggregates: Array[Aggregate[_]],
          • inputType: RelDataType,
          • windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {
            + private def createDataSetAggregateBufferDataType(
            + groupings: Array[Int],
            + aggregates: Array[TableAggregateFunction[_]],
            + inputType: RelDataType,
            + windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {

          // get the field data types of group keys.

          • val groupingTypes: Seq[TypeInformation[_]] = groupings
          • .map(inputType.getFieldList.get(_).getType)
          • .map(FlinkTypeFactory.toTypeInfo)
            + val groupingTypes: Seq[TypeInformation[_]] =
            + groupings
            + .map(inputType.getFieldList.get(_).getType)
            + .map(FlinkTypeFactory.toTypeInfo)

          // get all field data types of all intermediate aggregates

          • val aggTypes: Seq[TypeInformation[_]] = aggregates.flatMap(_.intermediateDataType)
            + val aggTypes: Seq[TypeInformation[_]] = aggregates.map { agg =>
            + val clazz: Class[_] = agg.getClass
              • End diff –

          Btw, @fhueske, I have not gotten a chance to dig into yet. But are you in case already aware of why we cannot get the correct scala type by TypeInformation? If we cannot fix this , I think we should consider to implement all built-in aggregate and aggregateTestBase in Java, and also recommend future UDAGG user to write aggregate in java but not in scala.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3423#discussion_r103885054 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -737,101 +632,121 @@ object AggregateUtil { aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) { sqlTypeName match { case TINYINT => - new ByteMinAggregate + new ByteMinAggFunction case SMALLINT => - new ShortMinAggregate + new ShortMinAggFunction case INTEGER => - new IntMinAggregate + new IntMinAggFunction case BIGINT => - new LongMinAggregate + new LongMinAggFunction case FLOAT => - new FloatMinAggregate + new FloatMinAggFunction case DOUBLE => - new DoubleMinAggregate + new DoubleMinAggFunction case DECIMAL => - new DecimalMinAggregate + new DecimalMinAggFunction case BOOLEAN => - new BooleanMinAggregate + new BooleanMinAggFunction case sqlType: SqlTypeName => throw new TableException("Min aggregate does no support type:" + sqlType) } } else { sqlTypeName match { case TINYINT => - new ByteMaxAggregate + new ByteMaxAggFunction case SMALLINT => - new ShortMaxAggregate + new ShortMaxAggFunction case INTEGER => - new IntMaxAggregate + new IntMaxAggFunction case BIGINT => - new LongMaxAggregate + new LongMaxAggFunction case FLOAT => - new FloatMaxAggregate + new FloatMaxAggFunction case DOUBLE => - new DoubleMaxAggregate + new DoubleMaxAggFunction case DECIMAL => - new DecimalMaxAggregate + new DecimalMaxAggFunction case BOOLEAN => - new BooleanMaxAggregate + new BooleanMaxAggFunction case sqlType: SqlTypeName => throw new TableException("Max aggregate does no support type:" + sqlType) } } } case _: SqlCountAggFunction => aggregates(index) = new CountAggregate + aggregates(index) = new CountAggFunction case unSupported: SqlAggFunction => throw new TableException("unsupported Function: " + unSupported.getName) } setAggregateDataOffset(index) } - // set the aggregate intermediate data start index in Row, and update current value. def setAggregateDataOffset(index: Int): Unit = { - aggregates(index).setAggOffsetInRow(aggOffset) - aggOffset += aggregates(index).intermediateDataType.length } (aggFieldIndexes, aggregates) } private def createAggregateBufferDataType( groupings: Array [Int] , aggregates: Array[Aggregate [_] ], inputType: RelDataType, windowKeyTypes: Option[Array[TypeInformation [_] ]] = None): RowTypeInfo = { + private def createDataSetAggregateBufferDataType( + groupings: Array [Int] , + aggregates: Array[TableAggregateFunction [_] ], + inputType: RelDataType, + windowKeyTypes: Option[Array[TypeInformation [_] ]] = None): RowTypeInfo = { // get the field data types of group keys. val groupingTypes: Seq[TypeInformation [_] ] = groupings .map(inputType.getFieldList.get(_).getType) .map(FlinkTypeFactory.toTypeInfo) + val groupingTypes: Seq[TypeInformation [_] ] = + groupings + .map(inputType.getFieldList.get(_).getType) + .map(FlinkTypeFactory.toTypeInfo) // get all field data types of all intermediate aggregates val aggTypes: Seq[TypeInformation [_] ] = aggregates.flatMap(_.intermediateDataType) + val aggTypes: Seq[TypeInformation [_] ] = aggregates.map { agg => + val clazz: Class [_] = agg.getClass End diff – Btw, @fhueske, I have not gotten a chance to dig into yet. But are you in case already aware of why we cannot get the correct scala type by TypeInformation? If we cannot fix this , I think we should consider to implement all built-in aggregate and aggregateTestBase in Java, and also recommend future UDAGG user to write aggregate in java but not in scala.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on the issue:

          https://github.com/apache/flink/pull/3423

          @fhueske I pushed a commit. Please take a look. Please note that this PR will close both FLINK5768 and FLINK5769. Thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3423 @fhueske I pushed a commit. Please take a look. Please note that this PR will close both FLINK5768 and FLINK5769. Thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3423

          thanks for the update @shaoxuan-wang. The PR looks good to merge.
          I do some final tests and run another build.

          Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3423 thanks for the update @shaoxuan-wang. The PR looks good to merge. I do some final tests and run another build. Thanks, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3423

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3423
          Hide
          fhueske Fabian Hueske added a comment -

          Implemented with 438276de8fab4f1a8f2b62b6452c2e5b2998ce5a

          Show
          fhueske Fabian Hueske added a comment - Implemented with 438276de8fab4f1a8f2b62b6452c2e5b2998ce5a

            People

            • Assignee:
              ShaoxuanWang Shaoxuan Wang
              Reporter:
              ShaoxuanWang Shaoxuan Wang
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development