Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-19476

Running threads in Spark DataFrame foreachPartition() causes NullPointerException

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Minor
    • Resolution: Not A Problem
    • 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
    • None
    • SQL
    • None

    Description

      First reported on Stack overflow.

      I use multiple threads inside foreachPartition(), which works great for me except for when the underlying iterator is TungstenAggregationIterator. Here is a minimal code snippet to reproduce:

      Reproduce.scala
          import scala.concurrent.ExecutionContext.Implicits.global
          import scala.concurrent.duration.Duration
          import scala.concurrent.{Await, Future}
      
          import org.apache.spark.SparkContext
          import org.apache.spark.sql.SQLContext
      
          object Reproduce extends App {
      
            val sc = new SparkContext("local", "reproduce")
            val sqlContext = new SQLContext(sc)
      
            import sqlContext.implicits._
      
            val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
      
            df.foreachPartition { iterator =>
              val f = Future(iterator.toVector)
              Await.result(f, Duration.Inf)
            }
          }
      

      When I run this, I get:

          java.lang.NullPointerException
              at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
              at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
              at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
              at scala.collection.Iterator$class.foreach(Iterator.scala:893)
              at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      

      I believe I actually understand why this happens - TungstenAggregationIterator uses a ThreadLocal variable that returns null when called from a thread other than the original thread that got the iterator from Spark. From examining the code, this does not appear to differ between recent Spark versions.

      However, this limitation is specific to TungstenAggregationIterator, and not documented, as far as I'm aware.

      Attachments

        Activity

          People

            Unassigned Unassigned
            gal.topper Gal Topper
            Votes:
            1 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: