Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-34731

ConcurrentModificationException in EventLoggingListener when redacting properties

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.1.1, 3.2.0
    • 3.1.2, 3.2.0
    • Spark Core
    • None

    Description

      Reproduction:

      The key elements of reproduction are enabling event logging, setting spark.executor.cores, and some bad luck:

      $ bin/spark-shell --conf spark.ui.showConsoleProgress=false \
      --conf spark.executor.cores=1 --driver-memory 4g --conf \
      "spark.ui.showConsoleProgress=false" \
      --conf spark.eventLog.enabled=true \
      --conf spark.eventLog.dir=/tmp/spark-events
      ...
      scala> (0 to 500).foreach { i =>
           |   val df = spark.range(0, 20000).toDF("a")
           |   df.filter("a > 12").count
           | }
      21/03/12 18:16:44 ERROR AsyncEventQueue: Listener EventLoggingListener threw an exception
      java.util.ConcurrentModificationException
      	at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
      	at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:424)
      	at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:420)
      	at scala.collection.Iterator.foreach(Iterator.scala:941)
      	at scala.collection.Iterator.foreach$(Iterator.scala:941)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
      	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
      	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
      	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
      	at scala.collection.mutable.MapLike.toSeq(MapLike.scala:75)
      	at scala.collection.mutable.MapLike.toSeq$(MapLike.scala:72)
      	at scala.collection.mutable.AbstractMap.toSeq(Map.scala:82)
      	at org.apache.spark.scheduler.EventLoggingListener.redactProperties(EventLoggingListener.scala:290)
      	at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:162)
      	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37)
      	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
      	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
      	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
      	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
      	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
      	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
      	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
      	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
      	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
      	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
      	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
      	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1379)
      	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
      

      Analysis from quick reading of the code:

      DAGScheduler posts a JobSubmitted event containing a clone of a properties object here.

      This event is handled here.

      DAGScheduler#handleJobSubmitted stores the properties object in a Job object, which in turn is saved in the jobIdToActiveJob map.

      DAGScheduler#handleJobSubmitted posts a SparkListenerJobStart event here with a reference to the same properties object that was stored indirectly in the jobIdToActiveJob map.

      When the EventLoggerListener handles the SparkListenerJobStart event, it iterates over that properties object in redactProperties.

      Meanwhile, the DAGScheduler#handleJobSubmitted method is not yet done. It calls submitStage, which calls submitMissingTasks, which retrieves the same properties object from jobIdToActiveJob and calls addPySparkConfigsToProperties, which will modify the properties if spark.executor.cores is set.

      If redactProperties just happens to still be iterating over the properties object when the modification happens, HashTable throws a ConcurrentModificationException.

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            bersprockets Bruce Robbins
            bersprockets Bruce Robbins
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment