diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index e9a0436..4249980 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -160,17 +160,14 @@ public void handle(NodesListManagerEvent event) { if (unusableRMNodesConcurrentSet.contains(eventNode)) { LOG.debug(eventNode + " reported usable"); unusableRMNodesConcurrentSet.remove(eventNode); - for (RMApp app : rmContext.getRMApps().values()) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode, - RMAppNodeUpdateType.NODE_USABLE)); - } - } else { - LOG.warn(eventNode - + " reported usable without first reporting unusable"); + } + for (RMApp app : rmContext.getRMApps().values()) { + this.rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode, + RMAppNodeUpdateType.NODE_USABLE)); } break; default: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 00a5cb4..0c96834 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -438,7 +438,10 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.context.getDispatcher().getEventHandler().handle( new NodeAddedSchedulerEvent(rmNode)); - + rmNode.context.getDispatcher().getEventHandler().handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmNode)); + String host = rmNode.nodeId.getHost(); if (rmNode.context.getInactiveRMNodes().containsKey(host)) { // Old node rejoining @@ -471,6 +474,9 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // Only add new node if old state is not UNHEALTHY rmNode.context.getDispatcher().getEventHandler().handle( new NodeAddedSchedulerEvent(rmNode)); + rmNode.context.getDispatcher().getEventHandler().handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmNode)); } } else { // Reconnected node differs, so replace old node and start new node diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index c060bb6..f81d200 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -79,6 +81,18 @@ public void handle(SchedulerEvent event) { } } + private NodesListManagerEvent nodesListManagerEvent = null; + + private class TestNodeListManagerEventDispatcher implements + EventHandler { + + @Override + public void handle(NodesListManagerEvent event) { + nodesListManagerEvent = event; + } + + } + @Before public void setUp() throws Exception { InlineDispatcher rmDispatcher = new InlineDispatcher(); @@ -109,8 +123,12 @@ public Void answer(InvocationOnMock invocation) throws Throwable { rmDispatcher.register(SchedulerEventType.class, new TestSchedulerEventDispatcher()); + rmDispatcher.register(NodesListManagerEventType.class, + new TestNodeListManagerEventDispatcher()); + NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); + nodesListManagerEvent = null; } @@ -431,8 +449,9 @@ public void testUpdateHeartbeatResponseForCleanup() { private RMNodeImpl getRunningNode() { NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); + Resource capability = Resource.newInstance(4096, 4); RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, - null, null, null); + null, capability, null); node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED)); Assert.assertEquals(NodeState.RUNNING, node.getState()); return node; @@ -447,4 +466,85 @@ private RMNodeImpl getUnhealthyNode() { Assert.assertEquals(NodeState.UNHEALTHY, node.getState()); return node; } + + + private RMNodeImpl getNewNode() { + NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); + RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); + return node; + } + + @Test + public void testAdd() { + RMNodeImpl node = getNewNode(); + ClusterMetrics cm = ClusterMetrics.getMetrics(); + int initialActive = cm.getNumActiveNMs(); + int initialLost = cm.getNumLostNMs(); + int initialUnhealthy = cm.getUnhealthyNMs(); + int initialDecommissioned = cm.getNumDecommisionedNMs(); + int initialRebooted = cm.getNumRebootedNMs(); + node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED)); + Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs()); + Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); + Assert.assertEquals("Unhealthy Nodes", + initialUnhealthy, cm.getUnhealthyNMs()); + Assert.assertEquals("Decommissioned Nodes", + initialDecommissioned, cm.getNumDecommisionedNMs()); + Assert.assertEquals("Rebooted Nodes", + initialRebooted, cm.getNumRebootedNMs()); + Assert.assertEquals(NodeState.RUNNING, node.getState()); + Assert.assertNotNull(nodesListManagerEvent); + Assert.assertEquals(NodesListManagerEventType.NODE_USABLE, + nodesListManagerEvent.getType()); + } + + @Test + public void testReconnect() { + RMNodeImpl node = getRunningNode(); + ClusterMetrics cm = ClusterMetrics.getMetrics(); + int initialActive = cm.getNumActiveNMs(); + int initialLost = cm.getNumLostNMs(); + int initialUnhealthy = cm.getUnhealthyNMs(); + int initialDecommissioned = cm.getNumDecommisionedNMs(); + int initialRebooted = cm.getNumRebootedNMs(); + node.handle(new RMNodeReconnectEvent(node.getNodeID(), node)); + Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs()); + Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); + Assert.assertEquals("Unhealthy Nodes", + initialUnhealthy, cm.getUnhealthyNMs()); + Assert.assertEquals("Decommissioned Nodes", + initialDecommissioned, cm.getNumDecommisionedNMs()); + Assert.assertEquals("Rebooted Nodes", + initialRebooted, cm.getNumRebootedNMs()); + Assert.assertEquals(NodeState.RUNNING, node.getState()); + Assert.assertNotNull(nodesListManagerEvent); + Assert.assertEquals(NodesListManagerEventType.NODE_USABLE, + nodesListManagerEvent.getType()); + } + + @Test + public void testReconnectWithDifferentCapacity() { + RMNodeImpl node = getRunningNode(); + ClusterMetrics cm = ClusterMetrics.getMetrics(); + int initialActive = cm.getNumActiveNMs(); + int initialLost = cm.getNumLostNMs(); + int initialUnhealthy = cm.getUnhealthyNMs(); + int initialDecommissioned = cm.getNumDecommisionedNMs(); + int initialRebooted = cm.getNumRebootedNMs(); + node.getTotalCapability().setVirtualCores(node.getTotalCapability().getVirtualCores() + 1); + node.handle(new RMNodeReconnectEvent(node.getNodeID(), node)); + Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs()); + Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); + Assert.assertEquals("Unhealthy Nodes", + initialUnhealthy, cm.getUnhealthyNMs()); + Assert.assertEquals("Decommissioned Nodes", + initialDecommissioned, cm.getNumDecommisionedNMs()); + Assert.assertEquals("Rebooted Nodes", + initialRebooted, cm.getNumRebootedNMs()); + Assert.assertEquals(NodeState.RUNNING, node.getState()); + Assert.assertNotNull(nodesListManagerEvent); + Assert.assertEquals(NodesListManagerEventType.NODE_USABLE, + nodesListManagerEvent.getType()); + } + }