Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      With the new UDAGG interface we do not need the preparation mapper anymore. It adds overhead because

      • it is another operator
      • it prevents to use AggregateFunction.accumulate() in a combiner or reducer.

      Hence, it should be removed.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3472

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3472
          Hide
          twalthr Timo Walther added a comment -

          Fixed in 1.3.0: c8e8f31b01915a4b9b49adbb4c094f4ae268b6d3

          Show
          twalthr Timo Walther added a comment - Fixed in 1.3.0: c8e8f31b01915a4b9b49adbb4c094f4ae268b6d3
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3472

          Thanks @fhueske. I will rebase this and replace foreach with while loops, and merge it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3472 Thanks @fhueske. I will rebase this and replace foreach with while loops, and merge it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3472

          Thanks for the reviews @shaoxuan-wang and @sunjincheng121.
          Will merge this PR

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3472 Thanks for the reviews @shaoxuan-wang and @sunjincheng121. Will merge this PR
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3472

          Hi @sunjincheng121, thanks for your review. I addressed you comments.
          Cheers, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3472 Hi @sunjincheng121, thanks for your review. I addressed you comments. Cheers, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3472#discussion_r104298836

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala —
          @@ -0,0 +1,87 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions._
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * [[GroupCombineFunction]] and [[MapPartitionFunction]] to compute pre-aggregates for batch
          + * (DataSet) queries.
          + *
          + * @param aggregates The aggregate functions.
          + * @param aggInFields The positions of the aggregation input fields.
          + * @param groupingKeys The positions of the grouping keys in the input.
          + */
          +class DataSetPreAggFunction(
          + private val aggregates: Array[AggregateFunction[_ <: Any]],
          + private val aggInFields: Array[Int],
          + private val groupingKeys: Array[Int])
          + extends AbstractRichFunction
          + with GroupCombineFunction[Row, Row]
          — End diff –

          OK

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3472#discussion_r104298836 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala — @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * [ [GroupCombineFunction] ] and [ [MapPartitionFunction] ] to compute pre-aggregates for batch + * (DataSet) queries. + * + * @param aggregates The aggregate functions. + * @param aggInFields The positions of the aggregation input fields. + * @param groupingKeys The positions of the grouping keys in the input. + */ +class DataSetPreAggFunction( + private val aggregates: Array[AggregateFunction [_ <: Any] ], + private val aggInFields: Array [Int] , + private val groupingKeys: Array [Int] ) + extends AbstractRichFunction + with GroupCombineFunction [Row, Row] — End diff – OK
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3472#discussion_r104298833

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala —
          @@ -87,47 +89,67 @@ class DataSetAggregate(

          override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {

          • val groupingKeys = grouping.indices.toArray
            -
          • val mapFunction = AggregateUtil.createPrepareMapFunction(
          • namedAggregates,
          • grouping,
          • inputType)
            -
          • val groupReduceFunction = AggregateUtil.createAggregateGroupReduceFunction(
          • namedAggregates,
          • inputType,
          • rowRelDataType,
          • grouping,
          • inGroupingSet)
            + val (preAgg: Option[DataSetPreAggFunction],
            + preAggType: Option[TypeInformation[Row]],
            + finalAgg: GroupReduceFunction[Row, Row]) =
              • End diff –

          OK

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3472#discussion_r104298833 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala — @@ -87,47 +89,67 @@ class DataSetAggregate( override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet [Row] = { val groupingKeys = grouping.indices.toArray - val mapFunction = AggregateUtil.createPrepareMapFunction( namedAggregates, grouping, inputType) - val groupReduceFunction = AggregateUtil.createAggregateGroupReduceFunction( namedAggregates, inputType, rowRelDataType, grouping, inGroupingSet) + val (preAgg: Option [DataSetPreAggFunction] , + preAggType: Option[TypeInformation [Row] ], + finalAgg: GroupReduceFunction [Row, Row] ) = End diff – OK
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3472#discussion_r104298822

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala —
          @@ -0,0 +1,103 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions.RichGroupReduceFunction
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * [[RichGroupReduceFunction]] to compute aggregates that do not support preaggregation for batch
          + * (DataSet) queries.
          + *
          + * @param aggregates The aggregate functions.
          + * @param aggInFields The positions of the aggregation input fields.
          + * @param gkeyOutMapping The mapping of group keys between input and output positions.
          + * @param aggOutMapping The mapping of aggregates to output positions.
          + * @param groupingSetsMapping The mapping of grouping set keys between input and output positions.
          + * @param finalRowArity The arity of the final resulting row.
          + */
          +class DataSetAggFunction(
          + private val aggregates: Array[AggregateFunction[_ <: Any]],
          + private val aggInFields: Array[Int],
          + private val aggOutMapping: Array[(Int, Int)],
          + private val gkeyOutMapping: Array[(Int, Int)],
          + private val groupingSetsMapping: Array[(Int, Int)],
          + private val finalRowArity: Int)
          + extends RichGroupReduceFunction[Row, Row] {
          +
          + private var output: Row = _
          +
          + private var intermediateGKeys: Option[Array[Int]] = None
          + private val aggsWithIdx: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex
          + private var accumulators: Array[Accumulator] = _
          +
          + override def open(config: Configuration) {
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggInFields)
          + Preconditions.checkNotNull(aggOutMapping)
          + Preconditions.checkNotNull(gkeyOutMapping)
          + accumulators = new Array(aggregates.length)
          — End diff –

          good point! thanks

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3472#discussion_r104298822 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala — @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * [ [RichGroupReduceFunction] ] to compute aggregates that do not support preaggregation for batch + * (DataSet) queries. + * + * @param aggregates The aggregate functions. + * @param aggInFields The positions of the aggregation input fields. + * @param gkeyOutMapping The mapping of group keys between input and output positions. + * @param aggOutMapping The mapping of aggregates to output positions. + * @param groupingSetsMapping The mapping of grouping set keys between input and output positions. + * @param finalRowArity The arity of the final resulting row. + */ +class DataSetAggFunction( + private val aggregates: Array[AggregateFunction [_ <: Any] ], + private val aggInFields: Array [Int] , + private val aggOutMapping: Array [(Int, Int)] , + private val gkeyOutMapping: Array [(Int, Int)] , + private val groupingSetsMapping: Array [(Int, Int)] , + private val finalRowArity: Int) + extends RichGroupReduceFunction [Row, Row] { + + private var output: Row = _ + + private var intermediateGKeys: Option[Array [Int] ] = None + private val aggsWithIdx: Array[(AggregateFunction [_] , Int)] = aggregates.zipWithIndex + private var accumulators: Array [Accumulator] = _ + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggInFields) + Preconditions.checkNotNull(aggOutMapping) + Preconditions.checkNotNull(gkeyOutMapping) + accumulators = new Array(aggregates.length) — End diff – good point! thanks
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3472#discussion_r104292001

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala —
          @@ -0,0 +1,87 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions._
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * [[GroupCombineFunction]] and [[MapPartitionFunction]] to compute pre-aggregates for batch
          + * (DataSet) queries.
          + *
          + * @param aggregates The aggregate functions.
          + * @param aggInFields The positions of the aggregation input fields.
          + * @param groupingKeys The positions of the grouping keys in the input.
          + */
          +class DataSetPreAggFunction(
          + private val aggregates: Array[AggregateFunction[_ <: Any]],
          + private val aggInFields: Array[Int],
          + private val groupingKeys: Array[Int])
          + extends AbstractRichFunction
          + with GroupCombineFunction[Row, Row]
          — End diff –

          Can we align with and extend?
          The suggestion just ref to: [Scalar Class/Object/Trait constructors declarations ](http://docs.scala-lang.org/style/declarations)

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3472#discussion_r104292001 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala — @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * [ [GroupCombineFunction] ] and [ [MapPartitionFunction] ] to compute pre-aggregates for batch + * (DataSet) queries. + * + * @param aggregates The aggregate functions. + * @param aggInFields The positions of the aggregation input fields. + * @param groupingKeys The positions of the grouping keys in the input. + */ +class DataSetPreAggFunction( + private val aggregates: Array[AggregateFunction [_ <: Any] ], + private val aggInFields: Array [Int] , + private val groupingKeys: Array [Int] ) + extends AbstractRichFunction + with GroupCombineFunction [Row, Row] — End diff – Can we align with and extend? The suggestion just ref to: [Scalar Class/Object/Trait constructors declarations ] ( http://docs.scala-lang.org/style/declarations )
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3472#discussion_r104290608

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala —
          @@ -0,0 +1,103 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions.RichGroupReduceFunction
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * [[RichGroupReduceFunction]] to compute aggregates that do not support preaggregation for batch
          + * (DataSet) queries.
          + *
          + * @param aggregates The aggregate functions.
          + * @param aggInFields The positions of the aggregation input fields.
          + * @param gkeyOutMapping The mapping of group keys between input and output positions.
          + * @param aggOutMapping The mapping of aggregates to output positions.
          + * @param groupingSetsMapping The mapping of grouping set keys between input and output positions.
          + * @param finalRowArity The arity of the final resulting row.
          + */
          +class DataSetAggFunction(
          + private val aggregates: Array[AggregateFunction[_ <: Any]],
          + private val aggInFields: Array[Int],
          + private val aggOutMapping: Array[(Int, Int)],
          + private val gkeyOutMapping: Array[(Int, Int)],
          + private val groupingSetsMapping: Array[(Int, Int)],
          + private val finalRowArity: Int)
          + extends RichGroupReduceFunction[Row, Row] {
          +
          + private var output: Row = _
          +
          + private var intermediateGKeys: Option[Array[Int]] = None
          + private val aggsWithIdx: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex
          + private var accumulators: Array[Accumulator] = _
          +
          + override def open(config: Configuration) {
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggInFields)
          + Preconditions.checkNotNull(aggOutMapping)
          + Preconditions.checkNotNull(gkeyOutMapping)
          + accumulators = new Array(aggregates.length)
          — End diff –

          Can we move those checks into the constructor ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3472#discussion_r104290608 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala — @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * [ [RichGroupReduceFunction] ] to compute aggregates that do not support preaggregation for batch + * (DataSet) queries. + * + * @param aggregates The aggregate functions. + * @param aggInFields The positions of the aggregation input fields. + * @param gkeyOutMapping The mapping of group keys between input and output positions. + * @param aggOutMapping The mapping of aggregates to output positions. + * @param groupingSetsMapping The mapping of grouping set keys between input and output positions. + * @param finalRowArity The arity of the final resulting row. + */ +class DataSetAggFunction( + private val aggregates: Array[AggregateFunction [_ <: Any] ], + private val aggInFields: Array [Int] , + private val aggOutMapping: Array [(Int, Int)] , + private val gkeyOutMapping: Array [(Int, Int)] , + private val groupingSetsMapping: Array [(Int, Int)] , + private val finalRowArity: Int) + extends RichGroupReduceFunction [Row, Row] { + + private var output: Row = _ + + private var intermediateGKeys: Option[Array [Int] ] = None + private val aggsWithIdx: Array[(AggregateFunction [_] , Int)] = aggregates.zipWithIndex + private var accumulators: Array [Accumulator] = _ + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggInFields) + Preconditions.checkNotNull(aggOutMapping) + Preconditions.checkNotNull(gkeyOutMapping) + accumulators = new Array(aggregates.length) — End diff – Can we move those checks into the constructor ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3472#discussion_r104290767

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala —
          @@ -87,47 +89,67 @@ class DataSetAggregate(

          override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {

          • val groupingKeys = grouping.indices.toArray
            -
          • val mapFunction = AggregateUtil.createPrepareMapFunction(
          • namedAggregates,
          • grouping,
          • inputType)
            -
          • val groupReduceFunction = AggregateUtil.createAggregateGroupReduceFunction(
          • namedAggregates,
          • inputType,
          • rowRelDataType,
          • grouping,
          • inGroupingSet)
            + val (preAgg: Option[DataSetPreAggFunction],
            + preAggType: Option[TypeInformation[Row]],
            + finalAgg: GroupReduceFunction[Row, Row]) =
              • End diff –

          Can we format the code like this:

          ```
          val (
          preAgg: Option[DataSetPreAggFunction],
          preAggType: Option[TypeInformation[Row]],
          finalAgg: GroupReduceFunction[Row, Row]) =
          ```
          I am not sure. just a suggestion.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3472#discussion_r104290767 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala — @@ -87,47 +89,67 @@ class DataSetAggregate( override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet [Row] = { val groupingKeys = grouping.indices.toArray - val mapFunction = AggregateUtil.createPrepareMapFunction( namedAggregates, grouping, inputType) - val groupReduceFunction = AggregateUtil.createAggregateGroupReduceFunction( namedAggregates, inputType, rowRelDataType, grouping, inGroupingSet) + val (preAgg: Option [DataSetPreAggFunction] , + preAggType: Option[TypeInformation [Row] ], + finalAgg: GroupReduceFunction [Row, Row] ) = End diff – Can we format the code like this: ``` val ( preAgg: Option [DataSetPreAggFunction] , preAggType: Option[TypeInformation [Row] ], finalAgg: GroupReduceFunction [Row, Row] ) = ``` I am not sure. just a suggestion.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3472#discussion_r104282353

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala —
          @@ -87,47 +89,67 @@ class DataSetAggregate(

          override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {

          • val groupingKeys = grouping.indices.toArray
            -
          • val mapFunction = AggregateUtil.createPrepareMapFunction(
          • namedAggregates,
          • grouping,
          • inputType)
            -
          • val groupReduceFunction = AggregateUtil.createAggregateGroupReduceFunction(
          • namedAggregates,
          • inputType,
          • rowRelDataType,
          • grouping,
          • inGroupingSet)
            + val (preAgg: Option[DataSetPreAggFunction],
            + preAggType: Option[TypeInformation[Row]],
            + finalAgg: GroupReduceFunction[Row, Row]) =
            + AggregateUtil.createDataSetAggregateFunctions(
            + namedAggregates,
            + inputType,
            + rowRelDataType,
            + grouping,
            + inGroupingSet)

          val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)

          val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil)

          • val prepareOpName = s"prepare select: ($aggString)"
          • val mappedInput = inputDS.map(mapFunction).name(prepareOpName)

          val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]

          • if (groupingKeys.length > 0) {
            + if (grouping.length > 0) {
            // grouped aggregation
            val aggOpName = s"groupBy: ($ {groupingToString(inputType, grouping)}

            ), " +
            s"select: ($aggString)"

          • mappedInput.asInstanceOf[DataSet[Row]]
          • .groupBy(groupingKeys: _*)
          • .reduceGroup(groupReduceFunction)
          • .returns(rowTypeInfo)
          • .name(aggOpName)
            + if (preAgg.isDefined) {
            + inputDS
            + // pre-aggregation
            + .groupBy(grouping: _*)
            + .combineGroup(preAgg.get)
            + .returns(preAggType.get)
            + .name(aggOpName)
            + // final aggregation
            + .groupBy(grouping.indices: _*)
              • End diff –

          The pre-aggregation function modifies the schema of rows. It puts all grouping keys first, followed by all accumulators. Therefore, the following final aggregation needs to group on the first `n` fields.
          Before, the prepare mapper changed the layout.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3472#discussion_r104282353 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala — @@ -87,47 +89,67 @@ class DataSetAggregate( override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet [Row] = { val groupingKeys = grouping.indices.toArray - val mapFunction = AggregateUtil.createPrepareMapFunction( namedAggregates, grouping, inputType) - val groupReduceFunction = AggregateUtil.createAggregateGroupReduceFunction( namedAggregates, inputType, rowRelDataType, grouping, inGroupingSet) + val (preAgg: Option [DataSetPreAggFunction] , + preAggType: Option[TypeInformation [Row] ], + finalAgg: GroupReduceFunction [Row, Row] ) = + AggregateUtil.createDataSetAggregateFunctions( + namedAggregates, + inputType, + rowRelDataType, + grouping, + inGroupingSet) val inputDS = getInput.asInstanceOf [DataSetRel] .translateToPlan(tableEnv) val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil) val prepareOpName = s"prepare select: ($aggString)" val mappedInput = inputDS.map(mapFunction).name(prepareOpName) val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf [RowTypeInfo] if (groupingKeys.length > 0) { + if (grouping.length > 0) { // grouped aggregation val aggOpName = s"groupBy: ($ {groupingToString(inputType, grouping)} ), " + s"select: ($aggString)" mappedInput.asInstanceOf[DataSet [Row] ] .groupBy(groupingKeys: _*) .reduceGroup(groupReduceFunction) .returns(rowTypeInfo) .name(aggOpName) + if (preAgg.isDefined) { + inputDS + // pre-aggregation + .groupBy(grouping: _*) + .combineGroup(preAgg.get) + .returns(preAggType.get) + .name(aggOpName) + // final aggregation + .groupBy(grouping.indices: _*) End diff – The pre-aggregation function modifies the schema of rows. It puts all grouping keys first, followed by all accumulators. Therefore, the following final aggregation needs to group on the first `n` fields. Before, the prepare mapper changed the layout.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3472#discussion_r104282131

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala —
          @@ -87,47 +89,67 @@ class DataSetAggregate(

          override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {

          • val groupingKeys = grouping.indices.toArray
            -
          • val mapFunction = AggregateUtil.createPrepareMapFunction(
          • namedAggregates,
          • grouping,
          • inputType)
            -
          • val groupReduceFunction = AggregateUtil.createAggregateGroupReduceFunction(
          • namedAggregates,
          • inputType,
          • rowRelDataType,
          • grouping,
          • inGroupingSet)
            + val (preAgg: Option[DataSetPreAggFunction],
            + preAggType: Option[TypeInformation[Row]],
            + finalAgg: GroupReduceFunction[Row, Row]) =
            + AggregateUtil.createDataSetAggregateFunctions(
            + namedAggregates,
            + inputType,
            + rowRelDataType,
            + grouping,
            + inGroupingSet)

          val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)

          val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil)

          • val prepareOpName = s"prepare select: ($aggString)"
          • val mappedInput = inputDS.map(mapFunction).name(prepareOpName)

          val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]

          • if (groupingKeys.length > 0) {
            + if (grouping.length > 0) {
            // grouped aggregation
            val aggOpName = s"groupBy: ($ {groupingToString(inputType, grouping)}

            ), " +
            s"select: ($aggString)"

          • mappedInput.asInstanceOf[DataSet[Row]]
          • .groupBy(groupingKeys: _*)
          • .reduceGroup(groupReduceFunction)
          • .returns(rowTypeInfo)
          • .name(aggOpName)
            + if (preAgg.isDefined) {
            + inputDS
            + // pre-aggregation
            + .groupBy(grouping: _*)
            + .combineGroup(preAgg.get)
            + .returns(preAggType.get)
            + .name(aggOpName)
            + // final aggregation
            + .groupBy(grouping.indices: _*)
              • End diff –

          I played around this PR with different test modes, everything works very well. Just curious why you use grouping.indices as the grouping key here?

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3472#discussion_r104282131 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala — @@ -87,47 +89,67 @@ class DataSetAggregate( override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet [Row] = { val groupingKeys = grouping.indices.toArray - val mapFunction = AggregateUtil.createPrepareMapFunction( namedAggregates, grouping, inputType) - val groupReduceFunction = AggregateUtil.createAggregateGroupReduceFunction( namedAggregates, inputType, rowRelDataType, grouping, inGroupingSet) + val (preAgg: Option [DataSetPreAggFunction] , + preAggType: Option[TypeInformation [Row] ], + finalAgg: GroupReduceFunction [Row, Row] ) = + AggregateUtil.createDataSetAggregateFunctions( + namedAggregates, + inputType, + rowRelDataType, + grouping, + inGroupingSet) val inputDS = getInput.asInstanceOf [DataSetRel] .translateToPlan(tableEnv) val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil) val prepareOpName = s"prepare select: ($aggString)" val mappedInput = inputDS.map(mapFunction).name(prepareOpName) val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf [RowTypeInfo] if (groupingKeys.length > 0) { + if (grouping.length > 0) { // grouped aggregation val aggOpName = s"groupBy: ($ {groupingToString(inputType, grouping)} ), " + s"select: ($aggString)" mappedInput.asInstanceOf[DataSet [Row] ] .groupBy(groupingKeys: _*) .reduceGroup(groupReduceFunction) .returns(rowTypeInfo) .name(aggOpName) + if (preAgg.isDefined) { + inputDS + // pre-aggregation + .groupBy(grouping: _*) + .combineGroup(preAgg.get) + .returns(preAggType.get) + .name(aggOpName) + // final aggregation + .groupBy(grouping.indices: _*) End diff – I played around this PR with different test modes, everything works very well. Just curious why you use grouping.indices as the grouping key here?
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user fhueske opened a pull request:

          https://github.com/apache/flink/pull/3472

          FLINK-5963 [table] Remove prepare mapper of DataSetAggregate.

          We remove the preparation mapper to

          • save one operator
          • be able to apply `AggregateFunction.accumulate()` in a `GroupCombineFunction` or `MapPartitionFunction` for pre-aggregation or in a `GroupReduceFunction` for final aggregation.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/fhueske/flink tableRmPrepareMap

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3472.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3472



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/3472 FLINK-5963 [table] Remove prepare mapper of DataSetAggregate. We remove the preparation mapper to save one operator be able to apply `AggregateFunction.accumulate()` in a `GroupCombineFunction` or `MapPartitionFunction` for pre-aggregation or in a `GroupReduceFunction` for final aggregation. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink tableRmPrepareMap Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3472.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3472

            People

            • Assignee:
              fhueske Fabian Hueske
              Reporter:
              fhueske Fabian Hueske
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development