Details
Description
Reading parquet rdd in non columnar mode (i.e. with list fields) if Spark session was created in one thread and rdd is being read in another - so InheritableThreadLocal with active session is not propagated. Code below was working perfectly in Spark 2.X, but fails in Spark 3
import java.util.concurrent.Executors import org.apache.spark.sql.SparkSession import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ object Main { final case class Data(list: List[Int]) def main(args: Array[String]): Unit = { val executor1 = Executors.newSingleThreadExecutor() val executor2 = Executors.newSingleThreadExecutor() try { val ds = Await.result(Future { val session = SparkSession.builder().appName("test").master("local[*]").getOrCreate() import session.implicits._ val path = "test.parquet" session.createDataset(Data(1 :: Nil) :: Nil).write.parquet(path) session.read.parquet(path).as[Data] }(ExecutionContext.fromExecutorService(executor1)), 1.minute) Await.result(Future { ds.rdd.collect().foreach(println(_)) }(ExecutionContext.fromExecutorService(executor2)), 1.minute) } finally { executor1.shutdown() executor2.shutdown() } } }
This code fails with following exception:
Exception in thread "main" java.util.NoSuchElementException: None.getException in thread "main" java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion$lzycompute(DataSourceScanExec.scala:178) at org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion(DataSourceScanExec.scala:176) at org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:462) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121) at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3198) at org.apache.spark.sql.Dataset.rdd(Dataset.scala:3196)
Attachments
Issue Links
- is duplicated by
-
SPARK-32589 NoSuchElementException: None.get for needsUnsafeRowConversion
- Resolved
- links to