Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-36339

aggsBuffer should collect AggregateExpression in the map range



    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.4.8, 3.0.3, 3.1.2
    • 3.2.0, 3.1.3, 3.0.4
    • SQL


      show demo for this ISSUE:

      // SQL without error
      SELECT name, count(name) c
      FROM VALUES ('Alice'), ('Bob') people(name)
      GROUP BY name GROUPING SETS(name);
      // An error is reported after exchanging the order of the query columns:
      SELECT count(name) c, name
      FROM VALUES ('Alice'), ('Bob') people(name)
      GROUP BY name GROUPING SETS(name);

      The error message is:

      Error in query: expression 'people.`name`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
      Aggregate [name#5, spark_grouping_id#3], [count(name#1) AS c#0L, name#1]
      +- Expand [List(name#1, name#4, 0)], [name#1, name#5, spark_grouping_id#3]
         +- Project [name#1, name#1 AS name#4]
            +- SubqueryAlias `people`
               +- LocalRelation [name#1]

      So far, I have checked that there is no problem before version 2.3.


      During debugging, I found that the behavior of constructAggregateExprs in ResolveGroupingAnalytics has changed.

           * Construct new aggregate expressions by replacing grouping functions.
          private def constructAggregateExprs(
              groupByExprs: Seq[Expression],
              aggregations: Seq[NamedExpression],
              groupByAliases: Seq[Alias],
              groupingAttrs: Seq[Expression],
              gid: Attribute): Seq[NamedExpression] = aggregations.map {
            // collect all the found AggregateExpression, so we can check an expression is part of
            // any AggregateExpression or not.
            val aggsBuffer = ArrayBuffer[Expression]()
            // Returns whether the expression belongs to any expressions in `aggsBuffer` or not.
            def isPartOfAggregation(e: Expression): Boolean = {
              aggsBuffer.exists(a => a.find(_ eq e).isDefined)
            replaceGroupingFunc(_, groupByExprs, gid).transformDown {
              // AggregateExpression should be computed on the unmodified value of its argument
              // expressions, so we should not replace any references to grouping expression
              // inside it.
              case e: AggregateExpression =>
                aggsBuffer += e
              case e if isPartOfAggregation(e) => e
              case e =>
                // Replace expression by expand output attribute.
                val index = groupByAliases.indexWhere(_.child.semanticEquals(e))
                if (index == -1) {
                } else {

      When performing aggregations.map, the aggsBuffer here seems to be outside the scope of the map. It can store the AggregateExpression of all the elements processed by the map function, but this is not before 2.3.




            gaoyajun02 gaoyajun02
            gaoyajun02 gaoyajun02
            0 Vote for this issue
            4 Start watching this issue