Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
0.8.1
-
None
-
None
-
zeppelin-0.8.1-bin-netinst running on local machine,
spark interpreter,
2 external dependencies added to the spark interpreter configs:
- org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0
- com.typesafe.play:play-json_2.11:2.6.8
Connecting to Spark 2.4.0 running on local machine
$ java -version
java version "1.8.0_191"
Java(TM) SE Runtime Environment (build 1.8.0_191-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)zeppelin-0.8.1-bin-netinst running on local machine, spark interpreter, 2 external dependencies added to the spark interpreter configs: org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0 com.typesafe.play:play-json_2.11:2.6.8 Connecting to Spark 2.4.0 running on local machine $ java -version java version "1.8.0_191" Java(TM) SE Runtime Environment (build 1.8.0_191-b12) Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)
Description
When I run the following Scala program in Zeppelin notebook, A NPE is shown in the logs/zeppelin-interpreter-spark-<name>.log file.
import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferBrokers import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.{Seconds, StreamingContext, Time} import org.apache.kafka.common.serialization.StringDeserializer import play.api.libs.json._ val PREFIX = "CK-LOG ====------> " case class SenseData(hash: String, value: Float, updated: String) /** Lazily instantiated singleton instance of SparkSession */ object SparkSessionSingleton { @transient private var instance: SparkSession = _ def getInstance(sparkConf: SparkConf): SparkSession = { if (instance == null) { instance = SparkSession .builder .config(sparkConf) .getOrCreate() } instance } } val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "localhost:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "sensor_data-2019", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("plugin_topic_536cc303-eb2f-4ff9-b546-d8c59b6c5466") val streamingContext = new StreamingContext(sc, Seconds(60)) println(PREFIX + "streamContext created") val stream = KafkaUtils.createDirectStream( streamingContext, PreferBrokers, Subscribe[String, String](topics, kafkaParams) ) println(PREFIX + "DStream created") // val msgs = stream.window(Seconds(10)) stream.map( record => { var json: JsValue = Json.parse(record.value) SenseData(json("b")("notification")("deviceId").as[String], json("b")("notification")("parameters")("temperature").as[Float], json("b")("notification")("timestamp").as[String]) } ).foreachRDD( (rdd: RDD[SenseData], time: Time) => { println("--- New RDD with " + rdd.partitions.size + " partitions and " + rdd.count + " records") // Get the singleton instance of SparkSession val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) import spark.implicits._ // this is how to print the rdd in the // https://spark.apache.org/docs/latest/rdd-programming-guide.html#printing-elements-of-an-rdd // rdd.take(100).foreach(println) rdd.collect().foreach(println) // rdd.toDF() won't work without spark session and imported implicits // https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations rdd.toDF().createOrReplaceTempView("sensedata") val senseDataDF = spark.sql("select value, updated from sensedata") println(s"========= $time =========") senseDataDF.show() } ) streamingContext.start()
Error:
ERROR [2019-03-18 17:02:00,021] ({spark-listener-group-shared} Logging.scala[logError]:91) - Listener threw an exception java.lang.NullPointerException at org.apache.zeppelin.spark.SparkShims.getNoteId(SparkShims.java:96) at org.apache.zeppelin.spark.SparkShims.buildSparkJobUrl(SparkShims.java:117) at org.apache.zeppelin.spark.Spark2Shims$1.onJobStart(Spark2Shims.java:44) at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:37) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91) at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92) at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92) at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87) at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87) at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302) at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
It looks like this issue is related to this and fixed in 0.8.0.
The issue is in line 96 of SparkShims.java file and caused by jobgroupid being null which is caused by this - a missing spark.jobGroup.id. It is set/passed during job start. I am not sure why it is not passed or taken from the interpreter properties.
Adding the spark.jobGroup.id property to an arbitrary string in the Zeppelin interpreters -> Spark -> Properties didn't sort it out either.
Attachments
Attachments
Issue Links
- relates to
-
ZEPPELIN-3242 Listener threw an exception java.lang.NPEat o.a.zeppelin.spark.Utils.getNoteId(Utils.java:156)
- Resolved