Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.2.1
-
None
Description
DataFrame.unpesist call fails wth NPE
java.lang.NullPointerException at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedRDDLoaded(InMemoryRelation.scala:247) at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedColumnBuffersLoaded(InMemoryRelation.scala:241) at org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8(CacheManager.scala:189) at org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8$adapted(CacheManager.scala:176) at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303) at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297) at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) at scala.collection.TraversableLike.filter(TraversableLike.scala:395) at scala.collection.TraversableLike.filter$(TraversableLike.scala:395) at scala.collection.AbstractTraversable.filter(Traversable.scala:108) at org.apache.spark.sql.execution.CacheManager.recacheByCondition(CacheManager.scala:219) at org.apache.spark.sql.execution.CacheManager.uncacheQuery(CacheManager.scala:176) at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3220) at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3231)
Looks like syncronization in required for org.apache.spark.sql.execution.columnar.CachedRDDBuilder#isCachedColumnBuffersLoaded
def isCachedColumnBuffersLoaded: Boolean = { _cachedColumnBuffers != null && isCachedRDDLoaded } def isCachedRDDLoaded: Boolean = { _cachedColumnBuffersAreLoaded || { val bmMaster = SparkEnv.get.blockManager.master val rddLoaded = _cachedColumnBuffers.partitions.forall { partition => bmMaster.getBlockStatus(RDDBlockId(_cachedColumnBuffers.id, partition.index), false) .exists { case(_, blockStatus) => blockStatus.isCached } } if (rddLoaded) { _cachedColumnBuffersAreLoaded = rddLoaded } rddLoaded } }
isCachedRDDLoaded relies on _cachedColumnBuffers != null check while it can be changed concurrently from other thread.