Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-6770

DirectKafkaInputDStream has not been initialized when recovery from checkpoint

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • 1.3.0
    • None
    • DStreams
    • None

    Description

      I am read data from kafka using createDirectStream method and save the received log to Mysql, the code snippets as follows

          def functionToCreateContext(): StreamingContext = {
            val sparkConf = new SparkConf()
            val sc = new SparkContext(sparkConf)
            val ssc = new StreamingContext(sc, Seconds(10))
            ssc.checkpoint("/tmp/kafka/channel/offset") // set checkpoint directory
            ssc
          }
      
          val struct = StructType(StructField("log", StringType) ::Nil)
      
          // Get StreamingContext from checkpoint data or create a new one
          val ssc = StreamingContext.getOrCreate("/tmp/kafka/channel/offset", functionToCreateContext)
      
          val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
          val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
          SDB.foreachRDD(rdd => {
            val result = rdd.map(item => {
              println(item)
              val result = item._2 match {
                case e: String => Row.apply(e)
                case _ => Row.apply("")
              }
              result
            })
      
            println(result.count())
            val df = sqlContext.createDataFrame(result, struct)
            df.insertIntoJDBC(url, "test", overwrite = false)
          })
          ssc.start()
          ssc.awaitTermination()
          ssc.stop()
      

      But when I recovery the program from checkpoint, I encountered an exception:

      Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not been initialized
      	at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
      	at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
      	at scala.Option.orElse(Option.scala:257)
      	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
      	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
      	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
      	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
      	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
      	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
      	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
      	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
      	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
      	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223)
      	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218)
      	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
      	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
      	at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218)
      	at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89)
      	at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
      	at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
      	at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57)
      	at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
      	at java.lang.reflect.Method.invoke(Method.java:597)
      	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
      	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
      	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
      	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
      	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
      

      Not sure if this is a bug or a feature, but it's not obvious, so wanted to create a JIRA to make sure we document this behavior.Is someone can help me to see the reasons? Thank you.

      Attachments

        Activity

          People

            Unassigned Unassigned
            397090770 iteblog
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: