Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
2.4.3
-
None
-
None
Description
LiveListenerBus.scala
private[spark] def addToQueue( listener: SparkListenerInterface, queue: String): Unit = synchronized { if (stopped.get()) { throw new IllegalStateException("LiveListenerBus is stopped.") } queues.asScala.find(_.name == queue) match { case Some(queue) => queue.addListener(listener) case None => // it will create multiple AsyncEventQueues with the same name when run in multi-thread scene and those created AsyncEventQueues will be added to queues val newQueue = new AsyncEventQueue(queue, conf, metrics, this) newQueue.addListener(listener) if (started.get()) { newQueue.start(sparkContext) } queues.add(newQueue) } }
Attachments
Issue Links
- links to