Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26555

Thread safety issue causes createDataset to fail with misleading errors

Log workAgile BoardRank to TopRank to BottomAttach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.4.0
    • Fix Version/s: 2.4.4, 3.0.0
    • Component/s: SQL
    • Labels:
      None

      Description

      This can be replicated (~2% of the time) with

      import java.sql.Timestamp
      import java.util.concurrent.{Executors, Future}
      
      import org.apache.spark.sql.SparkSession
      
      import scala.collection.mutable.ListBuffer
      import scala.concurrent.ExecutionContext
      import scala.util.Random
      
      object Main {
        def main(args: Array[String]): Unit = {
          val sparkSession = SparkSession.builder
            .getOrCreate()
          import sparkSession.implicits._
      
          val executor = Executors.newFixedThreadPool(1)
          try {
            implicit val xc: ExecutionContext = ExecutionContext.fromExecutorService(executor)
            val futures = new ListBuffer[Future[_]]()
      
            for (i <- 1 to 3) {
              futures += executor.submit(new Runnable {
                override def run(): Unit = {
                  val d = if (Random.nextInt(2) == 0) Some("d value") else None
                  val e = if (Random.nextInt(2) == 0) Some(5.0) else None
                  val f = if (Random.nextInt(2) == 0) Some(6.0) else None
                  println("DEBUG", d, e, f)
                  sparkSession.createDataset(Seq(
                    MyClass(new Timestamp(1L), "b", "c", d, e, f)
                  ))
                }
              })
            }
      
            futures.foreach(_.get())
          } finally {
            println("SHUTDOWN")
            executor.shutdown()
            sparkSession.stop()
          }
        }
      
        case class MyClass(
          a: Timestamp,
          b: String,
          c: String,
          d: Option[String],
          e: Option[Double],
          f: Option[Double]
        )
      }
      

      So it will usually come up during

      for i in $(seq 1 200); do
        echo $i
        spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar
      done
      

      causing a variety of possible errors, such as

      Exception in thread "main" java.util.concurrent.ExecutionException: scala.MatchError: scala.Option[String] (of class scala.reflect.internal.Types$ClassArgsTypeRef)
              at java.util.concurrent.FutureTask.report(FutureTask.java:122)
      Caused by: scala.MatchError: scala.Option[String] (of class scala.reflect.internal.Types$ClassArgsTypeRef)
      	at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210)

      or

      Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: Schema for type scala.Option[scala.Double] is not supported
      	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
      Caused by: java.lang.UnsupportedOperationException: Schema for type scala.Option[scala.Double] is not supported
      	at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789)

        Attachments

        Issue Links

          Activity

          $i18n.getText('security.level.explanation', $currentSelection) Viewable by All Users
          Cancel

            People

            • Assignee:
              mwlon Martin Loncaric Assign to me
              Reporter:
              mwlon Martin Loncaric

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment