diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index def9872..90b9ebc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -283,7 +283,7 @@ protected void serviceInit(final Configuration conf) throws Exception { initJobCredentialsAndUGI(conf); - dispatcher = createDispatcher(); + dispatcher = createDispatcher(conf); addIfService(dispatcher); taskAttemptFinishingMonitor = createTaskAttemptFinishingMonitor(dispatcher.getEventHandler()); addIfService(taskAttemptFinishingMonitor); @@ -493,9 +493,10 @@ public Void call(Configuration conf) { } super.serviceInit(conf); } // end of init() - - protected Dispatcher createDispatcher() { - return new AsyncDispatcher(); + + protected Dispatcher createDispatcher(final Configuration conf) { + int queueSize = conf.getInt(MRJobConfig.MR_AM_DISPATCH_QUEUE_SIZE, 0); + return new AsyncDispatcher(queueSize); } private boolean isCommitJobRepeatable() throws IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java index f681cf8..eb5d3e5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java @@ -187,7 +187,7 @@ protected void dispatch(Event event) { }; MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) { @Override - public Dispatcher createDispatcher() { + public Dispatcher createDispatcher(final Configuration conf) { return dispatcher; } }; @@ -225,7 +225,7 @@ public void testKillTaskWaitKillJobAfterTA_DONE() throws Exception { final Dispatcher dispatcher = new MyAsyncDispatch(latch, TaskAttemptEventType.TA_DONE); MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) { @Override - public Dispatcher createDispatcher() { + public Dispatcher createDispatcher(final Configuration conf) { return dispatcher; } }; @@ -273,7 +273,7 @@ public void testKillTaskWaitKillJobBeforeTA_DONE() throws Exception { final Dispatcher dispatcher = new MyAsyncDispatch(latch, JobEventType.JOB_KILL); MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) { @Override - public Dispatcher createDispatcher() { + public Dispatcher createDispatcher(final Configuration conf) { return dispatcher; } }; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java index eaf1070..d878941 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java @@ -636,7 +636,7 @@ public MRAppWithHistory(int maps, int reduces, boolean autoComplete, } @Override - protected Dispatcher createDispatcher() { + protected Dispatcher createDispatcher(final Configuration conf) { return dispatcher; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 427e6ea..503ed39 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -1025,7 +1025,7 @@ public void testReportedAppProgress() throws Exception { MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId( appAttemptId, 0), 10, 10, false, this.getClass().getName(), true, 1) { @Override - protected Dispatcher createDispatcher() { + protected Dispatcher createDispatcher(final Configuration conf) { return new DrainDispatcher(); } protected ContainerAllocator createContainerAllocator( @@ -1177,7 +1177,7 @@ public void testReportedAppProgressWithOnlyMaps() throws Exception { MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId( appAttemptId, 0), 10, 0, false, this.getClass().getName(), true, 1) { @Override - protected Dispatcher createDispatcher() { + protected Dispatcher createDispatcher(final Configuration conf) { return new DrainDispatcher(); } protected ContainerAllocator createContainerAllocator( @@ -2475,7 +2475,7 @@ public void testUnregistrationOnlyIfRegistered() throws Exception { new MRApp(appAttemptId, ContainerId.newContainerId(appAttemptId, 0), 10, 0, false, this.getClass().getName(), true, 1) { @Override - protected Dispatcher createDispatcher() { + protected Dispatcher createDispatcher(final Configuration conf) { return new DrainDispatcher(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index ca18bfe..6366ea4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -1113,7 +1113,10 @@ public static final String MR_AM_MAX_ATTEMPTS = "mapreduce.am.max-attempts"; public static final int DEFAULT_MR_AM_MAX_ATTEMPTS = 2; - + + /** The size of the dispatch queue. */ + public static final String MR_AM_DISPATCH_QUEUE_SIZE = "mapreduce.am.dispatch-queue-size"; + public static final String MR_APPLICATION_TYPE = "MAPREDUCE"; public static final String TASK_PREEMPTION = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 4a78a22..b111dc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -18,112 +18,141 @@ package org.apache.hadoop.yarn.event; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.Collection; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; /** - * Dispatches {@link Event}s in a separate thread. Currently only single thread - * does that. Potentially there could be multiple channels for each event type - * class and a thread pool can be used to dispatch the events. + * Dispatches {@link Event}s in a single separate thread. */ @SuppressWarnings("rawtypes") @Public @Evolving public class AsyncDispatcher extends AbstractService implements Dispatcher { - private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class); + /** Time in seconds */ + public static final long DEFAULT_WAIT_TIME = 1L; - private final BlockingQueue eventQueue; - private volatile int lastEventQueueSizeLogged = 0; - private volatile boolean stopped = false; + private static final Logger LOG = + LoggerFactory.getLogger(AsyncDispatcher.class); - // Configuration flag for enabling/disabling draining dispatcher's events on - // stop functionality. - private volatile boolean drainEventsOnStop = false; + private final EventHandler handlerInstance = new GenericEventHandler(); - // Indicates all the remaining dispatcher's events on stop have been drained - // and processed. - // Race condition happens if dispatcher thread sets drained to true between - // handler setting drained to false and enqueueing event. YARN-3878 decided - // to ignore it because of its tiny impact. Also see YARN-5436. - private volatile boolean drained = true; - private final Object waitForDrained = new Object(); + private BlockingQueue eventQueue; - // For drainEventsOnStop enabled only, block newly coming events into the - // queue while stopping. - private volatile boolean blockNewEvents = false; - private final EventHandler handlerInstance = new GenericEventHandler(); + private AtomicLong enqueueCounter = new AtomicLong(); + + private AtomicInteger waitCounter = new AtomicInteger(); + + private long dispatchCounter = 0L; + + private boolean stopped = false; - private Thread eventHandlingThread; - protected final Map, EventHandler> eventDispatchers; private boolean exitOnDispatchException = true; + private boolean drainEventsOnStop = false; + + private Thread eventHandlingThread; + + protected Multimap, EventHandler> eventDispatchers; + /** * The thread name for dispatcher. */ private String dispatcherThreadName = "AsyncDispatcher event handler"; + /** + * Creates an unbounded queue. + */ public AsyncDispatcher() { - this(new LinkedBlockingQueue()); + this(0); } - public AsyncDispatcher(BlockingQueue eventQueue) { - super("Dispatcher"); + /** + * Creates a bounded queue of a specified size + * + * @param queueSize The size of the queue. If size is less than or equal to + * zero, queue is unbounded + */ + public AsyncDispatcher(final int queueSize) { + this(null, queueSize); + } + + /** + * Set a name for this dispatcher thread. Use an unbounded queue. + * + * @param dispatcherName name of the dispatcher thread + */ + public AsyncDispatcher(final String dispatcherName) { + this(dispatcherName, 0); + } + + @VisibleForTesting + protected AsyncDispatcher(final String dispatcherName, + final BlockingQueue eventQueue) { + this(dispatcherName, 0); this.eventQueue = eventQueue; - this.eventDispatchers = new HashMap, EventHandler>(); } /** - * Set a name for this dispatcher thread. + * Set the name of this dispatcher thread. Use a bounded queue. + * * @param dispatcherName name of the dispatcher thread + * @param queueSize The size of the queue. If size is less than or equal to + * zero, queue is unbounded */ - public AsyncDispatcher(String dispatcherName) { - this(); - dispatcherThreadName = dispatcherName; + public AsyncDispatcher(final String dispatcherName, final int queueSize) { + super("Dispatcher"); + if (queueSize > 0) { + this.eventQueue = new ArrayBlockingQueue(queueSize); + } else { + this.eventQueue = new LinkedBlockingQueue(); + } + if (dispatcherName != null) { + this.dispatcherThreadName = dispatcherName; + } + this.eventDispatchers = ArrayListMultimap.create(); } Runnable createThread() { return new Runnable() { @Override public void run() { - while (!stopped && !Thread.currentThread().isInterrupted()) { - drained = eventQueue.isEmpty(); - // blockNewEvents is only set when dispatcher is draining to stop, - // adding this check is to avoid the overhead of acquiring the lock - // and calling notify every time in the normal run of the loop. - if (blockNewEvents) { - synchronized (waitForDrained) { - if (drained) { - waitForDrained.notify(); - } - } + for (;;) { + if (stopped && (!drainEventsOnStop || eventQueue.isEmpty())) { + return; } - Event event; try { - event = eventQueue.take(); - } catch(InterruptedException ie) { + final Event event = + eventQueue.poll(DEFAULT_WAIT_TIME, TimeUnit.SECONDS); + if (event == null) { + LOG.debug("No events submitted in last {}s", DEFAULT_WAIT_TIME); + } else { + dispatch(event); + dispatchCounter++; + } + } catch (InterruptedException ie) { if (!stopped) { LOG.warn("AsyncDispatcher thread interrupted", ie); + return; } - return; - } - if (event != null) { - dispatch(event); } } } @@ -135,106 +164,77 @@ public void disableExitOnDispatchException() { exitOnDispatchException = false; } + public void setDrainEventsOnStop() { + drainEventsOnStop = true; + } + @Override protected void serviceStart() throws Exception { - //start all the components super.serviceStart(); eventHandlingThread = new Thread(createThread()); eventHandlingThread.setName(dispatcherThreadName); eventHandlingThread.start(); } - public void setDrainEventsOnStop() { - drainEventsOnStop = true; - } - @Override protected void serviceStop() throws Exception { + long wait = 0L; + stopped = true; + if (drainEventsOnStop) { - blockNewEvents = true; - LOG.info("AsyncDispatcher is draining to stop, ignoring any new events."); - long endTime = System.currentTimeMillis() + getConfig() - .getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, + wait = getConfig().getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT); - synchronized (waitForDrained) { - while (!isDrained() && eventHandlingThread != null - && eventHandlingThread.isAlive() - && System.currentTimeMillis() < endTime) { - waitForDrained.wait(100); - LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" + - eventHandlingThread.getState()); - } - } + LOG.info( + "AsyncDispatcher is draining on stop and ignoring any new events. Will wait a maximum of {}ms", + wait); } - stopped = true; + if (eventHandlingThread != null) { - eventHandlingThread.interrupt(); try { - eventHandlingThread.join(); + eventHandlingThread.join(wait); } catch (InterruptedException ie) { LOG.warn("Interrupted Exception while stopping", ie); } + if (eventHandlingThread.isAlive()) { + LOG.warn("Thread {} has not yet stopped running", eventHandlingThread); + } } - // stop all the components super.serviceStop(); } @SuppressWarnings("unchecked") protected void dispatch(Event event) { - //all events go thru this loop - if (LOG.isDebugEnabled()) { - LOG.debug("Dispatching the event " + event.getClass().getName() + "." - + event.toString()); - } + LOG.debug("Dispatching the event {}: {}", event.getClass().getName(), + event); Class type = event.getType().getDeclaringClass(); - try{ - EventHandler handler = eventDispatchers.get(type); - if(handler != null) { - handler.handle(event); - } else { + try { + Collection> handlers = eventDispatchers.get(type); + if (handlers.isEmpty()) { throw new Exception("No handler for registered for " + type); } + for (final EventHandler handler : handlers) { + handler.handle(event); + } } catch (Throwable t) { - //TODO Maybe log the state of the queue - LOG.fatal("Error in dispatcher thread", t); // If serviceStop is called, we should exit this thread gracefully. if (exitOnDispatchException - && (ShutdownHookManager.get().isShutdownInProgress()) == false - && stopped == false) { - stopped = true; - Thread shutDownThread = new Thread(createShutDownThread()); - shutDownThread.setName("AsyncDispatcher ShutDown handler"); - shutDownThread.start(); + && !ShutdownHookManager.get().isShutdownInProgress() && !stopped) { + LOG.error("Error in dispatcher thread. Fail-Fast.", t); + System.exit(-1); } + LOG.error("Error in dispatcher thread", t); } } @SuppressWarnings("unchecked") @Override - public void register(Class eventType, - EventHandler handler) { - /* check to see if we have a listener registered */ - EventHandler registeredHandler = (EventHandler) - eventDispatchers.get(eventType); - LOG.info("Registering " + eventType + " for " + handler.getClass()); - if (registeredHandler == null) { - eventDispatchers.put(eventType, handler); - } else if (!(registeredHandler instanceof MultiListenerHandler)){ - /* for multiple listeners of an event add the multiple listener handler */ - MultiListenerHandler multiHandler = new MultiListenerHandler(); - multiHandler.addHandler(registeredHandler); - multiHandler.addHandler(handler); - eventDispatchers.put(eventType, multiHandler); - } else { - /* already a multilistener, just add to it */ - MultiListenerHandler multiHandler - = (MultiListenerHandler) registeredHandler; - multiHandler.addHandler(handler); - } + public void register(Class eventType, EventHandler handler) { + LOG.info("Registering {} for {}", eventType, handler.getClass()); + eventDispatchers.put(eventType, handler); } @Override @@ -244,82 +244,43 @@ public void register(Class eventType, class GenericEventHandler implements EventHandler { public void handle(Event event) { - if (blockNewEvents) { + if (stopped) { + LOG.debug("Event was ignored because service is stopped: {}", event); return; } - drained = false; - - /* all this method does is enqueue all the events onto the queue */ - int qSize = eventQueue.size(); - if (qSize != 0 && qSize % 1000 == 0 - && lastEventQueueSizeLogged != qSize) { - lastEventQueueSizeLogged = qSize; - LOG.info("Size of event-queue is " + qSize); - } - int remCapacity = eventQueue.remainingCapacity(); - if (remCapacity < 1000) { - LOG.warn("Very low remaining capacity in the event-queue: " - + remCapacity); + final long count = enqueueCounter.incrementAndGet(); + if ((count % 1000L) == 0L) { + LOG.info("Size of event-queue is {}", eventQueue.size()); } try { - eventQueue.put(event); + while (!eventQueue.offer(event, DEFAULT_WAIT_TIME, TimeUnit.SECONDS)) { + LOG.warn("Unable to insert into the event-queue within {}s.", + DEFAULT_WAIT_TIME); + waitCounter.incrementAndGet(); + } } catch (InterruptedException e) { if (!stopped) { LOG.warn("AsyncDispatcher thread interrupted", e); } - // Need to reset drained flag to true if event queue is empty, - // otherwise dispatcher will hang on stop. - drained = eventQueue.isEmpty(); throw new YarnRuntimeException(e); } }; } - /** - * Multiplexing an event. Sending it to different handlers that - * are interested in the event. - * @param the type of event these multiple handlers are interested in. - */ - static class MultiListenerHandler implements EventHandler { - List> listofHandlers; - - public MultiListenerHandler() { - listofHandlers = new ArrayList>(); - } - - @Override - public void handle(Event event) { - for (EventHandler handler: listofHandlers) { - handler.handle(event); - } - } - - void addHandler(EventHandler handler) { - listofHandlers.add(handler); - } + public long getEnqueuedCount() { + return enqueueCounter.get(); + } + public long getDispatchCount() { + return dispatchCounter; } - Runnable createShutDownThread() { - return new Runnable() { - @Override - public void run() { - LOG.info("Exiting, bbye.."); - System.exit(-1); - } - }; + public int getWaitCount() { + return waitCounter.get(); } @VisibleForTesting protected boolean isEventThreadWaiting() { return eventHandlingThread.getState() == Thread.State.WAITING; } - - protected boolean isDrained() { - return drained; - } - - protected boolean isStopped() { - return stopped; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index 2045eb6..8d06929 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -22,18 +22,13 @@ @SuppressWarnings("rawtypes") public class DrainDispatcher extends AsyncDispatcher { - private volatile boolean drained = false; - private final BlockingQueue queue; - private final Object mutex; public DrainDispatcher() { this(new LinkedBlockingQueue()); } public DrainDispatcher(BlockingQueue eventQueue) { - super(eventQueue); - this.queue = eventQueue; - this.mutex = this; + super(null, eventQueue); // Disable system exit since this class is only for unit tests. disableExitOnDispatchException(); } @@ -51,54 +46,8 @@ public void waitForEventThreadToWait() { * Busy loop waiting for all queued events to drain. */ public void await() { - while (!isDrained()) { + while (getEnqueuedCount() != getDispatchCount()) { Thread.yield(); } } - - @Override - Runnable createThread() { - return new Runnable() { - @Override - public void run() { - while (!isStopped() && !Thread.currentThread().isInterrupted()) { - synchronized (mutex) { - // !drained if dispatch queued new events on this dispatcher - drained = queue.isEmpty(); - } - Event event; - try { - event = queue.take(); - } catch (InterruptedException ie) { - return; - } - if (event != null) { - dispatch(event); - } - } - } - }; - } - - @SuppressWarnings("unchecked") - @Override - public EventHandler getEventHandler() { - final EventHandler actual = super.getEventHandler(); - return new EventHandler() { - @Override - public void handle(Event event) { - synchronized (mutex) { - actual.handle(event); - drained = false; - } - } - }; - } - - @Override - protected boolean isDrained() { - synchronized (mutex) { - return drained; - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/InlineDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/InlineDispatcher.java index 6aa56d8..635035f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/InlineDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/InlineDispatcher.java @@ -18,13 +18,15 @@ package org.apache.hadoop.yarn.event; +import java.util.Collection; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; -@SuppressWarnings({"unchecked", "rawtypes"}) +@SuppressWarnings({ "unchecked", "rawtypes" }) public class InlineDispatcher extends AsyncDispatcher { private static final Log LOG = LogFactory.getLog(InlineDispatcher.class); @@ -34,25 +36,29 @@ public void handle(Event event) { dispatch(event); } } + @Override protected void dispatch(Event event) { - LOG.info("Dispatching the event " + event.getClass().getName() + "." - + event.toString()); + LOG.info( + "Dispatching the event " + event.getClass().getName() + "." + event); Class type = event.getType().getDeclaringClass(); - if (eventDispatchers.get(type) != null) { - eventDispatchers.get(type).handle(event); + Collection> eventHandlers = + (Collection>) eventDispatchers.get(type); + for (EventHandler eventHandler : eventHandlers) { + eventHandler.handle(event); } } + @Override public EventHandler getEventHandler() { return new TestEventHandler(); } - + public static class EmptyEventHandler implements EventHandler { @Override public void handle(Event event) { - //do nothing - } + // do nothing + } } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java index 2b9d745..cc65240 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java @@ -20,6 +20,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -32,17 +33,19 @@ public class TestAsyncDispatcher { - /* This test checks whether dispatcher hangs on close if following two things - * happen : + /** + * This test checks whether dispatcher hangs on close if following two + * things happen: * 1. A thread which was putting event to event queue is interrupted. * 2. Event queue is empty on close. */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Test(timeout=10000) + @SuppressWarnings("rawtypes") + @Test(timeout = 60000) public void testDispatcherOnCloseIfQueueEmpty() throws Exception { BlockingQueue eventQueue = spy(new LinkedBlockingQueue()); Event event = mock(Event.class); - doThrow(new InterruptedException()).when(eventQueue).put(event); + doThrow(new InterruptedException()).when(eventQueue).offer(event, + AsyncDispatcher.DEFAULT_WAIT_TIME, TimeUnit.MILLISECONDS); DrainDispatcher disp = new DrainDispatcher(eventQueue); disp.init(new Configuration()); disp.setDrainEventsOnStop(); @@ -61,8 +64,11 @@ public void testDispatcherOnCloseIfQueueEmpty() throws Exception { disp.close(); } - // Test dispatcher should timeout on draining events. - @Test(timeout=10000) + /** + * Test dispatcher should timeout on draining events. + */ + @Test(timeout = 10000) + @SuppressWarnings("rawtypes") public void testDispatchStopOnTimeout() throws Exception { BlockingQueue eventQueue = new LinkedBlockingQueue(); eventQueue = spy(eventQueue); @@ -79,21 +85,70 @@ public void testDispatchStopOnTimeout() throws Exception { disp.close(); } + /** + * Test if drain dispatcher drains events on stop. + */ + @SuppressWarnings({ "rawtypes" }) + @Test(timeout = 10000) + public void testDrainDispatcherDrainEventsOnStop() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 4000); + BlockingQueue queue = new LinkedBlockingQueue(); + DrainDispatcher disp = new DrainDispatcher(queue); + disp.init(conf); + disp.register(DummyType.class, new DummyHandler()); + disp.setDrainEventsOnStop(); + disp.start(); + disp.waitForEventThreadToWait(); + dispatchDummyEvents(disp, 6); + disp.close(); + assertEquals(0, queue.size()); + } + + /** + * Test if dispatcher pauses when queue is bounded. + */ + @Test(timeout = 10000) + public void testDispatcherWaitsWhenQueueFull() throws Exception { + final AsyncDispatcher dispatcher = new AsyncDispatcher(1); + YarnConfiguration conf = new YarnConfiguration(); + dispatcher.init(conf); + dispatcher.setDrainEventsOnStop(); + dispatcher.register(DummyType.class, new DummyHandler(3000L)); + dispatcher.start(); + // Insert 2 events, the second one will sit in the queue for a time + dispatchDummyEvents(dispatcher, 3); + dispatcher.close(); + assertEquals(3L, dispatcher.getDispatchCount()); + assertEquals(1, dispatcher.getWaitCount()); + } + + private enum DummyType { + DUMMY + } + @SuppressWarnings("rawtypes") private static class DummyHandler implements EventHandler { + private final long sleepTime; + + DummyHandler() { + this(500L); + } + + DummyHandler(long sleepTime) { + this.sleepTime = sleepTime; + } + @Override public void handle(Event event) { try { - Thread.sleep(500); - } catch (InterruptedException e) {} + Thread.sleep(this.sleepTime); + } catch (InterruptedException e) { + } } } - private enum DummyType { - DUMMY - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings("rawtypes") private void dispatchDummyEvents(Dispatcher disp, int count) { for (int i = 0; i < count; i++) { Event event = mock(Event.class); @@ -101,23 +156,5 @@ private void dispatchDummyEvents(Dispatcher disp, int count) { disp.getEventHandler().handle(event); } } - - // Test if drain dispatcher drains events on stop. - @SuppressWarnings({ "rawtypes" }) - @Test(timeout=10000) - public void testDrainDispatcherDrainEventsOnStop() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 2000); - BlockingQueue queue = new LinkedBlockingQueue(); - DrainDispatcher disp = new DrainDispatcher(queue); - disp.init(conf); - disp.register(DummyType.class, new DummyHandler()); - disp.setDrainEventsOnStop(); - disp.start(); - disp.waitForEventThreadToWait(); - dispatchDummyEvents(disp, 2); - disp.close(); - assertEquals(0, queue.size()); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java index f313d70..0c72ead 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java @@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -158,7 +158,7 @@ protected SpyDispatcher dispatcher; private static EventHandler rmAppEventEventHandler; - public static class SpyDispatcher extends AsyncDispatcher { + public static class SpyDispatcher implements Dispatcher { public static BlockingQueue eventQueue = new LinkedBlockingQueue<>(); @@ -169,11 +169,6 @@ public void handle(Event event) { } @Override - protected void dispatch(Event event) { - eventQueue.add(event); - } - - @Override public EventHandler getEventHandler() { return rmAppEventEventHandler; } @@ -185,6 +180,12 @@ void spyOnNextEvent(Event expectedEvent, long timeout) assertEquals(expectedEvent.getType(), event.getType()); assertEquals(expectedEvent.getClass(), event.getClass()); } + + @Override + public void register(Class eventType, + EventHandler handler) { + // NO-OP + } } @Before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 9b2c0b3..09feade 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -77,8 +77,9 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; @@ -120,7 +121,7 @@ private static BlockingQueue eventQueue; private static volatile AtomicInteger counter; - private static AsyncDispatcher dispatcher; + private static Dispatcher dispatcher; public static class Renewer extends TokenRenewer { private static int counter = 0; private static Token lastRenewed = null; @@ -203,7 +204,22 @@ public void setUp() throws Exception { "kerberos"); UserGroupInformation.setConfiguration(conf); eventQueue = new LinkedBlockingQueue(); - dispatcher = new AsyncDispatcher(eventQueue); + dispatcher = new Dispatcher() { + @Override + public void register(Class eventType, + EventHandler handler) { + // NO-OP + } + @Override + public EventHandler getEventHandler() { + return new EventHandler() { + @Override + public void handle(Event event) { + eventQueue.add(event); + } + }; + } + }; Renewer.reset(); delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter); RMContext mockContext = mock(RMContext.class);