diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 9701775..b5ad23c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -575,11 +575,27 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { if (rmNode.getHttpPort() == newNode.getHttpPort()) { // Reset heartbeat ID since node just restarted. rmNode.getLastNodeHeartBeatResponse().setResponseId(0); - if (rmNode.getState() != NodeState.UNHEALTHY) { + if (!rmNode.getState().equals(NodeState.UNHEALTHY)) { // Only add new node if old state is not UNHEALTHY - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeAddedSchedulerEvent(newNode)); + rmNode.context.getDispatcher().getEventHandler() + .handle(new NodeAddedSchedulerEvent(newNode)); + + } + + // required to update rmNode capability with newNode capability + if (!rmNode.getTotalCapability().equals(newNode.getTotalCapability())) { + + rmNode.context + .getDispatcher() + .getEventHandler() + .handle( + new RMNodeResourceUpdateEvent(newNode.getNodeID(), + ResourceOption.newInstance( + newNode.getTotalCapability(), -1))); } + + sendNodeUsableEventIfNodeStateIsNotRunning(rmNode); + } else { // Reconnected node differs, so replace old node and start new node switch (rmNode.getState()) { @@ -605,24 +621,35 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // Reset heartbeat ID since node just restarted. rmNode.getLastNodeHeartBeatResponse().setResponseId(0); - } - if (null != reconnectEvent.getRunningApplications()) { for (ApplicationId appId : reconnectEvent.getRunningApplications()) { handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); } + + if (rmNode.getState().equals(NodeState.RUNNING)) { + // Update scheduler node's capacity for reconnect node. + rmNode.context + .getDispatcher() + .getEventHandler() + .handle( + new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption + .newInstance(newNode.getTotalCapability(), -1))); + } + + sendNodeUsableEventIfNodeStateIsNotRunning(rmNode); } - rmNode.context.getDispatcher().getEventHandler().handle( - new NodesListManagerEvent( - NodesListManagerEventType.NODE_USABLE, rmNode)); - if (rmNode.getState().equals(NodeState.RUNNING)) { - // Update scheduler node's capacity for reconnect node. - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeResourceUpdateSchedulerEvent(rmNode, - ResourceOption.newInstance(newNode.getTotalCapability(), -1))); + } + + private void sendNodeUsableEventIfNodeStateIsNotRunning(RMNodeImpl rmNode) { + if (!rmNode.getState().equals(NodeState.RUNNING)) { + rmNode.context + .getDispatcher() + .getEventHandler() + .handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmNode)); } - } private void handleNMContainerStatus( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 5f53805..c917f79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -51,7 +51,7 @@ private final int memory; private final int vCores; private ResourceTrackerService resourceTracker; - private final int httpPort = 2; + private int httpPort = 2; private MasterKey currentContainerTokenMasterKey; private MasterKey currentNMTokenMasterKey; private String version; @@ -87,6 +87,10 @@ public int getHttpPort() { return httpPort; } + public void setHttpPort(int port) { + httpPort = port; + } + public void setResourceTrackerService(ResourceTrackerService resourceTracker) { this.resourceTracker = resourceTracker; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 7c12848..ed4cd6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -623,6 +624,8 @@ protected Dispatcher createDispatcher() { dispatcher.await(); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 10240, metrics.getAvailableMB()); + RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + Assert.assertEquals(10240, rmNode.getTotalCapability().getMemory()); // reconnect of node with changed capability and running applications List runningApps = new ArrayList(); @@ -633,6 +636,20 @@ protected Dispatcher createDispatcher() { dispatcher.await(); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 15360, metrics.getAvailableMB()); + + // reconnect healthy node changing http port + nm1 = new MockNM("host1:1234", 5120, rm.getResourceTrackerService()); + nm1.setHttpPort(3); + nm1.registerNode(); + dispatcher.await(); + response = nm1.nodeHeartbeat(true); + response = nm1.nodeHeartbeat(true); + dispatcher.await(); + rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + Assert.assertEquals(3, rmNode.getHttpPort()); + Assert.assertEquals(5120, rmNode.getTotalCapability().getMemory()); + Assert.assertEquals(5120 + 15360, metrics.getAvailableMB()); + } private void writeToHostsFile(String... hosts) throws IOException { -- 1.9.2.msysgit.0