diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index def9872..25f0262 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -283,7 +283,7 @@ protected void serviceInit(final Configuration conf) throws Exception { initJobCredentialsAndUGI(conf); - dispatcher = createDispatcher(); + dispatcher = createDispatcher(conf); addIfService(dispatcher); taskAttemptFinishingMonitor = createTaskAttemptFinishingMonitor(dispatcher.getEventHandler()); addIfService(taskAttemptFinishingMonitor); @@ -493,9 +493,10 @@ public Void call(Configuration conf) { } super.serviceInit(conf); } // end of init() - - protected Dispatcher createDispatcher() { - return new AsyncDispatcher(); + + protected Dispatcher createDispatcher(final Configuration conf) { + int queueSize = conf.getInt(MRJobConfig.MR_AM_DISPATCH_QUEUE_SIZE, 0); + return new AsyncDispatcher(queueSize); } private boolean isCommitJobRepeatable() throws IOException { @@ -1863,11 +1864,6 @@ private void createJobClassLoader(Configuration conf) throws IOException { T call(Configuration conf) throws Exception; } - @Override - protected void serviceStop() throws Exception { - super.serviceStop(); - } - public ClientService getClientService() { return clientService; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index d2e2492..d172004 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -1017,8 +1017,8 @@ public void handle(JobEvent event) { } //notify the eventhandler of state change if (oldState != getInternalState()) { - LOG.info(jobId + "Job Transitioned from " + oldState + " to " - + getInternalState()); + LOG.info(" Job [" + jobId + "] transitioned from " + oldState + " to " + + getInternalState()); rememberLastNonFinalState(oldState); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java index 096d13e..409bf31 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java @@ -591,7 +591,7 @@ public void testTimelineEventHandling() throws Exception { handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1), currentTime - 10)); - jheh.getDispatcher().await(); + jheh.getDispatcher().drain(); TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null, null, null, null, null); Assert.assertEquals(1, entities.getEntities().size()); @@ -608,7 +608,7 @@ public void testTimelineEventHandling() throws Exception { "user", 200, "/foo/job.xml", new HashMap(), "default"), currentTime + 10)); - jheh.getDispatcher().await(); + jheh.getDispatcher().drain(); entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null, null, null, null, null); Assert.assertEquals(1, entities.getEntities().size()); @@ -627,7 +627,7 @@ public void testTimelineEventHandling() throws Exception { handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobQueueChangeEvent(TypeConverter.fromYarn(t.jobId), "q2"), currentTime - 20)); - jheh.getDispatcher().await(); + jheh.getDispatcher().drain(); entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null, null, null, null, null); Assert.assertEquals(1, entities.getEntities().size()); @@ -650,7 +650,7 @@ public void testTimelineEventHandling() throws Exception { handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, 0, 0, new Counters(), new Counters(), new Counters()), currentTime)); - jheh.getDispatcher().await(); + jheh.getDispatcher().drain(); entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null, null, null, null, null); Assert.assertEquals(1, entities.getEntities().size()); @@ -678,7 +678,7 @@ public void testTimelineEventHandling() throws Exception { new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, 0, 0, JobStateInternal.KILLED.toString()), currentTime + 20)); - jheh.getDispatcher().await(); + jheh.getDispatcher().drain(); entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null, null, null, null, null); Assert.assertEquals(1, entities.getEntities().size()); @@ -708,7 +708,7 @@ public void testTimelineEventHandling() throws Exception { handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskStartedEvent(t.taskID, 0, TaskType.MAP, ""))); - jheh.getDispatcher().await(); + jheh.getDispatcher().drain(); entities = ts.getEntities("MAPREDUCE_TASK", null, null, null, null, null, null, null, null, null); Assert.assertEquals(1, entities.getEntities().size()); @@ -722,7 +722,7 @@ public void testTimelineEventHandling() throws Exception { handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskStartedEvent(t.taskID, 0, TaskType.REDUCE, ""))); - jheh.getDispatcher().await(); + jheh.getDispatcher().drain(); entities = ts.getEntities("MAPREDUCE_TASK", null, null, null, null, null, null, null, null, null); Assert.assertEquals(1, entities.getEntities().size()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java index f681cf8..16817c2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java @@ -54,7 +54,7 @@ * Tests the state machine with respect to Job/Task/TaskAttempt kill scenarios. * */ -@SuppressWarnings({"unchecked", "rawtypes"}) +@SuppressWarnings("rawtypes") public class TestKill { @Test @@ -187,7 +187,7 @@ protected void dispatch(Event event) { }; MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) { @Override - public Dispatcher createDispatcher() { + public Dispatcher createDispatcher(final Configuration conf) { return dispatcher; } }; @@ -225,7 +225,7 @@ public void testKillTaskWaitKillJobAfterTA_DONE() throws Exception { final Dispatcher dispatcher = new MyAsyncDispatch(latch, TaskAttemptEventType.TA_DONE); MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) { @Override - public Dispatcher createDispatcher() { + public Dispatcher createDispatcher(final Configuration conf) { return dispatcher; } }; @@ -273,7 +273,7 @@ public void testKillTaskWaitKillJobBeforeTA_DONE() throws Exception { final Dispatcher dispatcher = new MyAsyncDispatch(latch, JobEventType.JOB_KILL); MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) { @Override - public Dispatcher createDispatcher() { + public Dispatcher createDispatcher(final Configuration conf) { return dispatcher; } }; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java index eaf1070..6599100 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java @@ -74,7 +74,6 @@ /** * Tests the state machine of MR App. */ -@SuppressWarnings("unchecked") public class TestMRApp { @Test @@ -636,7 +635,7 @@ public MRAppWithHistory(int maps, int reduces, boolean autoComplete, } @Override - protected Dispatcher createDispatcher() { + protected Dispatcher createDispatcher(final Configuration conf) { return dispatcher; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 8592b20..33254ca 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -441,7 +441,7 @@ public void testFailAbortDoesntHang() throws IOException { } } - dispatcher.await(); + dispatcher.drain(); //Verify abortJob is called once and the job failed Mockito.verify(committer, Mockito.timeout(2000).times(1)) .abortJob((JobContext) Mockito.any(), (State) Mockito.any()); @@ -595,7 +595,7 @@ public void handle(TaskAttemptEvent event) { NodeReport secondMapperNodeReport = nodeReports.get(1); job.handle(new JobUpdatedNodesEvent(job.getID(), Collections.singletonList(firstMapperNodeReport))); - dispatcher.await(); + dispatcher.drain(); // complete the reducer for (TaskId taskId: job.tasks.keySet()) { if (taskId.getTaskType() == TaskType.REDUCE) { @@ -678,7 +678,7 @@ public void handle(TaskEvent event) { } } - dispatcher.await(); + dispatcher.drain(); /* * StubbedJob cannot finish in this test - we'd have to generate the diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 427e6ea..32956c0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -1025,7 +1025,7 @@ public void testReportedAppProgress() throws Exception { MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId( appAttemptId, 0), 10, 10, false, this.getClass().getName(), true, 1) { @Override - protected Dispatcher createDispatcher() { + protected Dispatcher createDispatcher(final Configuration conf) { return new DrainDispatcher(); } protected ContainerAllocator createContainerAllocator( @@ -1047,7 +1047,7 @@ protected ContainerAllocator createContainerAllocator( mrApp.waitForInternalState((JobImpl) job, JobStateInternal.RUNNING); - amDispatcher.await(); + amDispatcher.drain(); // Wait till all map-attempts request for containers for (Task t : job.getTasks().values()) { if (t.getType() == TaskType.MAP) { @@ -1055,7 +1055,7 @@ protected ContainerAllocator createContainerAllocator( .iterator().next(), TaskAttemptStateInternal.UNASSIGNED); } } - amDispatcher.await(); + amDispatcher.drain(); allocator.schedule(); rm.drainEvents(); @@ -1144,7 +1144,7 @@ private void finishTask(DrainDispatcher rmDispatcher, MockNM node, new HashMap>(1); statusUpdate.put(mrApp.getAppID(), contStatus); node.nodeHeartbeat(statusUpdate, true); - rmDispatcher.await(); + rmDispatcher.drain(); mrApp.getContext().getEventHandler().handle( new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE)); mrApp.waitForState(task, TaskState.SUCCEEDED); @@ -1177,7 +1177,7 @@ public void testReportedAppProgressWithOnlyMaps() throws Exception { MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId( appAttemptId, 0), 10, 0, false, this.getClass().getName(), true, 1) { @Override - protected Dispatcher createDispatcher() { + protected Dispatcher createDispatcher(final Configuration conf) { return new DrainDispatcher(); } protected ContainerAllocator createContainerAllocator( @@ -1199,13 +1199,13 @@ protected ContainerAllocator createContainerAllocator( mrApp.waitForInternalState((JobImpl)job, JobStateInternal.RUNNING); - amDispatcher.await(); + amDispatcher.drain(); // Wait till all map-attempts request for containers for (Task t : job.getTasks().values()) { mrApp.waitForInternalState((TaskAttemptImpl) t.getAttempts().values() .iterator().next(), TaskAttemptStateInternal.UNASSIGNED); } - amDispatcher.await(); + amDispatcher.drain(); allocator.schedule(); rm.drainEvents(); @@ -2475,7 +2475,7 @@ public void testUnregistrationOnlyIfRegistered() throws Exception { new MRApp(appAttemptId, ContainerId.newContainerId(appAttemptId, 0), 10, 0, false, this.getClass().getName(), true, 1) { @Override - protected Dispatcher createDispatcher() { + protected Dispatcher createDispatcher(final Configuration conf) { return new DrainDispatcher(); } @@ -2489,7 +2489,7 @@ protected ContainerAllocator createContainerAllocator( DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher(); MyContainerAllocator allocator = (MyContainerAllocator) mrApp.getContainerAllocator(); - amDispatcher.await(); + amDispatcher.drain(); Assert.assertTrue(allocator.isApplicationMasterRegistered()); mrApp.stop(); Assert.assertTrue(allocator.isUnregistered()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index ca18bfe..6366ea4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -1113,7 +1113,10 @@ public static final String MR_AM_MAX_ATTEMPTS = "mapreduce.am.max-attempts"; public static final int DEFAULT_MR_AM_MAX_ATTEMPTS = 2; - + + /** The size of the dispatch queue. */ + public static final String MR_AM_DISPATCH_QUEUE_SIZE = "mapreduce.am.dispatch-queue-size"; + public static final String MR_APPLICATION_TYPE = "MAPREDUCE"; public static final String TASK_PREEMPTION = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 4a78a22..b1fe9f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -18,112 +18,146 @@ package org.apache.hadoop.yarn.event; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.Collection; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; /** - * Dispatches {@link Event}s in a separate thread. Currently only single thread - * does that. Potentially there could be multiple channels for each event type - * class and a thread pool can be used to dispatch the events. + * Dispatches {@link Event}s in a single separate thread. */ @SuppressWarnings("rawtypes") @Public @Evolving public class AsyncDispatcher extends AbstractService implements Dispatcher { - private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class); + /** Time in seconds. */ + public static final long DEFAULT_WAIT_TIME = 1L; - private final BlockingQueue eventQueue; - private volatile int lastEventQueueSizeLogged = 0; - private volatile boolean stopped = false; + private static final Logger LOG = LoggerFactory.getLogger(AsyncDispatcher.class); - // Configuration flag for enabling/disabling draining dispatcher's events on - // stop functionality. - private volatile boolean drainEventsOnStop = false; + private final EventHandler handlerInstance = new GenericEventHandler(); - // Indicates all the remaining dispatcher's events on stop have been drained - // and processed. - // Race condition happens if dispatcher thread sets drained to true between - // handler setting drained to false and enqueueing event. YARN-3878 decided - // to ignore it because of its tiny impact. Also see YARN-5436. - private volatile boolean drained = true; - private final Object waitForDrained = new Object(); + @VisibleForTesting + BlockingQueue eventQueue; - // For drainEventsOnStop enabled only, block newly coming events into the - // queue while stopping. - private volatile boolean blockNewEvents = false; - private final EventHandler handlerInstance = new GenericEventHandler(); + private AtomicLong enqueueCounter = new AtomicLong(); + + private AtomicInteger waitCounter = new AtomicInteger(); + + private long dispatchCounter = 0L; + + private boolean stopped = true; - private Thread eventHandlingThread; - protected final Map, EventHandler> eventDispatchers; private boolean exitOnDispatchException = true; + private boolean drainEventsOnStop = false; + + private Thread eventHandlingThread; + + private final Multimap, EventHandler> eventDispatchers; + /** * The thread name for dispatcher. */ private String dispatcherThreadName = "AsyncDispatcher event handler"; + /** + * Creates an unbounded queue. + */ public AsyncDispatcher() { - this(new LinkedBlockingQueue()); + this(0); } - public AsyncDispatcher(BlockingQueue eventQueue) { - super("Dispatcher"); + /** + * Creates a bounded queue of a specified size. + * + * @param queueSize The size of the queue. If size is less than or equal to + * zero, queue is unbounded + */ + public AsyncDispatcher(final int queueSize) { + this(null, queueSize); + } + + /** + * Set a name for this dispatcher thread. Use an unbounded queue. + * + * @param dispatcherName name of the dispatcher thread + */ + public AsyncDispatcher(final String dispatcherName) { + this(dispatcherName, 0); + } + + @VisibleForTesting + protected AsyncDispatcher(final String dispatcherName, + final BlockingQueue eventQueue) { + this(dispatcherName, 0); this.eventQueue = eventQueue; - this.eventDispatchers = new HashMap, EventHandler>(); } /** - * Set a name for this dispatcher thread. + * Set the name of this dispatcher thread. Use a bounded queue. + * * @param dispatcherName name of the dispatcher thread + * @param queueSize The size of the queue. If size is less than or equal to + * zero, queue is unbounded */ - public AsyncDispatcher(String dispatcherName) { - this(); - dispatcherThreadName = dispatcherName; + public AsyncDispatcher(final String dispatcherName, final int queueSize) { + super("Dispatcher"); + if (queueSize > 0) { + this.eventQueue = new ArrayBlockingQueue(queueSize); + } else { + this.eventQueue = new LinkedBlockingQueue(); + } + if (dispatcherName != null) { + this.dispatcherThreadName = dispatcherName; + } + this.eventDispatchers = ArrayListMultimap.create(); } Runnable createThread() { return new Runnable() { @Override public void run() { - while (!stopped && !Thread.currentThread().isInterrupted()) { - drained = eventQueue.isEmpty(); - // blockNewEvents is only set when dispatcher is draining to stop, - // adding this check is to avoid the overhead of acquiring the lock - // and calling notify every time in the normal run of the loop. - if (blockNewEvents) { - synchronized (waitForDrained) { - if (drained) { - waitForDrained.notify(); - } - } + for (;;) { + if (stopped && (!drainEventsOnStop || eventQueue.isEmpty())) { + return; } - Event event; try { - event = eventQueue.take(); - } catch(InterruptedException ie) { + final Event event = + eventQueue.poll(DEFAULT_WAIT_TIME, TimeUnit.SECONDS); + if (event == null) { + LOG.debug("No events submitted in last {}s", DEFAULT_WAIT_TIME); + } else { + dispatch(event); + dispatchCounter++; + } + } catch (InterruptedException ie) { + LOG.warn("AsyncDispatcher thread interrupted", ie); if (!stopped) { - LOG.warn("AsyncDispatcher thread interrupted", ie); + return; } - return; - } - if (event != null) { - dispatch(event); + } catch (Exception e) { + LOG.warn("Dispatch threw an error", e); + // Event was dispatched so increment counter + dispatchCounter++; } } } @@ -135,106 +169,117 @@ public void disableExitOnDispatchException() { exitOnDispatchException = false; } + public void setDrainEventsOnStop() { + drainEventsOnStop = true; + } + @Override protected void serviceStart() throws Exception { - //start all the components - super.serviceStart(); + LOG.debug("Starting service"); + if (!stopped) { + return; + } + stopped = false; eventHandlingThread = new Thread(createThread()); eventHandlingThread.setName(dispatcherThreadName); eventHandlingThread.start(); } - public void setDrainEventsOnStop() { - drainEventsOnStop = true; - } - @Override protected void serviceStop() throws Exception { - if (drainEventsOnStop) { - blockNewEvents = true; - LOG.info("AsyncDispatcher is draining to stop, ignoring any new events."); - long endTime = System.currentTimeMillis() + getConfig() - .getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, - YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT); - - synchronized (waitForDrained) { - while (!isDrained() && eventHandlingThread != null - && eventHandlingThread.isAlive() - && System.currentTimeMillis() < endTime) { - waitForDrained.wait(100); - LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" + - eventHandlingThread.getState()); - } - } + LOG.debug("Stopping service"); + long wait = 0L; + + if (stopped) { + return; } + stopped = true; - if (eventHandlingThread != null) { - eventHandlingThread.interrupt(); + + if (eventHandlingThread == null) { + LOG.warn("Service stopped before it was able to start"); + return; + } + + if (Thread.currentThread() == eventHandlingThread) { + /* + * There are times when the handler for a dispatched event triggers this + * service to be stopped using the event handling thread itself. It is + * therefore possible to end up in a dead-lock state. In particular, + * if dispatching an event triggers a service shutdown, there is no way to + * drain all remaining events because the event handling thread is itself + * responsible for the draining, but becomes responsible for shutting down + * the service too. It may wait on itself forever to drain the remaining + * events in the event queue. + */ + LOG.info("Closing service with event thread"); + if (drainEventsOnStop) { + LOG.warn("Cannot drain events when closing service with event thread"); + } + } else { + LOG.info("Waiting for event thread to stop"); + if (drainEventsOnStop) { + wait = getConfig().getLong( + YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, + YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT); + LOG.info( + "AsyncDispatcher is draining on stop and ignoring any new events." + + " Will wait a maximum of {}ms", + wait); + } try { - eventHandlingThread.join(); + eventHandlingThread.join(wait); } catch (InterruptedException ie) { LOG.warn("Interrupted Exception while stopping", ie); } + if (eventHandlingThread.isAlive()) { + LOG.warn("Thread {} has not yet stopped running after {}ms", + eventHandlingThread, wait); + } } - - // stop all the components - super.serviceStop(); + LOG.info( + "Service stopped. Total dispatched events: {}," + + " total remaining events: {}", + getDispatchCount(), eventQueue.size()); } @SuppressWarnings("unchecked") protected void dispatch(Event event) { - //all events go thru this loop - if (LOG.isDebugEnabled()) { - LOG.debug("Dispatching the event " + event.getClass().getName() + "." - + event.toString()); - } + LOG.debug("Dispatching the event [{}] - [{}]", event.getClass().getName(), + event); Class type = event.getType().getDeclaringClass(); - try{ - EventHandler handler = eventDispatchers.get(type); - if(handler != null) { + Collection> handlers = eventDispatchers.get(type); + if (handlers.isEmpty()) { + throw new YarnRuntimeException("No handler for registered for " + type); + } + + try { + for (final EventHandler handler : handlers) { handler.handle(event); - } else { - throw new Exception("No handler for registered for " + type); } - } catch (Throwable t) { - //TODO Maybe log the state of the queue - LOG.fatal("Error in dispatcher thread", t); + } catch (Exception e) { // If serviceStop is called, we should exit this thread gracefully. if (exitOnDispatchException - && (ShutdownHookManager.get().isShutdownInProgress()) == false - && stopped == false) { - stopped = true; - Thread shutDownThread = new Thread(createShutDownThread()); - shutDownThread.setName("AsyncDispatcher ShutDown handler"); - shutDownThread.start(); + && !ShutdownHookManager.get().isShutdownInProgress() && !stopped) { + LOG.error("Error in dispatcher thread. Fail-Fast.", e); + System.exit(-1); } + LOG.error("Error in dispatcher thread", e); } } + /** + * Register an EventHandler to received events from the queue. Service does + * not need to be started to register an EventHandler. + */ @SuppressWarnings("unchecked") @Override - public void register(Class eventType, - EventHandler handler) { - /* check to see if we have a listener registered */ - EventHandler registeredHandler = (EventHandler) - eventDispatchers.get(eventType); - LOG.info("Registering " + eventType + " for " + handler.getClass()); - if (registeredHandler == null) { - eventDispatchers.put(eventType, handler); - } else if (!(registeredHandler instanceof MultiListenerHandler)){ - /* for multiple listeners of an event add the multiple listener handler */ - MultiListenerHandler multiHandler = new MultiListenerHandler(); - multiHandler.addHandler(registeredHandler); - multiHandler.addHandler(handler); - eventDispatchers.put(eventType, multiHandler); - } else { - /* already a multilistener, just add to it */ - MultiListenerHandler multiHandler - = (MultiListenerHandler) registeredHandler; - multiHandler.addHandler(handler); - } + public void register(Class eventType, EventHandler handler) { + LOG.info("Registering handler[{}] for event[{}]", handler.getClass(), + eventType); + eventDispatchers.put(eventType, handler); } @Override @@ -243,83 +288,45 @@ public void register(Class eventType, } class GenericEventHandler implements EventHandler { + /** + * Load an event into the queue so that it is processed by a separate + * thread. Callers may add events to the queue even if the dispatcher is not + * currently running. + */ public void handle(Event event) { - if (blockNewEvents) { - return; - } - drained = false; - - /* all this method does is enqueue all the events onto the queue */ - int qSize = eventQueue.size(); - if (qSize != 0 && qSize % 1000 == 0 - && lastEventQueueSizeLogged != qSize) { - lastEventQueueSizeLogged = qSize; - LOG.info("Size of event-queue is " + qSize); - } - int remCapacity = eventQueue.remainingCapacity(); - if (remCapacity < 1000) { - LOG.warn("Very low remaining capacity in the event-queue: " - + remCapacity); - } try { - eventQueue.put(event); + LOG.debug("Adding event [{}] to the queue", event); + while (!eventQueue.offer(event, DEFAULT_WAIT_TIME, TimeUnit.SECONDS)) { + LOG.warn("Unable to insert into the event-queue within {}s: {}", + DEFAULT_WAIT_TIME, event.getType()); + waitCounter.incrementAndGet(); + } + final long count = enqueueCounter.incrementAndGet(); + if ((count % 1000L) == 0L) { + LOG.info("Size of event-queue is {}", eventQueue.size()); + } } catch (InterruptedException e) { if (!stopped) { LOG.warn("AsyncDispatcher thread interrupted", e); } - // Need to reset drained flag to true if event queue is empty, - // otherwise dispatcher will hang on stop. - drained = eventQueue.isEmpty(); throw new YarnRuntimeException(e); } }; } - /** - * Multiplexing an event. Sending it to different handlers that - * are interested in the event. - * @param the type of event these multiple handlers are interested in. - */ - static class MultiListenerHandler implements EventHandler { - List> listofHandlers; - - public MultiListenerHandler() { - listofHandlers = new ArrayList>(); - } - - @Override - public void handle(Event event) { - for (EventHandler handler: listofHandlers) { - handler.handle(event); - } - } - - void addHandler(EventHandler handler) { - listofHandlers.add(handler); - } - - } - - Runnable createShutDownThread() { - return new Runnable() { - @Override - public void run() { - LOG.info("Exiting, bbye.."); - System.exit(-1); - } - }; + public long getEnqueuedCount() { + return enqueueCounter.get(); } - @VisibleForTesting - protected boolean isEventThreadWaiting() { - return eventHandlingThread.getState() == Thread.State.WAITING; + public long getDispatchCount() { + return dispatchCounter; } - protected boolean isDrained() { - return drained; + public int getWaitCount() { + return waitCounter.get(); } - protected boolean isStopped() { - return stopped; + public Multimap, EventHandler> getEventDispatchers() { + return Multimaps.unmodifiableMultimap(eventDispatchers); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index 2045eb6..a1a6507 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -22,83 +22,28 @@ @SuppressWarnings("rawtypes") public class DrainDispatcher extends AsyncDispatcher { - private volatile boolean drained = false; - private final BlockingQueue queue; - private final Object mutex; public DrainDispatcher() { this(new LinkedBlockingQueue()); } public DrainDispatcher(BlockingQueue eventQueue) { - super(eventQueue); - this.queue = eventQueue; - this.mutex = this; + super(null, eventQueue); // Disable system exit since this class is only for unit tests. disableExitOnDispatchException(); } /** - * Wait till event thread enters WAITING state (i.e. waiting for new events). + * Drain all the events. */ - public void waitForEventThreadToWait() { - while (!isEventThreadWaiting()) { + public void drain() { + boolean drained = false; + do { Thread.yield(); - } - } - - /** - * Busy loop waiting for all queued events to drain. - */ - public void await() { - while (!isDrained()) { - Thread.yield(); - } - } - - @Override - Runnable createThread() { - return new Runnable() { - @Override - public void run() { - while (!isStopped() && !Thread.currentThread().isInterrupted()) { - synchronized (mutex) { - // !drained if dispatch queued new events on this dispatcher - drained = queue.isEmpty(); - } - Event event; - try { - event = queue.take(); - } catch (InterruptedException ie) { - return; - } - if (event != null) { - dispatch(event); - } - } - } - }; - } - - @SuppressWarnings("unchecked") - @Override - public EventHandler getEventHandler() { - final EventHandler actual = super.getEventHandler(); - return new EventHandler() { - @Override - public void handle(Event event) { - synchronized (mutex) { - actual.handle(event); - drained = false; - } + // lock any events from coming into or leaving the event queue + synchronized (eventQueue) { + drained = (getEnqueuedCount() == getDispatchCount()); } - }; - } - - @Override - protected boolean isDrained() { - synchronized (mutex) { - return drained; - } + } while (!drained); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/InlineDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/InlineDispatcher.java index 6aa56d8..2046ddf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/InlineDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/InlineDispatcher.java @@ -18,13 +18,18 @@ package org.apache.hadoop.yarn.event; +import java.util.Collection; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; -@SuppressWarnings({"unchecked", "rawtypes"}) +/** + * Dispatcher class which is not asynchronous. Used for testing. + */ +@SuppressWarnings({ "unchecked", "rawtypes" }) public class InlineDispatcher extends AsyncDispatcher { private static final Log LOG = LogFactory.getLog(InlineDispatcher.class); @@ -34,25 +39,29 @@ public void handle(Event event) { dispatch(event); } } + @Override protected void dispatch(Event event) { - LOG.info("Dispatching the event " + event.getClass().getName() + "." - + event.toString()); + LOG.info( + "Dispatching the event " + event.getClass().getName() + "." + event); Class type = event.getType().getDeclaringClass(); - if (eventDispatchers.get(type) != null) { - eventDispatchers.get(type).handle(event); + Collection> eventHandlers = + (Collection>) getEventDispatchers().get(type); + for (EventHandler eventHandler : eventHandlers) { + eventHandler.handle(event); } } + @Override public EventHandler getEventHandler() { return new TestEventHandler(); } - + public static class EmptyEventHandler implements EventHandler { @Override public void handle(Event event) { - //do nothing - } + // do nothing + } } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java index 2b9d745..f35ad83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java @@ -18,8 +18,17 @@ package org.apache.hadoop.yarn.event; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.IOException; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -27,28 +36,24 @@ import org.junit.Assert; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.*; - public class TestAsyncDispatcher { - /* This test checks whether dispatcher hangs on close if following two things - * happen : - * 1. A thread which was putting event to event queue is interrupted. + /** + * This test checks whether dispatcher hangs on close if following two things + * happen: 1. A thread which was putting event to event queue is interrupted. * 2. Event queue is empty on close. */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Test(timeout=10000) + @SuppressWarnings("rawtypes") + @Test(timeout = 60000) public void testDispatcherOnCloseIfQueueEmpty() throws Exception { BlockingQueue eventQueue = spy(new LinkedBlockingQueue()); Event event = mock(Event.class); - doThrow(new InterruptedException()).when(eventQueue).put(event); + doThrow(new InterruptedException()).when(eventQueue).offer(event, + AsyncDispatcher.DEFAULT_WAIT_TIME, TimeUnit.SECONDS); DrainDispatcher disp = new DrainDispatcher(eventQueue); disp.init(new Configuration()); disp.setDrainEventsOnStop(); disp.start(); - // Wait for event handler thread to start and begin waiting for events. - disp.waitForEventThreadToWait(); try { disp.getEventHandler().handle(event); Assert.fail("Expected YarnRuntimeException"); @@ -61,11 +66,15 @@ public void testDispatcherOnCloseIfQueueEmpty() throws Exception { disp.close(); } - // Test dispatcher should timeout on draining events. - @Test(timeout=10000) + /** + * Test dispatcher should timeout on draining events. + */ + @Test(timeout = 10000) + @SuppressWarnings("rawtypes") public void testDispatchStopOnTimeout() throws Exception { BlockingQueue eventQueue = new LinkedBlockingQueue(); eventQueue = spy(eventQueue); + // simulate dispatcher is not drained. when(eventQueue.isEmpty()).thenReturn(false); @@ -75,25 +84,165 @@ public void testDispatchStopOnTimeout() throws Exception { disp.init(conf); disp.setDrainEventsOnStop(); disp.start(); - disp.waitForEventThreadToWait(); disp.close(); } - @SuppressWarnings("rawtypes") - private static class DummyHandler implements EventHandler { + /** + * Test if drain dispatcher drains events on stop. + */ + @SuppressWarnings({ "rawtypes" }) + @Test(timeout = 10000) + public void testDrainDispatcherDrainEventsOnStop() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 4000); + BlockingQueue queue = new LinkedBlockingQueue(); + DrainDispatcher disp = new DrainDispatcher(queue); + disp.init(conf); + disp.register(DummyType.class, new SleepingDummyHandler()); + disp.setDrainEventsOnStop(); + disp.start(); + dispatchDummyEvents(disp, 6); + disp.close(); + assertEquals(0, queue.size()); + } + + /** + * Test if dispatcher pauses when queue is bounded. + */ + @Test(timeout = 10000) + public void testDispatcherWaitsWhenQueueFull() throws Exception { + final AsyncDispatcher dispatcher = new AsyncDispatcher(1); + long stallTime = + TimeUnit.SECONDS.toMillis(AsyncDispatcher.DEFAULT_WAIT_TIME) + 100L; + YarnConfiguration conf = new YarnConfiguration(); + dispatcher.init(conf); + dispatcher.setDrainEventsOnStop(); + dispatcher.register(DummyType.class, new SleepingDummyHandler(stallTime)); + dispatcher.start(); + // Insert 2 events, the second one will sit in the queue for a time + dispatchDummyEvents(dispatcher, 3); + dispatcher.close(); + assertEquals(3L, dispatcher.getDispatchCount()); + assertEquals(1, dispatcher.getWaitCount()); + } + + /** + * Test that the dispatch does not deadlock if it closes itself. + */ + @Test(timeout = 10000) + public void testDispatcherStopsItself() throws Exception { + final AsyncDispatcher dispatcher = new AsyncDispatcher(1); + YarnConfiguration conf = new YarnConfiguration(); + dispatcher.init(conf); + dispatcher.setDrainEventsOnStop(); + dispatcher.register(DummyType.class, new EventHandler>() { + @Override + public void handle(Event event) { + try { + dispatcher.close(); + } catch (IOException e) { + Assert.fail(); + } + } + }); + dispatcher.start(); + // Insert 1 event which will trigger a service stop + dispatchDummyEvents(dispatcher, 1); + // Also test closing twice + dispatcher.close(); + } + + /** + * Test that the counters increment correctly. + */ + @Test(timeout = 10000) + public void testDispatcherQueuedDispatchedCounters() throws Exception { + final AsyncDispatcher dispatcher = new AsyncDispatcher(); + BarrierDummyHandler handler = new BarrierDummyHandler(); + YarnConfiguration conf = new YarnConfiguration(); + dispatcher.init(conf); + dispatcher.setDrainEventsOnStop(); + dispatcher.register(DummyType.class, handler); + + dispatcher.start(); + dispatchDummyEvents(dispatcher, 3); + + // Event is in dispatching state, dispatched counter only increments after + // completion + Thread.sleep(1000L); + assertEquals(0L, dispatcher.getDispatchCount()); + assertEquals(3L, dispatcher.getEnqueuedCount()); + handler.await(); + + Thread.sleep(1000L); + assertEquals(1L, dispatcher.getDispatchCount()); + assertEquals(3L, dispatcher.getEnqueuedCount()); + handler.await(); + + Thread.sleep(1000L); + assertEquals(2L, dispatcher.getDispatchCount()); + assertEquals(3L, dispatcher.getEnqueuedCount()); + handler.await(); + + Thread.sleep(1000L); + assertEquals(3L, dispatcher.getDispatchCount()); + assertEquals(3L, dispatcher.getEnqueuedCount()); + + dispatcher.close(); + + assertEquals(3L, dispatcher.getDispatchCount()); + assertEquals(3L, dispatcher.getEnqueuedCount()); + } + + private enum DummyType { + DUMMY + } + + private static class SleepingDummyHandler + implements EventHandler> { + private final long sleepTime; + + SleepingDummyHandler() { + this(500L); + } + + SleepingDummyHandler(long sleepTime) { + this.sleepTime = sleepTime; + } + @Override - public void handle(Event event) { + public void handle(Event event) { try { - Thread.sleep(500); - } catch (InterruptedException e) {} + Thread.sleep(this.sleepTime); + } catch (InterruptedException e) { + } } } - private enum DummyType { - DUMMY + private static class BarrierDummyHandler + implements EventHandler> { + + private final CyclicBarrier barrier; + + BarrierDummyHandler() { + this.barrier = new CyclicBarrier(2); + } + + @Override + public void handle(Event event) { + try { + this.barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void await() { + handle(null); + } } - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings("rawtypes") private void dispatchDummyEvents(Dispatcher disp, int count) { for (int i = 0; i < count; i++) { Event event = mock(Event.class); @@ -101,23 +250,4 @@ private void dispatchDummyEvents(Dispatcher disp, int count) { disp.getEventHandler().handle(event); } } - - // Test if drain dispatcher drains events on stop. - @SuppressWarnings({ "rawtypes" }) - @Test(timeout=10000) - public void testDrainDispatcherDrainEventsOnStop() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 2000); - BlockingQueue queue = new LinkedBlockingQueue(); - DrainDispatcher disp = new DrainDispatcher(queue); - disp.init(conf); - disp.register(DummyType.class, new DummyHandler()); - disp.setDrainEventsOnStop(); - disp.start(); - disp.waitForEventThreadToWait(); - dispatchDummyEvents(disp, 2); - disp.close(); - assertEquals(0, queue.size()); - } } - diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index b3f4e1b..c99f4fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -207,7 +207,7 @@ public void testNMMultipleResyncEvent() } DrainDispatcher dispatcher = (DrainDispatcher) nm.getNMDispatcher(); - dispatcher.await(); + dispatcher.drain(); LOG.info("NM dispatcher drained"); // Wait for the resync thread to finish diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index cbe19ff..919b8d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -609,7 +609,7 @@ public boolean matches(Object argument) { } private void drainDispatcherEvents() { - dispatcher.await(); + dispatcher.drain(); } public void finished() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index 71cabdd..cad2d91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -1474,7 +1474,7 @@ public void handle(ContainerEvent event) { } private void drainDispatcherEvents() { - dispatcher.await(); + dispatcher.drain(); } public void finished() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java index 6cab593..3b38220 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java @@ -131,7 +131,7 @@ public void test() { // Localize R2 for C1 tracker.handle(req21Event); - dispatcher.await(); + dispatcher.drain(); verify(localizerEventHandler, times(3)).handle( any(LocalizerResourceRequestEvent.class)); // Verify refCount for R1 is 2 @@ -142,7 +142,7 @@ public void test() { // Release R2 for C1 tracker.handle(rel21Event); - dispatcher.await(); + dispatcher.drain(); verifyTrackedResourceCount(tracker, 2); // Verify resource with non zero ref count is not removed. @@ -203,12 +203,12 @@ public void testConsistency() { // Localize R1 for C1 tracker.handle(req11Event); - dispatcher.await(); + dispatcher.drain(); // Verify refCount for R1 is 1 Assert.assertEquals(1, lr1.getRefCount()); - dispatcher.await(); + dispatcher.drain(); verifyTrackedResourceCount(tracker, 1); // Localize resource1 @@ -226,7 +226,7 @@ public void testConsistency() { // Localize R1 for C1 tracker.handle(req11Event); - dispatcher.await(); + dispatcher.drain(); lr1.handle(rle); Assert.assertTrue(lr1.getState().equals(ResourceState.LOCALIZED)); LocalizedResource rsrcafter = tracker.iterator().next(); @@ -282,7 +282,7 @@ public void testLocalResourceCache() { // Container-1 requesting local resource. tracker.handle(reqEvent1); - dispatcher.await(); + dispatcher.drain(); // New localized Resource should have been added to local resource map // and the requesting container will be added to its waiting queue. @@ -299,7 +299,7 @@ public void testLocalResourceCache() { ResourceEvent reqEvent2 = new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc2); tracker.handle(reqEvent2); - dispatcher.await(); + dispatcher.drain(); // Container 2 should have been added to the waiting queue of the local // resource @@ -315,7 +315,7 @@ public void testLocalResourceCache() { LocalizedResource localizedResource = localrsrc.get(lr); tracker.handle(resourceFailedEvent); - dispatcher.await(); + dispatcher.drain(); // After receiving failed resource event; all waiting containers will be // notified with Container Resource Failed Event. @@ -329,7 +329,7 @@ public void testLocalResourceCache() { // exception. ResourceReleaseEvent relEvent1 = new ResourceReleaseEvent(lr, cId1); tracker.handle(relEvent1); - dispatcher.await(); + dispatcher.drain(); // Container-3 now requests for the same resource. This request call // is coming prior to Container-2's release call. @@ -338,7 +338,7 @@ public void testLocalResourceCache() { ResourceEvent reqEvent3 = new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc3); tracker.handle(reqEvent3); - dispatcher.await(); + dispatcher.drain(); // Local resource cache now should have the requested resource and the // number of waiting containers should be 1. @@ -350,7 +350,7 @@ public void testLocalResourceCache() { // Container-2 Releases the resource ResourceReleaseEvent relEvent2 = new ResourceReleaseEvent(lr, cId2); tracker.handle(relEvent2); - dispatcher.await(); + dispatcher.drain(); // Making sure that there is no change in the cache after the release. Assert.assertEquals(1, localrsrc.size()); @@ -364,7 +364,7 @@ public void testLocalResourceCache() { ResourceLocalizedEvent localizedEvent = new ResourceLocalizedEvent(lr, localizedPath, 123L); tracker.handle(localizedEvent); - dispatcher.await(); + dispatcher.drain(); // Verifying ContainerResourceLocalizedEvent . verify(containerEventHandler, timeout(1000).times(1)).handle( @@ -376,7 +376,7 @@ public void testLocalResourceCache() { // Container-3 releasing the resource. ResourceReleaseEvent relEvent3 = new ResourceReleaseEvent(lr, cId3); tracker.handle(relEvent3); - dispatcher.await(); + dispatcher.drain(); Assert.assertEquals(0, localrsrc.get(lr).getRefCount()); @@ -542,7 +542,7 @@ public void testStateStoreSuccessfulLocalization() throws Exception { ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1, LocalResourceVisibility.APPLICATION, lc1); tracker.handle(reqEvent1); - dispatcher.await(); + dispatcher.drain(); // Simulate the process of localization of lr1 Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir, @@ -563,7 +563,7 @@ public void testStateStoreSuccessfulLocalization() throws Exception { ResourceLocalizedEvent rle1 = new ResourceLocalizedEvent(lr1, pathCaptor.getValue(), 120); tracker.handle(rle1); - dispatcher.await(); + dispatcher.drain(); ArgumentCaptor localizedProtoCaptor = ArgumentCaptor.forClass(LocalizedResourceProto.class); @@ -579,7 +579,7 @@ public void testStateStoreSuccessfulLocalization() throws Exception { // simulate release and retention processing tracker.handle(new ResourceReleaseEvent(lr1, cId1)); - dispatcher.await(); + dispatcher.drain(); boolean removeResult = tracker.remove(localizedRsrc1, mockDelService); Assert.assertTrue(removeResult); @@ -623,7 +623,7 @@ public void testStateStoreFailedLocalization() throws Exception { ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1, LocalResourceVisibility.APPLICATION, lc1); tracker.handle(reqEvent1); - dispatcher.await(); + dispatcher.drain(); // Simulate the process of localization of lr1 Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir, @@ -644,7 +644,7 @@ public void testStateStoreFailedLocalization() throws Exception { new ResourceFailedLocalizationEvent( lr1, new Exception("Test").toString()); tracker.handle(rfe1); - dispatcher.await(); + dispatcher.drain(); verify(stateStore).removeLocalizedResource(eq(user), eq(appId), eq(localizedPath1)); } finally { @@ -685,7 +685,7 @@ public void testRecoveredResource() throws Exception { Long.toString(localizedId1)); Path localizedPath1 = new Path(hierarchicalPath1, "resource.jar"); tracker.handle(new ResourceRecoveredEvent(lr1, localizedPath1, 120)); - dispatcher.await(); + dispatcher.drain(); Assert.assertNotNull(tracker.getLocalizedResource(lr1)); // verify new paths reflect recovery of previous resources @@ -695,7 +695,7 @@ public void testRecoveredResource() throws Exception { ResourceEvent reqEvent2 = new ResourceRequestEvent(lr2, LocalResourceVisibility.APPLICATION, lc2); tracker.handle(reqEvent2); - dispatcher.await(); + dispatcher.drain(); Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir, null); long localizedId2 = Long.parseLong(hierarchicalPath2.getName()); @@ -736,7 +736,7 @@ public void testRecoveredResourceWithDirCacheMgr() throws Exception { Long.toString(localizedId1)); Path localizedPath1 = new Path(hierarchicalPath1, "resource.jar"); tracker.handle(new ResourceRecoveredEvent(lr1, localizedPath1, 120)); - dispatcher.await(); + dispatcher.drain(); Assert.assertNotNull(tracker.getLocalizedResource(lr1)); LocalCacheDirectoryManager dirMgrRoot = tracker.getDirectoryManager(localDirRoot); @@ -751,7 +751,7 @@ public void testRecoveredResourceWithDirCacheMgr() throws Exception { Long.toString(localizedId2)); Path localizedPath2 = new Path(hierarchicalPath2, "resource.jar"); tracker.handle(new ResourceRecoveredEvent(lr2, localizedPath2, 120)); - dispatcher.await(); + dispatcher.drain(); Assert.assertNotNull(tracker.getLocalizedResource(lr2)); Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount()); Assert.assertEquals(2, dirMgrRoot.getDirectory("4/2").getCount()); @@ -764,7 +764,7 @@ public void testRecoveredResourceWithDirCacheMgr() throws Exception { Long.toString(localizedId3)); Path localizedPath3 = new Path(hierarchicalPath3, "resource.jar"); tracker.handle(new ResourceRecoveredEvent(lr3, localizedPath3, 120)); - dispatcher.await(); + dispatcher.drain(); Assert.assertNotNull(tracker.getLocalizedResource(lr3)); Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount()); Assert.assertEquals(2, dirMgrRoot.getDirectory("4/2").getCount()); @@ -778,7 +778,7 @@ public void testRecoveredResourceWithDirCacheMgr() throws Exception { Long.toString(localizedId4)); Path localizedPath4 = new Path(hierarchicalPath4, "resource.jar"); tracker.handle(new ResourceRecoveredEvent(lr4, localizedPath4, 120)); - dispatcher.await(); + dispatcher.drain(); Assert.assertNotNull(tracker.getLocalizedResource(lr4)); Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount()); Assert.assertEquals(1, dirMgrRoot.getDirectory("4").getCount()); @@ -880,7 +880,7 @@ public void testResourcePresentInGoodDir() throws IOException { tracker.handle(req11Event); // Localize R2 for C1 tracker.handle(req21Event); - dispatcher.await(); + dispatcher.drain(); // Localize resource1 Path p1 = tracker.getPathForLocalization(req1, new Path("/tmp/somedir1"), null); @@ -890,7 +890,7 @@ public void testResourcePresentInGoodDir() throws IOException { tracker.handle(rle1); ResourceLocalizedEvent rle2 = new ResourceLocalizedEvent(req2, p2, 1); tracker.handle(rle2); - dispatcher.await(); + dispatcher.drain(); // Remove somedir2 from gooddirs Assert.assertTrue(tracker.checkLocalResource(lr2)); goodDirs.remove(1); @@ -944,7 +944,7 @@ public void testReleaseWhileDownloading() throws Exception { new ResourceLocalizedEvent(req, new Path("file:///tmp/r1"), 1); tracker.handle(rle); - dispatcher.await(); + dispatcher.drain(); } finally { if (dispatcher != null) { dispatcher.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java index cd85d92..25ffb69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java @@ -85,7 +85,7 @@ public void testNotification() throws Exception { LocalResourceRequest rsrcA = new LocalResourceRequest(apiRsrc); LocalizedResource local = new LocalizedResource(rsrcA, dispatcher); local.handle(new ResourceRequestEvent(rsrcA, vis0, ctxt0)); - dispatcher.await(); + dispatcher.drain(); // Register C0, verify request event LocalizerEventMatcher matchesL0Req = @@ -101,7 +101,7 @@ public void testNotification() throws Exception { new LocalizerContext("yak", container1, creds1); final LocalResourceVisibility vis1 = LocalResourceVisibility.PUBLIC; local.handle(new ResourceRequestEvent(rsrcA, vis1, ctxt1)); - dispatcher.await(); + dispatcher.drain(); LocalizerEventMatcher matchesL1Req = new LocalizerEventMatcher(container1, creds1, vis1, LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION); @@ -109,13 +109,13 @@ public void testNotification() throws Exception { // Release C0 container localization, verify no notification local.handle(new ResourceReleaseEvent(rsrcA, container0)); - dispatcher.await(); + dispatcher.drain(); verify(containerBus, never()).handle(isA(ContainerEvent.class)); assertEquals(ResourceState.DOWNLOADING, local.getState()); // Release C1 container localization, verify no notification local.handle(new ResourceReleaseEvent(rsrcA, container1)); - dispatcher.await(); + dispatcher.drain(); verify(containerBus, never()).handle(isA(ContainerEvent.class)); assertEquals(ResourceState.DOWNLOADING, local.getState()); @@ -134,7 +134,7 @@ public void testNotification() throws Exception { local.handle(new ResourceRequestEvent(rsrcA, vis2, ctxt2)); local.handle(new ResourceRequestEvent(rsrcA, vis3, ctxt3)); - dispatcher.await(); + dispatcher.drain(); LocalizerEventMatcher matchesL2Req = new LocalizerEventMatcher(container2, creds2, vis2, LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION); @@ -147,7 +147,7 @@ public void testNotification() throws Exception { // Successful localization. verify notification C2, C3 Path locA = new Path("file:///cache/rsrcA"); local.handle(new ResourceLocalizedEvent(rsrcA, locA, 10)); - dispatcher.await(); + dispatcher.drain(); ContainerEventMatcher matchesC2Localized = new ContainerEventMatcher(container2, ContainerEventType.RESOURCE_LOCALIZED); @@ -165,7 +165,7 @@ public void testNotification() throws Exception { new LocalizerContext("yak", container4, creds4); final LocalResourceVisibility vis4 = LocalResourceVisibility.PRIVATE; local.handle(new ResourceRequestEvent(rsrcA, vis4, ctxt4)); - dispatcher.await(); + dispatcher.drain(); ContainerEventMatcher matchesC4Localized = new ContainerEventMatcher(container4, ContainerEventType.RESOURCE_LOCALIZED); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 21896ca..d29b7d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -379,7 +379,7 @@ public void testResourceRelease() throws Exception { when(app.getAppId()).thenReturn(appId); spyService.handle(new ApplicationLocalizationEvent( LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); - dispatcher.await(); + dispatcher.drain(); //Get a handle on the trackers after they're setup with INIT_APP_RESOURCES LocalResourcesTracker appTracker = @@ -440,7 +440,7 @@ public void testResourceRelease() throws Exception { // Send Request event spyService.handle(new ContainerLocalizationRequestEvent(c, req)); spyService.handle(new ContainerLocalizationRequestEvent(c, req2)); - dispatcher.await(); + dispatcher.drain(); int privRsrcCount = 0; for (LocalizedResource lr : privTracker) { @@ -473,7 +473,7 @@ public void testResourceRelease() throws Exception { .cleanupPrivLocalizers("container_314159265358979_0003_01_000042"); req2.remove(LocalResourceVisibility.PRIVATE); spyService.handle(new ContainerLocalizationCleanupEvent(c, req2)); - dispatcher.await(); + dispatcher.drain(); pubRsrcs.add(pubReq); pubRsrcs.add(pubReq2); @@ -556,7 +556,7 @@ public void testRecovery() throws Exception { LocalizationEventType.INIT_APPLICATION_RESOURCES, app1)); spyService.handle(new ApplicationLocalizationEvent( LocalizationEventType.INIT_APPLICATION_RESOURCES, app2)); - dispatcher.await(); + dispatcher.drain(); //Get a handle on the trackers after they're setup with INIT_APP_RESOURCES LocalResourcesTracker appTracker1 = @@ -628,7 +628,7 @@ public void testRecovery() throws Exception { // Send Request event spyService.handle(new ContainerLocalizationRequestEvent(c1, req1)); spyService.handle(new ContainerLocalizationRequestEvent(c2, req2)); - dispatcher.await(); + dispatcher.drain(); // Simulate start of localization for all resources privTracker1.getPathForLocalization(privReq1, @@ -681,7 +681,7 @@ public void testRecovery() throws Exception { pubTracker.handle(new ResourceLocalizedEvent(pubReq2, pubLr2.getLocalPath(), pubLr2.getSize() + 99999)); - dispatcher.await(); + dispatcher.drain(); assertEquals(ResourceState.LOCALIZED, privLr1.getState()); assertEquals(ResourceState.LOCALIZED, privLr2.getState()); assertEquals(ResourceState.LOCALIZED, appLr1.getState()); @@ -695,7 +695,7 @@ public void testRecovery() throws Exception { spyService.init(conf); spyService.recoverLocalizedResources( stateStore.loadLocalizationState()); - dispatcher.await(); + dispatcher.drain(); appTracker1 = spyService.getLocalResourcesTracker( LocalResourceVisibility.APPLICATION, user1, appId1); @@ -774,7 +774,7 @@ public void testLocalizerRunnerException() throws Exception { when(app.getAppId()).thenReturn(appId); spyService.handle(new ApplicationLocalizationEvent( LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); - dispatcher.await(); + dispatcher.drain(); Random r = new Random(); long seed = r.nextLong(); @@ -805,7 +805,7 @@ public void testLocalizerRunnerException() throws Exception { .getLocalPathForWrite(isA(String.class)); spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs)); Thread.sleep(1000); - dispatcher.await(); + dispatcher.drain(); // Verify if ContainerResourceFailedEvent is invoked on FSError verify(containerBus).handle(isA(ContainerResourceFailedEvent.class)); } finally { @@ -907,7 +907,7 @@ public boolean matches(Object o) { && appId == evt.getApplicationID(); } }; - dispatcher.await(); + dispatcher.drain(); verify(applicationBus).handle(argThat(matchesAppInit)); // init container rsrc, localizer @@ -948,7 +948,7 @@ public boolean matches(Object o) { spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs)); // Sigh. Thread init of private localizer not accessible Thread.sleep(1000); - dispatcher.await(); + dispatcher.drain(); String appStr = appId.toString(); String ctnrStr = c.getContainerId().toString(); ArgumentCaptor contextCaptor = ArgumentCaptor @@ -1052,7 +1052,7 @@ public boolean matches(Object o) { response = spyService.heartbeat(stat); assertEquals(LocalizerAction.DIE, response.getLocalizerAction()); - dispatcher.await(); + dispatcher.drain(); // verify container notification ArgumentMatcher matchesContainerLoc = new ArgumentMatcher() { @@ -1234,7 +1234,7 @@ public boolean matches(Object o) { && appId == evt.getApplicationID(); } }; - dispatcher.await(); + dispatcher.drain(); verify(applicationBus).handle(argThat(matchesAppInit)); } @@ -1285,7 +1285,7 @@ private void doLocalization(ResourceLocalizationService spyService, rsrcs1.put(LocalResourceVisibility.PRIVATE, privateResourceList1); spyService.handle(new ContainerLocalizationRequestEvent(c2, rsrcs1)); - dispatcher.await(); + dispatcher.drain(); // Wait for localizers of both container c1 and c2 to begin. exec.waitForLocalizers(2); LocalizerRunner locC1 = @@ -1320,7 +1320,7 @@ private void doLocalization(ResourceLocalizationService spyService, assertEquals(LocalizerAction.DIE, response.getLocalizerAction()); exec.setStopLocalization(); - dispatcher.await(); + dispatcher.drain(); // verify container notification ArgumentMatcher successContainerLoc = new ArgumentMatcher() { @@ -1485,7 +1485,7 @@ public void testPublicResourceInitializesLocalDir() throws Exception { when(app.getAppId()).thenReturn(appId); spyService.handle(new ApplicationLocalizationEvent( LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); - dispatcher.await(); + dispatcher.drain(); // init container. final Container c = getMockContainer(appId, 42, user); @@ -1519,7 +1519,7 @@ public void testPublicResourceInitializesLocalDir() throws Exception { req.put(LocalResourceVisibility.PUBLIC, pubRsrcs); spyService.handle(new ContainerLocalizationRequestEvent(c, req)); - dispatcher.await(); + dispatcher.drain(); verify(spyService, times(1)).checkAndInitializeLocalDirs(); @@ -1593,7 +1593,7 @@ public void testPublicCacheDirPermission() throws Exception { when(app.getAppId()).thenReturn(appId); spyService.handle(new ApplicationLocalizationEvent( LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); - dispatcher.await(); + dispatcher.drain(); // init container. final Container c = getMockContainer(appId, 42, user); @@ -1618,7 +1618,7 @@ public void testPublicCacheDirPermission() throws Exception { req.put(LocalResourceVisibility.PUBLIC, pubRsrcs); spyService.handle(new ContainerLocalizationRequestEvent(c, req)); - dispatcher.await(); + dispatcher.drain(); // verify directory creation @@ -1666,7 +1666,7 @@ public void testLocalizerHeartbeatWhenAppCleaningUp() throws Exception { when(app.toString()).thenReturn(appId.toString()); spyService.handle(new ApplicationLocalizationEvent( LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); - dispatcher.await(); + dispatcher.drain(); // Initialize localizer. Random r = new Random(); @@ -1693,7 +1693,7 @@ public void testLocalizerHeartbeatWhenAppCleaningUp() throws Exception { List appResourceList = Arrays.asList(req1, req2); rsrcs.put(LocalResourceVisibility.APPLICATION, appResourceList); spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs)); - dispatcher.await(); + dispatcher.drain(); // Wait for localization to begin. exec.waitForLocalizers(1); @@ -1719,7 +1719,7 @@ public void testLocalizerHeartbeatWhenAppCleaningUp() throws Exception { // Cleanup container. spyService.handle(new ContainerLocalizationCleanupEvent(c, rsrcs)); - dispatcher.await(); + dispatcher.drain(); try { /*Directly send heartbeat to introduce race as container is being cleaned up.*/ @@ -1731,7 +1731,7 @@ public void testLocalizerHeartbeatWhenAppCleaningUp() throws Exception { // Cleanup application. spyService.handle(new ApplicationLocalizationEvent( LocalizationEventType.DESTROY_APPLICATION_RESOURCES, app)); - dispatcher.await(); + dispatcher.drain(); try { // Directly send heartbeat to introduce race as app is being cleaned up. locRunnerForContainer.processHeartbeat( @@ -1796,7 +1796,7 @@ public void testFailedPublicResource() throws Exception { when(app.getAppId()).thenReturn(appId); spyService.handle(new ApplicationLocalizationEvent( LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); - dispatcher.await(); + dispatcher.drain(); // init container. final Container c = getMockContainer(appId, 42, user); @@ -1836,7 +1836,7 @@ public Void answer(InvocationOnMock invocation) throws IOException { spyService.handle(new ContainerLocalizationRequestEvent(c, req)); spyService.handle(new ContainerLocalizationRequestEvent(c, req)); - dispatcher.await(); + dispatcher.drain(); // allow the chmod to fail now that both requests have been queued barrier.await(); @@ -1904,7 +1904,7 @@ public void testPublicResourceAddResourceExceptions() throws Exception { when(app.getAppId()).thenReturn(appId); spyService.handle(new ApplicationLocalizationEvent( LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); - dispatcher.await(); + dispatcher.drain(); // init resources Random r = new Random(); @@ -1929,7 +1929,7 @@ public void testPublicResourceAddResourceExceptions() throws Exception { Mockito.anyBoolean()); // send request spyService.handle(new ContainerLocalizationRequestEvent(c, req)); - dispatcher.await(); + dispatcher.drain(); LocalResourcesTracker tracker = spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC, user, appId); @@ -1955,7 +1955,7 @@ public void testPublicResourceAddResourceExceptions() throws Exception { Mockito.anyBoolean()); // send request spyService.handle(new ContainerLocalizationRequestEvent(c, req1)); - dispatcher.await(); + dispatcher.drain(); tracker = spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC, user, appId); @@ -1966,7 +1966,7 @@ public void testPublicResourceAddResourceExceptions() throws Exception { publicLocalizer.threadPool.shutdown(); spyService.handle(new ContainerLocalizationRequestEvent(c, req)); - dispatcher.await(); + dispatcher.drain(); tracker = spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC, user, appId); @@ -1975,7 +1975,7 @@ public void testPublicResourceAddResourceExceptions() throws Exception { } finally { // if we call stop with events in the queue, an InterruptedException gets // thrown resulting in the dispatcher thread causing a system exit - dispatcher.await(); + dispatcher.drain(); dispatcher.stop(); } } @@ -2794,7 +2794,7 @@ public void testFailedDirsResourceRelease() throws Exception { spyService.handle(new ApplicationLocalizationEvent( LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); - dispatcher.await(); + dispatcher.drain(); // Get a handle on the trackers after they're setup with // INIT_APP_RESOURCES @@ -2841,7 +2841,7 @@ public void testFailedDirsResourceRelease() throws Exception { // Send Request event spyService.handle(new ContainerLocalizationRequestEvent(c, req)); spyService.handle(new ContainerLocalizationRequestEvent(c, req2)); - dispatcher.await(); + dispatcher.drain(); int privRsrcCount = 0; for (LocalizedResource lr : privTracker) { @@ -2918,7 +2918,7 @@ public boolean matches(Object o) { } }; - dispatcher.await(); + dispatcher.drain(); // setup mocks again, this time throw UnsupportedFileSystemException and // IOExceptions @@ -2938,7 +2938,7 @@ public boolean matches(Object o) { LocalizationEventType.DESTROY_APPLICATION_RESOURCES, app); spyService.handle(destroyApp); // Waits for APPLICATION_RESOURCES_CLEANEDUP event to be handled. - dispatcher.await(); + dispatcher.drain(); verify(applicationBus).handle(argThat(matchesAppDestroy)); // verify we got the right delete calls @@ -3016,7 +3016,7 @@ public void testDirHandler() throws Exception { spyService.handle(new ApplicationLocalizationEvent( LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); - dispatcher.await(); + dispatcher.drain(); LocalResourcesTracker appTracker = spyService.getLocalResourcesTracker( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 8b2e3cc..fa941cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -187,7 +187,7 @@ public void tearDown() throws IOException, InterruptedException { .setBasedirs(new Path[] {}) .build()); - dispatcher.await(); + dispatcher.drain(); dispatcher.stop(); dispatcher.close(); } @@ -258,7 +258,7 @@ private void verifyLocalFileDeletion( Assert.assertTrue("Log file [" + logFilePath + "] not found", new File( logFilePath.toUri().getPath()).exists()); - dispatcher.await(); + dispatcher.drain(); ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ new ApplicationEvent( @@ -380,7 +380,7 @@ public void testNoContainerOnNode() throws Exception { application1, this.user, this.nodeId).toUri().getPath()) .exists()); - dispatcher.await(); + dispatcher.drain(); ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ new ApplicationEvent( @@ -474,7 +474,7 @@ public void testMultipleAppsLogAggregation() throws Exception { logAggregationService.handle(new LogHandlerAppStartedEvent(application3, this.user, null, this.acls, contextWithAMAndFailed)); - dispatcher.await(); + dispatcher.drain(); ApplicationEvent expectedInitEvents[] = new ApplicationEvent[]{ new ApplicationEvent( application1, @@ -532,7 +532,7 @@ public void testMultipleAppsLogAggregation() throws Exception { verifyContainerLogs(logAggregationService, application3, new ContainerId[] { container31, container32 }, fileNames, 3, false); - dispatcher.await(); + dispatcher.drain(); ApplicationEvent[] expectedFinishedEvents = new ApplicationEvent[]{ new ApplicationEvent( @@ -585,7 +585,7 @@ public LogAggregationFileController getLogAggregationFileController( logAggregationService.handle(new LogHandlerAppStartedEvent(appId, this.user, null, this.acls, contextWithAMAndFailed)); - dispatcher.await(); + dispatcher.drain(); // Verify that it failed ApplicationEvent[] expectedEvents = new ApplicationEvent[] { @@ -606,7 +606,7 @@ public LogAggregationFileController getLogAggregationFileController( appLogDir.mkdir(); logAggregationService.handle(new LogHandlerAppStartedEvent(appId2, this.user, null, this.acls, contextWithAMAndFailed)); - dispatcher.await(); + dispatcher.drain(); // Verify that it worked expectedEvents = new ApplicationEvent[] { @@ -645,7 +645,7 @@ public void testVerifyAndCreateRemoteDirNonExistence() AMOrFailedContainerLogAggregationPolicy.class.getName()); logAggregationService.handle(new LogHandlerAppStartedEvent(appId, this.user, null, this.acls, contextWithAMAndFailed)); - dispatcher.await(); + dispatcher.drain(); boolean existsAfter = aNewFile.exists(); assertTrue("The new aggregate file is not successfully created", existsAfter); @@ -673,7 +673,7 @@ public void testRemoteRootLogDirIsCreatedWithCorrectGroupOwner() AMOrFailedContainerLogAggregationPolicy.class.getName()); logAggregationService.handle(new LogHandlerAppStartedEvent(appId, this.user, null, this.acls, contextWithAMAndFailed)); - dispatcher.await(); + dispatcher.drain(); String targetGroup = UserGroupInformation.getLoginUser().getPrimaryGroupName(); @@ -787,7 +787,7 @@ public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception { logAggregationService.handle(new LogHandlerAppStartedEvent(appId, this.user, null, this.acls, contextWithAMAndFailed)); - dispatcher.await(); + dispatcher.drain(); ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ new ApplicationEvent(appId, ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED) @@ -803,10 +803,10 @@ public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception { logAggregationService.handle(new LogHandlerContainerFinishedEvent( BuilderUtils.newContainerId(4, 1, 1, 1), ContainerType.APPLICATION_MASTER, 0)); - dispatcher.await(); + dispatcher.drain(); logAggregationService.handle(new LogHandlerAppFinishedEvent( BuilderUtils.newApplicationId(1, 5))); - dispatcher.await(); + dispatcher.drain(); } @Test @@ -858,7 +858,7 @@ public LogAggregationFileController getLogAggregationFileController( logAggregationService.handle(new LogHandlerAppStartedEvent(appId, this.user, null, this.acls, contextWithAMAndFailed)); - dispatcher.await(); + dispatcher.drain(); ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ new ApplicationEvent(appId, ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED) @@ -871,7 +871,7 @@ public LogAggregationFileController getLogAggregationFileController( logAggregationService.handle(new LogHandlerContainerFinishedEvent( BuilderUtils.newContainerId(4, 1, 1, 1), ContainerType.APPLICATION_MASTER, 0)); - dispatcher.await(); + dispatcher.drain(); AppLogAggregator appAgg = logAggregationService.getAppLogAggregators().get(appId); @@ -880,7 +880,7 @@ public LogAggregationFileController getLogAggregationFileController( // Enabled aggregation logAggregationService.handle(new LogHandlerTokenUpdatedEvent()); - dispatcher.await(); + dispatcher.drain(); appAgg = logAggregationService.getAppLogAggregators().get(appId); @@ -892,7 +892,7 @@ public LogAggregationFileController getLogAggregationFileController( logAggregationService.handle(new LogHandlerAppFinishedEvent( BuilderUtils.newApplicationId(1, 5))); - dispatcher.await(); + dispatcher.drain(); logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); @@ -1341,7 +1341,7 @@ public void testLogAggregatorCleanup() throws Exception { application1, this.user, null, this.acls)); logAggregationService.handle(new LogHandlerAppFinishedEvent(application1)); - dispatcher.await(); + dispatcher.drain(); int timeToWait = 20 * 1000; while (timeToWait > 0 && logAggregationService.getNumAggregators() > 0) { Thread.sleep(100); @@ -1622,7 +1622,7 @@ public void testLogAggregationServiceWithPatterns() throws Exception { new LogHandlerContainerFinishedEvent(container4, ContainerType.APPLICATION_MASTER, 0)); - dispatcher.await(); + dispatcher.drain(); ApplicationEvent expectedInitEvents[] = new ApplicationEvent[] { new ApplicationEvent(application1, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), @@ -1659,7 +1659,7 @@ public void testLogAggregationServiceWithPatterns() throws Exception { verifyContainerLogs(logAggregationService, application4, new ContainerId[] { container4 }, logFiles, 1, false); - dispatcher.await(); + dispatcher.drain(); ApplicationEvent[] expectedFinishedEvents = new ApplicationEvent[] { new ApplicationEvent(application1, @@ -1750,7 +1750,7 @@ public void testLogAggregationServiceWithPatternsAndIntervals() new LogHandlerContainerFinishedEvent(container, ContainerType.APPLICATION_MASTER, 0)); - dispatcher.await(); + dispatcher.drain(); // Do the log aggregation after ContainerFinishedEvent but before // AppFinishedEvent. The std_final is expected to be aggregated this time @@ -2173,7 +2173,7 @@ private LogAggregationService createLogAggregationService( } logAggregationService.handle(new LogHandlerAppStartedEvent(appId, this.user, null, this.acls, logAggContext)); - dispatcher.await(); + dispatcher.drain(); return logAggregationService; } @@ -2225,7 +2225,7 @@ private ContainerId finishContainer(ApplicationId application1, private void finishApplication(ApplicationId appId, LogAggregationService logAggregationService) throws Exception { - dispatcher.await(); + dispatcher.drain(); ApplicationEvent expectedInitEvents[] = new ApplicationEvent[] { new ApplicationEvent(appId, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED) }; @@ -2239,7 +2239,7 @@ private void finishApplication(ApplicationId appId, } private void verifyLogAggFinishEvent(ApplicationId appId) throws Exception { - dispatcher.await(); + dispatcher.drain(); ApplicationEvent[] expectedFinishedEvents = new ApplicationEvent[] { new ApplicationEvent(appId, @@ -2399,7 +2399,7 @@ private void testLogAggregationService(boolean retentionSizeLimitation) new LogHandlerContainerFinishedEvent(container, ContainerType.APPLICATION_MASTER, 0)); - dispatcher.await(); + dispatcher.drain(); logAggregationService.handle(new LogHandlerAppFinishedEvent(application)); if (retentionSizeLimitation) { Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java index dead603..e248802 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java @@ -117,7 +117,7 @@ public void setup() { public void tearDown() throws IOException { dirsHandler.stop(); dirsHandler.close(); - dispatcher.await(); + dispatcher.drain(); dispatcher.stop(); dispatcher.close(); } @@ -393,14 +393,14 @@ public void testRecovery() throws Exception { anyLong(), any(TimeUnit.class)); // wait events get drained. - this.dispatcher.await(); + this.dispatcher.drain(); assertTrue(appEventHandler.receiveLogHandlingFinishEvent()); appEventHandler.resetLogHandlingEvent(); assertFalse(appEventHandler.receiveLogHandlingFailedEvent()); // send an app finish event against a removed app logHandler.handle(new LogHandlerAppFinishedEvent(appId)); - this.dispatcher.await(); + this.dispatcher.drain(); // verify to receive a log failed event. assertTrue(appEventHandler.receiveLogHandlingFailedEvent()); assertFalse(appEventHandler.receiveLogHandlingFinishEvent()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java index 2585262..dff4ee8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java @@ -123,7 +123,7 @@ private Context createMockContext() { publisher.createTimelineClient(appId); publisher.publishApplicationEvent(finishedEvent); publisher.stopTimelineClient(appId); - dispatcher.await(); + dispatcher.drain(); ContainerEntity cEntity = new ContainerEntity(); cEntity.setId(cId.toString()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index df86f28..ddf5d40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -124,7 +124,7 @@ private static final int TIMEOUT_MS_FOR_ATTEMPT = 40 * SECOND; private static final int TIMEOUT_MS_FOR_APP_REMOVED = 40 * SECOND; private static final int TIMEOUT_MS_FOR_CONTAINER_AND_NODE = 20 * SECOND; - private static final int WAIT_MS_PER_LOOP = 10; + private static final int WAIT_MS_PER_LOOP = 50; private final boolean useNullRMNodeLabelsManager; private boolean disableDrainEventsImplicitly; @@ -228,7 +228,7 @@ public void handle(SchedulerEvent event) { public void drainEvents() { Dispatcher rmDispatcher = getRmDispatcher(); if (rmDispatcher instanceof DrainDispatcher) { - ((DrainDispatcher) rmDispatcher).await(); + ((DrainDispatcher) rmDispatcher).drain(); } else { throw new UnsupportedOperationException("Not a Drain Dispatcher!"); } @@ -269,7 +269,7 @@ public void waitForState(ApplicationId appId, RMAppState finalState) throws InterruptedException { drainEventsImplicitly(); RMApp app = getRMContext().getRMApps().get(appId); - Assert.assertNotNull("app shouldn't be null", app); + Assert.assertNotNull("app should not be null", app); final int timeoutMsecs = 80 * SECOND; int timeWaiting = 0; while (!finalState.equals(app.getState())) { @@ -315,7 +315,7 @@ public void waitForState(ApplicationAttemptId attemptId, throws InterruptedException { drainEventsImplicitly(); RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId()); - Assert.assertNotNull("app shouldn't be null", app); + Assert.assertNotNull("app should not be null", app); RMAppAttempt attempt = app.getRMAppAttempt(attemptId); MockRM.waitForState(attempt, finalState, timeoutMsecs); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index 5542157..9d531f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -247,8 +247,6 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); - // Wait for scheduler to process all events - dispatcher.waitForEventThreadToWait(); rm.drainEvents(); // Verify Metrics After OPP allocation (Nothing should change again) verifyMetrics(metrics, 15360, 15, 1024, 1, 1); @@ -324,7 +322,7 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception Assert.assertEquals(uc.getVersion(), container.getVersion() + 2); // Wait for scheduler to finish processing events - dispatcher.waitForEventThreadToWait(); + dispatcher.drain(); rm.drainEvents(); // Verify Metrics After OPP allocation : // Everything should have reverted to what it was diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index c17dee8..107fe9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -653,7 +653,7 @@ protected Dispatcher createDispatcher() { } // Since refreshAll failed we are expecting fatal event to be send // Then fatal event is send RM will shutdown - dispatcher.await(); + dispatcher.drain(); assertEquals("Fatal Event to be received", 1, dispatcher.getEventCount()); // Check of refreshAll success HA can be active rm.adminService.transitionToActive(requestInfo); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index e40b3c0..e9d4671 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -2292,9 +2292,9 @@ protected Dispatcher createDispatcher() { // trying to register a invalid node. RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req); - dispatcher.await(); + dispatcher.drain(); Thread.sleep(2000); - dispatcher.await(); + dispatcher.drain(); Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction()); Collection liveContainers = applicationAttempt diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java index 5b0c34f..c130a0a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java @@ -313,7 +313,7 @@ private void publishEvents(boolean v1Enabled, boolean v2Enabled) { metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED, app, Integer.MAX_VALUE + 2L); if (v2Enabled) { - dispatcher.await(); + dispatcher.drain(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java index 3b503e5..d5b022f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java @@ -205,7 +205,7 @@ public void testPublishApplicationMetrics() throws Exception { metricsPublisher.appCreated(app, app.getStartTime()); metricsPublisher.appACLsUpdated(app, "user1,user2", 4L); metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime()); - dispatcher.await(); + dispatcher.drain(); String outputDirApp = getTimelineEntityDir(app) + "/" + TimelineEntityType.YARN_APPLICATION @@ -240,7 +240,7 @@ public void testPublishAppAttemptMetrics() throws Exception { metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED, app, Integer.MAX_VALUE + 2L); - dispatcher.await(); + dispatcher.drain(); String outputDirApp = getTimelineEntityDir(app) + "/" @@ -272,7 +272,7 @@ public void testPublishContainerMetrics() throws Exception { RMContainer container = createRMContainer(containerId); metricsPublisher.containerCreated(container, container.getCreationTime()); metricsPublisher.containerFinished(container, container.getFinishTime()); - dispatcher.await(); + dispatcher.drain(); String outputDirApp = getTimelineEntityDir(app) + "/" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 3454d72..c3a12a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -83,11 +83,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.security.MasterKeyData; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.junit.Assert; +@SuppressWarnings("rawtypes") public class RMStateStoreTestBase { public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class); @@ -98,29 +98,37 @@ static class TestDispatcher implements Dispatcher, EventHandler { - ApplicationAttemptId attemptId; + ApplicationAttemptId attemptId = null; boolean notified = false; - @SuppressWarnings("rawtypes") @Override public void register(Class eventType, - EventHandler handler) { + EventHandler handler) { } @Override public void handle(Event event) { + if (attemptId == null) { + notified = true; + synchronized (this) { + notifyAll(); + } + return; + } if (event instanceof RMAppAttemptEvent) { RMAppAttemptEvent rmAppAttemptEvent = (RMAppAttemptEvent) event; - assertEquals(attemptId, rmAppAttemptEvent.getApplicationAttemptId()); - } - notified = true; - synchronized (this) { - notifyAll(); + if (attemptId.equals(rmAppAttemptEvent.getApplicationAttemptId())) { + notified = true; + synchronized (this) { + notifyAll(); + } + attemptId = null; + } + return; } } - @SuppressWarnings("rawtypes") @Override public EventHandler getEventHandler() { return this; @@ -129,17 +137,25 @@ public void handle(Event event) { } public static class StoreStateVerifier { - void afterStoreApp(RMStateStore store, ApplicationId appId) {} - void afterStoreAppAttempt(RMStateStore store, ApplicationAttemptId - appAttId) {} + void afterStoreApp(RMStateStore store, ApplicationId appId) { + } + + void afterStoreAppAttempt(RMStateStore store, + ApplicationAttemptId appAttId) { + } } interface RMStateStoreHelper { RMStateStore getRMStateStore() throws Exception; + boolean isFinalStateValid() throws Exception; + void writeVersion(Version version) throws Exception; + Version getCurrentVersion() throws Exception; + boolean appExists(RMApp app) throws Exception; + boolean attemptExists(RMAppAttempt attempt) throws Exception; } @@ -149,16 +165,16 @@ public long getEpochRange() { void waitNotify(TestDispatcher dispatcher) { long startTime = System.currentTimeMillis(); - while(!dispatcher.notified) { - synchronized (dispatcher) { + synchronized (dispatcher) { + while (!dispatcher.notified) { try { dispatcher.wait(1000); } catch (InterruptedException e) { e.printStackTrace(); } - } - if(System.currentTimeMillis() - startTime > 1000*60) { - fail("Timed out attempt store notification"); + if (System.currentTimeMillis() - startTime > 1000 * 60) { + fail("Timed out attempt store notification"); + } } } dispatcher.notified = false; @@ -184,12 +200,11 @@ protected RMApp storeApp(RMStateStore store, ApplicationId appId, } protected RMAppAttempt storeAttempt(RMStateStore store, - ApplicationAttemptId attemptId, - String containerIdStr, Token appToken, - SecretKey clientTokenMasterKey, TestDispatcher dispatcher) - throws Exception { + ApplicationAttemptId attemptId, String containerIdStr, + Token appToken, SecretKey clientTokenMasterKey, + TestDispatcher dispatcher) throws Exception { - RMAppAttemptMetrics mockRmAppAttemptMetrics = + RMAppAttemptMetrics mockRmAppAttemptMetrics = mock(RMAppAttemptMetrics.class); Container container = new ContainerPBImpl(); container.setId(ContainerId.fromString(containerIdStr)); @@ -217,13 +232,12 @@ protected void updateAttempt(RMStateStore store, TestDispatcher dispatcher, } void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) - throws Exception { + throws Exception { testRMAppStateStore(stateStoreHelper, new StoreStateVerifier()); } void testRMAppStateStore(RMStateStoreHelper stateStoreHelper, - StoreStateVerifier verifier) - throws Exception { + StoreStateVerifier verifier) throws Exception { long submitTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis() + 1234; Configuration conf = new YarnConfiguration(); @@ -243,8 +257,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper, ClientToAMTokenSecretManagerInRM clientToAMTokenMgr = new ClientToAMTokenSecretManagerInRM(); - ApplicationAttemptId attemptId1 = ApplicationAttemptId.fromString( - "appattempt_1352994193343_0001_000001"); + ApplicationAttemptId attemptId1 = + ApplicationAttemptId.fromString("appattempt_1352994193343_0001_000001"); ApplicationId appId1 = attemptId1.getApplicationId(); storeApp(store, appId1, submitTime, startTime); verifier.afterStoreApp(store, appId1); @@ -252,31 +266,27 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper, // create application token and client token key for attempt1 Token appAttemptToken1 = generateAMRMToken(attemptId1, appTokenMgr); - SecretKey clientTokenKey1 = - clientToAMTokenMgr.createMasterKey(attemptId1); + SecretKey clientTokenKey1 = clientToAMTokenMgr.createMasterKey(attemptId1); ContainerId containerId1 = storeAttempt(store, attemptId1, - "container_1352994193343_0001_01_000001", - appAttemptToken1, clientTokenKey1, dispatcher) - .getMasterContainer().getId(); + "container_1352994193343_0001_01_000001", appAttemptToken1, + clientTokenKey1, dispatcher).getMasterContainer().getId(); String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002"; - ApplicationAttemptId attemptId2 = ApplicationAttemptId.fromString( - appAttemptIdStr2); + ApplicationAttemptId attemptId2 = + ApplicationAttemptId.fromString(appAttemptIdStr2); // create application token and client token key for attempt2 Token appAttemptToken2 = generateAMRMToken(attemptId2, appTokenMgr); - SecretKey clientTokenKey2 = - clientToAMTokenMgr.createMasterKey(attemptId2); + SecretKey clientTokenKey2 = clientToAMTokenMgr.createMasterKey(attemptId2); ContainerId containerId2 = storeAttempt(store, attemptId2, - "container_1352994193343_0001_02_000001", - appAttemptToken2, clientTokenKey2, dispatcher) - .getMasterContainer().getId(); + "container_1352994193343_0001_02_000001", appAttemptToken2, + clientTokenKey2, dispatcher).getMasterContainer().getId(); - ApplicationAttemptId attemptIdRemoved = ApplicationAttemptId.fromString( - "appattempt_1352994193343_0002_000001"); + ApplicationAttemptId attemptIdRemoved = + ApplicationAttemptId.fromString("appattempt_1352994193343_0002_000001"); ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId(); storeApp(store, appIdRemoved, submitTime, startTime); storeAttempt(store, attemptIdRemoved, @@ -284,10 +294,10 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper, verifier.afterStoreAppAttempt(store, attemptIdRemoved); RMApp mockRemovedApp = mock(RMApp.class); - RMAppAttemptMetrics mockRmAppAttemptMetrics = + RMAppAttemptMetrics mockRmAppAttemptMetrics = mock(RMAppAttemptMetrics.class); HashMap attempts = - new HashMap(); + new HashMap(); ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl(); context.setApplicationId(appIdRemoved); @@ -332,7 +342,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper, assertEquals(startTime, appState.getStartTime()); // submission context is loaded correctly assertEquals(appId1, - appState.getApplicationSubmissionContext().getApplicationId()); + appState.getApplicationSubmissionContext().getApplicationId()); ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId1); // attempt1 is loaded correctly assertNotNull(attemptState); @@ -341,8 +351,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper, // attempt1 container is loaded correctly assertEquals(containerId1, attemptState.getMasterContainer().getId()); // attempt1 client token master key is loaded correctly - assertArrayEquals( - clientTokenKey1.getEncoded(), + assertArrayEquals(clientTokenKey1.getEncoded(), attemptState.getAppAttemptTokens() .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); assertEquals("context", appState.getCallerContext().getContext()); @@ -354,24 +363,21 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper, // attempt2 container is loaded correctly assertEquals(containerId2, attemptState.getMasterContainer().getId()); // attempt2 client token master key is loaded correctly - assertArrayEquals( - clientTokenKey2.getEncoded(), + assertArrayEquals(clientTokenKey2.getEncoded(), attemptState.getAppAttemptTokens() .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); - //******* update application/attempt state *******// - ApplicationStateData appState2 = - ApplicationStateData.newInstance(appState.getSubmitTime(), - appState.getStartTime(), appState.getUser(), - appState.getApplicationSubmissionContext(), RMAppState.FINISHED, - "appDiagnostics", 123, 1234, appState.getCallerContext()); + // ******* update application/attempt state *******// + ApplicationStateData appState2 = ApplicationStateData.newInstance( + appState.getSubmitTime(), appState.getStartTime(), appState.getUser(), + appState.getApplicationSubmissionContext(), RMAppState.FINISHED, + "appDiagnostics", 123, 1234, appState.getCallerContext()); appState2.attempts.putAll(appState.attempts); store.updateApplicationState(appState2); ApplicationAttemptStateData oldAttemptState = attemptState; ApplicationAttemptStateData newAttemptState = - ApplicationAttemptStateData.newInstance( - oldAttemptState.getAttemptId(), + ApplicationAttemptStateData.newInstance(oldAttemptState.getAttemptId(), oldAttemptState.getMasterContainer(), oldAttemptState.getAppAttemptTokens(), oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, @@ -387,17 +393,15 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper, new ApplicationSubmissionContextPBImpl(); dummyContext.setApplicationId(dummyAppId); dummyContext.setAMContainerSpec(new ContainerLaunchContextPBImpl()); - ApplicationStateData dummyApp = - ApplicationStateData.newInstance(appState.getSubmitTime(), - appState.getStartTime(), appState.getUser(), dummyContext, - RMAppState.FINISHED, "appDiagnostics", 123, 1234, null); + ApplicationStateData dummyApp = ApplicationStateData.newInstance( + appState.getSubmitTime(), appState.getStartTime(), appState.getUser(), + dummyContext, RMAppState.FINISHED, "appDiagnostics", 123, 1234, null); store.updateApplicationState(dummyApp); ApplicationAttemptId dummyAttemptId = ApplicationAttemptId.newInstance(dummyAppId, 6); - ApplicationAttemptStateData dummyAttempt = - ApplicationAttemptStateData.newInstance(dummyAttemptId, - oldAttemptState.getMasterContainer(), + ApplicationAttemptStateData dummyAttempt = ApplicationAttemptStateData + .newInstance(dummyAttemptId, oldAttemptState.getMasterContainer(), oldAttemptState.getAppAttemptTokens(), oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, "myTrackingUrl", "attemptDiagnostics", @@ -415,8 +419,8 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper, RMState newRMState = store.loadState(); Map newRMAppState = newRMState.getApplicationState(); - assertNotNull(newRMAppState.get( - dummyApp.getApplicationSubmissionContext().getApplicationId())); + assertNotNull(newRMAppState + .get(dummyApp.getApplicationSubmissionContext().getApplicationId())); ApplicationStateData updatedAppState = newRMAppState.get(appId1); assertEquals(appState.getApplicationSubmissionContext().getApplicationId(), updatedAppState.getApplicationSubmissionContext().getApplicationId()); @@ -424,20 +428,21 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper, assertEquals(appState.getStartTime(), updatedAppState.getStartTime()); assertEquals(appState.getUser(), updatedAppState.getUser()); // new app state fields - assertEquals( RMAppState.FINISHED, updatedAppState.getState()); + assertEquals(RMAppState.FINISHED, updatedAppState.getState()); assertEquals("appDiagnostics", updatedAppState.getDiagnostics()); assertEquals(1234, updatedAppState.getFinishTime()); // check updated attempt state - assertNotNull(newRMAppState.get(dummyApp.getApplicationSubmissionContext - ().getApplicationId()).getAttempt(dummyAttemptId)); + assertNotNull(newRMAppState + .get(dummyApp.getApplicationSubmissionContext().getApplicationId()) + .getAttempt(dummyAttemptId)); ApplicationAttemptStateData updatedAttemptState = updatedAppState.getAttempt(newAttemptState.getAttemptId()); assertEquals(oldAttemptState.getAttemptId(), - updatedAttemptState.getAttemptId()); - assertEquals(containerId2, updatedAttemptState.getMasterContainer().getId()); - assertArrayEquals( - clientTokenKey2.getEncoded(), + updatedAttemptState.getAttemptId()); + assertEquals(containerId2, + updatedAttemptState.getMasterContainer().getId()); + assertArrayEquals(clientTokenKey2.getEncoded(), attemptState.getAppAttemptTokens() .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); // new attempt state fields @@ -446,7 +451,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper, assertEquals("attemptDiagnostics", updatedAttemptState.getDiagnostics()); assertEquals(100, updatedAttemptState.getAMContainerExitStatus()); assertEquals(FinalApplicationStatus.SUCCEEDED, - updatedAttemptState.getFinalApplicationStatus()); + updatedAttemptState.getFinalApplicationStatus()); // assert store is in expected state after everything is cleaned assertTrue(stateStoreHelper.isFinalStateValid()); @@ -461,9 +466,8 @@ public void testRMDTSecretManagerStateStore( store.setRMDispatcher(dispatcher); // store RM delegation token; - RMDelegationTokenIdentifier dtId1 = - new RMDelegationTokenIdentifier(new Text("owner1"), - new Text("renewer1"), new Text("realuser1")); + RMDelegationTokenIdentifier dtId1 = new RMDelegationTokenIdentifier( + new Text("owner1"), new Text("renewer1"), new Text("realuser1")); int sequenceNumber = 1111; dtId1.setSequenceNumber(sequenceNumber); byte[] tokenBeforeStore = dtId1.getBytes(); @@ -474,7 +478,7 @@ public void testRMDTSecretManagerStateStore( new HashMap(); token1.put(dtId1, renewDate1); // store delegation key; - DelegationKey key = new DelegationKey(1234, 4321 , "keyBytes".getBytes()); + DelegationKey key = new DelegationKey(1234, 4321, "keyBytes".getBytes()); HashSet keySet = new HashSet(); keySet.add(key); store.storeRMDTMasterKey(key); @@ -487,8 +491,8 @@ public void testRMDTSecretManagerStateStore( secretManagerState.getDTSequenceNumber()); RMDelegationTokenIdentifier tokenAfterStore = secretManagerState.getTokenState().keySet().iterator().next(); - Assert.assertTrue(Arrays.equals(tokenBeforeStore, - tokenAfterStore.getBytes())); + Assert.assertTrue( + Arrays.equals(tokenBeforeStore, tokenAfterStore.getBytes())); // update RM delegation token; renewDate1 = new Long(System.currentTimeMillis()); @@ -528,8 +532,7 @@ public void testRMDTSecretManagerStateStore( } protected Token generateAMRMToken( - ApplicationAttemptId attemptId, - AMRMTokenSecretManager appTokenMgr) { + ApplicationAttemptId attemptId, AMRMTokenSecretManager appTokenMgr) { Token appToken = appTokenMgr.createAndGetAMRMToken(attemptId); appToken.setService(new Text("appToken service")); @@ -547,9 +550,8 @@ public void testCheckVersion(RMStateStoreHelper stateStoreHelper) Assert.assertEquals(defaultVersion, store.loadVersion()); // compatible version - Version compatibleVersion = - Version.newInstance(defaultVersion.getMajorVersion(), - defaultVersion.getMinorVersion() + 2); + Version compatibleVersion = Version.newInstance( + defaultVersion.getMajorVersion(), defaultVersion.getMinorVersion() + 2); stateStoreHelper.writeVersion(compatibleVersion); Assert.assertEquals(compatibleVersion, store.loadVersion()); store.checkVersion(); @@ -557,9 +559,8 @@ public void testCheckVersion(RMStateStoreHelper stateStoreHelper) Assert.assertEquals(defaultVersion, store.loadVersion()); // incompatible version - Version incompatibleVersion = - Version.newInstance(defaultVersion.getMajorVersion() + 2, - defaultVersion.getMinorVersion()); + Version incompatibleVersion = Version.newInstance( + defaultVersion.getMajorVersion() + 2, defaultVersion.getMinorVersion()); stateStoreHelper.writeVersion(incompatibleVersion); try { store.checkVersion(); @@ -568,18 +569,17 @@ public void testCheckVersion(RMStateStoreHelper stateStoreHelper) Assert.assertTrue(t instanceof RMStateVersionIncompatibleException); } } - - public void testEpoch(RMStateStoreHelper stateStoreHelper) - throws Exception { + + public void testEpoch(RMStateStoreHelper stateStoreHelper) throws Exception { RMStateStore store = stateStoreHelper.getRMStateStore(); store.setRMDispatcher(new TestDispatcher()); - + long firstTimeEpoch = store.getAndIncrementEpoch(); Assert.assertEquals(epoch, firstTimeEpoch); - + long secondTimeEpoch = store.getAndIncrementEpoch(); Assert.assertEquals(epoch + 1, secondTimeEpoch); - + long thirdTimeEpoch = store.getAndIncrementEpoch(); Assert.assertEquals(epoch + 2, thirdTimeEpoch); @@ -663,7 +663,7 @@ public void testRemoveApplication(RMStateStoreHelper stateStoreHelper) } public void testRemoveAttempt(RMStateStoreHelper stateStoreHelper) - throws Exception { + throws Exception { RMStateStore store = stateStoreHelper.getRMStateStore(); TestDispatcher dispatcher = new TestDispatcher(); store.setRMDispatcher(dispatcher); @@ -674,13 +674,13 @@ public void testRemoveAttempt(RMStateStoreHelper stateStoreHelper) ApplicationAttemptId attemptId1 = ApplicationAttemptId.newInstance(appId, 1); RMAppAttempt attempt1 = storeAttempt(store, attemptId1, - ContainerId.newContainerId(attemptId1, 1).toString(), - null, null, dispatcher); + ContainerId.newContainerId(attemptId1, 1).toString(), null, null, + dispatcher); ApplicationAttemptId attemptId2 = ApplicationAttemptId.newInstance(appId, 2); RMAppAttempt attempt2 = storeAttempt(store, attemptId2, - ContainerId.newContainerId(attemptId2, 1).toString(), - null, null, dispatcher); + ContainerId.newContainerId(attemptId2, 1).toString(), null, null, + dispatcher); store.removeApplicationAttemptInternal(attemptId1); Assert.assertFalse(stateStoreHelper.attemptExists(attempt1)); Assert.assertTrue(stateStoreHelper.attemptExists(attempt2)); @@ -724,15 +724,13 @@ public void testAMRMTokenSecretManagerStateStore( AMRMTokenSecretManager appTokenMgr = new AMRMTokenSecretManager(conf, rmContext); - //create and save the first masterkey + // create and save the first masterkey MasterKeyData firstMasterKeyData = appTokenMgr.createNewMasterKey(); - AMRMTokenSecretManagerState state1 = - AMRMTokenSecretManagerState.newInstance( - firstMasterKeyData.getMasterKey(), null); - rmContext.getStateStore() - .storeOrUpdateAMRMTokenSecretManager(state1, - false); + AMRMTokenSecretManagerState state1 = AMRMTokenSecretManagerState + .newInstance(firstMasterKeyData.getMasterKey(), null); + rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManager(state1, + false); // load state store = stateStoreHelper.getRMStateStore(); @@ -740,19 +738,17 @@ public void testAMRMTokenSecretManagerStateStore( store.setRMDispatcher(dispatcher); RMState state = store.loadState(); Assert.assertNotNull(state.getAMRMTokenSecretManagerState()); - Assert.assertEquals(firstMasterKeyData.getMasterKey(), state - .getAMRMTokenSecretManagerState().getCurrentMasterKey()); - Assert.assertNull(state - .getAMRMTokenSecretManagerState().getNextMasterKey()); + Assert.assertEquals(firstMasterKeyData.getMasterKey(), + state.getAMRMTokenSecretManagerState().getCurrentMasterKey()); + Assert + .assertNull(state.getAMRMTokenSecretManagerState().getNextMasterKey()); - //create and save the second masterkey + // create and save the second masterkey MasterKeyData secondMasterKeyData = appTokenMgr.createNewMasterKey(); - AMRMTokenSecretManagerState state2 = - AMRMTokenSecretManagerState - .newInstance(firstMasterKeyData.getMasterKey(), + AMRMTokenSecretManagerState state2 = AMRMTokenSecretManagerState + .newInstance(firstMasterKeyData.getMasterKey(), secondMasterKeyData.getMasterKey()); - rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManager(state2, - true); + rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManager(state2, true); // load state store = stateStoreHelper.getRMStateStore(); @@ -760,24 +756,24 @@ public void testAMRMTokenSecretManagerStateStore( store.setRMDispatcher(dispatcher); RMState state_2 = store.loadState(); Assert.assertNotNull(state_2.getAMRMTokenSecretManagerState()); - Assert.assertEquals(firstMasterKeyData.getMasterKey(), state_2 - .getAMRMTokenSecretManagerState().getCurrentMasterKey()); - Assert.assertEquals(secondMasterKeyData.getMasterKey(), state_2 - .getAMRMTokenSecretManagerState().getNextMasterKey()); + Assert.assertEquals(firstMasterKeyData.getMasterKey(), + state_2.getAMRMTokenSecretManagerState().getCurrentMasterKey()); + Assert.assertEquals(secondMasterKeyData.getMasterKey(), + state_2.getAMRMTokenSecretManagerState().getNextMasterKey()); // re-create the masterKeyData based on the recovered masterkey // should have the same secretKey appTokenMgr.recover(state_2); Assert.assertEquals(appTokenMgr.getCurrnetMasterKeyData().getSecretKey(), - firstMasterKeyData.getSecretKey()); + firstMasterKeyData.getSecretKey()); Assert.assertEquals(appTokenMgr.getNextMasterKeyData().getSecretKey(), - secondMasterKeyData.getSecretKey()); + secondMasterKeyData.getSecretKey()); store.close(); } - public void testReservationStateStore( - RMStateStoreHelper stateStoreHelper) throws Exception { + public void testReservationStateStore(RMStateStoreHelper stateStoreHelper) + throws Exception { RMStateStore store = stateStoreHelper.getRMStateStore(); TestDispatcher dispatcher = new TestDispatcher(); store.setRMDispatcher(dispatcher); @@ -794,10 +790,10 @@ public void testReservationStateStore( boolean hasGang = true; String planName = "dedicated"; ReservationDefinition rDef = - ReservationSystemTestUtil.createSimpleReservationDefinition( - start, start + alloc.length + 1, alloc.length); - ReservationAllocation allocation = new InMemoryReservationAllocation( - r1, rDef, "u3", planName, 0, 0 + alloc.length, + ReservationSystemTestUtil.createSimpleReservationDefinition(start, + start + alloc.length + 1, alloc.length); + ReservationAllocation allocation = new InMemoryReservationAllocation(r1, + rDef, "u3", planName, 0, 0 + alloc.length, ReservationSystemTestUtil.generateAllocation(0L, 1L, alloc), res, minAlloc, hasGang); ReservationAllocationStateProto allocationStateProto = @@ -809,51 +805,46 @@ public void testReservationStateStore( when(rmContext.getStateStore()).thenReturn(store); store.setRMDispatcher(dispatcher); RMState state = store.loadState(); - Map> - reservationState = state.getReservationState(); + Map> reservationState = + state.getReservationState(); Assert.assertNotNull(reservationState); // 2. Store single reservation and verify String reservationIdName = r1.toString(); - rmContext.getStateStore().storeNewReservation( - allocationStateProto, + rmContext.getStateStore().storeNewReservation(allocationStateProto, planName, reservationIdName); - // load state and verify new state - validateStoredReservation( - stateStoreHelper, dispatcher, rmContext, r1, planName, allocation, - allocationStateProto); + validateStoredReservation(stateStoreHelper, dispatcher, rmContext, r1, + planName, allocation, allocationStateProto); // 3. update state test - alloc = new int[]{6, 6, 6}; + alloc = new int[] { 6, 6, 6 }; hasGang = false; - allocation = new InMemoryReservationAllocation( - r1, rDef, "u3", planName, 2, 2 + alloc.length, + allocation = new InMemoryReservationAllocation(r1, rDef, "u3", planName, 2, + 2 + alloc.length, ReservationSystemTestUtil.generateAllocation(1L, 2L, alloc), res, minAlloc, hasGang); - allocationStateProto = - ReservationSystemUtil.buildStateProto(allocation); + allocationStateProto = ReservationSystemUtil.buildStateProto(allocation); rmContext.getStateStore().removeReservation(planName, reservationIdName); - rmContext.getStateStore().storeNewReservation(allocationStateProto, planName, reservationIdName); + rmContext.getStateStore().storeNewReservation(allocationStateProto, + planName, reservationIdName); // load state and verify updated reservation - validateStoredReservation( - stateStoreHelper, dispatcher, rmContext, r1, planName, allocation, - allocationStateProto); + validateStoredReservation(stateStoreHelper, dispatcher, rmContext, r1, + planName, allocation, allocationStateProto); // 4. add a second one and remove the first one ReservationId r2 = ReservationId.newInstance(ts, 2); - ReservationAllocation allocation2 = new InMemoryReservationAllocation( - r2, rDef, "u3", planName, 0, 0 + alloc.length, + ReservationAllocation allocation2 = new InMemoryReservationAllocation(r2, + rDef, "u3", planName, 0, 0 + alloc.length, ReservationSystemTestUtil.generateAllocation(0L, 1L, alloc), res, minAlloc, hasGang); ReservationAllocationStateProto allocationStateProto2 = ReservationSystemUtil.buildStateProto(allocation2); String reservationIdName2 = r2.toString(); - rmContext.getStateStore().storeNewReservation( - allocationStateProto2, + rmContext.getStateStore().storeNewReservation(allocationStateProto2, planName, reservationIdName2); rmContext.getStateStore().removeReservation(planName, reservationIdName); @@ -874,11 +865,10 @@ public void testReservationStateStore( storedReservationAllocation); storedReservationAllocation = reservations.get(r2); - assertAllocationStateEqual( - allocationStateProto2, storedReservationAllocation); + assertAllocationStateEqual(allocationStateProto2, + storedReservationAllocation); assertAllocationStateEqual(allocation2, storedReservationAllocation); - // 5. remove last reservation removes the plan state rmContext.getStateStore().removeReservation(planName, reservationIdName2); @@ -892,17 +882,16 @@ public void testReservationStateStore( Assert.assertNull(reservations); } - private void validateStoredReservation( - RMStateStoreHelper stateStoreHelper, TestDispatcher dispatcher, - RMContext rmContext, ReservationId r1, String planName, - ReservationAllocation allocation, + private void validateStoredReservation(RMStateStoreHelper stateStoreHelper, + TestDispatcher dispatcher, RMContext rmContext, ReservationId r1, + String planName, ReservationAllocation allocation, ReservationAllocationStateProto allocationStateProto) throws Exception { RMStateStore store = stateStoreHelper.getRMStateStore(); when(rmContext.getStateStore()).thenReturn(store); store.setRMDispatcher(dispatcher); RMState state = store.loadState(); - Map> - reservationState = state.getReservationState(); + Map> reservationState = + state.getReservationState(); Assert.assertNotNull(reservationState); Map reservations = reservationState.get(planName); @@ -911,43 +900,37 @@ private void validateStoredReservation( reservations.get(r1); Assert.assertNotNull(storedReservationAllocation); - assertAllocationStateEqual( - allocationStateProto, storedReservationAllocation); + assertAllocationStateEqual(allocationStateProto, + storedReservationAllocation); assertAllocationStateEqual(allocation, storedReservationAllocation); } - void assertAllocationStateEqual( - ReservationAllocationStateProto expected, + void assertAllocationStateEqual(ReservationAllocationStateProto expected, ReservationAllocationStateProto actual) { - Assert.assertEquals( - expected.getAcceptanceTime(), actual.getAcceptanceTime()); + Assert.assertEquals(expected.getAcceptanceTime(), + actual.getAcceptanceTime()); Assert.assertEquals(expected.getStartTime(), actual.getStartTime()); Assert.assertEquals(expected.getEndTime(), actual.getEndTime()); Assert.assertEquals(expected.getContainsGangs(), actual.getContainsGangs()); Assert.assertEquals(expected.getUser(), actual.getUser()); - assertEquals( - expected.getReservationDefinition(), actual.getReservationDefinition()); + assertEquals(expected.getReservationDefinition(), + actual.getReservationDefinition()); assertEquals(expected.getAllocationRequestsList(), actual.getAllocationRequestsList()); } - void assertAllocationStateEqual( - ReservationAllocation expected, + void assertAllocationStateEqual(ReservationAllocation expected, ReservationAllocationStateProto actual) { - Assert.assertEquals( - expected.getAcceptanceTime(), actual.getAcceptanceTime()); + Assert.assertEquals(expected.getAcceptanceTime(), + actual.getAcceptanceTime()); Assert.assertEquals(expected.getStartTime(), actual.getStartTime()); Assert.assertEquals(expected.getEndTime(), actual.getEndTime()); Assert.assertEquals(expected.containsGangs(), actual.getContainsGangs()); Assert.assertEquals(expected.getUser(), actual.getUser()); - assertEquals( - expected.getReservationDefinition(), - ReservationSystemUtil.convertFromProtoFormat( - actual.getReservationDefinition())); - assertEquals( - expected.getAllocationRequests(), - ReservationSystemUtil.toAllocations( - actual.getAllocationRequestsList())); + assertEquals(expected.getReservationDefinition(), ReservationSystemUtil + .convertFromProtoFormat(actual.getReservationDefinition())); + assertEquals(expected.getAllocationRequests(), ReservationSystemUtil + .toAllocations(actual.getAllocationRequestsList())); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 70887e0..647d1a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -490,14 +490,14 @@ private void sendAppUpdateSavedEvent(RMApp application) { RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_UPDATE_SAVED); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); } private void sendAttemptUpdateSavedEvent(RMApp application) { application.getCurrentAppAttempt().handle( new RMAppAttemptEvent(application.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED)); - rmDispatcher.await(); + rmDispatcher.drain(); } protected RMApp testCreateAppNewSaving( @@ -648,7 +648,7 @@ public void testUnmanagedApp() throws IOException { RMAppEvent event = new RMAppFailedAttemptEvent( application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "", false); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); RMAppAttempt appAttempt = application.getCurrentAppAttempt(); Assert.assertEquals(1, appAttempt.getAppAttemptId().getAttemptId()); sendAppUpdateSavedEvent(application); @@ -690,7 +690,7 @@ public void testAppNewKill() throws IOException { application.getApplicationId(), "Application killed by user.", fooUser, Server.getRemoteIp()); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); sendAppUpdateSavedEvent(application); assertKilled(application); assertAppFinalStateNotSaved(application); @@ -709,7 +709,7 @@ public void testAppNewReject() throws IOException { RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_REJECTED, rejectedText); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); sendAppUpdateSavedEvent(application); assertFailed(application, rejectedText); assertAppFinalStateSaved(application); @@ -727,7 +727,7 @@ public void testAppNewRejectAddToStore() throws IOException { RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_REJECTED, rejectedText); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); sendAppUpdateSavedEvent(application); assertFailed(application, rejectedText); assertAppFinalStateSaved(application); @@ -750,7 +750,7 @@ public void testAppNewSavingKill() throws IOException { Server.getRemoteIp()); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); sendAppUpdateSavedEvent(application); assertKilled(application); verifyApplicationFinished(RMAppState.KILLED); @@ -768,7 +768,7 @@ public void testAppNewSavingReject() throws IOException { RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_REJECTED, rejectedText); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); sendAppUpdateSavedEvent(application); assertFailed(application, rejectedText); assertAppFinalStateSaved(application); @@ -785,7 +785,7 @@ public void testAppNewSavingSaveReject() throws IOException { RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_SAVE_FAILED, rejectedText); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); assertFailed(application, rejectedText); verify(store, times(0)).updateApplicationState( any(ApplicationStateData.class)); @@ -803,7 +803,7 @@ public void testAppSubmittedRejected() throws IOException { RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_REJECTED, rejectedText); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); sendAppUpdateSavedEvent(application); assertFailed(application, rejectedText); assertAppFinalStateSaved(application); @@ -825,7 +825,7 @@ public void testAppSubmittedKill() throws IOException, InterruptedException { Server.getRemoteIp()); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); sendAppUpdateSavedEvent(application); assertKilled(application); assertAppFinalStateSaved(application); @@ -851,7 +851,7 @@ public void testAppAcceptedFailed() throws IOException { new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_ACCEPTED); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); assertAppState(RMAppState.ACCEPTED, application); } @@ -862,7 +862,7 @@ public void testAppAcceptedFailed() throws IOException { new RMAppFailedAttemptEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, message, false); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); sendAppUpdateSavedEvent(application); assertFailed(application, ".*" + message + ".*Failing the application.*"); assertAppFinalStateSaved(application); @@ -882,7 +882,7 @@ public void testAppAcceptedKill() throws IOException, InterruptedException { Server.getRemoteIp()); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); assertAppState(RMAppState.KILLING, application); RMAppEvent appAttemptKilled = @@ -911,7 +911,7 @@ public void testAppAcceptedAttemptKilled() throws IOException, new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_KILLED, "Application killed by user."); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); assertAppState(RMAppState.FINAL_SAVING, application); sendAppUpdateSavedEvent(application); @@ -936,7 +936,7 @@ public void testAppRunningKill() throws IOException { Server.getRemoteIp()); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); sendAttemptUpdateSavedEvent(application); sendAppUpdateSavedEvent(application); @@ -962,7 +962,7 @@ public void testAppRunningFailed() throws IOException { new RMAppFailedAttemptEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "", false); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); assertAppState(RMAppState.ACCEPTED, application); appAttempt = application.getCurrentAppAttempt(); Assert.assertEquals(++expectedAttemptId, @@ -971,13 +971,13 @@ public void testAppRunningFailed() throws IOException { new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_ACCEPTED); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); assertAppState(RMAppState.ACCEPTED, application); event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_REGISTERED); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); assertAppState(RMAppState.RUNNING, application); } @@ -987,7 +987,7 @@ public void testAppRunningFailed() throws IOException { new RMAppFailedAttemptEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "", false); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); sendAppUpdateSavedEvent(application); assertFailed(application, ".*Failing the application.*"); assertAppFinalStateSaved(application); @@ -997,7 +997,7 @@ public void testAppRunningFailed() throws IOException { new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, "Application killed by user."); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); assertFailed(application, ".*Failing the application.*"); assertAppFinalStateSaved(application); verifyApplicationFinished(RMAppState.FAILED); @@ -1014,7 +1014,7 @@ public void testAppAtFinishingIgnoreKill() throws IOException { new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, "Application killed by user."); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); assertAppState(RMAppState.FINISHING, application); } @@ -1056,7 +1056,7 @@ public void testAppFinishedFinished() throws IOException { new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, "Application killed by user."); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); assertTimesAtFinish(application); assertAppState(RMAppState.FINISHED, application); StringBuilder diag = application.getDiagnostics(); @@ -1076,7 +1076,7 @@ public void testAppFailedFailed() throws IOException { RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_REJECTED, ""); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); sendAppUpdateSavedEvent(application); assertTimesAtFinish(application); assertAppState(RMAppState.FAILED, application); @@ -1086,7 +1086,7 @@ public void testAppFailedFailed() throws IOException { new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, "Application killed by user."); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); assertTimesAtFinish(application); assertAppState(RMAppState.FAILED, application); verifyApplicationFinished(RMAppState.FAILED); @@ -1112,7 +1112,7 @@ public void testAppKilledKilled() throws IOException { Server.getRemoteIp()); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); sendAttemptUpdateSavedEvent(application); sendAppUpdateSavedEvent(application); assertTimesAtFinish(application); @@ -1122,7 +1122,7 @@ public void testAppKilledKilled() throws IOException { event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FINISHED, ""); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); @@ -1131,7 +1131,7 @@ public void testAppKilledKilled() throws IOException { new RMAppFailedAttemptEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "", false); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); @@ -1141,7 +1141,7 @@ public void testAppKilledKilled() throws IOException { new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, "Application killed by user."); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); verifyApplicationFinished(RMAppState.KILLED); @@ -1174,14 +1174,14 @@ protected void onInvalidStateTransition(RMAppEventType rmAppEventType, applicationId, "Application killed by user.", fooUser, Server.getRemoteIp()); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); assertKilled(application); // KILLED => KILLED event RMAppEventType.START event = new RMAppFailedAttemptEvent(application.getApplicationId(), RMAppEventType.START, "", false); application.handle(event); - rmDispatcher.await(); + rmDispatcher.drain(); assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); } @@ -1225,7 +1225,7 @@ public void testRecoverApplication(ApplicationStateData appState, Assert.assertTrue("Application is not in recoveredFinalStatus.", RMAppImpl.isAppInFinalState(application)); - rmDispatcher.await(); + rmDispatcher.drain(); RMAppState finalState = appState.getState(); Assert.assertEquals("Application is not in finalState.", finalState, application.getState()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index bb6591b..6878998 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -138,16 +138,16 @@ public void testReleaseWhileRunning() { rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.START)); - drainDispatcher.await(); + drainDispatcher.drain(); assertEquals(RMContainerState.ALLOCATED, rmContainer.getState()); rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED)); - drainDispatcher.await(); + drainDispatcher.drain(); assertEquals(RMContainerState.ACQUIRED, rmContainer.getState()); rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED)); - drainDispatcher.await(); + drainDispatcher.drain(); assertEquals(RMContainerState.RUNNING, rmContainer.getState()); verify(publisher, times(2)).containerCreated(any(RMContainer.class), anyLong()); @@ -161,7 +161,7 @@ public void testReleaseWhileRunning() { SchedulerUtils.RELEASED_CONTAINER); rmContainer.handle(new RMContainerFinishedEvent(containerId, containerStatus, RMContainerEventType.RELEASED)); - drainDispatcher.await(); + drainDispatcher.drain(); assertEquals(RMContainerState.RELEASED, rmContainer.getState()); assertEquals(SchedulerUtils.RELEASED_CONTAINER, rmContainer.getDiagnosticsInfo()); @@ -244,20 +244,20 @@ public void testExpireWhileRunning() { rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.START)); - drainDispatcher.await(); + drainDispatcher.drain(); assertEquals(RMContainerState.ALLOCATED, rmContainer.getState()); verify(publisher).containerCreated(any(RMContainer.class), anyLong()); rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED)); - drainDispatcher.await(); + drainDispatcher.drain(); assertEquals(RMContainerState.ACQUIRED, rmContainer.getState()); verify(publisher, times(2)).containerCreated(any(RMContainer.class), anyLong()); rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED)); - drainDispatcher.await(); + drainDispatcher.drain(); assertEquals(RMContainerState.RUNNING, rmContainer.getState()); assertEquals("http://host:3465/node/containerlogs/container_1_0001_01_000001/user", @@ -270,7 +270,7 @@ public void testExpireWhileRunning() { SchedulerUtils.EXPIRED_CONTAINER); rmContainer.handle(new RMContainerFinishedEvent(containerId, containerStatus, RMContainerEventType.EXPIRE)); - drainDispatcher.await(); + drainDispatcher.drain(); assertEquals(RMContainerState.RUNNING, rmContainer.getState()); verify(writer, never()).containerFinished(any(RMContainer.class)); verify(publisher, never()).containerFinished(any(RMContainer.class), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index ba409b1..bedaa4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -917,7 +917,7 @@ public void testNodemanagerReconnect() throws Exception { request1.setHttpPort(0); request1.setResource(capability); privateResourceTrackerService.registerNodeManager(request1); - privateDispatcher.await(); + privateDispatcher.drain(); Resource clusterResource = rm.getResourceScheduler().getClusterResource(); Assert.assertEquals("Initial cluster resources don't match", capability, @@ -932,7 +932,7 @@ public void testNodemanagerReconnect() throws Exception { // hold up the disaptcher and register the same node with lower capability sleepHandler.sleepFlag = true; privateResourceTrackerService.registerNodeManager(request2); - privateDispatcher.await(); + privateDispatcher.drain(); Assert.assertEquals("Cluster resources don't match", newCapability, rm.getResourceScheduler().getClusterResource()); privateResourceTrackerService.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java index f313d70..24bac4f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java @@ -23,7 +23,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.security.GroupMappingServiceProvider; import org.apache.hadoop.security.ShellBasedUnixGroupsMapping; import org.apache.hadoop.security.TestGroupsCaching; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -32,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -63,8 +62,6 @@ .AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event .SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair - .SimpleGroupsMapping; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; @@ -99,6 +96,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +@SuppressWarnings("rawtypes") public class TestCapacitySchedulerAutoCreatedQueueBase { private static final Log LOG = LogFactory.getLog( @@ -156,9 +154,13 @@ protected MockNM nm3 = null; protected CapacityScheduler cs; protected SpyDispatcher dispatcher; + private static EventHandler rmAppEventEventHandler; - public static class SpyDispatcher extends AsyncDispatcher { + /** + * Dispatcher used for testing. + */ + public static class SpyDispatcher implements Dispatcher { public static BlockingQueue eventQueue = new LinkedBlockingQueue<>(); @@ -169,11 +171,6 @@ public void handle(Event event) { } @Override - protected void dispatch(Event event) { - eventQueue.add(event); - } - - @Override public EventHandler getEventHandler() { return rmAppEventEventHandler; } @@ -185,6 +182,12 @@ void spyOnNextEvent(Event expectedEvent, long timeout) assertEquals(expectedEvent.getType(), event.getType()); assertEquals(expectedEvent.getClass(), event.getClass()); } + + @Override + public void register(Class eventType, + EventHandler handler) { + // NO-OP + } } @Before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java index 800789a..8ed285a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java @@ -1046,6 +1046,8 @@ public void testPreemptionForFragmentatedCluster() throws Exception { nms.add(rm1.registerNode("h" + i + ":1234", 30 * GB)); } + rm1.drainEvents(); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); // launch an app to queue, AM container should be launched in nm1 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java index 307d5ae..e1ce3f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -205,7 +205,7 @@ protected Dispatcher createDispatcher() { verifyContainerDecreased(response, containerId1, 1 * GB); // Wait for scheduler to finish processing kill events.. - dispatcher.waitForEventThreadToWait(); + dispatcher.drain(); checkUsedResource(rm1, "default", 1 * GB, null); Assert.assertEquals(1 * GB, @@ -606,7 +606,6 @@ protected Dispatcher createDispatcher() { // Trigger a node heartbeat.. cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - dispatcher.waitForEventThreadToWait(); /* Check statuses after reservation satisfied */ // Increase request should be unreserved Assert.assertTrue(app.getReservedContainers().isEmpty()); @@ -715,7 +714,7 @@ protected Dispatcher createDispatcher() { am1.allocate(null, null); // Wait for scheduler to process all events. - dispatcher.waitForEventThreadToWait(); + dispatcher.drain(); /* Check statuses after reservation satisfied */ // Increase request should be unreserved diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java index a800bef..f793f80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java @@ -212,7 +212,7 @@ protected Dispatcher createDispatcher() { Assert.assertEquals( 1 * GB, rm1.getResourceScheduler().getRMContainer(containerId2) .getAllocatedResource().getMemorySize()); - disp.waitForEventThreadToWait(); + disp.drain(); // Verify total resource usage is 2G checkUsedResource(rm1, "default", 2 * GB, null); Assert.assertEquals(2 * GB, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 9b2c0b3..09feade 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -77,8 +77,9 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; @@ -120,7 +121,7 @@ private static BlockingQueue eventQueue; private static volatile AtomicInteger counter; - private static AsyncDispatcher dispatcher; + private static Dispatcher dispatcher; public static class Renewer extends TokenRenewer { private static int counter = 0; private static Token lastRenewed = null; @@ -203,7 +204,22 @@ public void setUp() throws Exception { "kerberos"); UserGroupInformation.setConfiguration(conf); eventQueue = new LinkedBlockingQueue(); - dispatcher = new AsyncDispatcher(eventQueue); + dispatcher = new Dispatcher() { + @Override + public void register(Class eventType, + EventHandler handler) { + // NO-OP + } + @Override + public EventHandler getEventHandler() { + return new EventHandler() { + @Override + public void handle(Event event) { + eventQueue.add(event); + } + }; + } + }; Renewer.reset(); delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter); RMContext mockContext = mock(RMContext.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java index ba14491..bc8511c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java @@ -118,8 +118,8 @@ protected void startWepApp() { MasterKey nmTokenMasterKey = registrationResponse.getNMTokenMasterKey(); Assert.assertNotNull(nmToken + "Registration should cause a key-update!", nmTokenMasterKey); - - dispatcher.await(); + + dispatcher.drain(); NodeHeartbeatResponse response = nm.nodeHeartbeat(true); Assert.assertNull(containerToken + @@ -128,7 +128,7 @@ protected void startWepApp() { Assert.assertNull(nmToken + "First heartbeat after registration shouldn't get any key updates!", response.getNMTokenMasterKey()); - dispatcher.await(); + dispatcher.drain(); response = nm.nodeHeartbeat(true); Assert.assertNull(containerToken + @@ -137,8 +137,8 @@ protected void startWepApp() { Assert.assertNull(nmToken + "Even second heartbeat after registration shouldn't get any key updates!", response.getContainerTokenMasterKey()); - - dispatcher.await(); + + dispatcher.drain(); // Let's force a roll-over rm.getRMContext().getContainerTokenSecretManager().rollMasterKey(); @@ -161,7 +161,7 @@ protected void startWepApp() { "Roll-over should have incremented the key-id only by one!", nmTokenMasterKey.getKeyId() + 1, response.getNMTokenMasterKey().getKeyId()); - dispatcher.await(); + dispatcher.drain(); response = nm.nodeHeartbeat(true); Assert.assertNull(containerToken + @@ -170,7 +170,7 @@ protected void startWepApp() { Assert.assertNull(nmToken + "Second heartbeat after roll-over shouldn't get any key updates!", response.getNMTokenMasterKey()); - dispatcher.await(); + dispatcher.drain(); // Let's force activation rm.getRMContext().getContainerTokenSecretManager().activateNextMasterKey(); @@ -183,7 +183,7 @@ protected void startWepApp() { Assert.assertNull(nmToken + "Activation shouldn't cause any key updates!", response.getNMTokenMasterKey()); - dispatcher.await(); + dispatcher.drain(); response = nm.nodeHeartbeat(true); Assert.assertNull(containerToken + @@ -192,7 +192,7 @@ protected void startWepApp() { Assert.assertNull(nmToken + "Even second heartbeat after activation shouldn't get any key updates!", response.getNMTokenMasterKey()); - dispatcher.await(); + dispatcher.drain(); rm.stop(); }