diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index f9e159168c5..f3e7a137bc4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -31,6 +31,7 @@ import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -509,12 +510,8 @@ private void sendRMAppNodeUpdateEventToNonFinalizedApps( RMNode eventNode, RMAppNodeUpdateType appNodeUpdateType) { for(RMApp app : rmContext.getRMApps().values()) { if (!app.isAppFinalStateStored()) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode, - appNodeUpdateType)); + app.handle(new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode, + appNodeUpdateType)); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 6d2a9fed08b..3495b1b0ca5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -459,6 +459,12 @@ protected void setRMStateStore(RMStateStore rmStore) { return dispatcher; } + protected EventHandler createNodeListManagerDispatcher() { + EventDispatcher dispatcher = new + EventDispatcher(this.nodesListManager, "NodeListManagerDispatcher"); + return dispatcher; + } + protected Dispatcher createDispatcher() { AsyncDispatcher dispatcher = new AsyncDispatcher("RM Event dispatcher"); GenericEventTypeMetrics genericEventTypeMetrics = @@ -667,6 +673,8 @@ protected static void validateConfigs(Configuration conf) { private DelegationTokenRenewer delegationTokenRenewer; private EventHandler schedulerDispatcher; + private EventHandler + nodeListManagerDispatcher; private ApplicationMasterLauncher applicationMasterLauncher; private ContainerAllocationExpirer containerAllocationExpirer; private ResourceManager rm; @@ -772,9 +780,14 @@ protected void serviceInit(Configuration configuration) throws Exception { rmContext.setDelegationTokenRenewer(delegationTokenRenewer); } - // Register event handler for NodesListManager + // Create nodeListManagerDispatcher + // for queued NodesListManagerEvent + nodeListManagerDispatcher = createNodeListManagerDispatcher(); + + // Register event handler for nodeListManagerDispatcher nodesListManager = new NodesListManager(rmContext); - rmDispatcher.register(NodesListManagerEventType.class, nodesListManager); + rmDispatcher.register(NodesListManagerEventType.class, + nodeListManagerDispatcher); addService(nodesListManager); rmContext.setNodesListManager(nodesListManager); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java index 1ea470cf094..339e2737e91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java @@ -25,10 +25,12 @@ import static org.mockito.Mockito.verify; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; @@ -79,4 +81,54 @@ public void testSchedulerEventDispatcherForPreemptionEvents() { rmDispatcher.stop(); } } + + @Test(timeout=10000) + public void testNodeListManagerDispatcher() { + AsyncDispatcher rmDispatcher = new AsyncDispatcher(); + RMContextImpl rmContext = new RMContextImpl(rmDispatcher, null, null, + null, null, null, null, null, null); + NodesListManager nodesListManager = spy(new NodesListManager(rmContext)); + YarnConfiguration conf = new YarnConfiguration(); + EventDispatcher nodeListManagerDispatcher = + new EventDispatcher(nodesListManager, + nodesListManager.getClass().getName()); + nodeListManagerDispatcher.disableExitOnError(); + rmDispatcher.register(NodesListManagerEventType.class, + nodeListManagerDispatcher); + rmDispatcher.init(conf); + rmDispatcher.start(); + nodeListManagerDispatcher.init(conf); + nodeListManagerDispatcher.start(); + + try { + RMNode rmNode1 = MockNodes.newNodeInfo(1, + Resource.newInstance(28000, 8), + 1, "testHost1", 1234); + RMNode rmNode2 = MockNodes.newNodeInfo(1, + Resource.newInstance(28000, 8), + 1, "testHost2", 1234); + + rmDispatcher.getEventHandler(). + handle(new NodesListManagerEvent(NodesListManagerEventType.NODE_USABLE, + rmNode1)); + + rmDispatcher.getEventHandler(). + handle(new NodesListManagerEvent(NodesListManagerEventType.NODE_UNUSABLE, + rmNode2)); + + // Wait for events to be processed by scheduler dispatcher. + Thread.sleep(1000); + + // Make sure nodesListManager handle 2 times. + // consistent with nodeListManagerDispatcher. + verify(nodesListManager, times(2)). + handle(any(NodesListManagerEvent.class)); + + } catch (InterruptedException e) { + Assert.fail(); + } finally { + nodeListManagerDispatcher.stop(); + rmDispatcher.stop(); + } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java index 0df295c9436..a4c7b0aa7e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java @@ -18,9 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.times; import java.util.ArrayList; @@ -45,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.util.ControlledClock; import org.slf4j.event.Level; import org.junit.Assert; @@ -52,8 +54,10 @@ import org.mockito.ArgumentMatcher; public class TestNodesListManager { - // To hold list of application for which event was received - ArrayList applist = new ArrayList(); + //Is trigger RMAppNodeUpdateEvent + Boolean isRMAppEvent = false; + //Is trigger NodesListManagerEvent + Boolean isNodesListEvent = false; @Test(timeout = 300000) public void testNodeUsableEvent() throws Exception { @@ -68,67 +72,30 @@ protected Dispatcher createDispatcher() { }; rm.start(); MockNM nm1 = rm.registerNode("h1:1234", 28000); - NodesListManager nodesListManager = rm.getNodesListManager(); Resource clusterResource = Resource.newInstance(28000, 8); RMNode rmnode = MockNodes.newNodeInfo(1, clusterResource); // Create killing APP - RMApp killrmApp = MockRMAppSubmitter.submitWithMemory(200, rm); - rm.killApp(killrmApp.getApplicationId()); - rm.waitForState(killrmApp.getApplicationId(), RMAppState.KILLED); + RMApp killRmApp = MockRMAppSubmitter.submitWithMemory(200, rm); + rm.killApp(killRmApp.getApplicationId()); + rm.waitForState(killRmApp.getApplicationId(), RMAppState.KILLED); // Create finish APP - RMApp finshrmApp = MockRMAppSubmitter.submitWithMemory(2000, rm); + RMApp finshRmApp = MockRMAppSubmitter.submitWithMemory(2000, rm); nm1.nodeHeartbeat(true); - RMAppAttempt attempt = finshrmApp.getCurrentAppAttempt(); + RMAppAttempt attempt = finshRmApp.getCurrentAppAttempt(); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); am.registerAppAttempt(); am.unregisterAppAttempt(); nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE); rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED); - // Create submitted App - RMApp subrmApp = MockRMAppSubmitter.submitWithMemory(200, rm); - // Fire Event for NODE_USABLE - nodesListManager.handle(new NodesListManagerEvent( + // Should not have RMAppNodeUpdateEvent to AsyncDispatcher. + dispatcher.getEventHandler().handle(new NodesListManagerEvent( NodesListManagerEventType.NODE_USABLE, rmnode)); - if (applist.size() > 0) { - Assert.assertTrue( - "Event based on running app expected " + subrmApp.getApplicationId(), - applist.contains(subrmApp.getApplicationId())); - Assert.assertFalse( - "Event based on finish app not expected " - + finshrmApp.getApplicationId(), - applist.contains(finshrmApp.getApplicationId())); - Assert.assertFalse( - "Event based on killed app not expected " - + killrmApp.getApplicationId(), - applist.contains(killrmApp.getApplicationId())); - } else { - Assert.fail("Events received should have beeen more than 1"); - } - applist.clear(); - - // Fire Event for NODE_UNUSABLE - nodesListManager.handle(new NodesListManagerEvent( - NodesListManagerEventType.NODE_UNUSABLE, rmnode)); - if (applist.size() > 0) { - Assert.assertTrue( - "Event based on running app expected " + subrmApp.getApplicationId(), - applist.contains(subrmApp.getApplicationId())); - Assert.assertFalse( - "Event based on finish app not expected " - + finshrmApp.getApplicationId(), - applist.contains(finshrmApp.getApplicationId())); - Assert.assertFalse( - "Event based on killed app not expected " - + killrmApp.getApplicationId(), - applist.contains(killrmApp.getApplicationId())); - } else { - Assert.fail("Events received should have beeen more than 1"); - } - + Assert.assertFalse(isRMAppEvent); + Assert.assertTrue(isNodesListEvent); } @Test @@ -241,9 +208,10 @@ private Dispatcher getDispatcher() { @Override public boolean matches(AbstractEvent argument) { if (argument instanceof RMAppNodeUpdateEvent) { - ApplicationId appid = - ((RMAppNodeUpdateEvent) argument).getApplicationId(); - applist.add(appid); + isRMAppEvent = true; + } + if (argument instanceof NodesListManagerEvent) { + isNodesListEvent = true; } return false; }