diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index f5361c8..f690f0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -302,7 +302,7 @@ protected boolean isEventThreadWaiting() { } @VisibleForTesting - protected boolean isDrained() { + public boolean isDrained() { return this.drained; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index de273c4..18a2ed8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -660,7 +660,7 @@ public void setRMDispatcher(Dispatcher dispatcher) { @Override protected void serviceInit(Configuration conf) throws Exception{ // create async handler - dispatcher = new AsyncDispatcher(); + dispatcher = createRMStateStoreDispatcher(); dispatcher.init(conf); dispatcher.register(RMStateStoreEventType.class, new ForwardingEventHandler()); @@ -674,6 +674,10 @@ protected void serviceStart() throws Exception { startInternal(); } + public AsyncDispatcher createRMStateStoreDispatcher() { + return new AsyncDispatcher(); + } + /** * Derived classes initialize themselves using this method. */ @@ -1192,4 +1196,8 @@ public RMStateStoreState getRMStateStoreState() { this.readLock.unlock(); } } + + public AsyncDispatcher getRMStateStoreDispatcher() { + return dispatcher; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index dc17f5f..0c749bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -52,6 +52,7 @@ private volatile int responseId = 0; private final ApplicationAttemptId attemptId; + private MockRM rm; private RMContext context; private ApplicationMasterProtocol amRMProtocol; private UserGroupInformation ugi; @@ -60,9 +61,10 @@ private final List requests = new ArrayList(); private final List releases = new ArrayList(); - public MockAM(RMContext context, ApplicationMasterProtocol amRMProtocol, + public MockAM(MockRM rm, ApplicationMasterProtocol amRMProtocol, ApplicationAttemptId attemptId) { - this.context = context; + this.rm = rm; + this.context = rm.getRMContext(); this.amRMProtocol = amRMProtocol; this.attemptId = attemptId; } @@ -84,7 +86,7 @@ private void waitForState(RMAppAttemptState finalState) throws InterruptedException { RMApp app = context.getRMApps().get(attemptId.getApplicationId()); RMAppAttempt attempt = app.getRMAppAttempt(attemptId); - MockRM.waitForState(attempt, finalState); + rm.waitForState(attempt, finalState); } public RegisterApplicationMasterResponse registerAppAttempt() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index a3e0f9a..9c10ce3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -63,8 +63,10 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; @@ -91,6 +93,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -115,6 +119,8 @@ private static final int WAIT_MS_PER_LOOP = 10; private final boolean useNullRMNodeLabelsManager; + private boolean disableDrainEventsImplicitly; + private AsyncDispatcher rmStateStoredispatcher; public MockRM() { this(new YarnConfiguration()); @@ -136,10 +142,13 @@ public MockRM(Configuration conf, RMStateStore store, if(store != null) { setRMStateStore(store); } + rmStateStoredispatcher = + getRMContext().getStateStore().getRMStateStoreDispatcher(); Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); + disableDrainEventsImplicitly = false; } - + @Override protected RMNodeLabelsManager createNodeLabelManager() throws InstantiationException, IllegalAccessException { @@ -157,10 +166,27 @@ protected Dispatcher createDispatcher() { return new DrainDispatcher(); } + @Override + protected EventHandler createSchedulerEventDispatcher() { + return new EventHandler() { + @Override + public void handle(SchedulerEvent event) { + scheduler.handle(event); + } + }; + } + + private void drainRMStateStoreEvents() { + while (!rmStateStoredispatcher.isDrained()) { + Thread.yield(); + } + } + public void drainEvents() { Dispatcher rmDispatcher = getRmDispatcher(); if (rmDispatcher instanceof DrainDispatcher) { ((DrainDispatcher) rmDispatcher).await(); + drainRMStateStoreEvents(); } else { throw new UnsupportedOperationException("Not a Drain Dispatcher!"); } @@ -193,7 +219,8 @@ public void waitForState(ApplicationId appId, RMAppState finalState) LOG.info("App State is : " + app.getState()); Assert.assertEquals("App State is not correct (timeout).", finalState, - app.getState()); + app.getState()); + drainEventsImplicitly(); } /** @@ -224,7 +251,7 @@ public void waitForState(ApplicationAttemptId attemptId, RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId()); Assert.assertNotNull("app shouldn't be null", app); RMAppAttempt attempt = app.getRMAppAttempt(attemptId); - MockRM.waitForState(attempt, finalState, timeoutMsecs); + waitForState(attempt, finalState, timeoutMsecs); } /** @@ -235,7 +262,7 @@ public void waitForState(ApplicationAttemptId attemptId, * @throws InterruptedException * if interrupted while waiting for the state transition */ - public static void waitForState(RMAppAttempt attempt, + public void waitForState(RMAppAttempt attempt, RMAppAttemptState finalState) throws InterruptedException { waitForState(attempt, finalState, TIMEOUT_MS_FOR_ATTEMPT); } @@ -249,7 +276,7 @@ public static void waitForState(RMAppAttempt attempt, * @throws InterruptedException * if interrupted while waiting for the state transition */ - public static void waitForState(RMAppAttempt attempt, + public void waitForState(RMAppAttempt attempt, RMAppAttemptState finalState, int timeoutMsecs) throws InterruptedException { int timeWaiting = 0; @@ -266,7 +293,8 @@ public static void waitForState(RMAppAttempt attempt, LOG.info("Attempt State is : " + attempt.getAppAttemptState()); Assert.assertEquals("Attempt state is not correct (timeout).", finalState, - attempt.getState()); + attempt.getState()); + drainEventsImplicitly(); } public void waitForContainerToComplete(RMAppAttempt attempt, @@ -277,6 +305,7 @@ public void waitForContainerToComplete(RMAppAttempt attempt, for (ContainerStatus container : containers) { if (container.getContainerId().equals( completedContainer.getContainerId())) { + drainEventsImplicitly(); return; } } @@ -329,7 +358,7 @@ public boolean waitForState(MockNM nm, ContainerId containerId, public boolean waitForState(MockNM nm, ContainerId containerId, RMContainerState containerState, int timeoutMsecs) throws Exception { return waitForState(Arrays.asList(nm), containerId, containerState, - timeoutMsecs); + timeoutMsecs); } /** @@ -346,7 +375,7 @@ public boolean waitForState(MockNM nm, ContainerId containerId, public boolean waitForState(Collection nms, ContainerId containerId, RMContainerState containerState) throws Exception { return waitForState(nms, containerId, containerState, - TIMEOUT_MS_FOR_CONTAINER_AND_NODE); + TIMEOUT_MS_FOR_CONTAINER_AND_NODE); } /** @@ -395,6 +424,7 @@ public boolean waitForState(Collection nms, ContainerId containerId, } System.out.println("Container State is : " + container.getState()); + drainEventsImplicitly(); return true; } @@ -657,13 +687,14 @@ public SubmitApplicationResponse run() { RMAppAttemptState.SCHEDULED); } + drainEventsImplicitly(); return rmApp; } public MockNM registerNode(String nodeIdStr, int memory) throws Exception { MockNM nm = new MockNM(nodeIdStr, memory, getResourceTrackerService()); nm.registerNode(); - drainEvents(); + drainEventsImplicitly(); return nm; } @@ -672,7 +703,7 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores) MockNM nm = new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService()); nm.registerNode(); - drainEvents(); + drainEventsImplicitly(); return nm; } @@ -682,7 +713,7 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores, new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService(), YarnVersionInfo.getVersion()); nm.registerNode(runningApplications); - drainEvents(); + drainEventsImplicitly(); return nm; } @@ -690,12 +721,14 @@ public void sendNodeStarted(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null)); + drainEventsImplicitly(); } public void sendNodeLost(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.EXPIRE)); + drainEventsImplicitly(); } /** @@ -725,12 +758,15 @@ public void waitForState(NodeId nodeId, NodeState finalState) System.out.println("Node State is : " + node.getState()); Assert.assertEquals("Node state is not correct (timedout)", finalState, node.getState()); + drainEventsImplicitly(); } public KillApplicationResponse killApp(ApplicationId appId) throws Exception { ApplicationClientProtocol client = getClientRMService(); KillApplicationRequest req = KillApplicationRequest.newInstance(appId); - return client.forceKillApplication(req); + KillApplicationResponse response = client.forceKillApplication(req); + drainEventsImplicitly(); + return response; } public FailApplicationAttemptResponse failApplicationAttempt( @@ -738,7 +774,10 @@ public FailApplicationAttemptResponse failApplicationAttempt( ApplicationClientProtocol client = getClientRMService(); FailApplicationAttemptRequest req = FailApplicationAttemptRequest.newInstance(attemptId); - return client.failApplicationAttempt(req); + FailApplicationAttemptResponse response = + client.failApplicationAttempt(req); + drainEventsImplicitly(); + return response; } /** @@ -749,7 +788,7 @@ public FailApplicationAttemptResponse failApplicationAttempt( */ public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId) throws Exception { - MockAM am = new MockAM(getRMContext(), masterService, appAttemptId); + MockAM am = new MockAM(this, masterService, appAttemptId); waitForState(appAttemptId, RMAppAttemptState.ALLOCATED); //create and set AMRMToken Token amrmToken = @@ -763,16 +802,18 @@ public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId) .getEventHandler() .handle( new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.LAUNCHED)); + drainEventsImplicitly(); return am; } public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId) throws Exception { - MockAM am = new MockAM(getRMContext(), masterService, appAttemptId); + MockAM am = new MockAM(this, masterService, appAttemptId); waitForState(am.getApplicationAttemptId(), RMAppAttemptState.ALLOCATED); getRMContext().getDispatcher().getEventHandler() .handle(new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.LAUNCH_FAILED, "Failed")); + drainEventsImplicitly(); } @Override @@ -923,6 +964,7 @@ public static void finishAMAndVerifyAppState(RMApp rmApp, MockRM rm, MockNM nm, nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE); rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED); rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED); + rm.drainEvents(); } @SuppressWarnings("rawtypes") @@ -942,6 +984,7 @@ private static void waitForSchedulerAppAttemptAdded( Assert.assertNotNull("Timed out waiting for SchedulerApplicationAttempt=" + attemptId + " to be added.", ((AbstractYarnScheduler) rm.getResourceScheduler()).getApplicationAttempt(attemptId)); + rm.drainEvents(); } /** @@ -950,25 +993,29 @@ private static void waitForSchedulerAppAttemptAdded( */ public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) throws Exception { + rm.drainEvents(); RMAppAttempt attempt = waitForAttemptScheduled(app, rm); System.out.println("Launch AM " + attempt.getAppAttemptId()); nm.nodeHeartbeat(true); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); + rm.drainEvents(); return am; } public static MockAM launchUAM(RMApp app, MockRM rm, MockNM nm) throws Exception { + rm.drainEvents(); // UAMs go directly to LAUNCHED state rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); RMAppAttempt attempt = app.getCurrentAppAttempt(); waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm); System.out.println("Launch AM " + attempt.getAppAttemptId()); nm.nodeHeartbeat(true); - MockAM am = new MockAM(rm.getRMContext(), rm.masterService, + MockAM am = new MockAM(rm, rm.masterService, attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); + rm.drainEvents(); return am; } @@ -978,6 +1025,7 @@ public static RMAppAttempt waitForAttemptScheduled(RMApp app, MockRM rm) RMAppAttempt attempt = app.getCurrentAppAttempt(); waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED); + rm.drainEvents(); return attempt; } @@ -986,6 +1034,7 @@ public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm) MockAM am = launchAM(app, rm, nm); am.registerAppAttempt(); rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); + rm.drainEvents(); return am; } @@ -1002,6 +1051,7 @@ public void updateReservationState(ReservationUpdateRequest request) throws IOException, YarnException { ApplicationClientProtocol client = getClientRMService(); client.updateReservation(request); + drainEventsImplicitly(); } // Explicitly reset queue metrics for testing. @@ -1022,9 +1072,9 @@ public void signalToContainer(ContainerId containerId, SignalContainerRequest req = SignalContainerRequest.newInstance(containerId, command); client.signalToContainer(req); + drainEventsImplicitly(); } - /** * Wait until an app removed from scheduler. * The timeout is 40 seconds. @@ -1062,5 +1112,16 @@ public void waitForAppRemovedFromScheduler(ApplicationId appId, Assert.assertTrue("app is not removed from scheduler (timeout).", !apps.containsKey(appId)); LOG.info("app is removed from scheduler, " + appId); + drainEventsImplicitly(); + } + + private void drainEventsImplicitly() { + if (!disableDrainEventsImplicitly) { + drainEvents(); + } + } + + public void disableDrainEventsImplicitly() { + disableDrainEventsImplicitly = true; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index a7d8ba2..a8d2957 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -42,9 +42,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; -import org.apache.hadoop.yarn.event.Event; -import org.apache.hadoop.yarn.event.EventDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; @@ -53,7 +50,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -168,17 +164,6 @@ public void testContainerCleanup() throws Exception { final DrainDispatcher dispatcher = new DrainDispatcher(); MockRM rm = new MockRM() { @Override - protected EventHandler createSchedulerEventDispatcher() { - return new EventDispatcher(this.scheduler, - this.scheduler.getClass().getName()) { - @Override - public void handle(SchedulerEvent event) { - super.handle(event); - } - }; - } - - @Override protected Dispatcher createDispatcher() { return dispatcher; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 3482af2..0682831 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -196,8 +196,8 @@ public void testAMLaunchAndCleanup() throws Exception { Assert.assertEquals(YarnConfiguration.DEFAULT_QUEUE_NAME, containerManager.queueName); - MockAM am = new MockAM(rm.getRMContext(), rm - .getApplicationMasterService(), appAttemptId); + MockAM am = new MockAM(rm, rm.getApplicationMasterService(), + appAttemptId); am.registerAppAttempt(); am.unregisterAppAttempt(); @@ -266,7 +266,7 @@ protected Dispatcher createDispatcher() { nm1.nodeHeartbeat(true); dispatcher.await(); - MockRM.waitForState(app.getCurrentAppAttempt(), + rm.waitForState(app.getCurrentAppAttempt(), RMAppAttemptState.LAUNCHED, 500); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java index ef6d43b..fbb6102 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java @@ -31,8 +31,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; -import org.apache.hadoop.yarn.event.EventDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.TestAMRestart; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -43,7 +41,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; @@ -118,7 +115,7 @@ public void testNodeBlacklistingOnAMFailure() throws Exception { dispatcher.await(); // Now the AM container should be allocated - MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); + rm.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); MockAM am2 = rm.sendAMLaunched(attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); @@ -171,7 +168,7 @@ public void testNoBlacklistingForNonSystemErrors() throws Exception { RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm); node.nodeHeartbeat(true); dispatcher.await(); - MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); + rm.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); rm.sendAMLaunched(attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); ApplicationAttemptId appAttemptId = @@ -199,7 +196,7 @@ public void testNoBlacklistingForNonSystemErrors() throws Exception { node.nodeHeartbeat(true); dispatcher.await(); - MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); + rm.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); rm.sendAMLaunched(attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); @@ -229,17 +226,6 @@ private MockRM startRM(YarnConfiguration conf, MockRM rm1 = new MockRM(conf, memStore) { @Override - protected EventHandler createSchedulerEventDispatcher() { - return new EventDispatcher(this.scheduler, - this.scheduler.getClass().getName()) { - @Override - public void handle(SchedulerEvent event) { - super.handle(event); - } - }; - } - - @Override protected Dispatcher createDispatcher() { return dispatcher; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java index ee6d51a..a7057b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.junit.Before; import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.doNothing; @@ -56,7 +57,6 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -558,7 +558,7 @@ public void testInvalidatedAMHostPortOnAMRestart() throws Exception { @Test (timeout = 60000) public void testApplicationKillAtAcceptedState() throws Exception { - final Dispatcher dispatcher = new AsyncDispatcher() { + final Dispatcher dispatcher = new DrainDispatcher() { @Override public EventHandler getEventHandler() { @@ -639,7 +639,7 @@ protected Dispatcher createDispatcher() { public void testKillFinishingApp() throws Exception{ // this dispatcher ignores RMAppAttemptEventType.KILL event - final Dispatcher dispatcher = new AsyncDispatcher() { + final Dispatcher dispatcher = new DrainDispatcher() { @Override public EventHandler getEventHandler() { @@ -693,7 +693,7 @@ protected Dispatcher createDispatcher() { public void testKillFailingApp() throws Exception{ // this dispatcher ignores RMAppAttemptEventType.KILL event - final Dispatcher dispatcher = new AsyncDispatcher() { + final Dispatcher dispatcher = new DrainDispatcher() { @Override public EventHandler getEventHandler() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index b271b37..5b52e7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1575,6 +1575,7 @@ protected void handleStoreEvent(RMStateStoreEvent event) { // start RM final MockRM rm1 = createMockRM(conf, memStore); + rm1.disableDrainEventsImplicitly(); rm1.start(); // create apps. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index 4329033..c8baa60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -21,7 +21,6 @@ import java.security.PrivilegedExceptionAction; import java.util.List; -import org.apache.hadoop.yarn.event.EventDispatcher; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -34,7 +33,6 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; @@ -42,7 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.junit.After; import org.junit.Before; @@ -64,16 +61,6 @@ public void init(Configuration conf) { "1.0"); super.init(conf); } - @Override - protected EventHandler createSchedulerEventDispatcher() { - return new EventDispatcher(this.scheduler, - this.scheduler.getClass().getName()) { - @Override - public void handle(SchedulerEvent event) { - super.handle(event); - } - }; - } @Override protected Dispatcher createDispatcher() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java index d36fb9f..a55562d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java @@ -217,7 +217,7 @@ protected void doSecureLogin() throws IOException { ApplicationAttemptId appAttempt = app.getCurrentAppAttempt().getAppAttemptId(); final MockAM mockAM = - new MockAM(rm.getRMContext(), rm.getApplicationMasterService(), + new MockAM(rm, rm.getApplicationMasterService(), app.getCurrentAppAttempt().getAppAttemptId()); UserGroupInformation appUgi = UserGroupInformation.createRemoteUser(appAttempt.toString()); @@ -457,7 +457,7 @@ protected void doSecureLogin() throws IOException { ApplicationAttemptId appAttempt = app.getCurrentAppAttempt().getAppAttemptId(); final MockAM mockAM = - new MockAM(rm.getRMContext(), rm.getApplicationMasterService(), + new MockAM(rm, rm.getApplicationMasterService(), app.getCurrentAppAttempt().getAppAttemptId()); UserGroupInformation appUgi = UserGroupInformation.createRemoteUser(appAttempt.toString());