Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-11932

trackStateByKey throws java.lang.IllegalArgumentException: requirement failed on restarting from checkpoint

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.6.0
    • Component/s: DStreams
    • Labels:
      None

      Description

      The problem is that when recovering a streaming application using trackStateByKey from Dstream checkpoints, there is the following exception.

      Code

        StreamingContext.getOrCreate(".", () => createContext(args))
        ...
      
        def createContext(args: Array[String]) : StreamingContext = {
      
          val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
          // Create the context with a 1 second batch size
          val ssc = new StreamingContext(sparkConf, Seconds(1))
          
          ssc.checkpoint(".")
      
          // Initial RDD input to trackStateByKey
          val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
      
          // Create a ReceiverInputDStream on target ip:port and count the
          // words in input stream of \n delimited test (eg. generated by 'nc')
          val lines = ssc.socketTextStream(args(0), args(1).toInt)
          val words = lines.flatMap(_.split(" "))
          val wordDstream = words.map(x => (x, 1))
      
          // Update the cumulative count using updateStateByKey
          // This will give a DStream made of state (which is the cumulative count of the words)
          val trackStateFunc = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {
            val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
            val output = (word, sum)
            state.update(sum)
            Some(output)
          }
      
          val stateDstream = wordDstream.trackStateByKey(
            StateSpec.function(trackStateFunc).initialState(initialRDD))
          stateDstream.print()
          
          ssc
        
        }
      

      Error

      15/11/23 10:55:07 ERROR StreamingContext: Error starting the context, marking it as stopped
      java.lang.IllegalArgumentException: requirement failed
      at scala.Predef$.require(Predef.scala:221)
      at org.apache.spark.streaming.rdd.TrackStateRDD.<init>(TrackStateRDD.scala:133)
      at org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:148)
      at org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:143)
      at scala.Option.map(Option.scala:145)
      at org.apache.spark.streaming.dstream.InternalTrackStateDStream.compute(TrackStateDStream.scala:143)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
      at scala.Option.orElse(Option.scala:257)
      at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
      at org.apache.spark.streaming.dstream.TrackStateDStreamImpl.compute(TrackStateDStream.scala:66)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
      at scala.Option.orElse(Option.scala:257)
      at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
      at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scla:47)
      at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
      at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
      at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
      at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
      at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:231)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:226)
      at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
      at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
      at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:226
      at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:96)
      at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:83)
      at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:609)
      at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:605)
      at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:605)
      at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
      at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:605)
      at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:599)
      at org.apache.spark.examples.streaming.StatefulNetworkWordCount$.main(StatefulNetworkWordCount.scala:48)
      at org.apache.spark.examples.streaming.StatefulNetworkWordCount.main(StatefulNetworkWordCount.scala)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:483)
      at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:727)
      at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
      at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
      at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
      at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
      

      The reason is that TrackStateRDDs generated by trackStateByKey expect the previous batch's TrackStateRDDs to have a partitioner. However, when recovery from DStream checkpoints, the RDDs recovered from RDD checkpoints do not have a partitioner attached to it. This is because RDD checkpoints do not preserve the partitioner (SPARK-12004).

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                tdas Tathagata Das
                Reporter:
                tdas Tathagata Das
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: