diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index c556b80..a6acb68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -562,68 +562,34 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { RMNode newNode = reconnectEvent.getReconnectedNode(); rmNode.nodeManagerVersion = newNode.getNodeManagerVersion(); List runningApps = reconnectEvent.getRunningApplications(); - boolean noRunningApps = - (runningApps == null) || (runningApps.size() == 0); - // No application running on the node, so send node-removal event with - // cleaning up old container info. - if (noRunningApps) { - rmNode.nodeUpdateQueue.clear(); - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeRemovedSchedulerEvent(rmNode)); + rmNode.httpPort = newNode.getHttpPort(); + rmNode.httpAddress = newNode.getHttpAddress(); + boolean isCapabilityChanged = false; + if (rmNode.getTotalCapability() != newNode.getTotalCapability()) { + rmNode.totalCapability = newNode.getTotalCapability(); + isCapabilityChanged = true; + } - if (rmNode.getHttpPort() == newNode.getHttpPort()) { - // Reset heartbeat ID since node just restarted. - rmNode.getLastNodeHeartBeatResponse().setResponseId(0); - if (rmNode.getState().equals(NodeState.RUNNING)) { - // Only add new node if old state is RUNNING - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeAddedSchedulerEvent(newNode)); - } - } else { - // Reconnected node differs, so replace old node and start new node - switch (rmNode.getState()) { - case RUNNING: - ClusterMetrics.getMetrics().decrNumActiveNodes(); - break; - case UNHEALTHY: - ClusterMetrics.getMetrics().decrNumUnhealthyNMs(); - break; - default: - LOG.debug("Unexpected Rmnode state"); - } - rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); - rmNode.context.getDispatcher().getEventHandler().handle( - new RMNodeStartedEvent(newNode.getNodeID(), null, null)); - } - } else { - rmNode.httpPort = newNode.getHttpPort(); - rmNode.httpAddress = newNode.getHttpAddress(); - boolean isCapabilityChanged = false; - if (rmNode.getTotalCapability() != newNode.getTotalCapability()) { - rmNode.totalCapability = newNode.getTotalCapability(); - isCapabilityChanged = true; - } - - handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode); + handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode); - // Reset heartbeat ID since node just restarted. - rmNode.getLastNodeHeartBeatResponse().setResponseId(0); + // Reset heartbeat ID since node just restarted. + rmNode.getLastNodeHeartBeatResponse().setResponseId(0); + if (runningApps != null) { for (ApplicationId appId : reconnectEvent.getRunningApplications()) { handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); } + } - if (isCapabilityChanged - && rmNode.getState().equals(NodeState.RUNNING)) { - // Update scheduler node's capacity for reconnect node. - rmNode.context - .getDispatcher() - .getEventHandler() - .handle( - new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption - .newInstance(newNode.getTotalCapability(), -1))); - } + if (isCapabilityChanged && rmNode.getState().equals(NodeState.RUNNING)) { + // Update scheduler node's capacity for reconnect node. + rmNode.context + .getDispatcher() + .getEventHandler() + .handle( + new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption + .newInstance(newNode.getTotalCapability(), -1))); } } @@ -631,8 +597,10 @@ private void handleNMContainerStatus( List nmContainerStatuses, RMNodeImpl rmnode) { List containerStatuses = new ArrayList(); - for (NMContainerStatus nmContainerStatus : nmContainerStatuses) { - containerStatuses.add(createContainerStatus(nmContainerStatus)); + if (nmContainerStatuses != null) { + for (NMContainerStatus nmContainerStatus : nmContainerStatuses) { + containerStatuses.add(createContainerStatus(nmContainerStatus)); + } } rmnode.handleContainerStatus(containerStatuses); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 18d7df4..e46d7c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -1049,6 +1049,10 @@ protected Dispatcher createDispatcher() { Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 10240, metrics.getAvailableMB()); + Assert.assertEquals(10240, + rm.getRMContext().getRMNodes().get(nm1.getNodeId()) + .getTotalCapability().getMemory()); + // reconnect of node with changed capability and running applications List runningApps = new ArrayList(); runningApps.add(ApplicationId.newInstance(1, 0)); -- 1.9.2.msysgit.0