Description
Aggregate functions are being pushed down into projections when nested columns are accessed causing the following error:
Caused by: UnsupportedOperationException: Cannot generate code for expression: ...
Reproduction:
spark.sql("drop table if exists test_aggregates") spark.sql("create table if not exists test_aggregates(a STRUCT<c: STRUCT<e: string>, d: int>, b string)") val df = sql("select max(a).c.e from (select a, b from test_aggregates) group by b") println(df.queryExecution.optimizedPlan)
The output of the above code:
'Aggregate [b#1], [_extract_e#5 AS max(a).c.e#3] +- 'Project [max(a#0).c.e AS _extract_e#5, b#1] +- Relation default.test_aggregates[a#0,b#1] parquet
The error message when the dataframe is executed:
java.lang.UnsupportedOperationException: Cannot generate code for expression: max(input[0, struct<c:struct<e:string>,d:int>, true]) at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotGenerateCodeForExpressionError(QueryExecutionErrors.scala:83) at org.apache.spark.sql.catalyst.expressions.Unevaluable.doGenCode(Expression.scala:312) at org.apache.spark.sql.catalyst.expressions.Unevaluable.doGenCode$(Expression.scala:311) at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.doGenCode(interfaces.scala:99) at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:151) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:146) at org.apache.spark.sql.catalyst.expressions.UnaryExpression.nullSafeCodeGen(Expression.scala:525) at org.apache.spark.sql.catalyst.expressions.GetStructField.doGenCode(complexTypeExtractors.scala:126) at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:151) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:146) at org.apache.spark.sql.catalyst.expressions.UnaryExpression.nullSafeCodeGen(Expression.scala:525) at org.apache.spark.sql.catalyst.expressions.GetStructField.doGenCode(complexTypeExtractors.scala:126) at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:151) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:146) at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:171) at org.apache.spark.sql.execution.ProjectExec.$anonfun$doConsume$2(basicPhysicalOperators.scala:73) at scala.collection.immutable.List.map(List.scala:293) at org.apache.spark.sql.execution.ProjectExec.$anonfun$doConsume$1(basicPhysicalOperators.scala:73) at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.withSubExprEliminationExprs(CodeGenerator.scala:1039) at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:73) at org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:195) at org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:150) at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:497) at org.apache.spark.sql.execution.InputRDDCodegen.doProduce(WholeStageCodegenExec.scala:484) at org.apache.spark.sql.execution.InputRDDCodegen.doProduce$(WholeStageCodegenExec.scala:457) at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:497) at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:96) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219) at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:91) at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:91) at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:497) at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:54) at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:96) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219) at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:91) at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:91) at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:41) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:792) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:151) at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:96) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219) at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:91) at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:91) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:46) at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:659) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:722) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:184) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:180) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:135) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:135) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:140) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139) at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219) at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68) at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:115) at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:170) at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:170) at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:172) at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:82) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:256) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:254) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:254) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:226) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:365) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:338) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3742) at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2998) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3733) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3731) at org.apache.spark.sql.Dataset.collect(Dataset.scala:2998) ... 47 elided