Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27150

Scheduling Within an Application : Spark SQL randomly failed on UDF

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.3.1, 2.3.2, 2.3.3, 2.4.0
    • None
    • Spark Core, SQL
    • None

    Description

      I run this (reduced) following code multiples times under the same exact input files : 

      def myUdf(input : java.lang.String) : Option[Long] = {
        None
      }
      
      ...
      
      val sparkMain = ... .getOrCreate()
      val d = inputPaths.toList.par
      val p = new scala.concurrent.forkjoin.ForkJoinPool(12)
      
      try {
      
         d.tasksupport = new scala.collection.parallel.ForkJoinTaskSupport(p)
         d.foreach {
          case (inputPath) => {
            val spark = sparkMain.newSession()
            
            spark.udf.register("myUdf",udf(myUdf _)) 
      
            val df = spark.read.format("csv").option("inferSchema", "false").option("mode", "DROPMALFORMED").schema(mySchema).load(inputPath) 
      
            df.createOrReplaceTempView("mytable")
      
            val sql = spark.sql(""" SELECT CAST( myUdf(updated_date) as long) FROM mytable """)
      
            sql.write.parquet( ... ) 
         }
       }
      } finally {
        p.shutdown()
      }

      Once in ten (spark-submit the application), the driver failed with an Exception related to Spark SQL and the UDF. However, as you can see, I have reduced the UDF to minimum, it now returns None everytime, and the problem still occurs. So, I think the problem is more likely related to having the driver submitting multiples jobs in parallel, aka "scheduling within apps".

      The exception is as follow :

      Exception in thread "main" java.lang.reflect.InvocationTargetException
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
      at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
      Caused by: org.apache.spark.sql.AnalysisException: cannot resolve 'CAST(UDF(updated_date) AS BIGINT)' due to data type mismatch: cannot cast struct<> to bigint; line 5 pos 10;
      ...
      at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
      at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
      at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
      at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
      at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
      at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
      at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
      at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
      at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
      at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
      at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
      at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
      at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118)
      at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.immutable.List.foreach(List.scala:381)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      at scala.collection.immutable.List.map(List.scala:285)
      at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:122)
      at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
      at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
      at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
      at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
      at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
      at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
      at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
      at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
      at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
      at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
      at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
      at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
      at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
      at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
      ...
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
      at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
      at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
      at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
      at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
      at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
      at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
      at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
      at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
      at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
      at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      

      So basically, with the exact same inputs, nine of ten times, everything succeed. But approximately one of ten, the previous exception occurs, so it's very strange, and tend to prove that there are some side effect inside Spark core code when using scheduling within app  ...

      Thanks for investigating

      EDIT :

      When I do :

      val sql = spark.sql(""" SELECT myUdf(updated_date) FROM mytable """)
      

      Instead of :

      val sql = spark.sql(""" SELECT CAST( myUdf(updated_date) as long) FROM mytable """)
      

      I'm no longer ensuring that there is either a long or null returned by the UDF. So it fails at the parquet level :

      Caused by: org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: optional group updated_date {
      }
      

      So it seems that in such case (when using scheduling within app) the UDF is returning randomly a struct/group instead of null/None ... 

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              joshsean Josh Sean
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: