Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-20525

ClassCast exception when interpreting UDFs from a String in spark-shell

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.1.0
    • None
    • Spark Core, Spark Shell
    • OS X 10.11.6, spark-2.1.0-bin-hadoop2.7, Scala version 2.11.8 (bundled w/ Spark), Java 1.8.0_121

    Description

      I'm trying to interpret a string containing Scala code from inside a Spark session. Everything is working fine, except for User Defined Function-like things (UDFs, map, flatMap, etc). This is a blocker for production launch of a large number of Spark jobs.

      I've been able to boil the problem down to a number of spark-shell examples, shown below. Because it's reproducible in the spark-shell, these related issues *don't apply*:

      https://issues.apache.org/jira/browse/SPARK-9219
      https://issues.apache.org/jira/browse/SPARK-18075
      https://issues.apache.org/jira/browse/SPARK-19938
      http://apache-spark-developers-list.1001551.n3.nabble.com/This-Exception-has-been-really-hard-to-trace-td19362.html
      https://community.mapr.com/thread/21488-spark-error-scalacollectionseq-in-instance-of-orgapachesparkrddmappartitionsrdd
      https://github.com/scala/bug/issues/9237

      Any help is appreciated!

      ========
      Repro:
      Run each of the below from a spark-shell.

      Preamble:

      import scala.tools.nsc.GenericRunnerSettings
      import scala.tools.nsc.interpreter.IMain
      val settings = new GenericRunnerSettings( println _ )
      settings.usejavacp.value = true
      val interpreter = new IMain(settings, new java.io.PrintWriter(System.out))
      interpreter.bind("spark", spark);

      These work:

      // works:
      interpreter.interpret("val x = 5")

      // works:
      interpreter.interpret("import spark.implicits._\nval df = spark.sparkContext.parallelize(Seq(\"foo\",\"bar\")).toDF.show")

      These do not work:

      // doesn't work, fails with seq/RDD serialization error:
      interpreter.interpret("import org.apache.spark.sql.functions.\nimport spark.implicits.\nval upper: String => String = _.toUpperCase\nval upperUDF = udf(upper)\nspark.sparkContext.parallelize(Seq(\"foo\",\"bar\")).toDF.withColumn(\"UPPER\", upperUDF($\"value\")).show")

      // doesn't work, fails with seq/RDD serialization error:
      interpreter.interpret("import org.apache.spark.sql.functions.\nimport spark.implicits.\nval upper: String => String = _.toUpperCase\nspark.udf.register(\"myUpper\", upper)\nspark.sparkContext.parallelize(Seq(\"foo\",\"bar\")).toDF.withColumn(\"UPPER\", callUDF(\"myUpper\", ($\"value\"))).show")

      The not-working ones fail with this exception:

      Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
      at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
      at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
      at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2237)
      at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
      at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
      at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
      at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
      at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
      at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
      at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
      at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
      at org.apache.spark.scheduler.Task.run(Task.scala:99)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

      Attachments

        1. UdfTest.scala
          3 kB
          Dave Knoester

        Issue Links

          Activity

            People

              Unassigned Unassigned
              dknoester Dave Knoester
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: