diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 9701775..c73b63f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -573,12 +573,20 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { new NodeRemovedSchedulerEvent(rmNode)); if (rmNode.getHttpPort() == newNode.getHttpPort()) { + rmNode.totalCapability = newNode.getTotalCapability(); + // Reset heartbeat ID since node just restarted. rmNode.getLastNodeHeartBeatResponse().setResponseId(0); if (rmNode.getState() != NodeState.UNHEALTHY) { // Only add new node if old state is not UNHEALTHY rmNode.context.getDispatcher().getEventHandler().handle( new NodeAddedSchedulerEvent(newNode)); + rmNode.context + .getDispatcher() + .getEventHandler() + .handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmNode)); } } else { // Reconnected node differs, so replace old node and start new node @@ -605,24 +613,29 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // Reset heartbeat ID since node just restarted. rmNode.getLastNodeHeartBeatResponse().setResponseId(0); - } - if (null != reconnectEvent.getRunningApplications()) { + // Handle running applications on this node for (ApplicationId appId : reconnectEvent.getRunningApplications()) { handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); } - } - rmNode.context.getDispatcher().getEventHandler().handle( - new NodesListManagerEvent( - NodesListManagerEventType.NODE_USABLE, rmNode)); - if (rmNode.getState().equals(NodeState.RUNNING)) { - // Update scheduler node's capacity for reconnect node. - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeResourceUpdateSchedulerEvent(rmNode, - ResourceOption.newInstance(newNode.getTotalCapability(), -1))); + if (rmNode.getState().equals(NodeState.RUNNING)) { + rmNode.context + .getDispatcher() + .getEventHandler() + .handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmNode)); + + // Update scheduler node's capacity for reconnect node. + rmNode.context + .getDispatcher() + .getEventHandler() + .handle( + new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption + .newInstance(newNode.getTotalCapability(), -1))); + } } - } private void handleNMContainerStatus( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index c6da3fd..c99edf1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -617,4 +617,22 @@ public void testReconnnectUpdate() { null, null)); Assert.assertEquals(nmVersion2, node.getNodeManagerVersion()); } + + @Test(timeout = 10000) + public void testResourceUpdateOnReconnnectedNode() { + RMNodeImpl node = getRunningNode(); + Resource oldCapacity = node.getTotalCapability(); + assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096); + assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4); + + Resource newCapacity = Resource.newInstance(8192, 8); + RMNodeImpl newNode = getNewNode(newCapacity); + + node.handle(new RMNodeReconnectEvent(newNode.getNodeID(), newNode, null, + null)); + + Resource updatedCapacity = node.getTotalCapability(); + assertEquals("Resource does not match", newCapacity, updatedCapacity); + } + } -- 1.9.2.msysgit.0