Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-8062

NullPointerException in SparkHadoopUtil.getFileSystemThreadStatistics

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.2.1
    • 1.2.3
    • Spark Core
    • None
    • MapR 4.0.1, Hadoop 2.4.1, Yarn

    Description

      I received the following error report from a user:

      While running a Spark Streaming job that reads from MapRfs and writes to HBase using Spark 1.2.1, the job intermittently experiences a total job failure due to the following errors:

      15/05/28 10:35:50 ERROR executor.Executor: Exception in task 1.1 in stage 6.0 (TID 24) 
      java.lang.NullPointerException 
      at org.apache.spark.deploy.SparkHadoopUtil$$anonfun$4.apply(SparkHadoopUtil.scala:178) 
      at org.apache.spark.deploy.SparkHadoopUtil$$anonfun$4.apply(SparkHadoopUtil.scala:178) 
      at scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264) 
      at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
      at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
      at scala.collection.TraversableLike$class.filter(TraversableLike.scala:263) 
      at scala.collection.AbstractTraversable.filter(Traversable.scala:105) 
      at org.apache.spark.deploy.SparkHadoopUtil.getFileSystemThreadStatistics(SparkHadoopUtil.scala:178) 
      at org.apache.spark.deploy.SparkHadoopUtil.getFSBytesReadOnThreadCallback(SparkHadoopUtil.scala:139) 
      at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:116) 
      at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107) 
      at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69) 
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) 
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) 
      at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) 
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) 
      at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) 
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) 
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) 
      at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) 
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) 
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) 
      at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
      at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
      at org.apache.spark.scheduler.Task.run(Task.scala:56) 
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) 
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
      at java.lang.Thread.run(Thread.java:744) 
      15/05/28 10:35:50 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 25 
      15/05/28 10:35:50 INFO executor.Executor: Running task 2.1 in stage 6.0 (TID 25) 
      15/05/28 10:35:50 INFO rdd.NewHadoopRDD: Input split: hdfs:/[REDACTED] 
      15/05/28 10:35:50 ERROR executor.Executor: Exception in task 2.1 in stage 6.0 (TID 25) 
      java.lang.NullPointerException 
      at org.apache.spark.deploy.SparkHadoopUtil$$anonfun$4.apply(SparkHadoopUtil.scala:178) 
      at org.apache.spark.deploy.SparkHadoopUtil$$anonfun$4.apply(SparkHadoopUtil.scala:178) 
      at scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264) 
      at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
      

      Diving into the code here:

      The NPE is occurring on this line of SparkHadoopUtil (in 1.2.1.): https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L178

      Here's that block of code from 1.2.1 (it's the same in 1.2.2):

        private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = {
          val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
          val scheme = qualifiedPath.toUri().getScheme()
          val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))   // <--- exception occurs at this line
          stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
        }
      

      Since the top call on the stack was org.apache.spark.deploy.SparkHadoopUtil$$anonfun$4, I'm assuming that the _.getScheme().equals(scheme) call here is failing because FileSystem.getAllStatistics() is returning a collection that has a null element or that _.getScheme() is null.

      Diving into the Hadoop source, it looks like FileSystem.getAllStatistics() accesses some synchronized static state to return statistics for all Hadoop filesystems created within the JVM. I wonder if it's possible that some code is nondeterministically creating a new FIleSystem instance for a FileSystem that lacks a scheme, causing entires to be stored in the statistics map that will return null when we call getScheme() on them.

      I am unable to reproduce this issue myself, but I think that we can fix it for the user by adding try-catch blocks to prevent errors in metrics collection from leading to task failures.

      Attachments

        Activity

          People

            joshrosen Josh Rosen
            joshrosen Josh Rosen
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: