Description
CacheManager.cacheQuery passes the stats for `planToCache` to InMemoryRelation. Since the plan has not been optimized, the stats is inaccurate because project and filter have not been applied. I'd suggest passing the stats from the optimized plan.
class CacheManager extends Logging { ... def cacheQuery( query: Dataset[_], tableName: Option[String] = None, storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = { val planToCache = query.logicalPlan if (lookupCachedData(planToCache).nonEmpty) { logWarning("Asked to cache already cached data.") } else { val sparkSession = query.sparkSession val inMemoryRelation = InMemoryRelation( sparkSession.sessionState.conf.useCompression, sparkSession.sessionState.conf.columnBatchSize, storageLevel, sparkSession.sessionState.executePlan(planToCache).executedPlan, tableName, planToCache) <<<<<== ... } object InMemoryRelation { def apply( useCompression: Boolean, batchSize: Int, storageLevel: StorageLevel, child: SparkPlan, tableName: Option[String], logicalPlan: LogicalPlan): InMemoryRelation = { val cacheBuilder = CachedRDDBuilder(useCompression, batchSize, storageLevel, child, tableName) val relation = new InMemoryRelation(child.output, cacheBuilder, logicalPlan.outputOrdering) relation.statsOfPlanToCache = logicalPlan.stats <<<<<== relation }
Attachments
Issue Links
- links to