Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-32813

Reading parquet rdd in non columnar mode fails in multithreaded environment

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0, 3.1.0
    • 3.0.2, 3.1.0
    • SQL
    • None
    • Spark 3.0.0, Scala 2.12.12

    Description

      Reading parquet rdd in non columnar mode (i.e. with list fields) if Spark session was  created in one thread and rdd is being read in another  - so InheritableThreadLocal  with active session is not propagated. Code below was working perfectly in Spark 2.X, but fails in Spark 3

      import java.util.concurrent.Executors
      
      import org.apache.spark.sql.SparkSession
      
      import scala.concurrent.{Await, ExecutionContext, Future}
      import scala.concurrent.duration._
      
      object Main {
      
        final case class Data(list: List[Int])
      
        def main(args: Array[String]): Unit = {
      
          val executor1 = Executors.newSingleThreadExecutor()
          val executor2 = Executors.newSingleThreadExecutor()
          try {
            val ds = Await.result(Future {
              val session = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
              import session.implicits._
      
              val path = "test.parquet"
              session.createDataset(Data(1 :: Nil) :: Nil).write.parquet(path)
              session.read.parquet(path).as[Data]
            }(ExecutionContext.fromExecutorService(executor1)), 1.minute)
      
            Await.result(Future {
              ds.rdd.collect().foreach(println(_))
            }(ExecutionContext.fromExecutorService(executor2)), 1.minute)
      
          } finally {
            executor1.shutdown()
            executor2.shutdown()
          }
        }
      }
      

      This code fails with following exception:

      Exception in thread "main" java.util.NoSuchElementException: None.getException in thread "main" java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion$lzycompute(DataSourceScanExec.scala:178) at org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion(DataSourceScanExec.scala:176) at org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:462) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121) at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3198) at org.apache.spark.sql.Dataset.rdd(Dataset.scala:3196)
      

      Attachments

        Issue Links

          Activity

            People

              viirya L. C. Hsieh
              vklyushnikov Vladimir Klyushnikov
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: