diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 667515d00c1..b12e78ea638 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -381,6 +381,11 @@ protected boolean isEventThreadWaiting() { return eventHandlingThread.getState() == Thread.State.WAITING; } + @VisibleForTesting + public BlockingQueue getEventQueue() { + return eventQueue; + } + protected boolean isDrained() { return drained; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index f9e159168c5..07d78cb1c1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -509,12 +509,8 @@ private void sendRMAppNodeUpdateEventToNonFinalizedApps( RMNode eventNode, RMAppNodeUpdateType appNodeUpdateType) { for(RMApp app : rmContext.getRMApps().values()) { if (!app.isAppFinalStateStored()) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode, - appNodeUpdateType)); + app.handle(new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode, + appNodeUpdateType)); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 6d2a9fed08b..7c5bf6720fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -459,6 +459,13 @@ protected void setRMStateStore(RMStateStore rmStore) { return dispatcher; } + protected EventHandler + createNodeListManagerDispatcher() { + EventDispatcher dispatcher = new + EventDispatcher(this.nodesListManager, "NodeListManagerDispatcher"); + return dispatcher; + } + protected Dispatcher createDispatcher() { AsyncDispatcher dispatcher = new AsyncDispatcher("RM Event dispatcher"); GenericEventTypeMetrics genericEventTypeMetrics = @@ -667,6 +674,8 @@ protected static void validateConfigs(Configuration conf) { private DelegationTokenRenewer delegationTokenRenewer; private EventHandler schedulerDispatcher; + private EventHandler + nodeListManagerDispatcher; private ApplicationMasterLauncher applicationMasterLauncher; private ContainerAllocationExpirer containerAllocationExpirer; private ResourceManager rm; @@ -772,12 +781,19 @@ protected void serviceInit(Configuration configuration) throws Exception { rmContext.setDelegationTokenRenewer(delegationTokenRenewer); } - // Register event handler for NodesListManager nodesListManager = new NodesListManager(rmContext); - rmDispatcher.register(NodesListManagerEventType.class, nodesListManager); addService(nodesListManager); rmContext.setNodesListManager(nodesListManager); + // Create nodeListManagerDispatcher + // for queued NodesListManagerEvent + nodeListManagerDispatcher = createNodeListManagerDispatcher(); + + // Register event handler for nodeListManagerDispatcher + rmDispatcher.register(NodesListManagerEventType.class, + nodeListManagerDispatcher); + addIfService(nodeListManagerDispatcher); + // Initialize the scheduler scheduler = createScheduler(); scheduler.setRMContext(rmContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java index 1ea470cf094..488878b4c80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java @@ -25,10 +25,14 @@ import static org.mockito.Mockito.verify; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; @@ -36,6 +40,9 @@ import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + public class TestRMDispatcher { @SuppressWarnings("unchecked") @@ -79,4 +86,115 @@ public void testSchedulerEventDispatcherForPreemptionEvents() { rmDispatcher.stop(); } } + + @Test(timeout=10000) + public void testNodeListManagerDispatcher() { + AsyncDispatcher rmDispatcher = new AsyncDispatcher(); + RMContextImpl rmContext = new RMContextImpl(rmDispatcher, null, null, + null, null, null, null, null, null); + NodesListManager nodesListManager = spy(new NodesListManager(rmContext)); + YarnConfiguration conf = new YarnConfiguration(); + EventDispatcher nodeListManagerDispatcher = + new EventDispatcher(nodesListManager, + nodesListManager.getClass().getName()); + nodeListManagerDispatcher.disableExitOnError(); + rmDispatcher.register(NodesListManagerEventType.class, + nodeListManagerDispatcher); + rmDispatcher.init(conf); + rmDispatcher.start(); + nodeListManagerDispatcher.init(conf); + nodeListManagerDispatcher.start(); + + try { + RMNode rmNode1 = MockNodes.newNodeInfo(1, + Resource.newInstance(28000, 8), + 1, "testHost1", 1234); + RMNode rmNode2 = MockNodes.newNodeInfo(1, + Resource.newInstance(28000, 8), + 1, "testHost2", 1234); + + rmDispatcher.getEventHandler(). + handle(new NodesListManagerEvent(NodesListManagerEventType + .NODE_USABLE, rmNode1)); + + rmDispatcher.getEventHandler(). + handle(new NodesListManagerEvent(NodesListManagerEventType + .NODE_UNUSABLE, rmNode2)); + + // Wait for events to be processed by scheduler dispatcher. + Thread.sleep(1000); + + // Make sure nodesListManager handle 2 times. + // consistent with nodeListManagerDispatcher. + verify(nodesListManager, times(2)). + handle(any(NodesListManagerEvent.class)); + + } catch (InterruptedException e) { + Assert.fail(); + } finally { + nodeListManagerDispatcher.stop(); + rmDispatcher.stop(); + } + } + + @Test(timeout=10000) + public void testNodeListManagerDispatcherStressImprovement() { + BlockingQueue eventQueue = new LinkedBlockingQueue(); + eventQueue = spy(eventQueue); + AsyncDispatcher rmDispatcher = new AsyncDispatcher(eventQueue); + RMContextImpl rmContext = new RMContextImpl(rmDispatcher, null, null, + null, null, null, null, null, null); + NodesListManager nodesListManager = spy(new NodesListManager(rmContext)); + YarnConfiguration conf = new YarnConfiguration(); + EventDispatcher nodeListManagerDispatcher = + new EventDispatcher(nodesListManager, + nodesListManager.getClass().getName()); + nodeListManagerDispatcher.disableExitOnError(); + rmDispatcher.register(NodesListManagerEventType.class, + nodeListManagerDispatcher); + rmDispatcher.init(conf); + rmDispatcher.start(); + nodeListManagerDispatcher.init(conf); + nodeListManagerDispatcher.start(); + + try { + RMNode rmNode1 = MockNodes.newNodeInfo(1, + Resource.newInstance(28000, 8), + 1, "testHost1", 1234); + + for (int i = 0; i < 10000; ++i) { + rmDispatcher.getEventHandler(). + handle(new NodesListManagerEvent(NodesListManagerEventType + .NODE_USABLE, rmNode1)); + } + + // Make sure the NodesListManagerEvent will put eventQueue 10000 times. + verify(eventQueue, times(10000)) + .put(any(NodesListManagerEvent.class)); + + // Make sure the RMAppEvent will put eventQueue 0 times. + // The old NodesListManager will trigger stress to put + // RMAppEvent 10000 times with RMAppEventType.NODE_UPDATE. + verify(eventQueue, times(0)) + .put(any(RMAppEvent.class)); + + // Wait for events to be processed by scheduler dispatcher. + Thread.sleep(1000); + + // Make sure no stress to AsyncDispatcher + // the event queue will not be filled. + Assert.assertEquals(0, rmDispatcher.getEventQueue().size()); + + // Make sure nodesListManager handle 10000 times. + // consistent with nodeListManagerDispatcher. + verify(nodesListManager, times(10000)). + handle(any(NodesListManagerEvent.class)); + + } catch (InterruptedException e) { + Assert.fail(); + } finally { + nodeListManagerDispatcher.stop(); + rmDispatcher.stop(); + } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java index 0df295c9436..9bbe51ee793 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java @@ -22,10 +22,7 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.spy; -import java.util.ArrayList; - import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -52,8 +49,10 @@ import org.mockito.ArgumentMatcher; public class TestNodesListManager { - // To hold list of application for which event was received - ArrayList applist = new ArrayList(); + //Is trigger RMAppNodeUpdateEvent + private Boolean isRMAppEvent = false; + //Is trigger NodesListManagerEvent + private Boolean isNodesListEvent = false; @Test(timeout = 300000) public void testNodeUsableEvent() throws Exception { @@ -68,67 +67,30 @@ protected Dispatcher createDispatcher() { }; rm.start(); MockNM nm1 = rm.registerNode("h1:1234", 28000); - NodesListManager nodesListManager = rm.getNodesListManager(); Resource clusterResource = Resource.newInstance(28000, 8); RMNode rmnode = MockNodes.newNodeInfo(1, clusterResource); // Create killing APP - RMApp killrmApp = MockRMAppSubmitter.submitWithMemory(200, rm); - rm.killApp(killrmApp.getApplicationId()); - rm.waitForState(killrmApp.getApplicationId(), RMAppState.KILLED); + RMApp killRmApp = MockRMAppSubmitter.submitWithMemory(200, rm); + rm.killApp(killRmApp.getApplicationId()); + rm.waitForState(killRmApp.getApplicationId(), RMAppState.KILLED); // Create finish APP - RMApp finshrmApp = MockRMAppSubmitter.submitWithMemory(2000, rm); + RMApp finshRmApp = MockRMAppSubmitter.submitWithMemory(2000, rm); nm1.nodeHeartbeat(true); - RMAppAttempt attempt = finshrmApp.getCurrentAppAttempt(); + RMAppAttempt attempt = finshRmApp.getCurrentAppAttempt(); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); am.registerAppAttempt(); am.unregisterAppAttempt(); nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE); rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED); - // Create submitted App - RMApp subrmApp = MockRMAppSubmitter.submitWithMemory(200, rm); - // Fire Event for NODE_USABLE - nodesListManager.handle(new NodesListManagerEvent( + // Should not have RMAppNodeUpdateEvent to AsyncDispatcher. + dispatcher.getEventHandler().handle(new NodesListManagerEvent( NodesListManagerEventType.NODE_USABLE, rmnode)); - if (applist.size() > 0) { - Assert.assertTrue( - "Event based on running app expected " + subrmApp.getApplicationId(), - applist.contains(subrmApp.getApplicationId())); - Assert.assertFalse( - "Event based on finish app not expected " - + finshrmApp.getApplicationId(), - applist.contains(finshrmApp.getApplicationId())); - Assert.assertFalse( - "Event based on killed app not expected " - + killrmApp.getApplicationId(), - applist.contains(killrmApp.getApplicationId())); - } else { - Assert.fail("Events received should have beeen more than 1"); - } - applist.clear(); - - // Fire Event for NODE_UNUSABLE - nodesListManager.handle(new NodesListManagerEvent( - NodesListManagerEventType.NODE_UNUSABLE, rmnode)); - if (applist.size() > 0) { - Assert.assertTrue( - "Event based on running app expected " + subrmApp.getApplicationId(), - applist.contains(subrmApp.getApplicationId())); - Assert.assertFalse( - "Event based on finish app not expected " - + finshrmApp.getApplicationId(), - applist.contains(finshrmApp.getApplicationId())); - Assert.assertFalse( - "Event based on killed app not expected " - + killrmApp.getApplicationId(), - applist.contains(killrmApp.getApplicationId())); - } else { - Assert.fail("Events received should have beeen more than 1"); - } - + Assert.assertFalse(getIsRMAppEvent()); + Assert.assertTrue(getIsNodesListEvent()); } @Test @@ -241,9 +203,10 @@ private Dispatcher getDispatcher() { @Override public boolean matches(AbstractEvent argument) { if (argument instanceof RMAppNodeUpdateEvent) { - ApplicationId appid = - ((RMAppNodeUpdateEvent) argument).getApplicationId(); - applist.add(appid); + isRMAppEvent = true; + } + if (argument instanceof NodesListManagerEvent) { + isNodesListEvent = true; } return false; } @@ -256,4 +219,11 @@ public boolean matches(AbstractEvent argument) { }; } + public Boolean getIsNodesListEvent() { + return isNodesListEvent; + } + + public Boolean getIsRMAppEvent() { + return isRMAppEvent; + } }