Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-8553

Resuming Checkpointed QueueStream Fails

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Won't Fix
    • 1.4.0
    • None
    • DStreams, PySpark
    • None

    Description

      After using a QueueStream within a checkpointed StreamingContext, when the context is resumed the following error is triggered:

      15/06/23 02:33:09 WARN QueueInputDStream: isTimeValid called with 1434987594000 ms where as last valid time is 1434987678000 ms
      15/06/23 02:33:09 ERROR StreamingContext: Error starting the context, marking it as stopped
      org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
      	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
      	at org.apache.spark.rdd.RDD.persist(RDD.scala:162)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$apply$8.apply(DStream.scala:357)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$apply$8.apply(DStream.scala:354)
      	at scala.Option.foreach(Option.scala:236)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:354)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
      	at scala.Option.orElse(Option.scala:257)
      	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
      	at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:195)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
      	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
      	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
      	at scala.Option.orElse(Option.scala:257)
      	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
      	at org.apache.spark.streaming.api.python.PythonStateDStream.compute(PythonDStream.scala:242)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
      	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
      	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
      	at scala.Option.orElse(Option.scala:257)
      	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
      	at org.apache.spark.streaming.api.python.PythonStateDStream.compute(PythonDStream.scala:241)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
      	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
      	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
      	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
      	at scala.Option.orElse(Option.scala:257)
      	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
      	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
      	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
      	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
      	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
      	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
      	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
      	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
      	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
      	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
      	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
      	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
      	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
      	at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
      	at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
      	at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
      	at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
      	at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
      	at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:606)
      	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
      	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
      	at py4j.Gateway.invoke(Gateway.java:259)
      	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
      	at py4j.commands.CallCommand.execute(CallCommand.java:79)
      	at py4j.GatewayConnection.run(GatewayConnection.java:207)
      	at java.lang.Thread.run(Thread.java:745)
      Traceback (most recent call last):
        File "factor.py", line 74, in <module>
          main()
        File "factor.py", line 60, in main
          purged_filename = sieving.run(sc, parameters, poly_filename=poly_filename)
        File "/home/ubuntu/spark_apps/sieving.py", line 403, in run
          (rels, outfiles) = run_sieving(sc, parameters, poly, poly_filename, fb_paths, rels_found, rels_wanted)
        File "/home/ubuntu/spark_apps/sieving.py", line 250, in run_sieving
          ssc.start()
        File "/home/ubuntu/spark/python/pyspark/streaming/context.py", line 185, in start
          self._jssc.start()
        File "/usr/local/lib/python3.4/dist-packages/py4j/java_gateway.py", line 538, in __call__
          self.target_id, self.name)
        File "/usr/local/lib/python3.4/dist-packages/py4j/protocol.py", line 300, in 
      

      The code triggering the error is Python3 running on Spark Standalone:

      ssc = StreamingContext.getOrCreate(s3n_path, make_ssc)
      ....
      p_batches = [ssc.sparkContext.parallelize(batch) for batch in task_batches]
      sieving_tasks = ssc.queueStream(p_batches)
      relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly, poly_path, fb_paths))
      countsState = relations.updateStateByKey(update_state)
      countsState.foreachRDD(gen_finals)
      ssc.checkpoint(s3n_path)
      ....
      ssc.start()
      ....
      
      def update_state(count, counts):
          if counts is None:
              counts = []
          print(count)
          counts.append(count)
          return counts
      
      
      
      def gen_finals(rdd):
          for (link, rank) in rdd.collect():
      
              acc = 0
              for l in rank:
                  acc += sum(l)
              run_sieving.counts.append(acc)
              run_sieving.out_files.add(link)
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            shaananc Shaanan Cohney
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: