Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-44706

ProductEncoder not working as expected within User Defined Fuction (UDF)

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.4.0
    • None
    • SQL
    • None
    • DBR (Databricks) 13.2, Spark 3.4.0 and Scala 2.12.

    Description

      Hi,

      When running the following code in Databricks' notebook:

      import org.apache.spark.{SparkConf, SparkContext}
      import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
        
        case class DataRow(field1: String)
        val sparkSession = SparkSession.builder.getOrCreate()
        import sparkSession.implicits._
        val udf1 = udf((x: String) => {
          val testData = Seq(DataRow("test1"), DataRow("test2")).toDF("test") // This is failing at runtime
          3
        })
        val df1 = Seq(DataRow("test1"), DataRow("test2")).toDF("test").withColumn("udf", udf1($"test")) // This is working
        display(df1)
      

      I get a ScalaReflectionException at runtime:

      ScalaReflectionException: class $linedb3da7b2933d4b63a62b3d2a21c675f2141.$read in JavaMirror with com.databricks.backend.daemon.driver.DriverLocal$DriverLocalClassLoader@717115ad of type class com.databricks.backend.daemon.driver.DriverLocal$DriverLocalClassLoader with classpath [] and parent being com.databricks.backend.daemon.driver.ClassLoaders$ReplWrappingClassLoader@1670897 of type class com.databricks.backend.daemon.driver.ClassLoaders$ReplWrappingClassLoader with classpath [<unknown>] and parent being com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader@1a42da0a of type class com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader with classpath [file:/local_disk0/tmp/repl/spark-4071811259476162981-8e526ae9-25fb-4545-8d3f-963a8661cd2b/] and parent being sun.misc.Launcher$AppClassLoader@43ee72e6 of type class sun.misc.Launcher$AppClassLoader with classpath [...] not found. 
      at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:141) 
      at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:29) 
      at $linedb3da7b2933d4b63a62b3d2a21c675f2196.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$c35395a9233a7197629c985e87dce75$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$typecreator6$1.apply(command-2013905963200886:11) 
      at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:237) 
      at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:237) 
      at org.apache.spark.sql.catalyst.ScalaReflection$.encoderFor(ScalaReflection.scala:848) 
      at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:55) 
      at org.apache.spark.sql.Encoders$.product(Encoders.scala:312) 
      at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder(SQLImplicits.scala:302) 
      at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder$(SQLImplicits.scala:302) 
      at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34) 
      at $linedb3da7b2933d4b63a62b3d2a21c675f2196.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$c35395a9233a7197629c985e87dce75$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$udf1$1(command-2013905963200886:11) 

      Declaring the case class within the UDF:

      import org.apache.spark.{SparkConf, SparkContext}
      import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
       
        case class DataRow(field1: String)
        val sparkSession = SparkSession.builder.getOrCreate()
        import sparkSession.implicits._  
        val udf1 = udf((x: String) => {
          case class DataRow(field1: String)
          val testData = Seq(DataRow("test1"), DataRow("test2")).toDF("test") // This is failing at runtime
          3
        })  
        val df1 = Seq(DataRow("test1"), DataRow("test2")).toDF("test").withColumn("udf", udf1($"test")) // This is working
        display(df1) 

      I get a different error

      error: value toDF is not a member of Seq[DataRow] val testData = Seq(DataRow("test1"), DataRow("test2")).toDF("test")

      Questions:

      • Is this an expected behaviour or a bug?

      Thanks.

      Thanks.

      Attachments

        Activity

          People

            Unassigned Unassigned
            jcblancomartinez Juan Carlos Blanco Martinez
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: