Uploaded image for project: 'Zeppelin'
  1. Zeppelin
  2. ZEPPELIN-4179

SparkScala211Interpreter missing synchronization

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 0.8.0
    • None
    • Interpreters
    • None

    Description

      Hi

      using new spark interpreter with zeppelin.spark.useNew : true

      with scala 2.11 and spak 2.2 sometimes creates very cryptic errors

      while disabling it eliminates the problem

      I believe it's connected to the fact that scala interpretation is not synchronized, while in OldSparkInterpreter there is 

      synchronized (this) {} in interpret method

       

      the errors we are getting varies from :

      java.io.InvalidClassException: $line187494590728.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw;
      local class incompatible:
      stream classdesc serialVersionUID = 665869549059 8467942,
      local class serialVersionUID = 5448801901630725440

       

      to 

      java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)

       

       

      to reproduce
      1. enable new interpreter: zeppelin.spark.useNew : true

      2. define several notebooks with only difference of name of function suffix(1,2,3,...), run then sequentially:

      spark paragraph that defines simple udf:

      import spark.implicits._
      
      def num_to_ip3(num: Long) = {
      require(num >= 0 && num < (2L << 31), s"Invalid number to convert: ${num}")
      val buf : StringBuilder = new StringBuilder()
      var i = 24
      var ip = num
      while (i >= 0) {
      val a = ip >> i
      ip = a << i ^ ip
      buf.append(a)
      if (i > 0) {
      buf.append(".")
      }
      i = i - 8
      }
      buf.toString()
      }
      
      spark.udf.register("num_to_ip3", num_to_ip3 _)
      
      

      and in 2nd paragraph usage of the above udf 

      %sql
      select num_to_ip3(1342342)

      mixermt lior.c@taboola.com fyi

      Attachments

        Activity

          People

            Unassigned Unassigned
            igor.berman Igor Berman
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: