Description
In Spark 3.0.2 if I attempt to execute on a canonicalized version of CustomShuffleReaderExec I get an error "operating on canonicalized plan", as expected.
There is a regression in Spark 3.1.1 where this check can never be reached because of a new call to sendDriverMetrics that was added prior to the check. This method will fail if operating on a canonicalized plan because it assumes the existence of metrics that do not exist if this is a canonicalized plan.
private lazy val shuffleRDD: RDD[_] = { sendDriverMetrics() shuffleStage.map { stage => stage.shuffle.getShuffleRDD(partitionSpecs.toArray) }.getOrElse { throw new IllegalStateException("operating on canonicalized plan") } }
The specific error looks like this:
java.util.NoSuchElementException: key not found: numPartitions at scala.collection.immutable.Map$EmptyMap$.apply(Map.scala:101) at scala.collection.immutable.Map$EmptyMap$.apply(Map.scala:99) at org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec.sendDriverMetrics(CustomShuffleReaderExec.scala:122) at org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec.shuffleRDD$lzycompute(CustomShuffleReaderExec.scala:182) at org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec.shuffleRDD(CustomShuffleReaderExec.scala:181) at org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec.doExecuteColumnar(CustomShuffleReaderExec.scala:196)
I think the fix is simply to avoid calling sendDriverMetrics if the plan is canonicalized and I am planning on creating a PR to fix this.