diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index dae2ce7..643fb8d 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -188,6 +188,12 @@ public void updateNodeHeartbeatResponseForContainersDecreasing( // TODO Auto-generated method stub return null; } + + @Override + public void setTotalResource(Resource totalResource) { + // TODO Auto-generated method stub + + } } public static RMNode newNodeInfo(String rackName, String hostName, diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 8c65ccc..bdd9b2e 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -176,4 +176,10 @@ public void updateNodeHeartbeatResponseForContainersDecreasing( // TODO Auto-generated method stub return null; } + + @Override + public void setTotalResource(Resource totalResource) { + // TODO Auto-generated method stub + + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index f28422a..4f2cc9a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -155,4 +155,10 @@ public void updateNodeHeartbeatResponseForContainersDecreasing( NodeHeartbeatResponse response); public List pullNewlyIncreasedContainers(); + + /** + * Set total resource for the node. + * @param totalResource {@link Resource} New total resource + */ + void setTotalResource(Resource totalResource); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index e0d27d6..85d3b5d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -671,7 +671,16 @@ private static void updateNodeResourceFromEvent(RMNodeImpl rmNode, RMNodeResourceUpdateEvent event){ ResourceOption resourceOption = event.getResourceOption(); // Set resource on RMNode - rmNode.totalCapability = resourceOption.getResource(); + rmNode.setTotalResource(resourceOption.getResource()); + } + + /** + * Set the total resource of the node. When node goes to decommissioning, + * scheduler calls this to update the RMNode + * @param newResource + */ + public void setTotalResource(Resource newResource){ + this.totalCapability = newResource; } public static class AddNodeTransition implements @@ -969,6 +978,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.updateMetricsForGracefulDecommissionOnUnhealthyNode(); } // TODO (in YARN-3223) Keep NM's available resource to be 0 + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index abd72bf..a7776ad 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; @@ -665,35 +666,56 @@ public synchronized void updateNodeResource(RMNode nm, SchedulerNode node = getSchedulerNode(nm.getNodeID()); Resource newResource = resourceOption.getResource(); Resource oldResource = node.getTotalResource(); - if(!oldResource.equals(newResource)) { - // Notify NodeLabelsManager about this change - rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(), - newResource); - - // Log resource change - LOG.info("Update resource on node: " + node.getNodeName() - + " from: " + oldResource + ", to: " - + newResource); - - nodes.remove(nm.getNodeID()); - updateMaximumAllocation(node, false); - - // update resource to node - node.setTotalResource(newResource); - - nodes.put(nm.getNodeID(), (N)node); - updateMaximumAllocation(node, true); - - // update resource to clusterResource - Resources.subtractFrom(clusterResource, oldResource); - Resources.addTo(clusterResource, newResource); + if(nm.getState() == NodeState.DECOMMISSIONING || + !oldResource.equals(newResource)) { + updateClusterResources(nm, node, oldResource, newResource); } else { - // Log resource change - LOG.warn("Update resource on node: " + node.getNodeName() - + " with the same resource: " + newResource); + //when recommissioning, the totalresource is changed back to original + if (node.setResourcesOnRecommission()) { + updateClusterResources(nm, node, oldResource, node.getTotalResource()); + } else { + // Log resource change + LOG.warn("Update resource on node: " + node.getNodeName() + + " with the same resource: " + newResource); + } } } + /** + * Update the cluster resource based on a particular node's changes. + */ + private synchronized void updateClusterResources(RMNode nm, + SchedulerNode node, Resource oldResource, Resource newResource) { + // Notify NodeLabelsManager about this change + rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(), + newResource); + + // Log resource change + LOG.info("Update resource on node: " + node.getNodeName() + + " from: " + oldResource + ", to: " + + newResource); + + nodes.remove(nm.getNodeID()); + updateMaximumAllocation(node, false); + + // if node is decommissioning, preserve original total resource + if (nm.getState() == NodeState.DECOMMISSIONING){ + node.setOriginalTotalResource(oldResource); + } + // update resource to node + node.setTotalResource(newResource); + + // Update RMNode + nm.setTotalResource(newResource); + + nodes.put(nm.getNodeID(), (N)node); + updateMaximumAllocation(node, true); + + // update resource to clusterResource + Resources.subtractFrom(clusterResource, oldResource); + Resources.addTo(clusterResource, newResource); + } + /** {@inheritDoc} */ @Override public EnumSet getSchedulingResourceTypes() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index f3d3906..2f6d115 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -56,6 +56,12 @@ private Resource availableResource = Resource.newInstance(0, 0); private Resource usedResource = Resource.newInstance(0, 0); private Resource totalResourceCapability; + /** + * This is to keep track of the node's original total capability. When a + * is decommissioning, it's totalResource capability becomes equal to the + * usedResource to set its availableResource to 0 + */ + private Resource originalTotalResourceCapability; private RMContainer reservedContainer; private volatile int numContainers; @@ -339,4 +345,41 @@ public String getPartition() { return this.labels.iterator().next(); } } + + /** + * If the node is decommissioning, preserve the original totalResource so that + * if the node is recommissioned, the total resources will be restored. This + * is a no-op if it's been called once. + */ + public synchronized void setOriginalTotalResource( + Resource originalTotalResource){ + if (this.originalTotalResourceCapability == null) { + this.originalTotalResourceCapability = originalTotalResource; + LOG.info("setOriginalTotalResource for " + this.rmNode.getNodeID() + + ": " + originalTotalResource); + } + } + + public synchronized Resource getOriginalTotalResource(){ + return this.originalTotalResourceCapability; + } + /** + * If a decommissioning node is recommissioned, its original total resources + * is restored. This is a no-op if the node was not decommissioning. + * @return True if the node was recommissioned, false otherwise. + */ + public synchronized boolean setResourcesOnRecommission(){ + // Do nothing if the node was not previously decommissioning + if (this.originalTotalResourceCapability != null){ + this.totalResourceCapability = this.originalTotalResourceCapability; + this.originalTotalResourceCapability = null; + this.availableResource = Resources.subtract(this.totalResourceCapability, + this.usedResource); + LOG.info("Recommission resources for " + this.rmNode.getNodeID() + + ", totalResource: " + this.totalResourceCapability + + ", availableResource: " + this.availableResource); + return true; + } + return false; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 1075ee0..d4d485e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -1018,7 +1019,7 @@ private synchronized void nodeUpdate(RMNode nm) { Resource releaseResources = Resource.newInstance(0, 0); FiCaSchedulerNode node = getNode(nm.getNodeID()); - + List containerInfoList = nm.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList(); List completedContainers = new ArrayList(); @@ -1058,6 +1059,11 @@ private synchronized void nodeUpdate(RMNode nm) { } } } + // Update the total capability of the node + ResourceOption newResource = (nm.getState() == NodeState.DECOMMISSIONING) ? + ResourceOption.newInstance(node.getUsedResource(), 0) : + ResourceOption.newInstance(node.getTotalResource(), 0); + this.updateNodeAndQueueResource(nm, newResource); schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime, releaseResources); @@ -1069,7 +1075,7 @@ private synchronized void nodeUpdate(RMNode nm) { + " availableResource: " + node.getAvailableResource()); } } - + /** * Process resource update on a node. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 92f3edf..c6cafd1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -235,6 +235,11 @@ public long getLastHealthReportTime() { } @Override + public void setTotalResource(Resource totalResource){ + this.perNode = totalResource; + } + + @Override public void updateNodeHeartbeatResponseForContainersDecreasing( NodeHeartbeatResponse response) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index 7c33f78..7a50225 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.spy; import java.io.IOException; import java.util.ArrayList; @@ -33,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -49,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.util.resource.Resources; @@ -358,6 +361,51 @@ public void testMaxAllocationAfterUpdateNodeResource() throws IOException { } } + @Test + public void testDecommissioningNodeUpdateNodeResource() throws IOException { + configureScheduler(); + YarnConfiguration conf = getConf(); + + MockRM rm = new MockRM(conf); + try { + rm.start(); + AbstractYarnScheduler scheduler = (AbstractYarnScheduler) rm + .getResourceScheduler(); + scheduler.nodes = new HashMap(); + Resource originalTotalResource = Resource.newInstance(8196, 8); + RMNode rmNode1 = MockNodes.nodeInfo(0, originalTotalResource, + NodeState.DECOMMISSIONING); + NodeId nodeId1 = rmNode1.getNodeID(); + SchedulerNode schNode1 = new FiCaSchedulerNode(rmNode1, false); + scheduler.nodes.put(nodeId1, schNode1); + Resource newTotalResource = Resource.newInstance(1024, 2); + scheduler.updateNodeResource(rmNode1, + ResourceOption.newInstance(newTotalResource, 0)); + //Decommissioning saves the original total resource + Assert.assertEquals(schNode1.getOriginalTotalResource().getMemory(), + originalTotalResource.getMemory()); + Assert.assertEquals(schNode1.getOriginalTotalResource().getVirtualCores(), + originalTotalResource.getVirtualCores()); + Assert.assertEquals(schNode1.getTotalResource().getMemory(), + newTotalResource.getMemory()); + Assert.assertEquals(schNode1.getTotalResource().getVirtualCores(), + newTotalResource.getVirtualCores()); + //recommission the node with a different state + RMNode rmNodeSpy = spy(rmNode1); + when(rmNodeSpy.getState()).thenReturn(NodeState.RUNNING); + scheduler.updateNodeResource(rmNodeSpy, + ResourceOption.newInstance(newTotalResource, 0)); + Assert.assertEquals(schNode1.getOriginalTotalResource(), + null); + Assert.assertEquals(schNode1.getTotalResource().getMemory(), + originalTotalResource.getMemory()); + Assert.assertEquals(schNode1.getTotalResource().getVirtualCores(), + originalTotalResource.getVirtualCores()); + } finally { + rm.stop(); + } + } + /* * This test case is to test the pending containers are cleared from the * attempt even if one of the application in the list have current attempt as