Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17397

Show example of what to do when awaitTermination() throws an Exception

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Minor
    • Resolution: Incomplete
    • 2.0.0
    • None
    • Documentation, DStreams
    • Linux, Scala but probably general

    Description

      When awaitTermination propagates an exception that was thrown in processing a batch, the StreamingContext keeps running. Perhaps this is by design, but I don't see any mention of it in the API docs or the streaming programming guide. It's not clear what idiom should be used to block the thread until the context HAS been stopped in a situation where stream processing is throwing lots of exceptions.

      For example, in the following, streaming takes the full 30 seconds to terminate. My hope in asking this is to improve my own understanding and perhaps inspire documentation improvements. I'm not filing a bug because it's not clear to me whether this is working as intended.

          val conf = new SparkConf().setAppName("ExceptionPropagation").setMaster("local[4]")
          val sc = new SparkContext(conf)
      
          // streams will produce data every second
          val ssc = new StreamingContext(sc, Seconds(1))
          val qm = new QueueMaker(sc, ssc)
      
          // create the stream
          val stream = // create some stream
      
          // register for data
          stream
            .map(x => { throw new SomeException("something"); x} )
            .foreachRDD(r => println("*** count = " + r.count()))
      
          // start streaming
          ssc.start()
      
          new Thread("Delayed Termination") {
            override def run() {
              Thread.sleep(30000)
              ssc.stop()
            }
          }.start()
      
          println("*** producing data")
          // start producing data
          qm.populateQueue()
      
          try {
            ssc.awaitTermination()
            println("*** streaming terminated")
          } catch {
            case e: Exception => {
              println("*** streaming exception caught in monitor thread")
            }
          }
      
          // if the above goes down the exception path, there seems no 
          // good way to block here until the streaming context is stopped 
      
          println("*** done")
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            spirom Spiro Michaylov
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: