diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index e4a5a82..83094dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -17,11 +17,14 @@ */ package org.apache.hadoop.yarn.event; +import com.google.common.annotations.VisibleForTesting; + import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @SuppressWarnings("rawtypes") -public class DrainDispatcher extends AsyncDispatcher { +public class DrainDispatcher extends AsyncDispatcher + implements EventHandler{ public DrainDispatcher() { this(new LinkedBlockingQueue()); @@ -31,6 +34,11 @@ public DrainDispatcher(BlockingQueue eventQueue) { super(eventQueue); } + @VisibleForTesting + public void handle(T event) { + super.getEventHandler().handle(event); + } + /** * Wait till event thread enters WAITING state (i.e. waiting for new events). */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 0c1df33..1e1dc58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -116,7 +116,6 @@ import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApps; import org.apache.hadoop.yarn.webapp.WebApps.Builder; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index de273c4..3177a5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -668,6 +668,20 @@ protected void serviceInit(Configuration conf) throws Exception{ initInternal(conf); } + @VisibleForTesting + public void updateStateStoreDispatcher( + Configuration conf, AsyncDispatcher dispatcher) { + if (this.dispatcher != null) { + this.dispatcher.stop(); + } + this.dispatcher = dispatcher; + dispatcher.init(conf); + dispatcher.register(RMStateStoreEventType.class, + new ForwardingEventHandler()); + dispatcher.setDrainEventsOnStop(); + dispatcher.start(); + } + @Override protected void serviceStart() throws Exception { dispatcher.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index dc17f5f..0c749bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -52,6 +52,7 @@ private volatile int responseId = 0; private final ApplicationAttemptId attemptId; + private MockRM rm; private RMContext context; private ApplicationMasterProtocol amRMProtocol; private UserGroupInformation ugi; @@ -60,9 +61,10 @@ private final List requests = new ArrayList(); private final List releases = new ArrayList(); - public MockAM(RMContext context, ApplicationMasterProtocol amRMProtocol, + public MockAM(MockRM rm, ApplicationMasterProtocol amRMProtocol, ApplicationAttemptId attemptId) { - this.context = context; + this.rm = rm; + this.context = rm.getRMContext(); this.amRMProtocol = amRMProtocol; this.attemptId = attemptId; } @@ -84,7 +86,7 @@ private void waitForState(RMAppAttemptState finalState) throws InterruptedException { RMApp app = context.getRMApps().get(attemptId.getApplicationId()); RMAppAttempt attempt = app.getRMAppAttempt(attemptId); - MockRM.waitForState(attempt, finalState); + rm.waitForState(attempt, finalState); } public RegisterApplicationMasterResponse registerAppAttempt() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 84fe7c0..a715061 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; @@ -91,6 +92,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -115,6 +118,9 @@ private static final int WAIT_MS_PER_LOOP = 10; private final boolean useNullRMNodeLabelsManager; + private DrainDispatcher schedulerEventDrainDispatcher; + private DrainDispatcher stateStoreEventDrainDispatcher; + private boolean disableDrainEventsImplicitly; public MockRM() { this(new YarnConfiguration()); @@ -136,8 +142,12 @@ public MockRM(Configuration conf, RMStateStore store, if(store != null) { setRMStateStore(store); } + stateStoreEventDrainDispatcher = new DrainDispatcher(); + getRMContext().getStateStore().updateStateStoreDispatcher( + conf, stateStoreEventDrainDispatcher); Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); + disableDrainEventsImplicitly = false; } @Override @@ -157,15 +167,33 @@ protected Dispatcher createDispatcher() { return new DrainDispatcher(); } + @Override + protected final EventHandler createSchedulerEventDispatcher() { + schedulerEventDrainDispatcher = new DrainDispatcher(); + schedulerEventDrainDispatcher.register(SchedulerEventType.class, + getResourceScheduler()); + return schedulerEventDrainDispatcher; + } + public void drainEvents() { Dispatcher rmDispatcher = getRmDispatcher(); if (rmDispatcher instanceof DrainDispatcher) { ((DrainDispatcher) rmDispatcher).await(); + drainSchedulerEvents(); + drainStateStoreEvents(); } else { throw new UnsupportedOperationException("Not a Drain Dispatcher!"); } } + public void drainSchedulerEvents() { + schedulerEventDrainDispatcher.await(); + } + + public void drainStateStoreEvents() { + stateStoreEventDrainDispatcher.await(); + } + /** * Wait until an application has reached a specified state. * The timeout is 80 seconds. @@ -193,7 +221,8 @@ public void waitForState(ApplicationId appId, RMAppState finalState) LOG.info("App State is : " + app.getState()); Assert.assertEquals("App State is not correct (timeout).", finalState, - app.getState()); + app.getState()); + drainEventsImplicitly(); } /** @@ -224,7 +253,7 @@ public void waitForState(ApplicationAttemptId attemptId, RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId()); Assert.assertNotNull("app shouldn't be null", app); RMAppAttempt attempt = app.getRMAppAttempt(attemptId); - MockRM.waitForState(attempt, finalState, timeoutMsecs); + waitForState(attempt, finalState, timeoutMsecs); } /** @@ -235,7 +264,7 @@ public void waitForState(ApplicationAttemptId attemptId, * @throws InterruptedException * if interrupted while waiting for the state transition */ - public static void waitForState(RMAppAttempt attempt, + public void waitForState(RMAppAttempt attempt, RMAppAttemptState finalState) throws InterruptedException { waitForState(attempt, finalState, TIMEOUT_MS_FOR_ATTEMPT); } @@ -249,7 +278,7 @@ public static void waitForState(RMAppAttempt attempt, * @throws InterruptedException * if interrupted while waiting for the state transition */ - public static void waitForState(RMAppAttempt attempt, + public void waitForState(RMAppAttempt attempt, RMAppAttemptState finalState, int timeoutMsecs) throws InterruptedException { int timeWaiting = 0; @@ -266,7 +295,8 @@ public static void waitForState(RMAppAttempt attempt, LOG.info("Attempt State is : " + attempt.getAppAttemptState()); Assert.assertEquals("Attempt state is not correct (timeout).", finalState, - attempt.getState()); + attempt.getState()); + drainEventsImplicitly(); } public void waitForContainerToComplete(RMAppAttempt attempt, @@ -277,6 +307,7 @@ public void waitForContainerToComplete(RMAppAttempt attempt, for (ContainerStatus container : containers) { if (container.getContainerId().equals( completedContainer.getContainerId())) { + drainEventsImplicitly(); return; } } @@ -346,7 +377,7 @@ public boolean waitForState(MockNM nm, ContainerId containerId, public boolean waitForState(Collection nms, ContainerId containerId, RMContainerState containerState) throws Exception { return waitForState(nms, containerId, containerState, - TIMEOUT_MS_FOR_CONTAINER_AND_NODE); + TIMEOUT_MS_FOR_CONTAINER_AND_NODE); } /** @@ -395,6 +426,7 @@ public boolean waitForState(Collection nms, ContainerId containerId, } System.out.println("Container State is : " + container.getState()); + drainEventsImplicitly(); return true; } @@ -657,13 +689,14 @@ public SubmitApplicationResponse run() { RMAppAttemptState.SCHEDULED); } + drainEventsImplicitly(); return rmApp; } public MockNM registerNode(String nodeIdStr, int memory) throws Exception { MockNM nm = new MockNM(nodeIdStr, memory, getResourceTrackerService()); nm.registerNode(); - drainEvents(); + drainEventsImplicitly(); return nm; } @@ -672,7 +705,7 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores) MockNM nm = new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService()); nm.registerNode(); - drainEvents(); + drainEventsImplicitly(); return nm; } @@ -682,7 +715,7 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores, new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService(), YarnVersionInfo.getVersion()); nm.registerNode(runningApplications); - drainEvents(); + drainEventsImplicitly(); return nm; } @@ -690,12 +723,14 @@ public void sendNodeStarted(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null)); + drainEventsImplicitly(); } public void sendNodeLost(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.EXPIRE)); + drainEventsImplicitly(); } /** @@ -725,12 +760,15 @@ public void waitForState(NodeId nodeId, NodeState finalState) System.out.println("Node State is : " + node.getState()); Assert.assertEquals("Node state is not correct (timedout)", finalState, node.getState()); + drainEventsImplicitly(); } public KillApplicationResponse killApp(ApplicationId appId) throws Exception { ApplicationClientProtocol client = getClientRMService(); KillApplicationRequest req = KillApplicationRequest.newInstance(appId); - return client.forceKillApplication(req); + KillApplicationResponse response = client.forceKillApplication(req); + drainEventsImplicitly(); + return response; } public FailApplicationAttemptResponse failApplicationAttempt( @@ -738,13 +776,16 @@ public FailApplicationAttemptResponse failApplicationAttempt( ApplicationClientProtocol client = getClientRMService(); FailApplicationAttemptRequest req = FailApplicationAttemptRequest.newInstance(attemptId); - return client.failApplicationAttempt(req); + FailApplicationAttemptResponse response = + client.failApplicationAttempt(req); + drainEventsImplicitly(); + return response; } // from AMLauncher public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId) throws Exception { - MockAM am = new MockAM(getRMContext(), masterService, appAttemptId); + MockAM am = new MockAM(this, masterService, appAttemptId); waitForState(appAttemptId, RMAppAttemptState.ALLOCATED); //create and set AMRMToken Token amrmToken = @@ -758,16 +799,18 @@ public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId) .getEventHandler() .handle( new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.LAUNCHED)); + drainEventsImplicitly(); return am; } public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId) throws Exception { - MockAM am = new MockAM(getRMContext(), masterService, appAttemptId); + MockAM am = new MockAM(this, masterService, appAttemptId); waitForState(am.getApplicationAttemptId(), RMAppAttemptState.ALLOCATED); getRMContext().getDispatcher().getEventHandler() .handle(new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.LAUNCH_FAILED, "Failed")); + drainEventsImplicitly(); } @Override @@ -918,6 +961,7 @@ public static void finishAMAndVerifyAppState(RMApp rmApp, MockRM rm, MockNM nm, nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE); rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED); rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED); + rm.drainEvents(); } @SuppressWarnings("rawtypes") @@ -935,8 +979,9 @@ private static void waitForSchedulerAppAttemptAdded( tick++; } Assert.assertNotNull("Timed out waiting for SchedulerApplicationAttempt=" + - attemptId + " to be added.", ((AbstractYarnScheduler) + attemptId + " to be added.", ((AbstractYarnScheduler) rm.getResourceScheduler()).getApplicationAttempt(attemptId)); + rm.drainEvents(); } /** @@ -945,25 +990,29 @@ private static void waitForSchedulerAppAttemptAdded( */ public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) throws Exception { + rm.drainEvents(); RMAppAttempt attempt = waitForAttemptScheduled(app, rm); System.out.println("Launch AM " + attempt.getAppAttemptId()); nm.nodeHeartbeat(true); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); + rm.drainEvents(); return am; } public static MockAM launchUAM(RMApp app, MockRM rm, MockNM nm) throws Exception { + rm.drainEvents(); // UAMs go directly to LAUNCHED state rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); RMAppAttempt attempt = app.getCurrentAppAttempt(); waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm); System.out.println("Launch AM " + attempt.getAppAttemptId()); nm.nodeHeartbeat(true); - MockAM am = new MockAM(rm.getRMContext(), rm.masterService, + MockAM am = new MockAM(rm, rm.masterService, attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); + rm.drainEvents(); return am; } @@ -973,6 +1022,7 @@ public static RMAppAttempt waitForAttemptScheduled(RMApp app, MockRM rm) RMAppAttempt attempt = app.getCurrentAppAttempt(); waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED); + rm.drainEvents(); return attempt; } @@ -981,6 +1031,7 @@ public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm) MockAM am = launchAM(app, rm, nm); am.registerAppAttempt(); rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); + rm.drainEvents(); return am; } @@ -997,6 +1048,7 @@ public void updateReservationState(ReservationUpdateRequest request) throws IOException, YarnException { ApplicationClientProtocol client = getClientRMService(); client.updateReservation(request); + drainEventsImplicitly(); } // Explicitly reset queue metrics for testing. @@ -1017,9 +1069,9 @@ public void signalToContainer(ContainerId containerId, SignalContainerRequest req = SignalContainerRequest.newInstance(containerId, command); client.signalToContainer(req); + drainEventsImplicitly(); } - /** * Wait until an app removed from scheduler. * The timeout is 40 seconds. @@ -1057,5 +1109,16 @@ public void waitForAppRemovedFromScheduler(ApplicationId appId, Assert.assertTrue("app is not removed from scheduler (timeout).", !apps.containsKey(appId)); LOG.info("app is removed from scheduler, " + appId); + drainEventsImplicitly(); + } + + private void drainEventsImplicitly() { + if (!disableDrainEventsImplicitly) { + drainEvents(); + } + } + + public void disableDrainEventsImplicitly() { + disableDrainEventsImplicitly = true; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index a7d8ba2..88d0c0c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -168,17 +168,6 @@ public void testContainerCleanup() throws Exception { final DrainDispatcher dispatcher = new DrainDispatcher(); MockRM rm = new MockRM() { @Override - protected EventHandler createSchedulerEventDispatcher() { - return new EventDispatcher(this.scheduler, - this.scheduler.getClass().getName()) { - @Override - public void handle(SchedulerEvent event) { - super.handle(event); - } - }; - } - - @Override protected Dispatcher createDispatcher() { return dispatcher; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 3482af2..0682831 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -196,8 +196,8 @@ public void testAMLaunchAndCleanup() throws Exception { Assert.assertEquals(YarnConfiguration.DEFAULT_QUEUE_NAME, containerManager.queueName); - MockAM am = new MockAM(rm.getRMContext(), rm - .getApplicationMasterService(), appAttemptId); + MockAM am = new MockAM(rm, rm.getApplicationMasterService(), + appAttemptId); am.registerAppAttempt(); am.unregisterAppAttempt(); @@ -266,7 +266,7 @@ protected Dispatcher createDispatcher() { nm1.nodeHeartbeat(true); dispatcher.await(); - MockRM.waitForState(app.getCurrentAppAttempt(), + rm.waitForState(app.getCurrentAppAttempt(), RMAppAttemptState.LAUNCHED, 500); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java index ef6d43b..515df85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java @@ -118,7 +118,7 @@ public void testNodeBlacklistingOnAMFailure() throws Exception { dispatcher.await(); // Now the AM container should be allocated - MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); + rm.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); MockAM am2 = rm.sendAMLaunched(attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); @@ -171,7 +171,7 @@ public void testNoBlacklistingForNonSystemErrors() throws Exception { RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm); node.nodeHeartbeat(true); dispatcher.await(); - MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); + rm.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); rm.sendAMLaunched(attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); ApplicationAttemptId appAttemptId = @@ -199,7 +199,7 @@ public void testNoBlacklistingForNonSystemErrors() throws Exception { node.nodeHeartbeat(true); dispatcher.await(); - MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); + rm.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); rm.sendAMLaunched(attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); @@ -229,17 +229,6 @@ private MockRM startRM(YarnConfiguration conf, MockRM rm1 = new MockRM(conf, memStore) { @Override - protected EventHandler createSchedulerEventDispatcher() { - return new EventDispatcher(this.scheduler, - this.scheduler.getClass().getName()) { - @Override - public void handle(SchedulerEvent event) { - super.handle(event); - } - }; - } - - @Override protected Dispatcher createDispatcher() { return dispatcher; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java index ee6d51a..7ced7bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.junit.Before; import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.doNothing; @@ -558,7 +559,7 @@ public void testInvalidatedAMHostPortOnAMRestart() throws Exception { @Test (timeout = 60000) public void testApplicationKillAtAcceptedState() throws Exception { - final Dispatcher dispatcher = new AsyncDispatcher() { + final Dispatcher dispatcher = new DrainDispatcher() { @Override public EventHandler getEventHandler() { @@ -639,7 +640,7 @@ protected Dispatcher createDispatcher() { public void testKillFinishingApp() throws Exception{ // this dispatcher ignores RMAppAttemptEventType.KILL event - final Dispatcher dispatcher = new AsyncDispatcher() { + final Dispatcher dispatcher = new DrainDispatcher() { @Override public EventHandler getEventHandler() { @@ -693,7 +694,7 @@ protected Dispatcher createDispatcher() { public void testKillFailingApp() throws Exception{ // this dispatcher ignores RMAppAttemptEventType.KILL event - final Dispatcher dispatcher = new AsyncDispatcher() { + final Dispatcher dispatcher = new DrainDispatcher() { @Override public EventHandler getEventHandler() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 1a79479..01d0bd4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1575,6 +1575,7 @@ protected void handleStoreEvent(RMStateStoreEvent event) { // start RM final MockRM rm1 = createMockRM(conf, memStore); + rm1.disableDrainEventsImplicitly(); rm1.start(); // create apps. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 098ba54..6e6cfd4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -1071,18 +1071,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { @Test public void testReconnectNode() throws Exception { - rm = new MockRM() { - @Override - protected EventHandler createSchedulerEventDispatcher() { - return new EventDispatcher(this.scheduler, - this.scheduler.getClass().getName()) { - @Override - public void handle(SchedulerEvent event) { - scheduler.handle(event); - } - }; - } - }; + rm = new MockRM(); rm.start(); MockNM nm1 = rm.registerNode("host1:1234", 5120); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index 4329033..d9d35c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -64,16 +64,6 @@ public void init(Configuration conf) { "1.0"); super.init(conf); } - @Override - protected EventHandler createSchedulerEventDispatcher() { - return new EventDispatcher(this.scheduler, - this.scheduler.getClass().getName()) { - @Override - public void handle(SchedulerEvent event) { - super.handle(event); - } - }; - } @Override protected Dispatcher createDispatcher() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java index d36fb9f..a55562d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java @@ -217,7 +217,7 @@ protected void doSecureLogin() throws IOException { ApplicationAttemptId appAttempt = app.getCurrentAppAttempt().getAppAttemptId(); final MockAM mockAM = - new MockAM(rm.getRMContext(), rm.getApplicationMasterService(), + new MockAM(rm, rm.getApplicationMasterService(), app.getCurrentAppAttempt().getAppAttemptId()); UserGroupInformation appUgi = UserGroupInformation.createRemoteUser(appAttempt.toString()); @@ -457,7 +457,7 @@ protected void doSecureLogin() throws IOException { ApplicationAttemptId appAttempt = app.getCurrentAppAttempt().getAppAttemptId(); final MockAM mockAM = - new MockAM(rm.getRMContext(), rm.getApplicationMasterService(), + new MockAM(rm, rm.getApplicationMasterService(), app.getCurrentAppAttempt().getAppAttemptId()); UserGroupInformation appUgi = UserGroupInformation.createRemoteUser(appAttempt.toString());