Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-39104

Null Pointer Exeption on unpersist call

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.2.1
    • 3.3.0, 3.2.2, 3.4.0
    • Spark Core
    • None

    Description

      DataFrame.unpesist call fails wth NPE

       

      java.lang.NullPointerException
          at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedRDDLoaded(InMemoryRelation.scala:247)
          at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedColumnBuffersLoaded(InMemoryRelation.scala:241)
          at org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8(CacheManager.scala:189)
          at org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8$adapted(CacheManager.scala:176)
          at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304)
          at scala.collection.Iterator.foreach(Iterator.scala:943)
          at scala.collection.Iterator.foreach$(Iterator.scala:943)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
          at scala.collection.IterableLike.foreach(IterableLike.scala:74)
          at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
          at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
          at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
          at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
          at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
          at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
          at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
          at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
          at org.apache.spark.sql.execution.CacheManager.recacheByCondition(CacheManager.scala:219)
          at org.apache.spark.sql.execution.CacheManager.uncacheQuery(CacheManager.scala:176)
          at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3220)
          at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3231)

      Looks like syncronization in required for org.apache.spark.sql.execution.columnar.CachedRDDBuilder#isCachedColumnBuffersLoaded

       

      def isCachedColumnBuffersLoaded: Boolean = {
        _cachedColumnBuffers != null && isCachedRDDLoaded
      }
      
      def isCachedRDDLoaded: Boolean = {
          _cachedColumnBuffersAreLoaded || {
            val bmMaster = SparkEnv.get.blockManager.master
            val rddLoaded = _cachedColumnBuffers.partitions.forall { partition =>
              bmMaster.getBlockStatus(RDDBlockId(_cachedColumnBuffers.id, partition.index), false)
                .exists { case(_, blockStatus) => blockStatus.isCached }
            }
            if (rddLoaded) {
              _cachedColumnBuffersAreLoaded = rddLoaded
            }
            rddLoaded
        }
      } 

      isCachedRDDLoaded relies on _cachedColumnBuffers != null check while it can be changed concurrently from other thread. 

      Attachments

        Activity

          People

            chengpan Cheng Pan
            Goihburg Denis
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: