Description
In QueryStageExec.computeStats we copy partial statistics from materlized query stages by calling QueryStageExec#getRuntimeStatistics, which in turn calls ShuffleExchangeLike#runtimeStatistics or BroadcastExchangeLike#runtimeStatistics.
Only dataSize and numOutputRows are copied into the new Statistics object:
def computeStats(): Option[Statistics] = if (isMaterialized) { val runtimeStats = getRuntimeStatistics val dataSize = runtimeStats.sizeInBytes.max(0) val numOutputRows = runtimeStats.rowCount.map(_.max(0)) Some(Statistics(dataSize, numOutputRows, isRuntime = true)) } else { None }
I would like to also copy over the column statistics stored in Statistics.attributeMap so that they can be fed back into the logical plan optimization phase. This is a small change as shown below:
def computeStats(): Option[Statistics] = if (isMaterialized) { val runtimeStats = getRuntimeStatistics val dataSize = runtimeStats.sizeInBytes.max(0) val numOutputRows = runtimeStats.rowCount.map(_.max(0)) val attributeStats = runtimeStats.attributeStats Some(Statistics(dataSize, numOutputRows, attributeStats, isRuntime = true)) } else { None }
The Spark implementations of ShuffleExchangeLike and BroadcastExchangeLike do not currently provide such column statistics, but other custom implementations can.