Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27804

LiveListenerBus#addToQueue : create multiple AsyncEventQueues under race condition

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 2.4.3
    • None
    • Spark Core
    • None

    Description

      LiveListenerBus.scala

        private[spark] def addToQueue(
            listener: SparkListenerInterface,
            queue: String): Unit = synchronized {
          if (stopped.get()) {
            throw new IllegalStateException("LiveListenerBus is stopped.")
          }
      
          queues.asScala.find(_.name == queue) match {
            case Some(queue) =>
              queue.addListener(listener)
      
            case None =>
              // it will create multiple AsyncEventQueues with the same name when run in multi-thread scene and those created AsyncEventQueues will be added to queues
              val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
              newQueue.addListener(listener)
              if (started.get()) {
                newQueue.start(sparkContext)
              }
              queues.add(newQueue)
          }
        }
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              dydeve duyu
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: