Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-13316

"SparkException: DStream has not been initialized" when restoring StreamingContext from checkpoint and the dstream is created afterwards

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Not A Problem
    • None
    • None
    • DStreams
    • None

    Description

      I faced the issue today but it was already reported on SO a couple of days ago and the reason is that a dstream is registered after a StreamingContext has been recreated from checkpoint.

      It appears that...no dstreams must be registered after a StreamingContext has been recreated from checkpoint. It is not obvious at first.

      The code:

      def createStreamingContext(): StreamingContext = {
          val ssc = new StreamingContext(sparkConf, Duration(1000))
          ssc.checkpoint(checkpointDir)
          ssc
      }
      val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)
      
      val socketStream = ssc.socketTextStream(...)
      socketStream.checkpoint(Seconds(1))
      socketStream.foreachRDD(...)
      

      It should be described in docs at the very least and/or checked in the code when the streaming computation starts.

      The exception is as follows:

      org.apache.spark.SparkException: org.apache.spark.streaming.dstream.ConstantInputDStream@724797ab has not been initialized
        at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:311)
        at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:89)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:332)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:332)
        at scala.Option.orElse(Option.scala:289)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:329)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:233)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:228)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:228)
        at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:97)
        at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:83)
        at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:589)
        at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:585)
        at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:585)
        at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
        at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:585)
        at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:579)
        ... 43 elided
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            jlaskowski Jacek Laskowski
            Votes:
            1 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: