From b2741c4cbf72d1558f41e4b239865ff555b5cc67 Mon Sep 17 00:00:00 2001 From: Garry Weng Date: Tue, 17 Nov 2015 13:32:04 +0800 Subject: [PATCH] Fix: Total resource count mistake:NodeRemovedSchedulerEvent in ReconnectNodeTransition will reduce the newNode.getTotalCapability() --- .../server/resourcemanager/rmnode/RMNodeImpl.java | 82 ++++++---------------- 1 file changed, 20 insertions(+), 62 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index e0d27d6..e7ae920 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -725,77 +725,35 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { RMNode newNode = reconnectEvent.getReconnectedNode(); rmNode.nodeManagerVersion = newNode.getNodeManagerVersion(); List runningApps = reconnectEvent.getRunningApplications(); - boolean noRunningApps = + boolean noRunningApps = (runningApps == null) || (runningApps.size() == 0); - - // No application running on the node, so send node-removal event with - // cleaning up old container info. - if (noRunningApps) { - if (rmNode.getState() == NodeState.DECOMMISSIONING) { - // When node in decommissioning, and no running apps on this node, - // it will return as decommissioned state. - deactivateNode(rmNode, NodeState.DECOMMISSIONED); - return NodeState.DECOMMISSIONED; - } - rmNode.nodeUpdateQueue.clear(); - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeRemovedSchedulerEvent(rmNode)); - if (rmNode.getHttpPort() == newNode.getHttpPort()) { - if (!rmNode.getTotalCapability().equals( - newNode.getTotalCapability())) { - rmNode.totalCapability = newNode.getTotalCapability(); - } - if (rmNode.getState().equals(NodeState.RUNNING)) { - // Only add old node if old state is RUNNING - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeAddedSchedulerEvent(rmNode)); - } - } else { - // Reconnected node differs, so replace old node and start new node - switch (rmNode.getState()) { - case RUNNING: - ClusterMetrics.getMetrics().decrNumActiveNodes(); - break; - case UNHEALTHY: - ClusterMetrics.getMetrics().decrNumUnhealthyNMs(); - break; - default: - LOG.debug("Unexpected Rmnode state"); - } - rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); - rmNode.context.getDispatcher().getEventHandler().handle( - new RMNodeStartedEvent(newNode.getNodeID(), null, null)); - } + rmNode.httpPort = newNode.getHttpPort(); + rmNode.httpAddress = newNode.getHttpAddress(); + boolean isCapabilityChanged = false; + if (!rmNode.getTotalCapability().equals(newNode.getTotalCapability())) { + rmNode.totalCapability = newNode.getTotalCapability(); + isCapabilityChanged = true; + } - } else { - rmNode.httpPort = newNode.getHttpPort(); - rmNode.httpAddress = newNode.getHttpAddress(); - boolean isCapabilityChanged = false; - if (!rmNode.getTotalCapability().equals( - newNode.getTotalCapability())) { - rmNode.totalCapability = newNode.getTotalCapability(); - isCapabilityChanged = true; - } - - handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode); + handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode); + if (!noRunningApps) { for (ApplicationId appId : reconnectEvent.getRunningApplications()) { handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); } + } - if (isCapabilityChanged - && rmNode.getState().equals(NodeState.RUNNING)) { - // Update scheduler node's capacity for reconnect node. - rmNode.context - .getDispatcher() - .getEventHandler() - .handle( - new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption - .newInstance(newNode.getTotalCapability(), -1))); - } - + if (isCapabilityChanged && rmNode.getState().equals(NodeState.RUNNING)) { + // Update scheduler node's capacity for reconnect node. + rmNode.context + .getDispatcher() + .getEventHandler() + .handle( + new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption + .newInstance(newNode.getTotalCapability(), -1))); } + return rmNode.getState(); } -- 1.9.4.msysgit.0