Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-31969

StreamingJobProgressListener threw an exception java.util.NoSuchElementException for Long Running Streaming Job

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.4.0
    • None
    • None
    • Kubernetes

      Spark 2.4.0

    Description

      We are running a long running streaming job and Below exception is seen continuosly after sometime. After the jobs starts all of a sudden our Spark streaming application's batch durations start to increase. At around the same time there starts to appear an error log that does not refer to the application code at all. We couldn't find any other significant errors in the driver logs.

       

      Our Application runs fine for several hours(4 or so) , then delay starts to build up and finally every 9-10 hours the application restarts. 

      Any suggestion on what could be the issue ?

       

      Refrerred ticket : https://issues.apache.org/jira/browse/SPARK-21065 for similar issue, in our case we are not setting anything for spark.streaming.concurrentJobs and default value is taken.

       {"type":"log", "level":"ERROR", "name":"STREAMING_OTHERS", "time":"2020-06-09T04:31:43.918Z", "timezone":"UTC", "class":"spark-listener-group-appStatus", "method":"streaming.scheduler.StreamingListenerBus.logError(91)", "log":"Listener StreamingJobProgressListener threw an exception\u000Ajava.util.NoSuchElementException: key not found: 1591677100000 ms\u000A\u0009at scala.collection.MapLike$class.default(MapLike.scala:228)\u000A\u0009at scala.collection.AbstractMap.default(Map.scala:59)\u000A\u0009at scala.collection.mutable.HashMap.apply(HashMap.scala:65)\u000A\u0009at org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:134)\u000A\u0009at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)\u000A\u0009at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)\u000A\u0009at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91)\u000A\u0009at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)\u000A\u0009at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)\u000A\u0009at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:80)\u000A\u0009at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)\u000A\u0009at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)\u000A\u0009at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91)\u000A\u0009at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92)\u000A\u0009at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92)\u000A\u0009at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)\u000A\u0009at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)\u000A\u0009at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)\u000A\u0009at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)\u000A\u0009at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)\u000A\u0009at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)\u000A\u0009at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)\u000A"}
      java.util.NoSuchElementException: key not found: 1591677100000 ms
      at scala.collection.MapLike$class.default(MapLike.scala:228) ~[scala-library-2.11.12.jar:?]
      at scala.collection.AbstractMap.default(Map.scala:59) ~[scala-library-2.11.12.jar:?]
      at scala.collection.mutable.HashMap.apply(HashMap.scala:65) ~[scala-library-2.11.12.jar:?]
      at org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:134) ~[spark-streaming_2.11-2.4.0.jar:2.4.0]
      at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) ~[spark-streaming_2.11-2.4.0.jar:2.4.0]
      at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29) [spark-streaming_2.11-2.4.0.jar:2.4.0]
      at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91) [spark-core_2.11-2.4.0.jar:2.4.0]
      at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29) [spark-streaming_2.11-2.4.0.jar:2.4.0]
      at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43) [spark-streaming_2.11-2.4.0.jar:2.4.0]
      at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:80) [spark-core_2.11-2.4.0.jar:2.4.0]
      at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) [spark-core_2.11-2.4.0.jar:2.4.0]
      at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) [spark-core_2.11-2.4.0.jar:2.4.0]
      at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91) [spark-core_2.11-2.4.0.jar:2.4.0]
      at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92) [spark-core_2.11-2.4.0.jar:2.4.0]
      at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92) [spark-core_2.11-2.4.0.jar:2.4.0]
      at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87) [spark-core_2.11-2.4.0.jar:2.4.0]
      at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87) [spark-core_2.11-2.4.0.jar:2.4.0]
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) [scala-library-2.11.12.jar:?]
      at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87) [spark-core_2.11-2.4.0.jar:2.4.0]
      at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83) [spark-core_2.11-2.4.0.jar:2.4.0]
      at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302) [spark-core_2.11-2.4.0.jar:2.4.0]
      at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82) [spark-core_2.11-2.4.0.jar:2.4.0]

      Attachments

        1. executor_log
          0.7 kB
          ThimmeGowda
        2. driver_log
          7 kB
          ThimmeGowda

        Activity

          People

            Unassigned Unassigned
            gowdatp123 ThimmeGowda
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: