Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6462

Add requiresOver interface for AggregateFunction

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0, 1.4.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      The window function in the standard database only supports the "over" window, such as `LAG`,` LEAD`, `FIRST_VALUE`,` LAST_VALUE`. These window functions do not apply to `Slide`,` Tumble`, `Session` window. So when the user-defined AGG need to be clearly defined. In calcite `SqlAggFunction` will using `requiresOver` to distinguish.
      In this JIRA. will deal with this feature.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user sunjincheng121 opened a pull request:

          https://github.com/apache/flink/pull/3851

          FLINK-6462 [table] Add requiresOver interface for AggregateFunction

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [x] General
          • The pull request references the related JIRA issue ("FLINK-6462 [table] Add requiresOver interface for AggregateFunction ")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [x] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/sunjincheng121/flink FLINK-6462-PR

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3851.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3851


          commit 9a474ebccd13f5a394c77a7e4d701fc16a3772f2
          Author: sunjincheng121 <sunjincheng121@gmail.com>
          Date: 2017-05-08T04:04:47Z

          FLINK-6462 [table] Add requiresOver interface for AggregateFunction


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3851 FLINK-6462 [table] Add requiresOver interface for AggregateFunction Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [x] General The pull request references the related JIRA issue (" FLINK-6462 [table] Add requiresOver interface for AggregateFunction ") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6462 -PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3851.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3851 commit 9a474ebccd13f5a394c77a7e4d701fc16a3772f2 Author: sunjincheng121 <sunjincheng121@gmail.com> Date: 2017-05-08T04:04:47Z FLINK-6462 [table] Add requiresOver interface for AggregateFunction
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3851#discussion_r116218286

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsTest.scala —
          @@ -0,0 +1,41 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.api.scala.batch.sql
          +
          +import org.apache.flink.api.scala._
          +import org.apache.flink.table.api.ValidationException
          +import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.OverAgg0
          +import org.apache.flink.table.api.scala._
          +import org.apache.flink.table.utils.TableTestBase
          +import org.junit.Test
          +
          +class AggregationsTest extends TableTestBase {
          +
          + @Test(expected = classOf[ValidationException])
          — End diff –

          Please add a brief comment to each test why it is expected to fail.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3851#discussion_r116218286 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsTest.scala — @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.api.scala.batch.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.OverAgg0 +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestBase +import org.junit.Test + +class AggregationsTest extends TableTestBase { + + @Test(expected = classOf [ValidationException] ) — End diff – Please add a brief comment to each test why it is expected to fail.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3851#discussion_r116211438

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala —
          @@ -53,8 +54,17 @@ class DataSetAggregateRule

          // check if we have distinct aggregates
          val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
          + if (distinctAggs)

          { + throw TableException("DISTINCT aggregates are currently not supported.") + }

          +
          + // check if we have over aggregates
          + val overAggs = agg.getAggCallList.exists(_.getAggregation.requiresOver())
          — End diff –

          use `filter` instead of `exists` to extract the functions that require OVER.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3851#discussion_r116211438 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala — @@ -53,8 +54,17 @@ class DataSetAggregateRule // check if we have distinct aggregates val distinctAggs = agg.getAggCallList.exists(_.isDistinct) + if (distinctAggs) { + throw TableException("DISTINCT aggregates are currently not supported.") + } + + // check if we have over aggregates + val overAggs = agg.getAggCallList.exists(_.getAggregation.requiresOver()) — End diff – use `filter` instead of `exists` to extract the functions that require OVER.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3851#discussion_r116212002

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala —
          @@ -53,8 +54,17 @@ class DataSetAggregateRule

          // check if we have distinct aggregates
          val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
          + if (distinctAggs) {
          + throw TableException("DISTINCT aggregates are currently not supported.")
          — End diff –

          We should not throw an exception here, but return with `false` from the `matches()` call.
          Otherwise the optimization terminates, although the query could be rewritten without distinct aggregates.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3851#discussion_r116212002 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala — @@ -53,8 +54,17 @@ class DataSetAggregateRule // check if we have distinct aggregates val distinctAggs = agg.getAggCallList.exists(_.isDistinct) + if (distinctAggs) { + throw TableException("DISTINCT aggregates are currently not supported.") — End diff – We should not throw an exception here, but return with `false` from the `matches()` call. Otherwise the optimization terminates, although the query could be rewritten without distinct aggregates.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3851#discussion_r116211558

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala —
          @@ -53,8 +54,17 @@ class DataSetAggregateRule

          // check if we have distinct aggregates
          val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
          + if (distinctAggs)

          { + throw TableException("DISTINCT aggregates are currently not supported.") + }

          +
          + // check if we have over aggregates
          + val overAggs = agg.getAggCallList.exists(_.getAggregation.requiresOver())
          + if (overAggs) {
          + throw TableException("OVER clause is necessary for requires over window functions")
          — End diff –

          Same for the other checks

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3851#discussion_r116211558 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala — @@ -53,8 +54,17 @@ class DataSetAggregateRule // check if we have distinct aggregates val distinctAggs = agg.getAggCallList.exists(_.isDistinct) + if (distinctAggs) { + throw TableException("DISTINCT aggregates are currently not supported.") + } + + // check if we have over aggregates + val overAggs = agg.getAggCallList.exists(_.getAggregation.requiresOver()) + if (overAggs) { + throw TableException("OVER clause is necessary for requires over window functions") — End diff – Same for the other checks
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3851#discussion_r116211870

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala —
          @@ -51,8 +52,17 @@ class DataSetAggregateWithNullValuesRule

          // check if we have distinct aggregates
          val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
          + if (distinctAggs) {
          + throw TableException("DISTINCT aggregates are currently not supported.")
          — End diff –

          We should not throw an exception here, but return with `false` from the `matches()` call.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3851#discussion_r116211870 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala — @@ -51,8 +52,17 @@ class DataSetAggregateWithNullValuesRule // check if we have distinct aggregates val distinctAggs = agg.getAggCallList.exists(_.isDistinct) + if (distinctAggs) { + throw TableException("DISTINCT aggregates are currently not supported.") — End diff – We should not throw an exception here, but return with `false` from the `matches()` call.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3851#discussion_r116215873

          — Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java —
          @@ -23,13 +23,37 @@
          import java.util.Iterator;

          public class UserDefinedAggFunctions {
          + // Accumulator for test requiresOver
          + public static class Accumulator0 extends Tuple2<Long, Integer>{}

          // Accumulator for WeightedAvg
          public static class WeightedAvgAccum extends Tuple2<Long, Integer>

          { public long sum = 0; public int count = 0; }

          + // Test for requiresOver
          + public static class OverAgg0 extends AggregateFunction<Long, Accumulator0> {
          — End diff –

          please reorder classes that the `AggregateFunction` class follows the accumulator class

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3851#discussion_r116215873 — Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java — @@ -23,13 +23,37 @@ import java.util.Iterator; public class UserDefinedAggFunctions { + // Accumulator for test requiresOver + public static class Accumulator0 extends Tuple2<Long, Integer>{} // Accumulator for WeightedAvg public static class WeightedAvgAccum extends Tuple2<Long, Integer> { public long sum = 0; public int count = 0; } + // Test for requiresOver + public static class OverAgg0 extends AggregateFunction<Long, Accumulator0> { — End diff – please reorder classes that the `AggregateFunction` class follows the accumulator class
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3851#discussion_r116217338

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsTest.scala —
          @@ -0,0 +1,38 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.api.scala.batch.table
          +
          +import org.apache.flink.api.scala._
          +import org.apache.flink.table.api.scala._
          +import org.apache.flink.table.api.TableException
          +import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.OverAgg0
          +import org.apache.flink.table.utils.TableTestBase
          +import org.junit.Test
          +
          +class AggregationsTest extends TableTestBase {
          +
          + @Test(expected = classOf[TableException])
          — End diff –

          can be moved to `AggregationsValidationTest`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3851#discussion_r116217338 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsTest.scala — @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.api.scala.batch.table + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.TableException +import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.OverAgg0 +import org.apache.flink.table.utils.TableTestBase +import org.junit.Test + +class AggregationsTest extends TableTestBase { + + @Test(expected = classOf [TableException] ) — End diff – can be moved to `AggregationsValidationTest`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3851#discussion_r116212175

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala —
          @@ -49,13 +49,19 @@ class DataStreamGroupAggregateRule
          throw TableException("DISTINCT aggregates are currently not supported.")
          }

          + // check if we have over aggregates
          + val overAggs = agg.getAggCallList.exists(_.getAggregation.requiresOver())
          + if (overAggs)

          { + throw TableException("OVER clause is necessary for requires over window functions") + }

          +
          // check if we have grouping sets
          val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
          if (groupSets || agg.indicator)

          { throw TableException("GROUPING SETS are currently not supported.") }
          • !distinctAggs && !groupSets && !agg.indicator
              • End diff –

          This check must not be removed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3851#discussion_r116212175 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala — @@ -49,13 +49,19 @@ class DataStreamGroupAggregateRule throw TableException("DISTINCT aggregates are currently not supported.") } + // check if we have over aggregates + val overAggs = agg.getAggCallList.exists(_.getAggregation.requiresOver()) + if (overAggs) { + throw TableException("OVER clause is necessary for requires over window functions") + } + // check if we have grouping sets val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet if (groupSets || agg.indicator) { throw TableException("GROUPING SETS are currently not supported.") } !distinctAggs && !groupSets && !agg.indicator End diff – This check must not be removed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3851#discussion_r116211361

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala —
          @@ -53,8 +54,17 @@ class DataSetAggregateRule

          // check if we have distinct aggregates
          val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
          + if (distinctAggs)

          { + throw TableException("DISTINCT aggregates are currently not supported.") + }

          +
          + // check if we have over aggregates
          + val overAggs = agg.getAggCallList.exists(_.getAggregation.requiresOver())
          + if (overAggs) {
          + throw TableException("OVER clause is necessary for requires over window functions")
          — End diff –

          The error message should list the aggregation functions that require OVER

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3851#discussion_r116211361 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala — @@ -53,8 +54,17 @@ class DataSetAggregateRule // check if we have distinct aggregates val distinctAggs = agg.getAggCallList.exists(_.isDistinct) + if (distinctAggs) { + throw TableException("DISTINCT aggregates are currently not supported.") + } + + // check if we have over aggregates + val overAggs = agg.getAggCallList.exists(_.getAggregation.requiresOver()) + if (overAggs) { + throw TableException("OVER clause is necessary for requires over window functions") — End diff – The error message should list the aggregation functions that require OVER
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3851#discussion_r116211695

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetWindowAggregateRule.scala —
          @@ -49,7 +49,13 @@ class DataSetWindowAggregateRule
          throw TableException("GROUPING SETS are currently not supported.")
          }

          • !distinctAggs && !groupSets && !agg.indicator
              • End diff –

          This check must not be removed. Instead we should return with `false` from `matches`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3851#discussion_r116211695 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetWindowAggregateRule.scala — @@ -49,7 +49,13 @@ class DataSetWindowAggregateRule throw TableException("GROUPING SETS are currently not supported.") } !distinctAggs && !groupSets && !agg.indicator End diff – This check must not be removed. Instead we should return with `false` from `matches`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3851#discussion_r116212331

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupWindowAggregateRule.scala —
          @@ -44,14 +44,19 @@ class DataStreamGroupWindowAggregateRule
          if (distinctAggs)

          { throw TableException("DISTINCT aggregates are currently not supported.") }

          + // check if we have over aggregates
          + val overAggs = agg.getAggCallList.exists(_.getAggregation.requiresOver())
          + if (overAggs)

          { + throw TableException("OVER clause is necessary for requires over window functions") + }

          // check if we have grouping sets
          val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
          if (groupSets || agg.indicator)

          { throw TableException("GROUPING SETS are currently not supported.") }
          • !distinctAggs && !groupSets && !agg.indicator
              • End diff –

          This check must not be removed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3851#discussion_r116212331 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupWindowAggregateRule.scala — @@ -44,14 +44,19 @@ class DataStreamGroupWindowAggregateRule if (distinctAggs) { throw TableException("DISTINCT aggregates are currently not supported.") } + // check if we have over aggregates + val overAggs = agg.getAggCallList.exists(_.getAggregation.requiresOver()) + if (overAggs) { + throw TableException("OVER clause is necessary for requires over window functions") + } // check if we have grouping sets val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet if (groupSets || agg.indicator) { throw TableException("GROUPING SETS are currently not supported.") } !distinctAggs && !groupSets && !agg.indicator End diff – This check must not be removed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3851

          Hi @fhueske, Thanks for the reviewing.
          I forgot to run the `mvn verify` at the first time. Sorry for that.
          I had update the PR. according your comments. Except one,i.e.:
          Move `scala/batch/table/AggregationsTest.scala#testOverAggregation` to `AggregationsValidationTest`. Because `AggregationsTest` should extends `TableTestBase`. And `AggregationsValidationTest` need not.
          What do you think?

          Thanks,
          SunJincheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3851 Hi @fhueske, Thanks for the reviewing. I forgot to run the `mvn verify` at the first time. Sorry for that. I had update the PR. according your comments. Except one,i.e.: Move `scala/batch/table/AggregationsTest.scala#testOverAggregation` to `AggregationsValidationTest`. Because `AggregationsTest` should extends `TableTestBase`. And `AggregationsValidationTest` need not. What do you think? Thanks, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3851#discussion_r116443377

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala —
          @@ -30,6 +30,23 @@ import org.junit.

          {Ignore, Test}

          class GroupWindowTest extends TableTestBase {

          + /**
          + * OVER clause is necessary for [[OverAgg0]] window function.
          + */
          + @Test(expected = classOf[TableException])
          + def testOverAggregation(): Unit = {
          + val util = streamTestUtil()
          + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
          +
          + val overAgg = new OverAgg0
          + val windowedTable = table
          + .window(Tumble over 2.rows on 'proctime as 'w)
          + .groupBy('w, 'string)
          + .select(overAgg('long, 'int))
          +
          + util.verifyTable(windowedTable, "n/a")
          — End diff –

          When the check is done in validation phase, we do not have to call `verifyTable()` (same for all other Table API tests).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3851#discussion_r116443377 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala — @@ -30,6 +30,23 @@ import org.junit. {Ignore, Test} class GroupWindowTest extends TableTestBase { + /** + * OVER clause is necessary for [ [OverAgg0] ] window function. + */ + @Test(expected = classOf [TableException] ) + def testOverAggregation(): Unit = { + val util = streamTestUtil() + val table = util.addTable [(Long, Int, String)] ('long, 'int, 'string, 'proctime.proctime) + + val overAgg = new OverAgg0 + val windowedTable = table + .window(Tumble over 2.rows on 'proctime as 'w) + .groupBy('w, 'string) + .select(overAgg('long, 'int)) + + util.verifyTable(windowedTable, "n/a") — End diff – When the check is done in validation phase, we do not have to call `verifyTable()` (same for all other Table API tests).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3851#discussion_r116442782

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala —
          @@ -50,6 +51,13 @@ class DataSetAggregateRule
          agg.getGroupSets.size() == 1)

          { return false }

          + // check if we have over aggregates
          + val overAggNames =
          — End diff –

          I thought about this again.
          The optimization rules are not the right place for this check and should not be modified.

          Instead the check should be moved to the Table API validation phase.
          SQL queries are automatically checked by Calcite's `SqlValidator`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3851#discussion_r116442782 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala — @@ -50,6 +51,13 @@ class DataSetAggregateRule agg.getGroupSets.size() == 1) { return false } + // check if we have over aggregates + val overAggNames = — End diff – I thought about this again. The optimization rules are not the right place for this check and should not be modified. Instead the check should be moved to the Table API validation phase. SQL queries are automatically checked by Calcite's `SqlValidator`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3851#discussion_r116444166

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala —
          @@ -31,6 +31,18 @@ class WindowAggregateTest extends TableTestBase {
          streamUtil.addTable[(Int, String, Long)](
          "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)

          + /**
          + * OVER clause is necessary for [[OverAgg0]] window function.
          + */
          + @Test(expected = classOf[ValidationException])
          + def testOverAggregation() = {
          + streamUtil.addFunction("overAgg", new OverAgg0)
          +
          + val sqlQuery = "SELECT overAgg(c, a) FROM MyTable"
          +
          + streamUtil.verifySql(sqlQuery, "n/a")
          — End diff –

          `streamUtil.tEnv.sql(sqlQuery)` is sufficient. We do not need to call the optimizer.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3851#discussion_r116444166 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala — @@ -31,6 +31,18 @@ class WindowAggregateTest extends TableTestBase { streamUtil.addTable [(Int, String, Long)] ( "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime) + /** + * OVER clause is necessary for [ [OverAgg0] ] window function. + */ + @Test(expected = classOf [ValidationException] ) + def testOverAggregation() = { + streamUtil.addFunction("overAgg", new OverAgg0) + + val sqlQuery = "SELECT overAgg(c, a) FROM MyTable" + + streamUtil.verifySql(sqlQuery, "n/a") — End diff – `streamUtil.tEnv.sql(sqlQuery)` is sufficient. We do not need to call the optimizer.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3851

          Thanks for the reviewing. @fhueske Move the check logic from `rule` into `Table API validation` phase make sense to me. That is `Table API validation` ensure the `Semantics` and `rule` ensure the `Runtime` can deal with the situation. The PR. is updated.

          Thanks,
          SunJincheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3851 Thanks for the reviewing. @fhueske Move the check logic from `rule` into `Table API validation` phase make sense to me. That is `Table API validation` ensure the `Semantics` and `rule` ensure the `Runtime` can deal with the situation. The PR. is updated. Thanks, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3851#discussion_r116592490

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala —
          @@ -215,14 +215,22 @@ case class Aggregate(
          }

          override def validate(tableEnv: TableEnvironment): LogicalNode = {
          -
          + implicit val relBuilder: RelBuilder = tableEnv.getRelBuilder
          val resolvedAggregate = super.validate(tableEnv).asInstanceOf[Aggregate]
          val groupingExprs = resolvedAggregate.groupingExpressions
          val aggregateExprs = resolvedAggregate.aggregateExpressions
          aggregateExprs.foreach(validateAggregateExpression)
          groupingExprs.foreach(validateGroupingExpression)

          def validateAggregateExpression(expr: Expression): Unit = expr match {
          + case Alias(child, _, _) => validateAggregateExpression(child)
          — End diff –

          rename `child` as `Aggregate` has a member `child` which might be confusing

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3851#discussion_r116592490 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala — @@ -215,14 +215,22 @@ case class Aggregate( } override def validate(tableEnv: TableEnvironment): LogicalNode = { - + implicit val relBuilder: RelBuilder = tableEnv.getRelBuilder val resolvedAggregate = super.validate(tableEnv).asInstanceOf [Aggregate] val groupingExprs = resolvedAggregate.groupingExpressions val aggregateExprs = resolvedAggregate.aggregateExpressions aggregateExprs.foreach(validateAggregateExpression) groupingExprs.foreach(validateGroupingExpression) def validateAggregateExpression(expr: Expression): Unit = expr match { + case Alias(child, _, _) => validateAggregateExpression(child) — End diff – rename `child` as `Aggregate` has a member `child` which might be confusing
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3851#discussion_r116592552

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala —
          @@ -584,13 +592,22 @@ case class WindowAggregate(
          }

          override def validate(tableEnv: TableEnvironment): LogicalNode = {
          + implicit val relBuilder: RelBuilder = tableEnv.getRelBuilder
          val resolvedWindowAggregate = super.validate(tableEnv).asInstanceOf[WindowAggregate]
          val groupingExprs = resolvedWindowAggregate.groupingExpressions
          val aggregateExprs = resolvedWindowAggregate.aggregateExpressions
          aggregateExprs.foreach(validateAggregateExpression)
          groupingExprs.foreach(validateGroupingExpression)

          def validateAggregateExpression(expr: Expression): Unit = expr match {
          + case Alias(child, _, _) => validateAggregateExpression(child)
          — End diff –

          Same comments as for `Aggregate` apply here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3851#discussion_r116592552 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala — @@ -584,13 +592,22 @@ case class WindowAggregate( } override def validate(tableEnv: TableEnvironment): LogicalNode = { + implicit val relBuilder: RelBuilder = tableEnv.getRelBuilder val resolvedWindowAggregate = super.validate(tableEnv).asInstanceOf [WindowAggregate] val groupingExprs = resolvedWindowAggregate.groupingExpressions val aggregateExprs = resolvedWindowAggregate.aggregateExpressions aggregateExprs.foreach(validateAggregateExpression) groupingExprs.foreach(validateGroupingExpression) def validateAggregateExpression(expr: Expression): Unit = expr match { + case Alias(child, _, _) => validateAggregateExpression(child) — End diff – Same comments as for `Aggregate` apply here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3851#discussion_r116595353

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala —
          @@ -31,6 +31,17 @@ class WindowAggregateTest extends TableTestBase {
          streamUtil.addTable[(Int, String, Long)](
          "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)

          + /**
          + * OVER clause is necessary for [[OverAgg0]] window function.
          + */
          + @Test(expected = classOf[ValidationException])
          + def testOverAggregation() :Unit = {
          — End diff –

          `testOverAggregation(): Unit = {`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3851#discussion_r116595353 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala — @@ -31,6 +31,17 @@ class WindowAggregateTest extends TableTestBase { streamUtil.addTable [(Int, String, Long)] ( "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime) + /** + * OVER clause is necessary for [ [OverAgg0] ] window function. + */ + @Test(expected = classOf [ValidationException] ) + def testOverAggregation() :Unit = { — End diff – `testOverAggregation(): Unit = {`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3851#discussion_r116593574

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala —
          @@ -215,14 +215,22 @@ case class Aggregate(
          }

          override def validate(tableEnv: TableEnvironment): LogicalNode = {
          -
          + implicit val relBuilder: RelBuilder = tableEnv.getRelBuilder
          val resolvedAggregate = super.validate(tableEnv).asInstanceOf[Aggregate]
          val groupingExprs = resolvedAggregate.groupingExpressions
          val aggregateExprs = resolvedAggregate.aggregateExpressions
          aggregateExprs.foreach(validateAggregateExpression)
          groupingExprs.foreach(validateGroupingExpression)

          def validateAggregateExpression(expr: Expression): Unit = expr match {
          + case Alias(child, _, _) => validateAggregateExpression(child)
          + // check user-defined aggregate function
          + case AggFunctionCall(agg, _) if agg.requiresOver =>
          — End diff –

          Check this case also as
          ```
          case aggExpr: Aggregation if aggExpr.getSqlAggFunction.requiresOver =>
          ```

          in case we add built-in Aggregations that require Over.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3851#discussion_r116593574 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala — @@ -215,14 +215,22 @@ case class Aggregate( } override def validate(tableEnv: TableEnvironment): LogicalNode = { - + implicit val relBuilder: RelBuilder = tableEnv.getRelBuilder val resolvedAggregate = super.validate(tableEnv).asInstanceOf [Aggregate] val groupingExprs = resolvedAggregate.groupingExpressions val aggregateExprs = resolvedAggregate.aggregateExpressions aggregateExprs.foreach(validateAggregateExpression) groupingExprs.foreach(validateGroupingExpression) def validateAggregateExpression(expr: Expression): Unit = expr match { + case Alias(child, _, _) => validateAggregateExpression(child) + // check user-defined aggregate function + case AggFunctionCall(agg, _) if agg.requiresOver => — End diff – Check this case also as ``` case aggExpr: Aggregation if aggExpr.getSqlAggFunction.requiresOver => ``` in case we add built-in Aggregations that require Over.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3851#discussion_r116595446

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala —
          @@ -20,13 +20,25 @@ package org.apache.flink.table.api.scala.batch.table.validation

          import org.apache.flink.api.scala._
          import org.apache.flink.api.scala.util.CollectionDataSets
          -import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMergeAndReset
          +import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.

          {OverAgg0, WeightedAvgWithMergeAndReset}

          import org.apache.flink.table.api.scala._
          import org.apache.flink.table.api.

          {TableEnvironment, ValidationException}

          import org.junit._

          class AggregationsValidationTest {

          + /**
          + * OVER clause is necessary for [[OverAgg0]] window function.
          + */
          + @Test(expected = classOf[ValidationException])
          + def testOverAggregation(): Unit = {
          + val env= ExecutionEnvironment.getExecutionEnvironment
          — End diff –

          +space `val env = ExecutionEnvironment...`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3851#discussion_r116595446 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala — @@ -20,13 +20,25 @@ package org.apache.flink.table.api.scala.batch.table.validation import org.apache.flink.api.scala._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMergeAndReset +import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions. {OverAgg0, WeightedAvgWithMergeAndReset} import org.apache.flink.table.api.scala._ import org.apache.flink.table.api. {TableEnvironment, ValidationException} import org.junit._ class AggregationsValidationTest { + /** + * OVER clause is necessary for [ [OverAgg0] ] window function. + */ + @Test(expected = classOf [ValidationException] ) + def testOverAggregation(): Unit = { + val env= ExecutionEnvironment.getExecutionEnvironment — End diff – +space `val env = ExecutionEnvironment...`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3851#discussion_r116592899

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala —
          @@ -215,14 +215,22 @@ case class Aggregate(
          }

          override def validate(tableEnv: TableEnvironment): LogicalNode = {
          -
          + implicit val relBuilder: RelBuilder = tableEnv.getRelBuilder
          val resolvedAggregate = super.validate(tableEnv).asInstanceOf[Aggregate]
          val groupingExprs = resolvedAggregate.groupingExpressions
          val aggregateExprs = resolvedAggregate.aggregateExpressions
          aggregateExprs.foreach(validateAggregateExpression)
          groupingExprs.foreach(validateGroupingExpression)

          def validateAggregateExpression(expr: Expression): Unit = expr match {
          + case Alias(child, _, _) => validateAggregateExpression(child)
          — End diff –

          Isn't this case caught by the last case `case e => e.children.foreach(validateAggregateExpression)`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3851#discussion_r116592899 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala — @@ -215,14 +215,22 @@ case class Aggregate( } override def validate(tableEnv: TableEnvironment): LogicalNode = { - + implicit val relBuilder: RelBuilder = tableEnv.getRelBuilder val resolvedAggregate = super.validate(tableEnv).asInstanceOf [Aggregate] val groupingExprs = resolvedAggregate.groupingExpressions val aggregateExprs = resolvedAggregate.aggregateExpressions aggregateExprs.foreach(validateAggregateExpression) groupingExprs.foreach(validateGroupingExpression) def validateAggregateExpression(expr: Expression): Unit = expr match { + case Alias(child, _, _) => validateAggregateExpression(child) — End diff – Isn't this case caught by the last case `case e => e.children.foreach(validateAggregateExpression)`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3851

          Hi @fhueske Thank you very much.
          Honestly, I need more review by myself which can saving the time of other reviewer.
          Best,
          SunJincheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3851 Hi @fhueske Thank you very much. Honestly, I need more review by myself which can saving the time of other reviewer. Best, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3851

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3851
          Hide
          fhueske Fabian Hueske added a comment -

          Fixed for 1.3.0 with 629d3633bcc458dc4ba5e660f48ed42b1a90b834
          Fixed for 1.4.0 with 7e98910fcdc6fdd149969103d43ab54b8f10fde1

          Show
          fhueske Fabian Hueske added a comment - Fixed for 1.3.0 with 629d3633bcc458dc4ba5e660f48ed42b1a90b834 Fixed for 1.4.0 with 7e98910fcdc6fdd149969103d43ab54b8f10fde1

            People

            • Assignee:
              sunjincheng121 sunjincheng
              Reporter:
              sunjincheng121 sunjincheng
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development