diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index f769492..2cc4d07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -17,12 +17,14 @@ */ package org.apache.hadoop.yarn.event; +import org.apache.hadoop.conf.Configuration; + import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @SuppressWarnings("rawtypes") public class DrainDispatcher extends AsyncDispatcher { - private volatile boolean drained = false; + private volatile boolean drained = true; private volatile boolean stopped = false; private final BlockingQueue queue; private final Object mutex; @@ -37,6 +39,13 @@ public DrainDispatcher(BlockingQueue eventQueue) { this.mutex = this; } + @Override + public void serviceInit(Configuration conf) + throws Exception { + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, false); + super.serviceInit(conf); + } + /** * Wait till event thread enters WAITING state (i.e. waiting for new events). */ @@ -50,7 +59,7 @@ public void waitForEventThreadToWait() { * Busy loop waiting for all queued events to drain. */ public void await() { - while (!drained) { + while (!isDrained()) { Thread.yield(); } } @@ -96,7 +105,9 @@ public void handle(Event event) { @Override protected boolean isDrained() { - return drained; + synchronized (mutex) { + return drained; + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index de273c4..5ccca5c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -660,7 +660,7 @@ public void setRMDispatcher(Dispatcher dispatcher) { @Override protected void serviceInit(Configuration conf) throws Exception{ // create async handler - dispatcher = new AsyncDispatcher(); + dispatcher = createStateStoreDispatcher(); dispatcher.init(conf); dispatcher.register(RMStateStoreEventType.class, new ForwardingEventHandler()); @@ -1192,4 +1192,8 @@ public RMStateStoreState getRMStateStoreState() { this.readLock.unlock(); } } + + protected AsyncDispatcher createStateStoreDispatcher() { + return new AsyncDispatcher(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 1b11472..f2d3947 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -53,6 +53,7 @@ private volatile int responseId = 0; private final ApplicationAttemptId attemptId; + private MockRM rm; private RMContext context; private ApplicationMasterProtocol amRMProtocol; private UserGroupInformation ugi; @@ -61,9 +62,10 @@ private final List requests = new ArrayList(); private final List releases = new ArrayList(); - public MockAM(RMContext context, ApplicationMasterProtocol amRMProtocol, + public MockAM(MockRM rm, ApplicationMasterProtocol amRMProtocol, ApplicationAttemptId attemptId) { - this.context = context; + this.rm = rm; + this.context = rm.getRMContext(); this.amRMProtocol = amRMProtocol; this.attemptId = attemptId; } @@ -85,7 +87,7 @@ private void waitForState(RMAppAttemptState finalState) throws InterruptedException { RMApp app = context.getRMApps().get(attemptId.getApplicationId()); RMAppAttempt attempt = app.getRMAppAttempt(attemptId); - MockRM.waitForState(attempt, finalState); + rm.waitForState(attempt, finalState); } public RegisterApplicationMasterResponse registerAppAttempt() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index f843261..ff0da78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -63,8 +63,10 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; @@ -72,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -91,6 +94,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -115,6 +119,8 @@ private static final int WAIT_MS_PER_LOOP = 10; private final boolean useNullRMNodeLabelsManager; + private boolean disableDrainEventsImplicitly; + private DrainDispatcher stateStoreDispatcher; public MockRM() { this(new YarnConfiguration()); @@ -133,13 +139,38 @@ public MockRM(Configuration conf, RMStateStore store, super(); this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager; init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); - if(store != null) { + if (store != null) { setRMStateStore(store); + } else { + if (getRMContext().getStateStore().getClass().equals( + MemoryRMStateStore.class)) { + setRMStateStore(new MockRMMemoryStateStore()); + } + } + + RMStateStore stateStore = getRMContext().getStateStore(); + if (stateStore instanceof MockRMMemoryStateStore) { + stateStoreDispatcher = ((MockRMMemoryStateStore)stateStore).getDispatcher(); } + Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); + disableDrainEventsImplicitly = false; } - + + public class MockRMMemoryStateStore extends MemoryRMStateStore { + DrainDispatcher dispatcher; + @Override + protected AsyncDispatcher createStateStoreDispatcher() { + dispatcher = new DrainDispatcher(); + return dispatcher; + } + + public DrainDispatcher getDispatcher() { + return dispatcher; + } + } + @Override protected RMNodeLabelsManager createNodeLabelManager() throws InstantiationException, IllegalAccessException { @@ -157,10 +188,24 @@ protected Dispatcher createDispatcher() { return new DrainDispatcher(); } + @Override + protected EventHandler createSchedulerEventDispatcher() { + return new EventHandler() { + @Override + public void handle(SchedulerEvent event) { + scheduler.handle(event); + } + }; + } + public void drainEvents() { Dispatcher rmDispatcher = getRmDispatcher(); if (rmDispatcher instanceof DrainDispatcher) { ((DrainDispatcher) rmDispatcher).await(); + if (stateStoreDispatcher != null) { + stateStoreDispatcher.await(); + ((DrainDispatcher) rmDispatcher).await(); + } } else { throw new UnsupportedOperationException("Not a Drain Dispatcher!"); } @@ -193,7 +238,8 @@ public void waitForState(ApplicationId appId, RMAppState finalState) LOG.info("App State is : " + app.getState()); Assert.assertEquals("App State is not correct (timeout).", finalState, - app.getState()); + app.getState()); + drainEventsImplicitly(); } /** @@ -224,7 +270,7 @@ public void waitForState(ApplicationAttemptId attemptId, RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId()); Assert.assertNotNull("app shouldn't be null", app); RMAppAttempt attempt = app.getRMAppAttempt(attemptId); - MockRM.waitForState(attempt, finalState, timeoutMsecs); + waitForState(attempt, finalState, timeoutMsecs); } /** @@ -235,7 +281,7 @@ public void waitForState(ApplicationAttemptId attemptId, * @throws InterruptedException * if interrupted while waiting for the state transition */ - public static void waitForState(RMAppAttempt attempt, + public void waitForState(RMAppAttempt attempt, RMAppAttemptState finalState) throws InterruptedException { waitForState(attempt, finalState, TIMEOUT_MS_FOR_ATTEMPT); } @@ -249,7 +295,7 @@ public static void waitForState(RMAppAttempt attempt, * @throws InterruptedException * if interrupted while waiting for the state transition */ - public static void waitForState(RMAppAttempt attempt, + public void waitForState(RMAppAttempt attempt, RMAppAttemptState finalState, int timeoutMsecs) throws InterruptedException { int timeWaiting = 0; @@ -266,7 +312,8 @@ public static void waitForState(RMAppAttempt attempt, LOG.info("Attempt State is : " + attempt.getAppAttemptState()); Assert.assertEquals("Attempt state is not correct (timeout).", finalState, - attempt.getState()); + attempt.getState()); + drainEventsImplicitly(); } public void waitForContainerToComplete(RMAppAttempt attempt, @@ -277,6 +324,7 @@ public void waitForContainerToComplete(RMAppAttempt attempt, for (ContainerStatus container : containers) { if (container.getContainerId().equals( completedContainer.getContainerId())) { + drainEventsImplicitly(); return; } } @@ -329,7 +377,7 @@ public boolean waitForState(MockNM nm, ContainerId containerId, public boolean waitForState(MockNM nm, ContainerId containerId, RMContainerState containerState, int timeoutMsecs) throws Exception { return waitForState(Arrays.asList(nm), containerId, containerState, - timeoutMsecs); + timeoutMsecs); } /** @@ -346,7 +394,7 @@ public boolean waitForState(MockNM nm, ContainerId containerId, public boolean waitForState(Collection nms, ContainerId containerId, RMContainerState containerState) throws Exception { return waitForState(nms, containerId, containerState, - TIMEOUT_MS_FOR_CONTAINER_AND_NODE); + TIMEOUT_MS_FOR_CONTAINER_AND_NODE); } /** @@ -395,6 +443,7 @@ public boolean waitForState(Collection nms, ContainerId containerId, } System.out.println("Container State is : " + container.getState()); + drainEventsImplicitly(); return true; } @@ -412,7 +461,7 @@ public RMApp submitApp(int masterMemory) throws Exception { public RMApp submitApp(int masterMemory, Priority priority) throws Exception { Resource resource = Resource.newInstance(masterMemory, 0); return submitApp(resource, "", UserGroupInformation.getCurrentUser() - .getShortUserName(), null, false, null, + .getShortUserName(), null, false, null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, false, null, 0, null, true, priority); @@ -657,13 +706,14 @@ public SubmitApplicationResponse run() { RMAppAttemptState.SCHEDULED); } + drainEventsImplicitly(); return rmApp; } public MockNM registerNode(String nodeIdStr, int memory) throws Exception { MockNM nm = new MockNM(nodeIdStr, memory, getResourceTrackerService()); nm.registerNode(); - drainEvents(); + drainEventsImplicitly(); return nm; } @@ -672,7 +722,7 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores) MockNM nm = new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService()); nm.registerNode(); - drainEvents(); + drainEventsImplicitly(); return nm; } @@ -682,7 +732,7 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores, new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService(), YarnVersionInfo.getVersion()); nm.registerNode(runningApplications); - drainEvents(); + drainEventsImplicitly(); return nm; } @@ -690,12 +740,14 @@ public void sendNodeStarted(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null)); + drainEventsImplicitly(); } public void sendNodeLost(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.EXPIRE)); + drainEventsImplicitly(); } /** @@ -728,6 +780,7 @@ public void waitForState(NodeId nodeId, NodeState finalState) System.out.println("Node " + nodeId + " State is : " + node.getState()); Assert.assertEquals("Node state is not correct (timedout)", finalState, node.getState()); + drainEventsImplicitly(); } public void sendNodeEvent(MockNM nm, RMNodeEventType event) throws Exception { @@ -739,7 +792,9 @@ public void sendNodeEvent(MockNM nm, RMNodeEventType event) throws Exception { public KillApplicationResponse killApp(ApplicationId appId) throws Exception { ApplicationClientProtocol client = getClientRMService(); KillApplicationRequest req = KillApplicationRequest.newInstance(appId); - return client.forceKillApplication(req); + KillApplicationResponse response = client.forceKillApplication(req); + drainEventsImplicitly(); + return response; } public FailApplicationAttemptResponse failApplicationAttempt( @@ -747,7 +802,10 @@ public FailApplicationAttemptResponse failApplicationAttempt( ApplicationClientProtocol client = getClientRMService(); FailApplicationAttemptRequest req = FailApplicationAttemptRequest.newInstance(attemptId); - return client.failApplicationAttempt(req); + FailApplicationAttemptResponse response = + client.failApplicationAttempt(req); + drainEventsImplicitly(); + return response; } /** @@ -758,7 +816,7 @@ public FailApplicationAttemptResponse failApplicationAttempt( */ public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId) throws Exception { - MockAM am = new MockAM(getRMContext(), masterService, appAttemptId); + MockAM am = new MockAM(this, masterService, appAttemptId); waitForState(appAttemptId, RMAppAttemptState.ALLOCATED); //create and set AMRMToken Token amrmToken = @@ -772,16 +830,18 @@ public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId) .getEventHandler() .handle( new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.LAUNCHED)); + drainEventsImplicitly(); return am; } public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId) throws Exception { - MockAM am = new MockAM(getRMContext(), masterService, appAttemptId); + MockAM am = new MockAM(this, masterService, appAttemptId); waitForState(am.getApplicationAttemptId(), RMAppAttemptState.ALLOCATED); getRMContext().getDispatcher().getEventHandler() .handle(new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.LAUNCH_FAILED, "Failed")); + drainEventsImplicitly(); } @Override @@ -933,6 +993,7 @@ public static void finishAMAndVerifyAppState(RMApp rmApp, MockRM rm, MockNM nm, nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE); rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED); rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED); + rm.drainEvents(); } @SuppressWarnings("rawtypes") @@ -952,6 +1013,7 @@ private static void waitForSchedulerAppAttemptAdded( Assert.assertNotNull("Timed out waiting for SchedulerApplicationAttempt=" + attemptId + " to be added.", ((AbstractYarnScheduler) rm.getResourceScheduler()).getApplicationAttempt(attemptId)); + rm.drainEvents(); } /** @@ -960,25 +1022,29 @@ private static void waitForSchedulerAppAttemptAdded( */ public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) throws Exception { + rm.drainEvents(); RMAppAttempt attempt = waitForAttemptScheduled(app, rm); System.out.println("Launch AM " + attempt.getAppAttemptId()); nm.nodeHeartbeat(true); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); + rm.drainEvents(); return am; } public static MockAM launchUAM(RMApp app, MockRM rm, MockNM nm) throws Exception { + rm.drainEvents(); // UAMs go directly to LAUNCHED state rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); RMAppAttempt attempt = app.getCurrentAppAttempt(); waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm); System.out.println("Launch AM " + attempt.getAppAttemptId()); nm.nodeHeartbeat(true); - MockAM am = new MockAM(rm.getRMContext(), rm.masterService, + MockAM am = new MockAM(rm, rm.masterService, attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); + rm.drainEvents(); return am; } @@ -988,6 +1054,7 @@ public static RMAppAttempt waitForAttemptScheduled(RMApp app, MockRM rm) RMAppAttempt attempt = app.getCurrentAppAttempt(); waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED); + rm.drainEvents(); return attempt; } @@ -996,6 +1063,7 @@ public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm) MockAM am = launchAM(app, rm, nm); am.registerAppAttempt(); rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); + rm.drainEvents(); return am; } @@ -1012,6 +1080,7 @@ public void updateReservationState(ReservationUpdateRequest request) throws IOException, YarnException { ApplicationClientProtocol client = getClientRMService(); client.updateReservation(request); + drainEventsImplicitly(); } // Explicitly reset queue metrics for testing. @@ -1032,9 +1101,9 @@ public void signalToContainer(ContainerId containerId, SignalContainerRequest req = SignalContainerRequest.newInstance(containerId, command); client.signalToContainer(req); + drainEventsImplicitly(); } - /** * Wait until an app removed from scheduler. * The timeout is 40 seconds. @@ -1072,5 +1141,16 @@ public void waitForAppRemovedFromScheduler(ApplicationId appId, Assert.assertTrue("app is not removed from scheduler (timeout).", !apps.containsKey(appId)); LOG.info("app is removed from scheduler, " + appId); + drainEventsImplicitly(); + } + + private void drainEventsImplicitly() { + if (!disableDrainEventsImplicitly) { + drainEvents(); + } + } + + public void disableDrainEventsImplicitly() { + disableDrainEventsImplicitly = true; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index a7d8ba2..a8d2957 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -42,9 +42,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; -import org.apache.hadoop.yarn.event.Event; -import org.apache.hadoop.yarn.event.EventDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; @@ -53,7 +50,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -168,17 +164,6 @@ public void testContainerCleanup() throws Exception { final DrainDispatcher dispatcher = new DrainDispatcher(); MockRM rm = new MockRM() { @Override - protected EventHandler createSchedulerEventDispatcher() { - return new EventDispatcher(this.scheduler, - this.scheduler.getClass().getName()) { - @Override - public void handle(SchedulerEvent event) { - super.handle(event); - } - }; - } - - @Override protected Dispatcher createDispatcher() { return dispatcher; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 15b8ade..dbe98aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -204,8 +204,8 @@ public void testAMLaunchAndCleanup() throws Exception { Assert.assertEquals(YarnConfiguration.DEFAULT_QUEUE_NAME, containerManager.queueName); - MockAM am = new MockAM(rm.getRMContext(), rm - .getApplicationMasterService(), appAttemptId); + MockAM am = new MockAM(rm, rm.getApplicationMasterService(), + appAttemptId); am.registerAppAttempt(); am.unregisterAppAttempt(); @@ -274,7 +274,7 @@ protected Dispatcher createDispatcher() { nm1.nodeHeartbeat(true); dispatcher.await(); - MockRM.waitForState(app.getCurrentAppAttempt(), + rm.waitForState(app.getCurrentAppAttempt(), RMAppAttemptState.LAUNCHED, 500); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java index 7a24b7a..107b89f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java @@ -31,8 +31,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; -import org.apache.hadoop.yarn.event.EventDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.TestAMRestart; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -43,7 +41,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; @@ -134,7 +131,7 @@ public void testNodeBlacklistingOnAMFailure() throws Exception { dispatcher.await(); // Now the AM container should be allocated - MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); + rm.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); MockAM am2 = rm.sendAMLaunched(attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); @@ -187,7 +184,7 @@ public void testNoBlacklistingForNonSystemErrors() throws Exception { RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm); node.nodeHeartbeat(true); dispatcher.await(); - MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); + rm.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); rm.sendAMLaunched(attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); ApplicationAttemptId appAttemptId = @@ -215,7 +212,7 @@ public void testNoBlacklistingForNonSystemErrors() throws Exception { node.nodeHeartbeat(true); dispatcher.await(); - MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); + rm.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); rm.sendAMLaunched(attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); @@ -245,17 +242,6 @@ private MockRM startRM(YarnConfiguration conf, MockRM rm1 = new MockRM(conf, memStore) { @Override - protected EventHandler createSchedulerEventDispatcher() { - return new EventDispatcher(this.scheduler, - this.scheduler.getClass().getName()) { - @Override - public void handle(SchedulerEvent event) { - super.handle(event); - } - }; - } - - @Override protected Dispatcher createDispatcher() { return dispatcher; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java index ee6d51a..a7057b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.junit.Before; import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.doNothing; @@ -56,7 +57,6 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -558,7 +558,7 @@ public void testInvalidatedAMHostPortOnAMRestart() throws Exception { @Test (timeout = 60000) public void testApplicationKillAtAcceptedState() throws Exception { - final Dispatcher dispatcher = new AsyncDispatcher() { + final Dispatcher dispatcher = new DrainDispatcher() { @Override public EventHandler getEventHandler() { @@ -639,7 +639,7 @@ protected Dispatcher createDispatcher() { public void testKillFinishingApp() throws Exception{ // this dispatcher ignores RMAppAttemptEventType.KILL event - final Dispatcher dispatcher = new AsyncDispatcher() { + final Dispatcher dispatcher = new DrainDispatcher() { @Override public EventHandler getEventHandler() { @@ -693,7 +693,7 @@ protected Dispatcher createDispatcher() { public void testKillFailingApp() throws Exception{ // this dispatcher ignores RMAppAttemptEventType.KILL event - final Dispatcher dispatcher = new AsyncDispatcher() { + final Dispatcher dispatcher = new DrainDispatcher() { @Override public EventHandler getEventHandler() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index b271b37..5b52e7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1575,6 +1575,7 @@ protected void handleStoreEvent(RMStateStoreEvent event) { // start RM final MockRM rm1 = createMockRM(conf, memStore); + rm1.disableDrainEventsImplicitly(); rm1.start(); // create apps. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index 4329033..c8baa60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -21,7 +21,6 @@ import java.security.PrivilegedExceptionAction; import java.util.List; -import org.apache.hadoop.yarn.event.EventDispatcher; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -34,7 +33,6 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; @@ -42,7 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.junit.After; import org.junit.Before; @@ -64,16 +61,6 @@ public void init(Configuration conf) { "1.0"); super.init(conf); } - @Override - protected EventHandler createSchedulerEventDispatcher() { - return new EventDispatcher(this.scheduler, - this.scheduler.getClass().getName()) { - @Override - public void handle(SchedulerEvent event) { - super.handle(event); - } - }; - } @Override protected Dispatcher createDispatcher() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java index d36fb9f..a55562d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java @@ -217,7 +217,7 @@ protected void doSecureLogin() throws IOException { ApplicationAttemptId appAttempt = app.getCurrentAppAttempt().getAppAttemptId(); final MockAM mockAM = - new MockAM(rm.getRMContext(), rm.getApplicationMasterService(), + new MockAM(rm, rm.getApplicationMasterService(), app.getCurrentAppAttempt().getAppAttemptId()); UserGroupInformation appUgi = UserGroupInformation.createRemoteUser(appAttempt.toString()); @@ -457,7 +457,7 @@ protected void doSecureLogin() throws IOException { ApplicationAttemptId appAttempt = app.getCurrentAppAttempt().getAppAttemptId(); final MockAM mockAM = - new MockAM(rm.getRMContext(), rm.getApplicationMasterService(), + new MockAM(rm, rm.getApplicationMasterService(), app.getCurrentAppAttempt().getAppAttemptId()); UserGroupInformation appUgi = UserGroupInformation.createRemoteUser(appAttempt.toString()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java index 718091f..50b0252 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java @@ -88,6 +88,7 @@ protected void configureServlets() { rm = new MockRM(new Configuration()); rm.getRMContext().getContainerTokenSecretManager().rollMasterKey(); rm.getRMContext().getNMTokenSecretManager().rollMasterKey(); + rm.disableDrainEventsImplicitly(); bind(ResourceManager.class).toInstance(rm); serve("/*").with(GuiceContainer.class); }