diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 4a78a226d6a..23a09ce69f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -73,28 +73,46 @@ protected final Map, EventHandler> eventDispatchers; private boolean exitOnDispatchException = true; + private static final String DEFAULT_DISPATCHER_THREAD_NAME = + "AsyncDispatcher event handler"; + private static final String DEFAULT_DISPATCHER_NAME = "AsyncDispatcher"; + /** * The thread name for dispatcher. */ - private String dispatcherThreadName = "AsyncDispatcher event handler"; + private final String dispatcherThreadName; public AsyncDispatcher() { - this(new LinkedBlockingQueue()); - } - - public AsyncDispatcher(BlockingQueue eventQueue) { - super("Dispatcher"); - this.eventQueue = eventQueue; - this.eventDispatchers = new HashMap, EventHandler>(); + this(DEFAULT_DISPATCHER_THREAD_NAME); } /** * Set a name for this dispatcher thread. - * @param dispatcherName name of the dispatcher thread + * @param dispatcherThreadName name of the dispatcher thread */ - public AsyncDispatcher(String dispatcherName) { - this(); - dispatcherThreadName = dispatcherName; + public AsyncDispatcher(String dispatcherThreadName) { + this(dispatcherThreadName, DEFAULT_DISPATCHER_NAME); + } + + public AsyncDispatcher(String dispatcherThreadName, String dispatcherName) { + this(new LinkedBlockingQueue(), dispatcherThreadName, dispatcherName); + } + + public AsyncDispatcher(BlockingQueue eventQueue) { + this(eventQueue, DEFAULT_DISPATCHER_THREAD_NAME); + } + + public AsyncDispatcher(BlockingQueue eventQueue, + String dispatcherThreadName) { + this(eventQueue, dispatcherThreadName, DEFAULT_DISPATCHER_NAME); + } + + public AsyncDispatcher(BlockingQueue eventQueue, + String dispatcherThreadName, String dispatcherName) { + super(dispatcherName); + this.eventQueue = eventQueue; + this.eventDispatchers = new HashMap, EventHandler>(); + this.dispatcherThreadName = dispatcherThreadName; } Runnable createThread() { @@ -118,7 +136,8 @@ public void run() { event = eventQueue.take(); } catch(InterruptedException ie) { if (!stopped) { - LOG.warn("AsyncDispatcher thread interrupted", ie); + LOG.warn("AsyncDispatcher " + getName() + + " thread " + dispatcherThreadName + " interrupted", ie); } return; } @@ -152,7 +171,8 @@ public void setDrainEventsOnStop() { protected void serviceStop() throws Exception { if (drainEventsOnStop) { blockNewEvents = true; - LOG.info("AsyncDispatcher is draining to stop, ignoring any new events."); + LOG.info("AsyncDispatcher " + getName() + + " is draining to stop, ignoring any new events."); long endTime = System.currentTimeMillis() + getConfig() .getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT); @@ -162,7 +182,8 @@ protected void serviceStop() throws Exception { && eventHandlingThread.isAlive() && System.currentTimeMillis() < endTime) { waitForDrained.wait(100); - LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" + + LOG.info("Waiting for AsyncDispatcher " + getName() + + " to drain. Thread " + dispatcherThreadName + " state is :" + eventHandlingThread.getState()); } } @@ -185,7 +206,8 @@ protected void serviceStop() throws Exception { protected void dispatch(Event event) { //all events go thru this loop if (LOG.isDebugEnabled()) { - LOG.debug("Dispatching the event " + event.getClass().getName() + "." + LOG.debug("AsyncDispatcher " + getName() + + " dispatching the event " + event.getClass().getName() + "." + event.toString()); } @@ -196,7 +218,8 @@ protected void dispatch(Event event) { if(handler != null) { handler.handle(event); } else { - throw new Exception("No handler for registered for " + type); + throw new Exception("AsyncDispatcher " + getName() + + " no handler for registered for " + type); } } catch (Throwable t) { //TODO Maybe log the state of the queue @@ -207,7 +230,8 @@ protected void dispatch(Event event) { && stopped == false) { stopped = true; Thread shutDownThread = new Thread(createShutDownThread()); - shutDownThread.setName("AsyncDispatcher ShutDown handler"); + shutDownThread.setName( + "AsyncDispatcher " + getName() + " ShutDown handler"); shutDownThread.start(); } } @@ -220,7 +244,8 @@ public void register(Class eventType, /* check to see if we have a listener registered */ EventHandler registeredHandler = (EventHandler) eventDispatchers.get(eventType); - LOG.info("Registering " + eventType + " for " + handler.getClass()); + LOG.info("AsyncDispatcher " + getName() + + " registering " + eventType + " for " + handler.getClass()); if (registeredHandler == null) { eventDispatchers.put(eventType, handler); } else if (!(registeredHandler instanceof MultiListenerHandler)){ @@ -254,18 +279,20 @@ public void handle(Event event) { if (qSize != 0 && qSize % 1000 == 0 && lastEventQueueSizeLogged != qSize) { lastEventQueueSizeLogged = qSize; - LOG.info("Size of event-queue is " + qSize); + LOG.info("AsyncDispatcher " + getName() + + " size of event-queue is " + qSize); } int remCapacity = eventQueue.remainingCapacity(); if (remCapacity < 1000) { - LOG.warn("Very low remaining capacity in the event-queue: " - + remCapacity); + LOG.warn("AsyncDispatcher " + getName() + + " very low remaining capacity in the event-queue: " + remCapacity); } try { eventQueue.put(event); } catch (InterruptedException e) { if (!stopped) { - LOG.warn("AsyncDispatcher thread interrupted", e); + LOG.warn("AsyncDispatcher " + getName() + + " thread " + dispatcherThreadName + " interrupted", e); } // Need to reset drained flag to true if event queue is empty, // otherwise dispatcher will hang on stop. @@ -304,7 +331,7 @@ Runnable createShutDownThread() { return new Runnable() { @Override public void run() { - LOG.info("Exiting, bbye.."); + LOG.info("AsyncDispatcher " getName() + " exiting, bbye.."); System.exit(-1); } };