Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26555

Thread safety issue causes createDataset to fail with misleading errors

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.4.0
    • 2.4.4, 3.0.0
    • SQL
    • None

    Description

      This can be replicated (~2% of the time) with

      import java.sql.Timestamp
      import java.util.concurrent.{Executors, Future}
      
      import org.apache.spark.sql.SparkSession
      
      import scala.collection.mutable.ListBuffer
      import scala.concurrent.ExecutionContext
      import scala.util.Random
      
      object Main {
        def main(args: Array[String]): Unit = {
          val sparkSession = SparkSession.builder
            .getOrCreate()
          import sparkSession.implicits._
      
          val executor = Executors.newFixedThreadPool(1)
          try {
            implicit val xc: ExecutionContext = ExecutionContext.fromExecutorService(executor)
            val futures = new ListBuffer[Future[_]]()
      
            for (i <- 1 to 3) {
              futures += executor.submit(new Runnable {
                override def run(): Unit = {
                  val d = if (Random.nextInt(2) == 0) Some("d value") else None
                  val e = if (Random.nextInt(2) == 0) Some(5.0) else None
                  val f = if (Random.nextInt(2) == 0) Some(6.0) else None
                  println("DEBUG", d, e, f)
                  sparkSession.createDataset(Seq(
                    MyClass(new Timestamp(1L), "b", "c", d, e, f)
                  ))
                }
              })
            }
      
            futures.foreach(_.get())
          } finally {
            println("SHUTDOWN")
            executor.shutdown()
            sparkSession.stop()
          }
        }
      
        case class MyClass(
          a: Timestamp,
          b: String,
          c: String,
          d: Option[String],
          e: Option[Double],
          f: Option[Double]
        )
      }
      

      So it will usually come up during

      for i in $(seq 1 200); do
        echo $i
        spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar
      done
      

      causing a variety of possible errors, such as

      Exception in thread "main" java.util.concurrent.ExecutionException: scala.MatchError: scala.Option[String] (of class scala.reflect.internal.Types$ClassArgsTypeRef)
              at java.util.concurrent.FutureTask.report(FutureTask.java:122)
      Caused by: scala.MatchError: scala.Option[String] (of class scala.reflect.internal.Types$ClassArgsTypeRef)
      	at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210)

      or

      Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: Schema for type scala.Option[scala.Double] is not supported
      	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
      Caused by: java.lang.UnsupportedOperationException: Schema for type scala.Option[scala.Double] is not supported
      	at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789)

      Attachments

        Issue Links

          Activity

            People

              mwlon Martin Loncaric
              mwlon Martin Loncaric
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: