Details
-
Improvement
-
Status: Resolved
-
Minor
-
Resolution: Not A Problem
-
1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
-
None
-
None
Description
First reported on Stack overflow.
I use multiple threads inside foreachPartition(), which works great for me except for when the underlying iterator is TungstenAggregationIterator. Here is a minimal code snippet to reproduce:
import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext object Reproduce extends App { val sc = new SparkContext("local", "reproduce") val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count() df.foreachPartition { iterator => val f = Future(iterator.toVector) Await.result(f, Duration.Inf) } }
When I run this, I get:
java.lang.NullPointerException at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
I believe I actually understand why this happens - TungstenAggregationIterator uses a ThreadLocal variable that returns null when called from a thread other than the original thread that got the iterator from Spark. From examining the code, this does not appear to differ between recent Spark versions.
However, this limitation is specific to TungstenAggregationIterator, and not documented, as far as I'm aware.