Uploaded image for project: 'Zeppelin'
  1. Zeppelin
  2. ZEPPELIN-4079

SparkShims.getNoteId returns NPE

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 0.8.1
    • None
    • zeppelin-interpreter
    • None

    Description

      When I run the following Scala program in Zeppelin notebook, A NPE is shown in the logs/zeppelin-interpreter-spark-<name>.log file.

      import org.apache.spark.SparkConf
      import org.apache.spark.rdd.RDD
      import org.apache.spark.sql.SparkSession
      
      import org.apache.spark.streaming.kafka010._
      import org.apache.spark.streaming.kafka010.LocationStrategies.PreferBrokers
      import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
      import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
      
      import org.apache.kafka.common.serialization.StringDeserializer
      
      import play.api.libs.json._
      
      val PREFIX = "CK-LOG ====------> "
      
      case class SenseData(hash: String, value: Float, updated: String)
      
      /** Lazily instantiated singleton instance of SparkSession */
      object SparkSessionSingleton {
      
        @transient  private var instance: SparkSession = _
      
        def getInstance(sparkConf: SparkConf): SparkSession = {
          if (instance == null) {
            instance = SparkSession
              .builder
              .config(sparkConf)
              .getOrCreate()
          }
          instance
        }
      }
      
      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "localhost:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "sensor_data-2019",
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )
      
      val topics = Array("plugin_topic_536cc303-eb2f-4ff9-b546-d8c59b6c5466")
      
      val streamingContext = new StreamingContext(sc, Seconds(60))
      println(PREFIX + "streamContext created")
      
      val stream = KafkaUtils.createDirectStream(
        streamingContext,
        PreferBrokers,
        Subscribe[String, String](topics, kafkaParams)
      )
      println(PREFIX + "DStream created")
      
      // val msgs = stream.window(Seconds(10))
      stream.map(
        record => {
          var json: JsValue = Json.parse(record.value)
          SenseData(json("b")("notification")("deviceId").as[String], json("b")("notification")("parameters")("temperature").as[Float], json("b")("notification")("timestamp").as[String])
        }
      ).foreachRDD( (rdd: RDD[SenseData], time: Time) => {
        println("--- New RDD with " + rdd.partitions.size + " partitions and " + rdd.count + " records")
      
        // Get the singleton instance of SparkSession
        val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
        import spark.implicits._
      
        // this is how to print the rdd in the
        // https://spark.apache.org/docs/latest/rdd-programming-guide.html#printing-elements-of-an-rdd
        // rdd.take(100).foreach(println)
        rdd.collect().foreach(println)
      
        // rdd.toDF() won't work without spark session and imported implicits
        // https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations
        rdd.toDF().createOrReplaceTempView("sensedata")
      
        val senseDataDF =
          spark.sql("select value, updated from sensedata")
        println(s"========= $time =========")
        senseDataDF.show()
      }
      )
      
      streamingContext.start()

      Error:

      ERROR [2019-03-18 17:02:00,021] ({spark-listener-group-shared} Logging.scala[logError]:91) - Listener threw an exception
      java.lang.NullPointerException
      at org.apache.zeppelin.spark.SparkShims.getNoteId(SparkShims.java:96)
      at org.apache.zeppelin.spark.SparkShims.buildSparkJobUrl(SparkShims.java:117)
      at org.apache.zeppelin.spark.Spark2Shims$1.onJobStart(Spark2Shims.java:44)
      at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:37)
      at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
      at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
      at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91)
      at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92)
      at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92)
      at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
      at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
      at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
      at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
      at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
      at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)

      It looks like this issue is related to this and fixed in 0.8.0.

      The issue is in line 96 of SparkShims.java file and caused by jobgroupid being null which is caused by this - a missing spark.jobGroup.id. It is set/passed during job start. I am not sure why it is not passed or taken from the interpreter properties.

      Adding the spark.jobGroup.id property to an arbitrary string in the Zeppelin interpreters -> Spark -> Properties didn't sort it out either.

      Attachments

        1. Screen Shot 2019-03-21 at 10.14.58 pm.png
          110 kB
          Chandana Kithalagama
        2. Screen Shot 2019-03-23 at 10.52.05 pm.png
          434 kB
          Chandana Kithalagama
        3. Screen Shot 2019-03-23 at 10.55.54 pm.png
          408 kB
          Chandana Kithalagama
        4. Screen Shot 2019-03-23 at 11.38.35 pm.png
          35 kB
          Chandana Kithalagama
        5. spark-interpreter.log
          67 kB
          Chandana Kithalagama

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            ckit Chandana Kithalagama

            Dates

              Created:
              Updated:

              Slack

                Issue deployment