Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21923

SplitAggregateRule will be abnormal, when the sum/count and avg in SQL at the same time

    XMLWordPrintableJSON

    Details

      Description

      SplitAggregateRule optimizes one-layer aggregation to two-layer aggregation to improve computing performance under data skew.
      In the partial phase, avg will be translated into count and sum. If count already exists in the original SQL at this time, the engine will remove the duplicate count, and then add Project to calculate and restore the optimized count result value.

          relBuilder.aggregate(
            relBuilder.groupKey(fullGroupSet, ImmutableList.of[ImmutableBitSet](fullGroupSet)),
            newPartialAggCalls)
          relBuilder.peek().asInstanceOf[FlinkLogicalAggregate]
            .setPartialFinalType(PartialFinalType.PARTIAL)
      

      so `relBuilder.peek()` will return `FlinkLogicalCalc` not `FlinkLogicalAggregate`,
      then will throw exception like

      java.lang.ClassCastException: org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc cannot be cast to org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate
      
      	at org.apache.flink.table.planner.plan.rules.logical.SplitAggregateRule.onMatch(SplitAggregateRule.scala:286)
      	at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
      	at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
      	at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
      	at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
      	at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
      	at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
      	at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
      	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
      	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
      	at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
      	at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
      	at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
      	at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:284)
      	at org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:889)
      	at org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:780)
      	at org.apache.flink.table.planner.utils.TableTestUtilBase.verifyPlan(TableTestBase.scala:283)
      	at org.apache.flink.table.planner.plan.rules.logical.SplitAggregateRuleTest.testAggWithFilterClause2(SplitAggregateRuleTest.scala:205)
      

      We can reproduce stably and pass the test cases in `SplitAggregateRuleTest`

        @Test
        def testAggFilterClauseBothWithAvgAndCount(): Unit = {
          util.tableEnv.getConfig.getConfiguration.setBoolean(
            OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
          val sqlQuery =
            s"""
               |SELECT
               |  a,
               |  COUNT(DISTINCT b) FILTER (WHERE NOT b = 2),
               |  SUM(b) FILTER (WHERE NOT b = 5),
               |  COUNT(b),
               |  AVG(b),
               |  SUM(b)
               |FROM MyTable
               |GROUP BY a
               |""".stripMargin
          util.verifyRelPlan(sqlQuery)
        }
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                tartarus tartarus
                Reporter:
                tartarus tartarus
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: