Uploaded image for project: 'Zeppelin'
  1. Zeppelin
  2. ZEPPELIN-4079

SparkShims.getNoteId returns NPE

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 0.8.1
    • None
    • zeppelin-interpreter
    • None

    Description

      When I run the following Scala program in Zeppelin notebook, A NPE is shown in the logs/zeppelin-interpreter-spark-<name>.log file.

      import org.apache.spark.SparkConf
      import org.apache.spark.rdd.RDD
      import org.apache.spark.sql.SparkSession
      
      import org.apache.spark.streaming.kafka010._
      import org.apache.spark.streaming.kafka010.LocationStrategies.PreferBrokers
      import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
      import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
      
      import org.apache.kafka.common.serialization.StringDeserializer
      
      import play.api.libs.json._
      
      val PREFIX = "CK-LOG ====------> "
      
      case class SenseData(hash: String, value: Float, updated: String)
      
      /** Lazily instantiated singleton instance of SparkSession */
      object SparkSessionSingleton {
      
        @transient  private var instance: SparkSession = _
      
        def getInstance(sparkConf: SparkConf): SparkSession = {
          if (instance == null) {
            instance = SparkSession
              .builder
              .config(sparkConf)
              .getOrCreate()
          }
          instance
        }
      }
      
      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "localhost:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "sensor_data-2019",
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )
      
      val topics = Array("plugin_topic_536cc303-eb2f-4ff9-b546-d8c59b6c5466")
      
      val streamingContext = new StreamingContext(sc, Seconds(60))
      println(PREFIX + "streamContext created")
      
      val stream = KafkaUtils.createDirectStream(
        streamingContext,
        PreferBrokers,
        Subscribe[String, String](topics, kafkaParams)
      )
      println(PREFIX + "DStream created")
      
      // val msgs = stream.window(Seconds(10))
      stream.map(
        record => {
          var json: JsValue = Json.parse(record.value)
          SenseData(json("b")("notification")("deviceId").as[String], json("b")("notification")("parameters")("temperature").as[Float], json("b")("notification")("timestamp").as[String])
        }
      ).foreachRDD( (rdd: RDD[SenseData], time: Time) => {
        println("--- New RDD with " + rdd.partitions.size + " partitions and " + rdd.count + " records")
      
        // Get the singleton instance of SparkSession
        val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
        import spark.implicits._
      
        // this is how to print the rdd in the
        // https://spark.apache.org/docs/latest/rdd-programming-guide.html#printing-elements-of-an-rdd
        // rdd.take(100).foreach(println)
        rdd.collect().foreach(println)
      
        // rdd.toDF() won't work without spark session and imported implicits
        // https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations
        rdd.toDF().createOrReplaceTempView("sensedata")
      
        val senseDataDF =
          spark.sql("select value, updated from sensedata")
        println(s"========= $time =========")
        senseDataDF.show()
      }
      )
      
      streamingContext.start()

      Error:

      ERROR [2019-03-18 17:02:00,021] ({spark-listener-group-shared} Logging.scala[logError]:91) - Listener threw an exception
      java.lang.NullPointerException
      at org.apache.zeppelin.spark.SparkShims.getNoteId(SparkShims.java:96)
      at org.apache.zeppelin.spark.SparkShims.buildSparkJobUrl(SparkShims.java:117)
      at org.apache.zeppelin.spark.Spark2Shims$1.onJobStart(Spark2Shims.java:44)
      at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:37)
      at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
      at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
      at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91)
      at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92)
      at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92)
      at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
      at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
      at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
      at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
      at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
      at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)

      It looks like this issue is related to this and fixed in 0.8.0.

      The issue is in line 96 of SparkShims.java file and caused by jobgroupid being null which is caused by this - a missing spark.jobGroup.id. It is set/passed during job start. I am not sure why it is not passed or taken from the interpreter properties.

      Adding the spark.jobGroup.id property to an arbitrary string in the Zeppelin interpreters -> Spark -> Properties didn't sort it out either.

      Attachments

        1. spark-interpreter.log
          67 kB
          Chandana Kithalagama
        2. Screen Shot 2019-03-23 at 11.38.35 pm.png
          35 kB
          Chandana Kithalagama
        3. Screen Shot 2019-03-23 at 10.55.54 pm.png
          408 kB
          Chandana Kithalagama
        4. Screen Shot 2019-03-23 at 10.52.05 pm.png
          434 kB
          Chandana Kithalagama
        5. Screen Shot 2019-03-21 at 10.14.58 pm.png
          110 kB
          Chandana Kithalagama

        Issue Links

          Activity

            People

              Unassigned Unassigned
              ckit Chandana Kithalagama
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated: