Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3475

DISTINCT aggregate function support for SQL queries

    Details

    • Type: New Feature
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      DISTINCT aggregate function may be able to reuse the aggregate function instead of separate implementation, and let Flink runtime take care of duplicate records.

        Issue Links

          Activity

          Hide
          fhueske Fabian Hueske added a comment - - edited

          DISTINCT aggregates should be computed the following way:

          Given a LogicalAggregate with one or more DISTINCT aggregates we should translate this into the following DataSet (DS) plan:

             /-> Map(non-distinct)-> Combine()-> GRed(agg)-----------------\
          IN --> Map(dist1)-> LocDist-> GlobDist-> Map-> GRed(agg)--> Join--> Join-> OUT
             \-> Map(dist2)-> LocDist-> GlobDist-> Map-> GRed(agg)-/
          

          The distinct aggregates are split off into individual aggregates which are individually computed.
          The operators do the following:

          • First Map: The mappers split the input. All non-distinct and each distinct aggregate become an own pipeline. The mappers for the distinct aggregates project to the relevant fields. The mapper for the non-distinct aggregates prepares the aggregation (as before).
          • LocalDist: Local distinct execute as a Combiner on the grouping key(s) + the distinct attribute.
          • GlobDist: Global distinct executed as a GroupReduce by (manually) partitioning on the grouping key (partitionByHash) and sorting on the grouping key + the distinct attribute (sortPartition).
          • Second Map: prepares the distinct aggregations
          • GRed: GroupReduce on grouping keys. Since we hash partitioned only on grouping key and sorted on a superset of the grouping keys, we do not need to repartition and sort again and the aggregation becomes a pipelined operator.
          • Join: Since all aggregates are equally partitioned and have compatible sort orders, the joins can be done as a simple merge without additional partitioning and sorting.

          The overall plan is a bit complex but should be fairly efficient to execute since we can do some pre-aggregation and reuse many physical properties for the distinct aggregations and joins. In order to make this work we must correctly annotate the user functions with semantic forward field annotations.

          Show
          fhueske Fabian Hueske added a comment - - edited DISTINCT aggregates should be computed the following way: Given a LogicalAggregate with one or more DISTINCT aggregates we should translate this into the following DataSet (DS) plan: /-> Map(non-distinct)-> Combine()-> GRed(agg)-----------------\ IN --> Map(dist1)-> LocDist-> GlobDist-> Map-> GRed(agg)--> Join--> Join-> OUT \-> Map(dist2)-> LocDist-> GlobDist-> Map-> GRed(agg)-/ The distinct aggregates are split off into individual aggregates which are individually computed. The operators do the following: First Map: The mappers split the input. All non-distinct and each distinct aggregate become an own pipeline. The mappers for the distinct aggregates project to the relevant fields. The mapper for the non-distinct aggregates prepares the aggregation (as before). LocalDist: Local distinct execute as a Combiner on the grouping key(s) + the distinct attribute. GlobDist: Global distinct executed as a GroupReduce by (manually) partitioning on the grouping key ( partitionByHash ) and sorting on the grouping key + the distinct attribute ( sortPartition ). Second Map: prepares the distinct aggregations GRed: GroupReduce on grouping keys. Since we hash partitioned only on grouping key and sorted on a superset of the grouping keys, we do not need to repartition and sort again and the aggregation becomes a pipelined operator. Join: Since all aggregates are equally partitioned and have compatible sort orders, the joins can be done as a simple merge without additional partitioning and sorting. The overall plan is a bit complex but should be fairly efficient to execute since we can do some pre-aggregation and reuse many physical properties for the distinct aggregations and joins. In order to make this work we must correctly annotate the user functions with semantic forward field annotations.
          Hide
          fhueske Fabian Hueske added a comment -

          I'm unassigning this issue. Chengxiang Li, please reassign this issue to you, if you would like to work on it. Thank you!

          Show
          fhueske Fabian Hueske added a comment - I'm unassigning this issue. Chengxiang Li , please reassign this issue to you, if you would like to work on it. Thank you!
          Hide
          ykt836 Kurt Young added a comment -

          We are willing to offer some helps on this issue, Fabian Hueske Chengxiang Li Is this ok for both of you?

          Show
          ykt836 Kurt Young added a comment - We are willing to offer some helps on this issue, Fabian Hueske Chengxiang Li Is this ok for both of you?
          Hide
          fhueske Fabian Hueske added a comment -

          Hi Kurt Young, sure. That's fine with me and would be great!
          I don't have time to work on it myself and just wanted to sketch a possible design.
          Please let me know if you have questions about it or if you have other suggestions.

          Show
          fhueske Fabian Hueske added a comment - Hi Kurt Young , sure. That's fine with me and would be great! I don't have time to work on it myself and just wanted to sketch a possible design. Please let me know if you have questions about it or if you have other suggestions.
          Hide
          ykt836 Kurt Young added a comment -

          Hi Fabian Hueske, cool, will definitely let you know if we have some new thoughts or progress on this issue.

          Show
          ykt836 Kurt Young added a comment - Hi Fabian Hueske , cool, will definitely let you know if we have some new thoughts or progress on this issue.
          Hide
          docete Zhenghua Gao added a comment -

          I have created a pr for this issue: https://github.com/apache/flink/pull/3111.
          Currently, calcite's AggregateExpandDistinctAggregatesRule can help flink to support distinct aggregate SQL.
          After do some tests, i find some bad cases which described in https://issues.apache.org/jira/browse/CALCITE-1558.
          So I copied the calcite class to flink project and did a quick fix.

          Show
          docete Zhenghua Gao added a comment - I have created a pr for this issue: https://github.com/apache/flink/pull/3111 . Currently, calcite's AggregateExpandDistinctAggregatesRule can help flink to support distinct aggregate SQL. After do some tests, i find some bad cases which described in https://issues.apache.org/jira/browse/CALCITE-1558 . So I copied the calcite class to flink project and did a quick fix.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3111#discussion_r96160082

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala —
          @@ -165,9 +165,13 @@ class AggregationTest extends TableTestBase {
          val util = batchTestUtil()
          val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)

          • val resultTable = sourceTable.groupBy('a)
            + // Move "where" before "groupBy" for the former query would generate
            + // nondeterministic plans with same cost. If we change FlinkRuleSets.DATASET_OPT_RULES,
            + // the importance of relNode may change, and the test may fail. This issue is mentioned
            + // in FLINK-5394, we could move "where" to the end when FLINK-5394 is fixed.
            + val resultTable = sourceTable.where('a === 1).groupBy('a)
              • End diff –

          It might make sense to wait with this until #3058 is in. It is almost done I think.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r96160082 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala — @@ -165,9 +165,13 @@ class AggregationTest extends TableTestBase { val util = batchTestUtil() val sourceTable = util.addTable [(Int, Long, Int)] ("MyTable", 'a, 'b, 'c) val resultTable = sourceTable.groupBy('a) + // Move "where" before "groupBy" for the former query would generate + // nondeterministic plans with same cost. If we change FlinkRuleSets.DATASET_OPT_RULES, + // the importance of relNode may change, and the test may fail. This issue is mentioned + // in FLINK-5394 , we could move "where" to the end when FLINK-5394 is fixed. + val resultTable = sourceTable.where('a === 1).groupBy('a) End diff – It might make sense to wait with this until #3058 is in. It is almost done I think.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3111#discussion_r96160329

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java —
          @@ -0,0 +1,1144 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to you under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.plan.rules.dataSet;
          +
          +import org.apache.calcite.plan.Contexts;
          +import org.apache.calcite.plan.RelOptCluster;
          +import org.apache.calcite.plan.RelOptRule;
          +import org.apache.calcite.plan.RelOptRuleCall;
          +import org.apache.calcite.rel.RelNode;
          +import org.apache.calcite.rel.core.Aggregate;
          +import org.apache.calcite.rel.core.AggregateCall;
          +import org.apache.calcite.rel.core.JoinRelType;
          +import org.apache.calcite.rel.core.RelFactories;
          +import org.apache.calcite.rel.logical.LogicalAggregate;
          +import org.apache.calcite.rel.type.RelDataType;
          +import org.apache.calcite.rel.type.RelDataTypeFactory;
          +import org.apache.calcite.rel.type.RelDataTypeField;
          +import org.apache.calcite.rex.RexBuilder;
          +import org.apache.calcite.rex.RexInputRef;
          +import org.apache.calcite.rex.RexNode;
          +import org.apache.calcite.sql.SqlAggFunction;
          +import org.apache.calcite.sql.fun.SqlCountAggFunction;
          +import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
          +import org.apache.calcite.sql.fun.SqlStdOperatorTable;
          +import org.apache.calcite.sql.fun.SqlSumAggFunction;
          +import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
          +import org.apache.calcite.sql.type.SqlTypeName;
          +import org.apache.calcite.tools.RelBuilder;
          +import org.apache.calcite.tools.RelBuilderFactory;
          +import org.apache.calcite.util.ImmutableBitSet;
          +import org.apache.calcite.util.ImmutableIntList;
          +import org.apache.calcite.util.Pair;
          +import org.apache.calcite.util.Util;
          +
          +import com.google.common.base.Preconditions;
          +import com.google.common.collect.ImmutableList;
          +import com.google.common.collect.Iterables;
          +import com.google.common.collect.Lists;
          +
          +import java.math.BigDecimal;
          +import java.util.ArrayList;
          +import java.util.HashMap;
          +import java.util.LinkedHashSet;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Set;
          +import java.util.SortedSet;
          +import java.util.TreeSet;
          +
          +/**
          + * Planner rule that expands distinct aggregates
          + * (such as

          {@code COUNT(DISTINCT x)}

          ) from a
          + *

          {@link org.apache.calcite.rel.logical.LogicalAggregate}

          .
          + *
          + * <p>How this is done depends upon the arguments to the function. If all
          + * functions have the same argument
          + * (e.g.

          {@code COUNT(DISTINCT x), SUM(DISTINCT x)}

          both have the argument
          + *

          {@code x}

          ) then one extra

          {@link org.apache.calcite.rel.core.Aggregate}

          is
          + * sufficient.
          + *
          + * <p>If there are multiple arguments
          + * (e.g.

          {@code COUNT(DISTINCT x), COUNT(DISTINCT y)}

          )
          + * the rule creates separate

          {@code Aggregate}

          s and combines using a
          + *

          {@link org.apache.calcite.rel.core.Join}

          .
          + */
          +public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule {
          — End diff –

          I would like to move this class to `org.apache.flink.table.calcite` package, and add a comment to the top of the class to annotate this is a temporary solution and should be removed later, such as

          >This is a copy of Calcite's [[AggregateExpandDistinctAggregatesRule]] with a quick fix to avoid some bad case mentioned in CALCITE-1558. Should drop it and use calcite's [[AggregateExpandDistinctAggregatesRule]] when upgrade to calcite 1.12 (above).

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r96160329 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java — @@ -0,0 +1,1144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.dataSet; + +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.fun.SqlCountAggFunction; +import org.apache.calcite.sql.fun.SqlMinMaxAggFunction; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.fun.SqlSumAggFunction; +import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * Planner rule that expands distinct aggregates + * (such as {@code COUNT(DISTINCT x)} ) from a + * {@link org.apache.calcite.rel.logical.LogicalAggregate} . + * + * <p>How this is done depends upon the arguments to the function. If all + * functions have the same argument + * (e.g. {@code COUNT(DISTINCT x), SUM(DISTINCT x)} both have the argument + * {@code x} ) then one extra {@link org.apache.calcite.rel.core.Aggregate} is + * sufficient. + * + * <p>If there are multiple arguments + * (e.g. {@code COUNT(DISTINCT x), COUNT(DISTINCT y)} ) + * the rule creates separate {@code Aggregate} s and combines using a + * {@link org.apache.calcite.rel.core.Join} . + */ +public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule { — End diff – I would like to move this class to `org.apache.flink.table.calcite` package, and add a comment to the top of the class to annotate this is a temporary solution and should be removed later, such as >This is a copy of Calcite's [ [AggregateExpandDistinctAggregatesRule] ] with a quick fix to avoid some bad case mentioned in CALCITE-1558 . Should drop it and use calcite's [ [AggregateExpandDistinctAggregatesRule] ] when upgrade to calcite 1.12 (above).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3111#discussion_r96329552

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala —
          @@ -96,6 +96,13 @@ object FlinkRuleSets {
          ProjectToCalcRule.INSTANCE,
          CalcMergeRule.INSTANCE,

          + // distinct aggregate rule for FLINK-3475
          — End diff –

          I think this comment can move to `FlinkAggregateExpandDistinctAggregatesRule`, and open a jira to track to remove this class after we upgrade Calcite.

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r96329552 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala — @@ -96,6 +96,13 @@ object FlinkRuleSets { ProjectToCalcRule.INSTANCE, CalcMergeRule.INSTANCE, + // distinct aggregate rule for FLINK-3475 — End diff – I think this comment can move to `FlinkAggregateExpandDistinctAggregatesRule`, and open a jira to track to remove this class after we upgrade Calcite.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3111

          Thanks for the PR @docete.
          I'll have a look at it soon.

          Best, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3111 Thanks for the PR @docete. I'll have a look at it soon. Best, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user docete commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3111#discussion_r96568191

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala —
          @@ -165,9 +165,13 @@ class AggregationTest extends TableTestBase {
          val util = batchTestUtil()
          val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)

          • val resultTable = sourceTable.groupBy('a)
            + // Move "where" before "groupBy" for the former query would generate
            + // nondeterministic plans with same cost. If we change FlinkRuleSets.DATASET_OPT_RULES,
            + // the importance of relNode may change, and the test may fail. This issue is mentioned
            + // in FLINK-5394, we could move "where" to the end when FLINK-5394 is fixed.
            + val resultTable = sourceTable.where('a === 1).groupBy('a)
              • End diff –

          I will check the finish time for #3058 with @beyond1920

          Show
          githubbot ASF GitHub Bot added a comment - Github user docete commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r96568191 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala — @@ -165,9 +165,13 @@ class AggregationTest extends TableTestBase { val util = batchTestUtil() val sourceTable = util.addTable [(Int, Long, Int)] ("MyTable", 'a, 'b, 'c) val resultTable = sourceTable.groupBy('a) + // Move "where" before "groupBy" for the former query would generate + // nondeterministic plans with same cost. If we change FlinkRuleSets.DATASET_OPT_RULES, + // the importance of relNode may change, and the test may fail. This issue is mentioned + // in FLINK-5394 , we could move "where" to the end when FLINK-5394 is fixed. + val resultTable = sourceTable.where('a === 1).groupBy('a) End diff – I will check the finish time for #3058 with @beyond1920
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user docete commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3111#discussion_r96568781

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java —
          @@ -0,0 +1,1144 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to you under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.plan.rules.dataSet;
          +
          +import org.apache.calcite.plan.Contexts;
          +import org.apache.calcite.plan.RelOptCluster;
          +import org.apache.calcite.plan.RelOptRule;
          +import org.apache.calcite.plan.RelOptRuleCall;
          +import org.apache.calcite.rel.RelNode;
          +import org.apache.calcite.rel.core.Aggregate;
          +import org.apache.calcite.rel.core.AggregateCall;
          +import org.apache.calcite.rel.core.JoinRelType;
          +import org.apache.calcite.rel.core.RelFactories;
          +import org.apache.calcite.rel.logical.LogicalAggregate;
          +import org.apache.calcite.rel.type.RelDataType;
          +import org.apache.calcite.rel.type.RelDataTypeFactory;
          +import org.apache.calcite.rel.type.RelDataTypeField;
          +import org.apache.calcite.rex.RexBuilder;
          +import org.apache.calcite.rex.RexInputRef;
          +import org.apache.calcite.rex.RexNode;
          +import org.apache.calcite.sql.SqlAggFunction;
          +import org.apache.calcite.sql.fun.SqlCountAggFunction;
          +import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
          +import org.apache.calcite.sql.fun.SqlStdOperatorTable;
          +import org.apache.calcite.sql.fun.SqlSumAggFunction;
          +import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
          +import org.apache.calcite.sql.type.SqlTypeName;
          +import org.apache.calcite.tools.RelBuilder;
          +import org.apache.calcite.tools.RelBuilderFactory;
          +import org.apache.calcite.util.ImmutableBitSet;
          +import org.apache.calcite.util.ImmutableIntList;
          +import org.apache.calcite.util.Pair;
          +import org.apache.calcite.util.Util;
          +
          +import com.google.common.base.Preconditions;
          +import com.google.common.collect.ImmutableList;
          +import com.google.common.collect.Iterables;
          +import com.google.common.collect.Lists;
          +
          +import java.math.BigDecimal;
          +import java.util.ArrayList;
          +import java.util.HashMap;
          +import java.util.LinkedHashSet;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Set;
          +import java.util.SortedSet;
          +import java.util.TreeSet;
          +
          +/**
          + * Planner rule that expands distinct aggregates
          + * (such as

          {@code COUNT(DISTINCT x)}

          ) from a
          + *

          {@link org.apache.calcite.rel.logical.LogicalAggregate}

          .
          + *
          + * <p>How this is done depends upon the arguments to the function. If all
          + * functions have the same argument
          + * (e.g.

          {@code COUNT(DISTINCT x), SUM(DISTINCT x)}

          both have the argument
          + *

          {@code x}

          ) then one extra

          {@link org.apache.calcite.rel.core.Aggregate}

          is
          + * sufficient.
          + *
          + * <p>If there are multiple arguments
          + * (e.g.

          {@code COUNT(DISTINCT x), COUNT(DISTINCT y)}

          )
          + * the rule creates separate

          {@code Aggregate}

          s and combines using a
          + *

          {@link org.apache.calcite.rel.core.Join}

          .
          + */
          +public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule {
          — End diff –

          classes in org.apache.flink.table.calcite pacage are inspired by calcite and stay in flink, and FlinkAggregateExpandDistinctAggregatesRule is copied and would be droped. So I don't think it's a good idea to move this class to it.
          I will move the comment to FlinkAggregateExpandDistinctAggregatesRule, and create a jira to track to remove it(recommended by @KurtYoung )

          Show
          githubbot ASF GitHub Bot added a comment - Github user docete commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r96568781 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java — @@ -0,0 +1,1144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.dataSet; + +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.fun.SqlCountAggFunction; +import org.apache.calcite.sql.fun.SqlMinMaxAggFunction; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.fun.SqlSumAggFunction; +import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * Planner rule that expands distinct aggregates + * (such as {@code COUNT(DISTINCT x)} ) from a + * {@link org.apache.calcite.rel.logical.LogicalAggregate} . + * + * <p>How this is done depends upon the arguments to the function. If all + * functions have the same argument + * (e.g. {@code COUNT(DISTINCT x), SUM(DISTINCT x)} both have the argument + * {@code x} ) then one extra {@link org.apache.calcite.rel.core.Aggregate} is + * sufficient. + * + * <p>If there are multiple arguments + * (e.g. {@code COUNT(DISTINCT x), COUNT(DISTINCT y)} ) + * the rule creates separate {@code Aggregate} s and combines using a + * {@link org.apache.calcite.rel.core.Join} . + */ +public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule { — End diff – classes in org.apache.flink.table.calcite pacage are inspired by calcite and stay in flink, and FlinkAggregateExpandDistinctAggregatesRule is copied and would be droped. So I don't think it's a good idea to move this class to it. I will move the comment to FlinkAggregateExpandDistinctAggregatesRule, and create a jira to track to remove it(recommended by @KurtYoung )
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3111#discussion_r96572366

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java —
          @@ -0,0 +1,1144 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to you under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.plan.rules.dataSet;
          +
          +import org.apache.calcite.plan.Contexts;
          +import org.apache.calcite.plan.RelOptCluster;
          +import org.apache.calcite.plan.RelOptRule;
          +import org.apache.calcite.plan.RelOptRuleCall;
          +import org.apache.calcite.rel.RelNode;
          +import org.apache.calcite.rel.core.Aggregate;
          +import org.apache.calcite.rel.core.AggregateCall;
          +import org.apache.calcite.rel.core.JoinRelType;
          +import org.apache.calcite.rel.core.RelFactories;
          +import org.apache.calcite.rel.logical.LogicalAggregate;
          +import org.apache.calcite.rel.type.RelDataType;
          +import org.apache.calcite.rel.type.RelDataTypeFactory;
          +import org.apache.calcite.rel.type.RelDataTypeField;
          +import org.apache.calcite.rex.RexBuilder;
          +import org.apache.calcite.rex.RexInputRef;
          +import org.apache.calcite.rex.RexNode;
          +import org.apache.calcite.sql.SqlAggFunction;
          +import org.apache.calcite.sql.fun.SqlCountAggFunction;
          +import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
          +import org.apache.calcite.sql.fun.SqlStdOperatorTable;
          +import org.apache.calcite.sql.fun.SqlSumAggFunction;
          +import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
          +import org.apache.calcite.sql.type.SqlTypeName;
          +import org.apache.calcite.tools.RelBuilder;
          +import org.apache.calcite.tools.RelBuilderFactory;
          +import org.apache.calcite.util.ImmutableBitSet;
          +import org.apache.calcite.util.ImmutableIntList;
          +import org.apache.calcite.util.Pair;
          +import org.apache.calcite.util.Util;
          +
          +import com.google.common.base.Preconditions;
          +import com.google.common.collect.ImmutableList;
          +import com.google.common.collect.Iterables;
          +import com.google.common.collect.Lists;
          +
          +import java.math.BigDecimal;
          +import java.util.ArrayList;
          +import java.util.HashMap;
          +import java.util.LinkedHashSet;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Set;
          +import java.util.SortedSet;
          +import java.util.TreeSet;
          +
          +/**
          + * Planner rule that expands distinct aggregates
          + * (such as

          {@code COUNT(DISTINCT x)}

          ) from a
          + *

          {@link org.apache.calcite.rel.logical.LogicalAggregate}

          .
          + *
          + * <p>How this is done depends upon the arguments to the function. If all
          + * functions have the same argument
          + * (e.g.

          {@code COUNT(DISTINCT x), SUM(DISTINCT x)}

          both have the argument
          + *

          {@code x}

          ) then one extra

          {@link org.apache.calcite.rel.core.Aggregate}

          is
          + * sufficient.
          + *
          + * <p>If there are multiple arguments
          + * (e.g.

          {@code COUNT(DISTINCT x), COUNT(DISTINCT y)}

          )
          + * the rule creates separate

          {@code Aggregate}

          s and combines using a
          + *

          {@link org.apache.calcite.rel.core.Join}

          .
          + */
          +public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule {
          — End diff –

          Thank you. That's fine to me.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r96572366 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java — @@ -0,0 +1,1144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.dataSet; + +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.fun.SqlCountAggFunction; +import org.apache.calcite.sql.fun.SqlMinMaxAggFunction; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.fun.SqlSumAggFunction; +import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * Planner rule that expands distinct aggregates + * (such as {@code COUNT(DISTINCT x)} ) from a + * {@link org.apache.calcite.rel.logical.LogicalAggregate} . + * + * <p>How this is done depends upon the arguments to the function. If all + * functions have the same argument + * (e.g. {@code COUNT(DISTINCT x), SUM(DISTINCT x)} both have the argument + * {@code x} ) then one extra {@link org.apache.calcite.rel.core.Aggregate} is + * sufficient. + * + * <p>If there are multiple arguments + * (e.g. {@code COUNT(DISTINCT x), COUNT(DISTINCT y)} ) + * the rule creates separate {@code Aggregate} s and combines using a + * {@link org.apache.calcite.rel.core.Join} . + */ +public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule { — End diff – Thank you. That's fine to me.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user docete commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3111#discussion_r96628698

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala —
          @@ -96,6 +96,13 @@ object FlinkRuleSets {
          ProjectToCalcRule.INSTANCE,
          CalcMergeRule.INSTANCE,

          + // distinct aggregate rule for FLINK-3475
          — End diff –

          Done. The tracking jira is https://issues.apache.org/jira/browse/FLINK-5545

          Show
          githubbot ASF GitHub Bot added a comment - Github user docete commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r96628698 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala — @@ -96,6 +96,13 @@ object FlinkRuleSets { ProjectToCalcRule.INSTANCE, CalcMergeRule.INSTANCE, + // distinct aggregate rule for FLINK-3475 — End diff – Done. The tracking jira is https://issues.apache.org/jira/browse/FLINK-5545
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3111

          Looks good to me. I would remove the ITCases. Logical tests should be sufficient.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3111 Looks good to me. I would remove the ITCases. Logical tests should be sufficient.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3111

          @docete, did you check if the execution plan is similar to what I outlined in the related JIRA issue FLINK-3475(https://issues.apache.org/jira/browse/FLINK-3475)?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3111 @docete, did you check if the execution plan is similar to what I outlined in the related JIRA issue FLINK-3475 ( https://issues.apache.org/jira/browse/FLINK-3475)?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3111#discussion_r97295864

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala —
          @@ -0,0 +1,275 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.api.scala.batch.sql
          +
          +import org.apache.flink.table.utils.TableTestBase
          +import org.apache.flink.api.scala._
          +import org.apache.flink.table.api.scala._
          +import org.apache.flink.table.utils.TableTestUtil._
          +import org.junit.Test
          +
          +class DistinctAggregateTest extends TableTestBase {
          +
          + @Test
          + def testSingleDistinctAggregate(): Unit =

          { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT COUNT(DISTINCT a) FROM MyTable" + + val expected = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a") + ), + term("groupBy", "a"), + term("select", "a") + ), + tuples(List(null)), + term("values", "a") + ), + term("union", "a") + ), + term("select", "COUNT(a) AS EXPR$0") + ) + + util.verifySql(sqlQuery, expected) + }

          +
          + @Test
          + def testMultiDistinctAggregateOnSameColumn(): Unit =

          { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable" + + val expected = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a") + ), + term("groupBy", "a"), + term("select", "a") + ), + tuples(List(null)), + term("values", "a") + ), + term("union", "a") + ), + term("select", "COUNT(a) AS EXPR$0", "SUM(a) AS EXPR$1", "MAX(a) AS EXPR$2") + ) + + util.verifySql(sqlQuery, expected) + }

          +
          + @Test
          + def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate(): Unit =

          { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others + val sqlQuery0 = "SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable" + + val expected0 = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b") + ), + term("groupBy", "a"), + term("select", "a", "SUM(b) AS EXPR$1") + ), + tuples(List(null, null)), + term("values", "a", "EXPR$1") + ), + term("union", "a", "EXPR$1") + ), + term("select", "COUNT(a) AS EXPR$0", "SUM(EXPR$1) AS EXPR$1") + ) + + util.verifySql(sqlQuery0, expected0) + + // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others + val sqlQuery1 = "SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable" + + val expected1 = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b") + ), + term("groupBy", "b"), + term("select", "b", "COUNT(a) AS EXPR$0") + ), + tuples(List(null, null)), + term("values", "b", "EXPR$0") + ), + term("union", "b", "EXPR$0") + ), + term("select", "$SUM0(EXPR$0) AS EXPR$0", "SUM(b) AS EXPR$1") + ) + + util.verifySql(sqlQuery1, expected1) + }

          +
          + @Test
          + def testMultiDistinctAggregateOnDifferentColumn(): Unit = {
          + val util = batchTestUtil()
          + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
          +
          + val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable"
          +
          + val expected = binaryNode(
          + "DataSetSingleRowJoin",
          + unaryNode(
          + "DataSetAggregate",
          + unaryNode(
          + "DataSetUnion",
          + unaryNode(
          + "DataSetValues",
          + unaryNode(
          + "DataSetAggregate",
          + unaryNode(
          + "DataSetCalc",
          + batchTableNode(0),
          + term("select", "a")
          + ),
          + term("groupBy", "a"),
          + term("select", "a")
          + ),
          + tuples(List(null)),
          + term("values", "a")
          + ),
          + term("union", "a")
          + ),
          + term("select", "COUNT(a) AS EXPR$0")
          + ),
          + unaryNode(
          + "DataSetAggregate",
          + unaryNode(
          + "DataSetUnion",
          + unaryNode(
          + "DataSetValues",
          + unaryNode(
          + "DataSetAggregate",
          + unaryNode(
          + "DataSetCalc",
          + batchTableNode(0),
          + term("select", "b")
          + ),
          + term("groupBy", "b"),
          + term("select", "b")
          + ),
          + tuples(List(null)),
          + term("values", "b")
          + ),
          + term("union", "b")
          + ),
          + term("select", "SUM(b) AS EXPR$1")
          + ),
          + term("where", "true"),
          + term("join", "EXPR$0", "EXPR$1"),
          + term("joinType", "NestedLoopJoin")
          — End diff –

          This join type is not supported. It would fail during execution.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r97295864 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala — @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.sql + +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.Test + +class DistinctAggregateTest extends TableTestBase { + + @Test + def testSingleDistinctAggregate(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT COUNT(DISTINCT a) FROM MyTable" + + val expected = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a") + ), + term("groupBy", "a"), + term("select", "a") + ), + tuples(List(null)), + term("values", "a") + ), + term("union", "a") + ), + term("select", "COUNT(a) AS EXPR$0") + ) + + util.verifySql(sqlQuery, expected) + } + + @Test + def testMultiDistinctAggregateOnSameColumn(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable" + + val expected = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a") + ), + term("groupBy", "a"), + term("select", "a") + ), + tuples(List(null)), + term("values", "a") + ), + term("union", "a") + ), + term("select", "COUNT(a) AS EXPR$0", "SUM(a) AS EXPR$1", "MAX(a) AS EXPR$2") + ) + + util.verifySql(sqlQuery, expected) + } + + @Test + def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others + val sqlQuery0 = "SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable" + + val expected0 = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b") + ), + term("groupBy", "a"), + term("select", "a", "SUM(b) AS EXPR$1") + ), + tuples(List(null, null)), + term("values", "a", "EXPR$1") + ), + term("union", "a", "EXPR$1") + ), + term("select", "COUNT(a) AS EXPR$0", "SUM(EXPR$1) AS EXPR$1") + ) + + util.verifySql(sqlQuery0, expected0) + + // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others + val sqlQuery1 = "SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable" + + val expected1 = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b") + ), + term("groupBy", "b"), + term("select", "b", "COUNT(a) AS EXPR$0") + ), + tuples(List(null, null)), + term("values", "b", "EXPR$0") + ), + term("union", "b", "EXPR$0") + ), + term("select", "$SUM0(EXPR$0) AS EXPR$0", "SUM(b) AS EXPR$1") + ) + + util.verifySql(sqlQuery1, expected1) + } + + @Test + def testMultiDistinctAggregateOnDifferentColumn(): Unit = { + val util = batchTestUtil() + util.addTable [(Int, Long, String)] ("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable" + + val expected = binaryNode( + "DataSetSingleRowJoin", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a") + ), + term("groupBy", "a"), + term("select", "a") + ), + tuples(List(null)), + term("values", "a") + ), + term("union", "a") + ), + term("select", "COUNT(a) AS EXPR$0") + ), + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "b") + ), + term("groupBy", "b"), + term("select", "b") + ), + tuples(List(null)), + term("values", "b") + ), + term("union", "b") + ), + term("select", "SUM(b) AS EXPR$1") + ), + term("where", "true"), + term("join", "EXPR$0", "EXPR$1"), + term("joinType", "NestedLoopJoin") — End diff – This join type is not supported. It would fail during execution.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user docete commented on the issue:

          https://github.com/apache/flink/pull/3111

          @fhueske Yes, I have checked the execution plan. It's very similar to your description:

          Take example for SQL "select sum(distinct a), sum(distinct b), sum(c) from expr", where expr is a table, and it has 3 fields: a, b, c.

          The explaination for the query is:
          ```
          == Abstract Syntax Tree ==
          LogicalAggregate(group=[{}], EXPR$0=[SUM(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($2)])
          LogicalTableScan(table=[[expr]])

          == Optimized Logical Plan ==
          DataSetCalc(select=[EXPR$0, EXPR$1, EXPR$2])
          DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0, EXPR$1], joinType=[NestedLoopJoin])
          DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0], joinType=[NestedLoopJoin])
          DataSetAggregate(select=[SUM(c) AS EXPR$2])
          DataSetUnion(union=[a, b, c])
          DataSetValues(tuples=[[

          { null, null, null }

          ]], values=[a, b, c])
          DataSetScan(table=[[_DataSetTable_0]])
          DataSetAggregate(select=[SUM(a) AS EXPR$0])
          DataSetUnion(union=[a])
          DataSetValues(tuples=[[

          { null }]], values=[a])
          DataSetAggregate(groupBy=[a], select=[a])
          DataSetCalc(select=[a])
          DataSetScan(table=[[_DataSetTable_0]])
          DataSetAggregate(select=[SUM(b) AS EXPR$1])
          DataSetUnion(union=[b])
          DataSetValues(tuples=[[{ null }

          ]], values=[b])
          DataSetAggregate(groupBy=[b], select=[b])
          DataSetCalc(select=[b])
          DataSetScan(table=[[_DataSetTable_0]])

          == Physical Execution Plan ==
          Stage 8 : Data Source
          content : collect elements with CollectionInputFormat
          Partitioning : RANDOM_PARTITIONED

          Stage 14 : Data Source
          content : collect elements with CollectionInputFormat
          Partitioning : RANDOM_PARTITIONED

          Stage 13 : Map
          content : from: (a, b, c)
          ship_strategy : Forward
          exchange_mode : BATCH
          driver_strategy : Map
          Partitioning : RANDOM_PARTITIONED

          Stage 12 : FlatMap
          content : select: (a)
          ship_strategy : Forward
          exchange_mode : PIPELINED
          driver_strategy : FlatMap
          Partitioning : RANDOM_PARTITIONED

          Stage 11 : Map
          content : prepare select: (a)
          ship_strategy : Forward
          exchange_mode : PIPELINED
          driver_strategy : Map
          Partitioning : RANDOM_PARTITIONED

          Stage 10 : GroupCombine
          content : groupBy: (a), select: (a)
          ship_strategy : Forward
          exchange_mode : PIPELINED
          driver_strategy : Sorted Combine
          Partitioning : RANDOM_PARTITIONED

          Stage 9 : GroupReduce
          content : groupBy: (a), select: (a)
          ship_strategy : Hash Partition on [0]
          exchange_mode : PIPELINED
          driver_strategy : Sorted Group Reduce
          Partitioning : RANDOM_PARTITIONED

          Stage 7 : Union
          content :
          ship_strategy : Redistribute
          exchange_mode : PIPELINED
          Partitioning : RANDOM_PARTITIONED

          Stage 6 : Map
          content : prepare select: (SUM(a) AS EXPR$0)
          ship_strategy : Forward
          exchange_mode : PIPELINED
          driver_strategy : Map
          Partitioning : RANDOM_PARTITIONED

          Stage 5 : GroupCombine
          content : select:(SUM(a) AS EXPR$0)
          ship_strategy : Forward
          exchange_mode : PIPELINED
          driver_strategy : Group Reduce All
          Partitioning : RANDOM_PARTITIONED

          Stage 4 : GroupReduce
          content : select:(SUM(a) AS EXPR$0)
          ship_strategy : Redistribute
          exchange_mode : PIPELINED
          driver_strategy : Group Reduce All
          Partitioning : RANDOM_PARTITIONED

          Stage 19 : Data Source
          content : collect elements with CollectionInputFormat
          Partitioning : RANDOM_PARTITIONED

          Stage 20 : Map
          content : from: (a, b, c)
          ship_strategy : Forward
          exchange_mode : BATCH
          driver_strategy : Map
          Partitioning : RANDOM_PARTITIONED

          Stage 18 : Union
          content :
          ship_strategy : Redistribute
          exchange_mode : PIPELINED
          Partitioning : RANDOM_PARTITIONED

          Stage 17 : Map
          content : prepare select: (SUM(c) AS EXPR$2)
          ship_strategy : Forward
          exchange_mode : PIPELINED
          driver_strategy : Map
          Partitioning : RANDOM_PARTITIONED

          Stage 16 : GroupCombine
          content : select:(SUM(c) AS EXPR$2)
          ship_strategy : Forward
          exchange_mode : PIPELINED
          driver_strategy : Group Reduce All
          Partitioning : RANDOM_PARTITIONED

          Stage 15 : GroupReduce
          content : select:(SUM(c) AS EXPR$2)
          ship_strategy : Redistribute
          exchange_mode : PIPELINED
          driver_strategy : Group Reduce All
          Partitioning : RANDOM_PARTITIONED

          Stage 3 : FlatMap
          content : where: (true), join: (EXPR$2, EXPR$0)
          ship_strategy : Forward
          exchange_mode : PIPELINED
          driver_strategy : FlatMap
          Partitioning : RANDOM_PARTITIONED

          Stage 25 : Data Source
          content : collect elements with CollectionInputFormat
          Partitioning : RANDOM_PARTITIONED

          Stage 30 : Map
          content : from: (a, b, c)
          ship_strategy : Forward
          exchange_mode : BATCH
          driver_strategy : Map
          Partitioning : RANDOM_PARTITIONED

          Stage 29 : FlatMap
          content : select: (b)
          ship_strategy : Forward
          exchange_mode : PIPELINED
          driver_strategy : FlatMap
          Partitioning : RANDOM_PARTITIONED

          Stage 28 : Map
          content : prepare select: (b)
          ship_strategy : Forward
          exchange_mode : PIPELINED
          driver_strategy : Map
          Partitioning : RANDOM_PARTITIONED

          Stage 27 : GroupCombine
          content : groupBy: (b), select: (b)
          ship_strategy : Forward
          exchange_mode : PIPELINED
          driver_strategy : Sorted Combine
          Partitioning : RANDOM_PARTITIONED

          Stage 26 : GroupReduce
          content : groupBy: (b), select: (b)
          ship_strategy : Hash Partition on [0]
          exchange_mode : PIPELINED
          driver_strategy : Sorted Group Reduce
          Partitioning : RANDOM_PARTITIONED

          Stage 24 : Union
          content :
          ship_strategy : Redistribute
          exchange_mode : PIPELINED
          Partitioning : RANDOM_PARTITIONED

          Stage 23 : Map
          content : prepare select: (SUM(b) AS EXPR$1)
          ship_strategy : Forward
          exchange_mode : PIPELINED
          driver_strategy : Map
          Partitioning : RANDOM_PARTITIONED

          Stage 22 : GroupCombine
          content : select:(SUM(b) AS EXPR$1)
          ship_strategy : Forward
          exchange_mode : PIPELINED
          driver_strategy : Group Reduce All
          Partitioning : RANDOM_PARTITIONED

          Stage 21 : GroupReduce
          content : select:(SUM(b) AS EXPR$1)
          ship_strategy : Redistribute
          exchange_mode : PIPELINED
          driver_strategy : Group Reduce All
          Partitioning : RANDOM_PARTITIONED

          Stage 2 : FlatMap
          content : where: (true), join: (EXPR$2, EXPR$0, EXPR$1)
          ship_strategy : Forward
          exchange_mode : PIPELINED
          driver_strategy : FlatMap
          Partitioning : RANDOM_PARTITIONED

          Stage 1 : FlatMap
          content : select: (EXPR$0, EXPR$1, EXPR$2)
          ship_strategy : Forward
          exchange_mode : PIPELINED
          driver_strategy : FlatMap
          Partitioning : RANDOM_PARTITIONED

          Stage 0 : Data Sink
          content : org.apache.flink.api.java.io.DiscardingOutputFormat
          ship_strategy : Forward
          exchange_mode : PIPELINED
          Partitioning : RANDOM_PARTITIONED

          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user docete commented on the issue: https://github.com/apache/flink/pull/3111 @fhueske Yes, I have checked the execution plan. It's very similar to your description: Take example for SQL "select sum(distinct a), sum(distinct b), sum(c) from expr", where expr is a table, and it has 3 fields: a, b, c. The explaination for the query is: ``` == Abstract Syntax Tree == LogicalAggregate(group= [{}] , EXPR$0= [SUM(DISTINCT $0)] , EXPR$1= [SUM(DISTINCT $1)] , EXPR$2= [SUM($2)] ) LogicalTableScan(table=[ [expr] ]) == Optimized Logical Plan == DataSetCalc(select= [EXPR$0, EXPR$1, EXPR$2] ) DataSetSingleRowJoin(where= [true] , join= [EXPR$2, EXPR$0, EXPR$1] , joinType= [NestedLoopJoin] ) DataSetSingleRowJoin(where= [true] , join= [EXPR$2, EXPR$0] , joinType= [NestedLoopJoin] ) DataSetAggregate(select= [SUM(c) AS EXPR$2] ) DataSetUnion(union= [a, b, c] ) DataSetValues(tuples=[[ { null, null, null } ]], values= [a, b, c] ) DataSetScan(table=[ [_DataSetTable_0] ]) DataSetAggregate(select= [SUM(a) AS EXPR$0] ) DataSetUnion(union= [a] ) DataSetValues(tuples=[[ { null }]], values= [a] ) DataSetAggregate(groupBy= [a] , select= [a] ) DataSetCalc(select= [a] ) DataSetScan(table=[ [_DataSetTable_0] ]) DataSetAggregate(select= [SUM(b) AS EXPR$1] ) DataSetUnion(union= [b] ) DataSetValues(tuples=[[{ null } ]], values= [b] ) DataSetAggregate(groupBy= [b] , select= [b] ) DataSetCalc(select= [b] ) DataSetScan(table=[ [_DataSetTable_0] ]) == Physical Execution Plan == Stage 8 : Data Source content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 14 : Data Source content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 13 : Map content : from: (a, b, c) ship_strategy : Forward exchange_mode : BATCH driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 12 : FlatMap content : select: (a) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap Partitioning : RANDOM_PARTITIONED Stage 11 : Map content : prepare select: (a) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 10 : GroupCombine content : groupBy: (a), select: (a) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 9 : GroupReduce content : groupBy: (a), select: (a) ship_strategy : Hash Partition on [0] exchange_mode : PIPELINED driver_strategy : Sorted Group Reduce Partitioning : RANDOM_PARTITIONED Stage 7 : Union content : ship_strategy : Redistribute exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED Stage 6 : Map content : prepare select: (SUM(a) AS EXPR$0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 5 : GroupCombine content : select:(SUM(a) AS EXPR$0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Group Reduce All Partitioning : RANDOM_PARTITIONED Stage 4 : GroupReduce content : select:(SUM(a) AS EXPR$0) ship_strategy : Redistribute exchange_mode : PIPELINED driver_strategy : Group Reduce All Partitioning : RANDOM_PARTITIONED Stage 19 : Data Source content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 20 : Map content : from: (a, b, c) ship_strategy : Forward exchange_mode : BATCH driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 18 : Union content : ship_strategy : Redistribute exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED Stage 17 : Map content : prepare select: (SUM(c) AS EXPR$2) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 16 : GroupCombine content : select:(SUM(c) AS EXPR$2) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Group Reduce All Partitioning : RANDOM_PARTITIONED Stage 15 : GroupReduce content : select:(SUM(c) AS EXPR$2) ship_strategy : Redistribute exchange_mode : PIPELINED driver_strategy : Group Reduce All Partitioning : RANDOM_PARTITIONED Stage 3 : FlatMap content : where: (true), join: (EXPR$2, EXPR$0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap Partitioning : RANDOM_PARTITIONED Stage 25 : Data Source content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 30 : Map content : from: (a, b, c) ship_strategy : Forward exchange_mode : BATCH driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 29 : FlatMap content : select: (b) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap Partitioning : RANDOM_PARTITIONED Stage 28 : Map content : prepare select: (b) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 27 : GroupCombine content : groupBy: (b), select: (b) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 26 : GroupReduce content : groupBy: (b), select: (b) ship_strategy : Hash Partition on [0] exchange_mode : PIPELINED driver_strategy : Sorted Group Reduce Partitioning : RANDOM_PARTITIONED Stage 24 : Union content : ship_strategy : Redistribute exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED Stage 23 : Map content : prepare select: (SUM(b) AS EXPR$1) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 22 : GroupCombine content : select:(SUM(b) AS EXPR$1) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Group Reduce All Partitioning : RANDOM_PARTITIONED Stage 21 : GroupReduce content : select:(SUM(b) AS EXPR$1) ship_strategy : Redistribute exchange_mode : PIPELINED driver_strategy : Group Reduce All Partitioning : RANDOM_PARTITIONED Stage 2 : FlatMap content : where: (true), join: (EXPR$2, EXPR$0, EXPR$1) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap Partitioning : RANDOM_PARTITIONED Stage 1 : FlatMap content : select: (EXPR$0, EXPR$1, EXPR$2) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap Partitioning : RANDOM_PARTITIONED Stage 0 : Data Sink content : org.apache.flink.api.java.io.DiscardingOutputFormat ship_strategy : Forward exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3111

          Hi @docete,

          thanks for posting the plan. It looks OK, but this is because the query computes non-grouped aggregates.

          In case of grouped aggregates, the resulting plan will be less efficient than the plan I proposed because the it will not reuse existing partitioning and sorting properties of the data. This will result in at least one shuffle and one full sort for each distinct aggregate.

          The trick is to explicitly partition the data for the distinct operation on a subset (i.e., the grouping keys) of the attributes that usually would be used. The following aggregations and joins can be performed in a streaming fashion without partitioning or sorting the data again. This is not possible with your plan.

          We could try to implement that with some tweaking as an optimization rule (which would be custom and based on the rule you copied from Calcite) or implement it as a dedicated `DataSetRelNode` for distinct aggregates. I'm more in favor of the latter option.
          Once, the optimizer tracks physical data properties, it is easier to implement distinct aggregates using optimizer rules.

          What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3111 Hi @docete, thanks for posting the plan. It looks OK, but this is because the query computes non-grouped aggregates. In case of grouped aggregates, the resulting plan will be less efficient than the plan I proposed because the it will not reuse existing partitioning and sorting properties of the data. This will result in at least one shuffle and one full sort for each distinct aggregate. The trick is to explicitly partition the data for the distinct operation on a subset (i.e., the grouping keys) of the attributes that usually would be used. The following aggregations and joins can be performed in a streaming fashion without partitioning or sorting the data again. This is not possible with your plan. We could try to implement that with some tweaking as an optimization rule (which would be custom and based on the rule you copied from Calcite) or implement it as a dedicated `DataSetRelNode` for distinct aggregates. I'm more in favor of the latter option. Once, the optimizer tracks physical data properties, it is easier to implement distinct aggregates using optimizer rules. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user docete commented on the issue:

          https://github.com/apache/flink/pull/3111

          Agree. If we have more than one distinct agg with groupings, do the partition first and reuse the subsets would improve the performance.
          Could we merge this PR first and create another JIRA to track the grouping cases?
          We need a workaround to support distinct aggs ASAP.

          Show
          githubbot ASF GitHub Bot added a comment - Github user docete commented on the issue: https://github.com/apache/flink/pull/3111 Agree. If we have more than one distinct agg with groupings, do the partition first and reuse the subsets would improve the performance. Could we merge this PR first and create another JIRA to track the grouping cases? We need a workaround to support distinct aggs ASAP.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3111#discussion_r101732097

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala —
          @@ -45,7 +45,7 @@ class DataSetAggregateRule
          // check if we have distinct aggregates
          val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
          if (distinctAggs) {
          — End diff –

          the condition can be removed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r101732097 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala — @@ -45,7 +45,7 @@ class DataSetAggregateRule // check if we have distinct aggregates val distinctAggs = agg.getAggCallList.exists(_.isDistinct) if (distinctAggs) { — End diff – the condition can be removed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3111#discussion_r101732154

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala —
          @@ -51,7 +51,7 @@ class DataSetAggregateWithNullValuesRule
          // check if we have distinct aggregates
          val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
          if (distinctAggs) {
          — End diff –

          The condition can be removed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r101732154 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala — @@ -51,7 +51,7 @@ class DataSetAggregateWithNullValuesRule // check if we have distinct aggregates val distinctAggs = agg.getAggCallList.exists(_.isDistinct) if (distinctAggs) { — End diff – The condition can be removed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3111#discussion_r101733431

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala —
          @@ -213,34 +213,45 @@ class AggregationsITCase(
          TestBaseUtils.compareResultAsText(results.asJava, expected)
          }

          • @Test(expected = classOf[TableException])
            + @Test
            def testDistinctAggregate(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT sum(_1) as a, count(distinct _3) as b FROM MyTable" - val ds = CollectionDataSets.get3TupleDataSet(env) - tEnv.registerDataSet("MyTable", ds) + val ds = env.fromElements( + (1, 1L, 1.0f, "Hello"), + (2, 2L, 1.0f, "Ciao")).toTable(tEnv) + tEnv.registerTable("MyTable", ds) - // must fail. distinct aggregates are not supported - tEnv.sql(sqlQuery).toDataSet[Row] + val result = tEnv.sql(sqlQuery) + + val expected = "3,1" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) }
          • @Test(expected = classOf[TableException])
            + @Test
            def testGroupedDistinctAggregate(): Unit = {

          val env = ExecutionEnvironment.getExecutionEnvironment
          val tEnv = TableEnvironment.getTableEnvironment(env, config)

          val sqlQuery = "SELECT _2, avg(distinct _1) as a, count(_3) as b FROM MyTable GROUP BY _2"

          • val ds = CollectionDataSets.get3TupleDataSet(env)
          • tEnv.registerDataSet("MyTable", ds)
            + val ds = env.fromElements(
              • End diff –

          I think it would be good to use a bit more test data here, like on of the `CollectionDataSets`.
          ITCases are rather expensive to run, so we should to get the most out of them.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r101733431 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala — @@ -213,34 +213,45 @@ class AggregationsITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } @Test(expected = classOf [TableException] ) + @Test def testDistinctAggregate(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT sum(_1) as a, count(distinct _3) as b FROM MyTable" - val ds = CollectionDataSets.get3TupleDataSet(env) - tEnv.registerDataSet("MyTable", ds) + val ds = env.fromElements( + (1, 1L, 1.0f, "Hello"), + (2, 2L, 1.0f, "Ciao")).toTable(tEnv) + tEnv.registerTable("MyTable", ds) - // must fail. distinct aggregates are not supported - tEnv.sql(sqlQuery).toDataSet[Row] + val result = tEnv.sql(sqlQuery) + + val expected = "3,1" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) } @Test(expected = classOf [TableException] ) + @Test def testGroupedDistinctAggregate(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT _2, avg(distinct _1) as a, count(_3) as b FROM MyTable GROUP BY _2" val ds = CollectionDataSets.get3TupleDataSet(env) tEnv.registerDataSet("MyTable", ds) + val ds = env.fromElements( End diff – I think it would be good to use a bit more test data here, like on of the `CollectionDataSets`. ITCases are rather expensive to run, so we should to get the most out of them.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3111#discussion_r101735785

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java —
          @@ -0,0 +1,1144 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to you under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.plan.rules.dataSet;
          +
          +import org.apache.calcite.plan.Contexts;
          +import org.apache.calcite.plan.RelOptCluster;
          +import org.apache.calcite.plan.RelOptRule;
          +import org.apache.calcite.plan.RelOptRuleCall;
          +import org.apache.calcite.rel.RelNode;
          +import org.apache.calcite.rel.core.Aggregate;
          +import org.apache.calcite.rel.core.AggregateCall;
          +import org.apache.calcite.rel.core.JoinRelType;
          +import org.apache.calcite.rel.core.RelFactories;
          +import org.apache.calcite.rel.logical.LogicalAggregate;
          +import org.apache.calcite.rel.type.RelDataType;
          +import org.apache.calcite.rel.type.RelDataTypeFactory;
          +import org.apache.calcite.rel.type.RelDataTypeField;
          +import org.apache.calcite.rex.RexBuilder;
          +import org.apache.calcite.rex.RexInputRef;
          +import org.apache.calcite.rex.RexNode;
          +import org.apache.calcite.sql.SqlAggFunction;
          +import org.apache.calcite.sql.fun.SqlCountAggFunction;
          +import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
          +import org.apache.calcite.sql.fun.SqlStdOperatorTable;
          +import org.apache.calcite.sql.fun.SqlSumAggFunction;
          +import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
          +import org.apache.calcite.sql.type.SqlTypeName;
          +import org.apache.calcite.tools.RelBuilder;
          +import org.apache.calcite.tools.RelBuilderFactory;
          +import org.apache.calcite.util.ImmutableBitSet;
          +import org.apache.calcite.util.ImmutableIntList;
          +import org.apache.calcite.util.Pair;
          +import org.apache.calcite.util.Util;
          +
          +import com.google.common.base.Preconditions;
          +import com.google.common.collect.ImmutableList;
          +import com.google.common.collect.Iterables;
          +import com.google.common.collect.Lists;
          +
          +import java.math.BigDecimal;
          +import java.util.ArrayList;
          +import java.util.HashMap;
          +import java.util.LinkedHashSet;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.Set;
          +import java.util.SortedSet;
          +import java.util.TreeSet;
          +
          +/**
          + * Planner rule that expands distinct aggregates
          + * (such as

          {@code COUNT(DISTINCT x)}

          ) from a
          + *

          {@link org.apache.calcite.rel.logical.LogicalAggregate}

          .
          + *
          + * <p>How this is done depends upon the arguments to the function. If all
          + * functions have the same argument
          + * (e.g.

          {@code COUNT(DISTINCT x), SUM(DISTINCT x)}

          both have the argument
          + *

          {@code x}

          ) then one extra

          {@link org.apache.calcite.rel.core.Aggregate}

          is
          + * sufficient.
          + *
          + * <p>If there are multiple arguments
          + * (e.g.

          {@code COUNT(DISTINCT x), COUNT(DISTINCT y)}

          )
          + * the rule creates separate

          {@code Aggregate}

          s and combines using a
          + *

          {@link org.apache.calcite.rel.core.Join}

          .
          + */
          +public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule {
          — End diff –

          This is a Java class and should be in the Java source folder `./src/main/java`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r101735785 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java — @@ -0,0 +1,1144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.dataSet; + +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.fun.SqlCountAggFunction; +import org.apache.calcite.sql.fun.SqlMinMaxAggFunction; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.fun.SqlSumAggFunction; +import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * Planner rule that expands distinct aggregates + * (such as {@code COUNT(DISTINCT x)} ) from a + * {@link org.apache.calcite.rel.logical.LogicalAggregate} . + * + * <p>How this is done depends upon the arguments to the function. If all + * functions have the same argument + * (e.g. {@code COUNT(DISTINCT x), SUM(DISTINCT x)} both have the argument + * {@code x} ) then one extra {@link org.apache.calcite.rel.core.Aggregate} is + * sufficient. + * + * <p>If there are multiple arguments + * (e.g. {@code COUNT(DISTINCT x), COUNT(DISTINCT y)} ) + * the rule creates separate {@code Aggregate} s and combines using a + * {@link org.apache.calcite.rel.core.Join} . + */ +public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule { — End diff – This is a Java class and should be in the Java source folder `./src/main/java`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3111#discussion_r101733413

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala —
          @@ -213,34 +213,45 @@ class AggregationsITCase(
          TestBaseUtils.compareResultAsText(results.asJava, expected)
          }

          • @Test(expected = classOf[TableException])
            + @Test
            def testDistinctAggregate(): Unit = {

          val env = ExecutionEnvironment.getExecutionEnvironment
          val tEnv = TableEnvironment.getTableEnvironment(env, config)

          val sqlQuery = "SELECT sum(_1) as a, count(distinct _3) as b FROM MyTable"

          • val ds = CollectionDataSets.get3TupleDataSet(env)
          • tEnv.registerDataSet("MyTable", ds)
            + val ds = env.fromElements(
              • End diff –

          I think it would be good to use a bit more test data here, like on of the `CollectionDataSets`.
          ITCases are rather expensive to run, so we should to get the most out of them.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r101733413 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala — @@ -213,34 +213,45 @@ class AggregationsITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } @Test(expected = classOf [TableException] ) + @Test def testDistinctAggregate(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT sum(_1) as a, count(distinct _3) as b FROM MyTable" val ds = CollectionDataSets.get3TupleDataSet(env) tEnv.registerDataSet("MyTable", ds) + val ds = env.fromElements( End diff – I think it would be good to use a bit more test data here, like on of the `CollectionDataSets`. ITCases are rather expensive to run, so we should to get the most out of them.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user docete commented on the issue:

          https://github.com/apache/flink/pull/3111

          @fhueske Fixed according to your comments

          Show
          githubbot ASF GitHub Bot added a comment - Github user docete commented on the issue: https://github.com/apache/flink/pull/3111 @fhueske Fixed according to your comments
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3111#discussion_r103169886

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala —
          @@ -0,0 +1,275 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.api.scala.batch.sql
          +
          +import org.apache.flink.table.utils.TableTestBase
          +import org.apache.flink.api.scala._
          +import org.apache.flink.table.api.scala._
          +import org.apache.flink.table.utils.TableTestUtil._
          +import org.junit.Test
          +
          +class DistinctAggregateTest extends TableTestBase {
          +
          + @Test
          + def testSingleDistinctAggregate(): Unit =

          { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT COUNT(DISTINCT a) FROM MyTable" + + val expected = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a") + ), + term("groupBy", "a"), + term("select", "a") + ), + tuples(List(null)), + term("values", "a") + ), + term("union", "a") + ), + term("select", "COUNT(a) AS EXPR$0") + ) + + util.verifySql(sqlQuery, expected) + }

          +
          + @Test
          + def testMultiDistinctAggregateOnSameColumn(): Unit =

          { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable" + + val expected = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a") + ), + term("groupBy", "a"), + term("select", "a") + ), + tuples(List(null)), + term("values", "a") + ), + term("union", "a") + ), + term("select", "COUNT(a) AS EXPR$0", "SUM(a) AS EXPR$1", "MAX(a) AS EXPR$2") + ) + + util.verifySql(sqlQuery, expected) + }

          +
          + @Test
          + def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate(): Unit =

          { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others + val sqlQuery0 = "SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable" + + val expected0 = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b") + ), + term("groupBy", "a"), + term("select", "a", "SUM(b) AS EXPR$1") + ), + tuples(List(null, null)), + term("values", "a", "EXPR$1") + ), + term("union", "a", "EXPR$1") + ), + term("select", "COUNT(a) AS EXPR$0", "SUM(EXPR$1) AS EXPR$1") + ) + + util.verifySql(sqlQuery0, expected0) + + // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others + val sqlQuery1 = "SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable" + + val expected1 = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b") + ), + term("groupBy", "b"), + term("select", "b", "COUNT(a) AS EXPR$0") + ), + tuples(List(null, null)), + term("values", "b", "EXPR$0") + ), + term("union", "b", "EXPR$0") + ), + term("select", "$SUM0(EXPR$0) AS EXPR$0", "SUM(b) AS EXPR$1") + ) + + util.verifySql(sqlQuery1, expected1) + }

          +
          + @Test
          + def testMultiDistinctAggregateOnDifferentColumn(): Unit = {
          + val util = batchTestUtil()
          + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
          +
          + val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable"
          — End diff –

          Add another test with two distinct aggregates on different attributes and a non-distinct aggregate.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r103169886 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala — @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.sql + +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.Test + +class DistinctAggregateTest extends TableTestBase { + + @Test + def testSingleDistinctAggregate(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT COUNT(DISTINCT a) FROM MyTable" + + val expected = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a") + ), + term("groupBy", "a"), + term("select", "a") + ), + tuples(List(null)), + term("values", "a") + ), + term("union", "a") + ), + term("select", "COUNT(a) AS EXPR$0") + ) + + util.verifySql(sqlQuery, expected) + } + + @Test + def testMultiDistinctAggregateOnSameColumn(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable" + + val expected = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a") + ), + term("groupBy", "a"), + term("select", "a") + ), + tuples(List(null)), + term("values", "a") + ), + term("union", "a") + ), + term("select", "COUNT(a) AS EXPR$0", "SUM(a) AS EXPR$1", "MAX(a) AS EXPR$2") + ) + + util.verifySql(sqlQuery, expected) + } + + @Test + def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others + val sqlQuery0 = "SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable" + + val expected0 = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b") + ), + term("groupBy", "a"), + term("select", "a", "SUM(b) AS EXPR$1") + ), + tuples(List(null, null)), + term("values", "a", "EXPR$1") + ), + term("union", "a", "EXPR$1") + ), + term("select", "COUNT(a) AS EXPR$0", "SUM(EXPR$1) AS EXPR$1") + ) + + util.verifySql(sqlQuery0, expected0) + + // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others + val sqlQuery1 = "SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable" + + val expected1 = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b") + ), + term("groupBy", "b"), + term("select", "b", "COUNT(a) AS EXPR$0") + ), + tuples(List(null, null)), + term("values", "b", "EXPR$0") + ), + term("union", "b", "EXPR$0") + ), + term("select", "$SUM0(EXPR$0) AS EXPR$0", "SUM(b) AS EXPR$1") + ) + + util.verifySql(sqlQuery1, expected1) + } + + @Test + def testMultiDistinctAggregateOnDifferentColumn(): Unit = { + val util = batchTestUtil() + util.addTable [(Int, Long, String)] ("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable" — End diff – Add another test with two distinct aggregates on different attributes and a non-distinct aggregate.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3111#discussion_r103170129

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala —
          @@ -0,0 +1,275 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.api.scala.batch.sql
          +
          +import org.apache.flink.table.utils.TableTestBase
          +import org.apache.flink.api.scala._
          +import org.apache.flink.table.api.scala._
          +import org.apache.flink.table.utils.TableTestUtil._
          +import org.junit.Test
          +
          +class DistinctAggregateTest extends TableTestBase {
          +
          + @Test
          + def testSingleDistinctAggregate(): Unit =

          { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT COUNT(DISTINCT a) FROM MyTable" + + val expected = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a") + ), + term("groupBy", "a"), + term("select", "a") + ), + tuples(List(null)), + term("values", "a") + ), + term("union", "a") + ), + term("select", "COUNT(a) AS EXPR$0") + ) + + util.verifySql(sqlQuery, expected) + }

          +
          + @Test
          + def testMultiDistinctAggregateOnSameColumn(): Unit =

          { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable" + + val expected = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a") + ), + term("groupBy", "a"), + term("select", "a") + ), + tuples(List(null)), + term("values", "a") + ), + term("union", "a") + ), + term("select", "COUNT(a) AS EXPR$0", "SUM(a) AS EXPR$1", "MAX(a) AS EXPR$2") + ) + + util.verifySql(sqlQuery, expected) + }

          +
          + @Test
          + def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate(): Unit =

          { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others + val sqlQuery0 = "SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable" + + val expected0 = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b") + ), + term("groupBy", "a"), + term("select", "a", "SUM(b) AS EXPR$1") + ), + tuples(List(null, null)), + term("values", "a", "EXPR$1") + ), + term("union", "a", "EXPR$1") + ), + term("select", "COUNT(a) AS EXPR$0", "SUM(EXPR$1) AS EXPR$1") + ) + + util.verifySql(sqlQuery0, expected0) + + // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others + val sqlQuery1 = "SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable" + + val expected1 = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b") + ), + term("groupBy", "b"), + term("select", "b", "COUNT(a) AS EXPR$0") + ), + tuples(List(null, null)), + term("values", "b", "EXPR$0") + ), + term("union", "b", "EXPR$0") + ), + term("select", "$SUM0(EXPR$0) AS EXPR$0", "SUM(b) AS EXPR$1") + ) + + util.verifySql(sqlQuery1, expected1) + }

          +
          + @Test
          + def testMultiDistinctAggregateOnDifferentColumn(): Unit =

          { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable" + + val expected = binaryNode( + "DataSetSingleRowJoin", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a") + ), + term("groupBy", "a"), + term("select", "a") + ), + tuples(List(null)), + term("values", "a") + ), + term("union", "a") + ), + term("select", "COUNT(a) AS EXPR$0") + ), + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "b") + ), + term("groupBy", "b"), + term("select", "b") + ), + tuples(List(null)), + term("values", "b") + ), + term("union", "b") + ), + term("select", "SUM(b) AS EXPR$1") + ), + term("where", "true"), + term("join", "EXPR$0", "EXPR$1"), + term("joinType", "NestedLoopJoin") + ) + + util.verifySql(sqlQuery, expected) + }

          +
          + @Test
          + def testSingleDistinctAggregateWithGrouping(): Unit =

          { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT a, COUNT(a), SUM(DISTINCT b) FROM MyTable GROUP BY a" + + val expected = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b") + ), + term("groupBy", "a", "b"), + term("select", "a", "b", "COUNT(a) AS EXPR$1") + ), + term("groupBy", "a"), + term("select", "a", "SUM(EXPR$1) AS EXPR$1", "SUM(b) AS EXPR$2") + ) + + util.verifySql(sqlQuery, expected) + }

          +
          + @Test
          + def testSingleDistinctAggregateWithGroupingAndCountStar(): Unit = {
          + val util = batchTestUtil()
          + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
          +
          + val sqlQuery = "SELECT a, COUNT, SUM(DISTINCT b) FROM MyTable GROUP BY a"
          — End diff –

          Add two tests for `GROUP BY` with two distinct aggregates
          1. on same column
          2. on different column

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r103170129 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala — @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.sql + +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.Test + +class DistinctAggregateTest extends TableTestBase { + + @Test + def testSingleDistinctAggregate(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT COUNT(DISTINCT a) FROM MyTable" + + val expected = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a") + ), + term("groupBy", "a"), + term("select", "a") + ), + tuples(List(null)), + term("values", "a") + ), + term("union", "a") + ), + term("select", "COUNT(a) AS EXPR$0") + ) + + util.verifySql(sqlQuery, expected) + } + + @Test + def testMultiDistinctAggregateOnSameColumn(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable" + + val expected = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a") + ), + term("groupBy", "a"), + term("select", "a") + ), + tuples(List(null)), + term("values", "a") + ), + term("union", "a") + ), + term("select", "COUNT(a) AS EXPR$0", "SUM(a) AS EXPR$1", "MAX(a) AS EXPR$2") + ) + + util.verifySql(sqlQuery, expected) + } + + @Test + def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others + val sqlQuery0 = "SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable" + + val expected0 = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b") + ), + term("groupBy", "a"), + term("select", "a", "SUM(b) AS EXPR$1") + ), + tuples(List(null, null)), + term("values", "a", "EXPR$1") + ), + term("union", "a", "EXPR$1") + ), + term("select", "COUNT(a) AS EXPR$0", "SUM(EXPR$1) AS EXPR$1") + ) + + util.verifySql(sqlQuery0, expected0) + + // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others + val sqlQuery1 = "SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable" + + val expected1 = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b") + ), + term("groupBy", "b"), + term("select", "b", "COUNT(a) AS EXPR$0") + ), + tuples(List(null, null)), + term("values", "b", "EXPR$0") + ), + term("union", "b", "EXPR$0") + ), + term("select", "$SUM0(EXPR$0) AS EXPR$0", "SUM(b) AS EXPR$1") + ) + + util.verifySql(sqlQuery1, expected1) + } + + @Test + def testMultiDistinctAggregateOnDifferentColumn(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable" + + val expected = binaryNode( + "DataSetSingleRowJoin", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a") + ), + term("groupBy", "a"), + term("select", "a") + ), + tuples(List(null)), + term("values", "a") + ), + term("union", "a") + ), + term("select", "COUNT(a) AS EXPR$0") + ), + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetUnion", + unaryNode( + "DataSetValues", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "b") + ), + term("groupBy", "b"), + term("select", "b") + ), + tuples(List(null)), + term("values", "b") + ), + term("union", "b") + ), + term("select", "SUM(b) AS EXPR$1") + ), + term("where", "true"), + term("join", "EXPR$0", "EXPR$1"), + term("joinType", "NestedLoopJoin") + ) + + util.verifySql(sqlQuery, expected) + } + + @Test + def testSingleDistinctAggregateWithGrouping(): Unit = { + val util = batchTestUtil() + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT a, COUNT(a), SUM(DISTINCT b) FROM MyTable GROUP BY a" + + val expected = unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b") + ), + term("groupBy", "a", "b"), + term("select", "a", "b", "COUNT(a) AS EXPR$1") + ), + term("groupBy", "a"), + term("select", "a", "SUM(EXPR$1) AS EXPR$1", "SUM(b) AS EXPR$2") + ) + + util.verifySql(sqlQuery, expected) + } + + @Test + def testSingleDistinctAggregateWithGroupingAndCountStar(): Unit = { + val util = batchTestUtil() + util.addTable [(Int, Long, String)] ("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT a, COUNT , SUM(DISTINCT b) FROM MyTable GROUP BY a" — End diff – Add two tests for `GROUP BY` with two distinct aggregates 1. on same column 2. on different column
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3111#discussion_r103165057

          — Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java —
          @@ -0,0 +1,1152 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to you under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.plan.rules.dataSet;
          — End diff –

          Move to `org.apache.flink.table.calcite.rules` next to `FlinkAggregateJoinTransposeRule`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r103165057 — Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java — @@ -0,0 +1,1152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.dataSet; — End diff – Move to `org.apache.flink.table.calcite.rules` next to `FlinkAggregateJoinTransposeRule`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3111

          Merging

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3111 Merging
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3111

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3111
          Hide
          fhueske Fabian Hueske added a comment -

          Implemented for 1.3.0 with 36c9348ff06cae1fe55925bcc6081154be2f10f5

          Show
          fhueske Fabian Hueske added a comment - Implemented for 1.3.0 with 36c9348ff06cae1fe55925bcc6081154be2f10f5

            People

            • Assignee:
              docete Zhenghua Gao
              Reporter:
              chengxiang li Chengxiang Li
            • Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development