diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 4a78a226d6a..9775913f6af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -73,28 +73,34 @@ protected final Map, EventHandler> eventDispatchers; private boolean exitOnDispatchException = true; + private static final String DEFAULT_DISPATCHER_NAME = "AsyncDispatcher"; + /** - * The thread name for dispatcher. + * The name of the dispatcher. */ - private String dispatcherThreadName = "AsyncDispatcher event handler"; + private final String dispatcherName; public AsyncDispatcher() { - this(new LinkedBlockingQueue()); - } - - public AsyncDispatcher(BlockingQueue eventQueue) { - super("Dispatcher"); - this.eventQueue = eventQueue; - this.eventDispatchers = new HashMap, EventHandler>(); + this(DEFAULT_DISPATCHER_NAME); } /** - * Set a name for this dispatcher thread. - * @param dispatcherName name of the dispatcher thread + * Set a name for this dispatcher. + * @param dispatcherName name of the dispatcher */ public AsyncDispatcher(String dispatcherName) { - this(); - dispatcherThreadName = dispatcherName; + this(new LinkedBlockingQueue(), dispatcherName); + } + + public AsyncDispatcher(BlockingQueue eventQueue) { + this(eventQueue, DEFAULT_DISPATCHER_NAME); + } + + public AsyncDispatcher(BlockingQueue eventQueue, String dispatcherName) { + super(dispatcherName); + this.eventQueue = eventQueue; + this.eventDispatchers = new HashMap, EventHandler>(); + this.dispatcherName = dispatcherName; } Runnable createThread() { @@ -118,7 +124,8 @@ public void run() { event = eventQueue.take(); } catch(InterruptedException ie) { if (!stopped) { - LOG.warn("AsyncDispatcher thread interrupted", ie); + LOG.warn("AsyncDispatcher " + getName() + + " thread " + dispatcherName + " interrupted", ie); } return; } @@ -140,7 +147,7 @@ protected void serviceStart() throws Exception { //start all the components super.serviceStart(); eventHandlingThread = new Thread(createThread()); - eventHandlingThread.setName(dispatcherThreadName); + eventHandlingThread.setName(dispatcherName); eventHandlingThread.start(); } @@ -152,7 +159,8 @@ public void setDrainEventsOnStop() { protected void serviceStop() throws Exception { if (drainEventsOnStop) { blockNewEvents = true; - LOG.info("AsyncDispatcher is draining to stop, ignoring any new events."); + LOG.info("AsyncDispatcher " + getName() + + " is draining to stop, ignoring any new events."); long endTime = System.currentTimeMillis() + getConfig() .getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT); @@ -162,7 +170,8 @@ protected void serviceStop() throws Exception { && eventHandlingThread.isAlive() && System.currentTimeMillis() < endTime) { waitForDrained.wait(100); - LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" + + LOG.info("Waiting for AsyncDispatcher " + getName() + + " to drain. Thread " + dispatcherName + " state is :" + eventHandlingThread.getState()); } } @@ -185,7 +194,8 @@ protected void serviceStop() throws Exception { protected void dispatch(Event event) { //all events go thru this loop if (LOG.isDebugEnabled()) { - LOG.debug("Dispatching the event " + event.getClass().getName() + "." + LOG.debug("AsyncDispatcher " + getName() + + " dispatching the event " + event.getClass().getName() + "." + event.toString()); } @@ -196,7 +206,8 @@ protected void dispatch(Event event) { if(handler != null) { handler.handle(event); } else { - throw new Exception("No handler for registered for " + type); + throw new Exception("AsyncDispatcher " + getName() + + " no handler for registered for " + type); } } catch (Throwable t) { //TODO Maybe log the state of the queue @@ -207,7 +218,8 @@ protected void dispatch(Event event) { && stopped == false) { stopped = true; Thread shutDownThread = new Thread(createShutDownThread()); - shutDownThread.setName("AsyncDispatcher ShutDown handler"); + shutDownThread.setName( + "AsyncDispatcher " + getName() + " ShutDown handler"); shutDownThread.start(); } } @@ -220,7 +232,8 @@ public void register(Class eventType, /* check to see if we have a listener registered */ EventHandler registeredHandler = (EventHandler) eventDispatchers.get(eventType); - LOG.info("Registering " + eventType + " for " + handler.getClass()); + LOG.info("AsyncDispatcher " + getName() + + " registering " + eventType + " for " + handler.getClass()); if (registeredHandler == null) { eventDispatchers.put(eventType, handler); } else if (!(registeredHandler instanceof MultiListenerHandler)){ @@ -254,18 +267,20 @@ public void handle(Event event) { if (qSize != 0 && qSize % 1000 == 0 && lastEventQueueSizeLogged != qSize) { lastEventQueueSizeLogged = qSize; - LOG.info("Size of event-queue is " + qSize); + LOG.info("AsyncDispatcher " + getName() + + " size of event-queue is " + qSize); } int remCapacity = eventQueue.remainingCapacity(); if (remCapacity < 1000) { - LOG.warn("Very low remaining capacity in the event-queue: " - + remCapacity); + LOG.warn("AsyncDispatcher " + getName() + + " very low remaining capacity in the event-queue: " + remCapacity); } try { eventQueue.put(event); } catch (InterruptedException e) { if (!stopped) { - LOG.warn("AsyncDispatcher thread interrupted", e); + LOG.warn("AsyncDispatcher " + getName() + + " thread " + dispatcherName + " interrupted", e); } // Need to reset drained flag to true if event queue is empty, // otherwise dispatcher will hang on stop. @@ -304,7 +319,7 @@ Runnable createShutDownThread() { return new Runnable() { @Override public void run() { - LOG.info("Exiting, bbye.."); + LOG.info("AsyncDispatcher " + getName() + " exiting, bbye.."); System.exit(-1); } };