Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21065

Spark Streaming concurrentJobs + StreamingJobProgressListener conflict

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Won't Fix
    • Affects Version/s: 2.1.0
    • Fix Version/s: None
    • Labels:
      None

      Description

      My streaming application has 200+ output operations, many of them stateful and several of them windowed. In an attempt to reduce the processing times, I set "spark.streaming.concurrentJobs" to 2+. Initial results are very positive, cutting our processing time from ~3 minutes to ~1 minute, but eventually we encounter an exception as follows:

      (Note that 1496977560000 ms is 2017-06-09 03:06:00, so it's trying to get a batch from 45 minutes before the exception is thrown.)

      2017-06-09 03:50:28,259 [Spark Listener Bus] ERROR org.apache.spark.streaming.scheduler.StreamingListenerBus - Listener StreamingJobProgressListener threw an exception
      java.util.NoSuchElementException: key not found 1496977560000 ms
      at scala.collection.MalLike$class.default(MapLike.scala:228)
      at scala.collection.AbstractMap.default(Map.scala:59)
      at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
      at org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:128)
      at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
      at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
      at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
      at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
      at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
      ...

      The Spark code causing the exception is here:

      https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC125
      override def onOutputOperationCompleted(
      outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = synchronized

      { // This method is called before onBatchCompleted {color:red}

      runningBatchUIData(outputOperationCompleted.outputOperationInfo.batchTime).
      updateOutputOperationInfo(outputOperationCompleted.outputOperationInfo)
      }

      It seems to me that it may be caused by that batch being removed earlier.
      https://github.com/apache/spark/blob/branch-2.1/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala#LC102

      override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
      synchronized {
      waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
      runningBatchUIData.remove(batchCompleted.batchInfo.batchTime)

      val batchUIData = BatchUIData(batchCompleted.batchInfo)
      completedBatchUIData.enqueue(batchUIData)
      if (completedBatchUIData.size > batchUIDataLimit)

      { val removedBatch = completedBatchUIData.dequeue() batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime) }

      totalCompletedBatches += 1L

      totalProcessedRecords += batchUIData.numRecords
      }
      }

      What is the solution here? Should I make my spark streaming context remember duration a lot longer? ssc.remember(batchDuration * rememberMultiple)

      Otherwise, it seems like there should be some kind of existence check on runningBatchUIData before dereferencing it.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              dutrow Dan Dutrow
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: