Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-31923

Event log cannot be generated when some internal accumulators use unexpected types

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.3.4, 2.4.0, 2.4.1, 2.4.2, 2.4.3, 2.4.4, 2.4.5, 2.4.6
    • Fix Version/s: 2.4.7, 3.0.1, 3.1.0
    • Component/s: Spark Core
    • Labels:
      None

      Description

      A user may use internal accumulators by adding the "internal.metrics." prefix to the accumulator name to hide sensitive information from UI (Accumulators except internal ones will be shown in Spark UI).

      However, org.apache.spark.util.JsonProtocol.accumValueToJson assumes an internal accumulator has only 3 possible types: int, long, and java.util.List[(BlockId, BlockStatus)]. When an internal accumulator uses an unexpected type, it will crash.

      An event log that contains such accumulator will be dropped because it cannot be converted to JSON, and it will cause weird UI issue when rendering in Spark History Server. For example, if `SparkListenerTaskEnd` is dropped because of this issue, the user will see the task is still running even if it was finished.

      It's better to make accumValueToJson more robust.


      How to reproduce it:

      • Enable Spark event log
      • Run the following command:
      scala> val accu = sc.doubleAccumulator("internal.metrics.foo")
      accu: org.apache.spark.util.DoubleAccumulator = DoubleAccumulator(id: 0, name: Some(internal.metrics.foo), value: 0.0)
      
      scala> sc.parallelize(1 to 1, 1).foreach { _ => accu.add(1.0) }
      20/06/06 16:11:27 ERROR AsyncEventQueue: Listener EventLoggingListener threw an exception
      java.lang.ClassCastException: java.lang.Double cannot be cast to java.util.List
      	at org.apache.spark.util.JsonProtocol$.accumValueToJson(JsonProtocol.scala:330)
      	at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$3.apply(JsonProtocol.scala:306)
      	at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$3.apply(JsonProtocol.scala:306)
      	at scala.Option.map(Option.scala:146)
      	at org.apache.spark.util.JsonProtocol$.accumulableInfoToJson(JsonProtocol.scala:306)
      	at org.apache.spark.util.JsonProtocol$$anonfun$accumulablesToJson$2.apply(JsonProtocol.scala:299)
      	at org.apache.spark.util.JsonProtocol$$anonfun$accumulablesToJson$2.apply(JsonProtocol.scala:299)
      	at scala.collection.immutable.List.map(List.scala:284)
      	at org.apache.spark.util.JsonProtocol$.accumulablesToJson(JsonProtocol.scala:299)
      	at org.apache.spark.util.JsonProtocol$.taskInfoToJson(JsonProtocol.scala:291)
      	at org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:145)
      	at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:76)
      	at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:138)
      	at org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:158)
      	at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:45)
      	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
      	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
      	at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91)
      	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92)
      	at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92)
      	at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
      	at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
      	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
      	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
      	at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
      	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
      	at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
      

        Attachments

          Activity

            People

            • Assignee:
              zsxwing Shixiong Zhu
              Reporter:
              zsxwing Shixiong Zhu
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: