diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index f769492..1369465 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.yarn.event; +import org.apache.hadoop.conf.Configuration; + import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -37,6 +39,13 @@ public DrainDispatcher(BlockingQueue eventQueue) { this.mutex = this; } + @Override + public void serviceInit(Configuration conf) + throws Exception { + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, false); + super.serviceInit(conf); + } + /** * Wait till event thread enters WAITING state (i.e. waiting for new events). */ @@ -50,7 +59,7 @@ public void waitForEventThreadToWait() { * Busy loop waiting for all queued events to drain. */ public void await() { - while (!drained) { + while (!isDrained()) { Thread.yield(); } } @@ -96,7 +105,9 @@ public void handle(Event event) { @Override protected boolean isDrained() { - return drained; + synchronized (mutex) { + return drained; + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index fc12522..adf4ab9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -670,14 +670,18 @@ public void setRMDispatcher(Dispatcher dispatcher) { } AsyncDispatcher dispatcher; + @SuppressWarnings("rawtypes") + @VisibleForTesting + protected EventHandler rmStateStoreEventHandler; @Override protected void serviceInit(Configuration conf) throws Exception{ // create async handler dispatcher = new AsyncDispatcher(); dispatcher.init(conf); + rmStateStoreEventHandler = new ForwardingEventHandler(); dispatcher.register(RMStateStoreEventType.class, - new ForwardingEventHandler()); + rmStateStoreEventHandler); dispatcher.setDrainEventsOnStop(); initInternal(conf); } @@ -789,12 +793,12 @@ public void storeNewApplication(RMApp app) { ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(), context, app.getUser(), app.getCallerContext()); appState.setApplicationTimeouts(app.getApplicationTimeouts()); - dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); + getRMStateStoreEventHandler().handle(new RMStateStoreAppEvent(appState)); } @SuppressWarnings("unchecked") public void updateApplicationState(ApplicationStateData appState) { - dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState)); + getRMStateStoreEventHandler().handle(new RMStateUpdateAppEvent(appState)); } public void updateApplicationStateSynchronously(ApplicationStateData appState, @@ -841,14 +845,14 @@ public void storeNewApplicationAttempt(RMAppAttempt appAttempt) { attempMetrics.getPreemptedVcore() ); - dispatcher.getEventHandler().handle( + getRMStateStoreEventHandler().handle( new RMStateStoreAppAttemptEvent(attemptState)); } @SuppressWarnings("unchecked") public void updateApplicationAttemptState( ApplicationAttemptStateData attemptState) { - dispatcher.getEventHandler().handle( + getRMStateStoreEventHandler().handle( new RMStateUpdateAppAttemptEvent(attemptState)); } @@ -1020,7 +1024,7 @@ public void removeApplication(RMApp app) { appState.attempts.put(appAttempt.getAppAttemptId(), null); } - dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState)); + getRMStateStoreEventHandler().handle(new RMStateStoreRemoveAppEvent(appState)); } /** @@ -1041,7 +1045,7 @@ protected abstract void removeApplicationStateInternal( @SuppressWarnings("unchecked") public synchronized void removeApplicationAttempt( ApplicationAttemptId applicationAttemptId) { - dispatcher.getEventHandler().handle( + getRMStateStoreEventHandler().handle( new RMStateStoreRemoveAppAttemptEvent(applicationAttemptId)); } @@ -1210,4 +1214,9 @@ public RMStateStoreState getRMStateStoreState() { this.readLock.unlock(); } } + + @SuppressWarnings("rawtypes") + protected EventHandler getRMStateStoreEventHandler() { + return dispatcher.getEventHandler(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 3861624..7dfbb02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; @@ -74,6 +75,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -93,6 +96,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -117,6 +121,7 @@ private static final int WAIT_MS_PER_LOOP = 10; private final boolean useNullRMNodeLabelsManager; + private boolean disableDrainEventsImplicitly; public MockRM() { this(new YarnConfiguration()); @@ -135,13 +140,41 @@ public MockRM(Configuration conf, RMStateStore store, super(); this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager; init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); - if(store != null) { + if (store != null) { setRMStateStore(store); + } else { + Class storeClass = getRMContext().getStateStore().getClass(); + if (storeClass.equals(MemoryRMStateStore.class)) { + MockRMMemoryStateStore mockStateStore = new MockRMMemoryStateStore(); + mockStateStore.init(conf); + setRMStateStore(mockStateStore); + } else if (storeClass.equals(NullRMStateStore.class)) { + MockRMNullStateStore mockStateStore = new MockRMNullStateStore(); + mockStateStore.init(conf); + setRMStateStore(mockStateStore); + } } Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); + disableDrainEventsImplicitly = false; } - + + public class MockRMMemoryStateStore extends MemoryRMStateStore { + @SuppressWarnings("rawtypes") + @Override + protected EventHandler getRMStateStoreEventHandler() { + return rmStateStoreEventHandler; + } + } + + public class MockRMNullStateStore extends NullRMStateStore { + @SuppressWarnings("rawtypes") + @Override + protected EventHandler getRMStateStoreEventHandler() { + return rmStateStoreEventHandler; + } + } + @Override protected RMNodeLabelsManager createNodeLabelManager() throws InstantiationException, IllegalAccessException { @@ -159,6 +192,16 @@ protected Dispatcher createDispatcher() { return new DrainDispatcher(); } + @Override + protected EventHandler createSchedulerEventDispatcher() { + return new EventHandler() { + @Override + public void handle(SchedulerEvent event) { + scheduler.handle(event); + } + }; + } + public void drainEvents() { Dispatcher rmDispatcher = getRmDispatcher(); if (rmDispatcher instanceof DrainDispatcher) { @@ -218,6 +261,7 @@ public void waitForState(ApplicationId appId, RMAppState finalState) LOG.info("App State is : " + app.getState()); Assert.assertEquals("App State is not correct (timeout).", finalState, app.getState()); + drainEventsImplicitly(); } /** @@ -249,6 +293,7 @@ public void waitForState(ApplicationAttemptId attemptId, Assert.assertNotNull("app shouldn't be null", app); RMAppAttempt attempt = app.getRMAppAttempt(attemptId); MockRM.waitForState(attempt, finalState, timeoutMsecs); + drainEventsImplicitly(); } /** @@ -302,6 +347,7 @@ public void waitForContainerToComplete(RMAppAttempt attempt, for (ContainerStatus container : containers) { if (container.getContainerId().equals( completedContainer.getContainerId())) { + drainEventsImplicitly(); return; } } @@ -425,6 +471,7 @@ public boolean waitForState(Collection nms, ContainerId containerId, timeWaiting += WAIT_MS_PER_LOOP; } + drainEventsImplicitly(); LOG.info("Container State is : " + container.getState()); return true; } @@ -698,7 +745,7 @@ public SubmitApplicationResponse run() { public MockNM registerNode(String nodeIdStr, int memory) throws Exception { MockNM nm = new MockNM(nodeIdStr, memory, getResourceTrackerService()); nm.registerNode(); - drainEvents(); + drainEventsImplicitly(); return nm; } @@ -707,7 +754,7 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores) MockNM nm = new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService()); nm.registerNode(); - drainEvents(); + drainEventsImplicitly(); return nm; } @@ -717,7 +764,7 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores, new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService(), YarnVersionInfo.getVersion()); nm.registerNode(runningApplications); - drainEvents(); + drainEventsImplicitly(); return nm; } @@ -725,12 +772,14 @@ public void sendNodeStarted(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null)); + drainEventsImplicitly(); } public void sendNodeLost(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.EXPIRE)); + drainEventsImplicitly(); } /** @@ -763,6 +812,7 @@ public void waitForState(NodeId nodeId, NodeState finalState) LOG.info("Node " + nodeId + " State is : " + node.getState()); Assert.assertEquals("Node state is not correct (timedout)", finalState, node.getState()); + drainEventsImplicitly(); } public void sendNodeEvent(MockNM nm, RMNodeEventType event) throws Exception { @@ -774,7 +824,9 @@ public void sendNodeEvent(MockNM nm, RMNodeEventType event) throws Exception { public KillApplicationResponse killApp(ApplicationId appId) throws Exception { ApplicationClientProtocol client = getClientRMService(); KillApplicationRequest req = KillApplicationRequest.newInstance(appId); - return client.forceKillApplication(req); + KillApplicationResponse response = client.forceKillApplication(req); + drainEventsImplicitly(); + return response; } public FailApplicationAttemptResponse failApplicationAttempt( @@ -782,7 +834,10 @@ public FailApplicationAttemptResponse failApplicationAttempt( ApplicationClientProtocol client = getClientRMService(); FailApplicationAttemptRequest req = FailApplicationAttemptRequest.newInstance(attemptId); - return client.failApplicationAttempt(req); + FailApplicationAttemptResponse response = + client.failApplicationAttempt(req); + drainEventsImplicitly(); + return response; } /** @@ -807,6 +862,7 @@ public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId) .getEventHandler() .handle( new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.LAUNCHED)); + drainEventsImplicitly(); return am; } @@ -817,6 +873,7 @@ public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId) getRMContext().getDispatcher().getEventHandler() .handle(new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.LAUNCH_FAILED, "Failed")); + drainEventsImplicitly(); } @Override @@ -968,6 +1025,7 @@ public static void finishAMAndVerifyAppState(RMApp rmApp, MockRM rm, MockNM nm, nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE); rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED); rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED); + rm.drainEventsImplicitly(); } @SuppressWarnings("rawtypes") @@ -987,6 +1045,7 @@ private static void waitForSchedulerAppAttemptAdded( Assert.assertNotNull("Timed out waiting for SchedulerApplicationAttempt=" + attemptId + " to be added.", ((AbstractYarnScheduler) rm.getResourceScheduler()).getApplicationAttempt(attemptId)); + rm.drainEventsImplicitly(); } public static MockAM launchAMWhenAsyncSchedulingEnabled(RMApp app, MockRM rm) @@ -1015,6 +1074,7 @@ public static MockAM launchAMWhenAsyncSchedulingEnabled(RMApp app, MockRM rm) */ public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) throws Exception { + rm.drainEventsImplicitly(); RMAppAttempt attempt = waitForAttemptScheduled(app, rm); LOG.info("Launch AM " + attempt.getAppAttemptId()); nm.nodeHeartbeat(true); @@ -1025,6 +1085,7 @@ public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) public static MockAM launchUAM(RMApp app, MockRM rm, MockNM nm) throws Exception { + rm.drainEventsImplicitly(); // UAMs go directly to LAUNCHED state rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); RMAppAttempt attempt = app.getCurrentAppAttempt(); @@ -1067,6 +1128,7 @@ public void updateReservationState(ReservationUpdateRequest request) throws IOException, YarnException { ApplicationClientProtocol client = getClientRMService(); client.updateReservation(request); + drainEventsImplicitly(); } // Explicitly reset queue metrics for testing. @@ -1087,6 +1149,7 @@ public void signalToContainer(ContainerId containerId, SignalContainerRequest req = SignalContainerRequest.newInstance(containerId, command); client.signalToContainer(req); + drainEventsImplicitly(); } /** @@ -1114,6 +1177,21 @@ public void waitForAppRemovedFromScheduler(ApplicationId appId) Assert.assertTrue("app is not removed from scheduler (timeout).", !apps.containsKey(appId)); LOG.info("app is removed from scheduler, " + appId); + drainEventsImplicitly(); + } + + private void drainEventsImplicitly() { + if (!disableDrainEventsImplicitly) { + drainEvents(); + } + } + + public void disableDrainEventsImplicitly() { + disableDrainEventsImplicitly = true; + } + + public void enableDrainEventsImplicityly() { + disableDrainEventsImplicitly = false; } public RMApp submitApp(int masterMemory, Priority priority, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index 7c02264..c4197a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -42,9 +42,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; -import org.apache.hadoop.yarn.event.Event; -import org.apache.hadoop.yarn.event.EventDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; @@ -53,7 +50,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -168,17 +164,6 @@ public void testContainerCleanup() throws Exception { final DrainDispatcher dispatcher = new DrainDispatcher(); MockRM rm = new MockRM() { @Override - protected EventHandler createSchedulerEventDispatcher() { - return new EventDispatcher(this.scheduler, - this.scheduler.getClass().getName()) { - @Override - public void handle(SchedulerEvent event) { - super.handle(event); - } - }; - } - - @Override protected Dispatcher createDispatcher() { return dispatcher; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java index 7a24b7a..c80a799 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java @@ -31,8 +31,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; -import org.apache.hadoop.yarn.event.EventDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.TestAMRestart; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -43,7 +41,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; @@ -245,17 +242,6 @@ private MockRM startRM(YarnConfiguration conf, MockRM rm1 = new MockRM(conf, memStore) { @Override - protected EventHandler createSchedulerEventDispatcher() { - return new EventDispatcher(this.scheduler, - this.scheduler.getClass().getName()) { - @Override - public void handle(SchedulerEvent event) { - super.handle(event); - } - }; - } - - @Override protected Dispatcher createDispatcher() { return dispatcher; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java index 61fd884..d84c77d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.junit.Before; import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.doNothing; @@ -56,7 +57,6 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -559,7 +559,7 @@ public void testInvalidatedAMHostPortOnAMRestart() throws Exception { @Test (timeout = 60000) public void testApplicationKillAtAcceptedState() throws Exception { - final Dispatcher dispatcher = new AsyncDispatcher() { + final Dispatcher dispatcher = new DrainDispatcher() { @Override public EventHandler getEventHandler() { @@ -640,7 +640,7 @@ protected Dispatcher createDispatcher() { public void testKillFinishingApp() throws Exception{ // this dispatcher ignores RMAppAttemptEventType.KILL event - final Dispatcher dispatcher = new AsyncDispatcher() { + final Dispatcher dispatcher = new DrainDispatcher() { @Override public EventHandler getEventHandler() { @@ -694,7 +694,7 @@ protected Dispatcher createDispatcher() { public void testKillFailingApp() throws Exception{ // this dispatcher ignores RMAppAttemptEventType.KILL event - final Dispatcher dispatcher = new AsyncDispatcher() { + final Dispatcher dispatcher = new DrainDispatcher() { @Override public EventHandler getEventHandler() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index a98a124..9223ef3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1577,6 +1577,7 @@ protected void handleStoreEvent(RMStateStoreEvent event) { // start RM final MockRM rm1 = createMockRM(conf, memStore); + rm1.disableDrainEventsImplicitly(); rm1.start(); // create apps. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index 4329033..c8baa60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -21,7 +21,6 @@ import java.security.PrivilegedExceptionAction; import java.util.List; -import org.apache.hadoop.yarn.event.EventDispatcher; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -34,7 +33,6 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; @@ -42,7 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.junit.After; import org.junit.Before; @@ -64,16 +61,6 @@ public void init(Configuration conf) { "1.0"); super.init(conf); } - @Override - protected EventHandler createSchedulerEventDispatcher() { - return new EventDispatcher(this.scheduler, - this.scheduler.getClass().getName()) { - @Override - public void handle(SchedulerEvent event) { - super.handle(event); - } - }; - } @Override protected Dispatcher createDispatcher() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 21d22c3..ffbfec8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -4620,6 +4620,13 @@ public void handle(Event event) { new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName, containerManagerPort, httpPort, rackName, capability, resourceManager); + + // after YARN-5375, scheduler event is processed in rm main dispatcher, + // wait it processed, or may lead dead lock + if (resourceManager instanceof MockRM) { + ((MockRM) resourceManager).drainEvents(); + } + NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() .get(nm.getNodeId())); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java index 3ff8f6a..10aa92a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java @@ -90,6 +90,7 @@ protected void configureServlets() { rm = new MockRM(new Configuration()); rm.getRMContext().getContainerTokenSecretManager().rollMasterKey(); rm.getRMContext().getNMTokenSecretManager().rollMasterKey(); + rm.disableDrainEventsImplicitly(); bind(ResourceManager.class).toInstance(rm); serve("/*").with(GuiceContainer.class); }