diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 146b0d3..fcd590f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -109,6 +109,8 @@ private final String nodeAddress; // The containerManager address private String httpAddress; private volatile Resource totalCapability; + // Preserves the original total capability + private volatile Resource originalTotalCapability; private final Node node; private String healthReport; @@ -239,6 +241,9 @@ RMNodeEventType.GRACEFUL_DECOMMISSION, new DecommissioningNodeTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING)) + .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, + RMNodeEventType.RESOURCE_UPDATE, + new UpdateNodeResourceWhenRunningTransition()) .addTransition(NodeState.DECOMMISSIONING, NodeState.LOST, RMNodeEventType.EXPIRE, new DeactivateNodeTransition(NodeState.LOST)) @@ -249,10 +254,8 @@ .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) - // TODO (in YARN-3223) update resource when container finished. .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) - // TODO (in YARN-3223) update resource when container finished. .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, new AddContainersToBeRemovedFromNMTransition()) @@ -1017,7 +1020,9 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { if (initState.equals(NodeState.UNHEALTHY)) { rmNode.updateMetricsForGracefulDecommissionOnUnhealthyNode(); } - // TODO (in YARN-3223) Keep NM's available resource to be 0 + if (rmNode.originalTotalCapability == null) { + rmNode.originalTotalCapability = rmNode.totalCapability; + } } } @@ -1031,9 +1036,16 @@ public RecommissionNodeTransition(NodeState finalState) { @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + if (rmNode.originalTotalCapability != null){ + rmNode.totalCapability = rmNode.originalTotalCapability; + rmNode.originalTotalCapability = null; + } LOG.info("Node " + rmNode.nodeId + " in DECOMMISSIONING is " + - "recommissioned back to RUNNING."); - // TODO handle NM resource resume in YARN-3223. + "recommissioned back to RUNNING with " + rmNode.totalCapability); + //update the scheduler with the restored original total capability + rmNode.context.getDispatcher().getEventHandler().handle( + new NodeResourceUpdateSchedulerEvent(rmNode, + ResourceOption.newInstance(rmNode.totalCapability, 0))); } } @@ -1089,8 +1101,6 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { return NodeState.DECOMMISSIONED; } - // TODO (in YARN-3223) if node in decommissioning, get node resource - // updated if container get finished (keep available resource to be 0) } rmNode.handleContainerStatus(statusEvent.getContainers()); @@ -1318,4 +1328,8 @@ private void handleLogAggregationStatus( writeLock.unlock(); } } + + public Resource getOriginalTotalCapability() { + return this.originalTotalCapability; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 782ed03..e6e2e5a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -89,6 +90,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -1059,8 +1061,17 @@ private synchronized void nodeUpdate(RMNode nm) { } } + if (nm.getState() == NodeState.DECOMMISSIONING) { + this.rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption + .newInstance(getSchedulerNode(nm.getNodeID()) + .getUsedResource(), 0))); + } schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime, - releaseResources); + releaseResources); schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); // Updating node resource utilization diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 33a077d..727c7e5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -31,7 +31,6 @@ import java.util.List; import java.util.Random; -import org.apache.hadoop.net.Node; import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -48,7 +47,6 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; @@ -914,4 +912,38 @@ public void testContainerExpire() throws Exception { verify(mockExpirer).unregister(containerId1); verify(mockExpirer).unregister(containerId2); } + + @Test + public void testResourceUpdateOnDecommissioningNode() { + RMNodeImpl node = getDecommissioningNode(); + Resource oldCapacity = node.getTotalCapability(); + assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096); + assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4); + node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(), + ResourceOption.newInstance(Resource.newInstance(2048, 2), + ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT))); + Resource originalCapacity = node.getOriginalTotalCapability(); + assertEquals("Memory resource is not match.", originalCapacity.getMemory(), oldCapacity.getMemory()); + assertEquals("CPU resource is not match.", originalCapacity.getVirtualCores(), oldCapacity.getVirtualCores()); + Resource newCapacity = node.getTotalCapability(); + assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048); + assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2); + + Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState()); + Assert.assertNotNull(nodesListManagerEvent); + Assert.assertEquals(NodesListManagerEventType.NODE_USABLE, + nodesListManagerEvent.getType()); + } + + @Test + public void testResourceUpdateOnRecommissioningNode() { + RMNodeImpl node = getDecommissioningNode(); + Resource oldCapacity = node.getTotalCapability(); + assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096); + assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4); + node.handle(new RMNodeEvent(node.getNodeID(), + RMNodeEventType.RECOMMISSION)); + Resource originalCapacity = node.getOriginalTotalCapability(); + assertEquals("Original total capability not null", null, originalCapacity); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 7c95cdc..bd47eaf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; @@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -115,6 +117,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -150,11 +153,11 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; -import org.mockito.Mockito; public class TestCapacityScheduler { private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class); @@ -3360,4 +3363,89 @@ public void testNodemanagerReconnect() throws Exception { resourceManager.getResourceScheduler().getClusterResource()); privateResourceTrackerService.stop(); } + + @Test + public void testContainerUpdateDecommissioningNode() throws Exception { + // Mock the RMNodeResourceUpdate event handler to update SchedulerNode + // to have 0 available resource + RMContext spyContext = Mockito.spy(resourceManager.getRMContext()); + Dispatcher mockDispatcher = mock(AsyncDispatcher.class); + when(mockDispatcher.getEventHandler()).thenReturn(new EventHandler() { + @Override + public void handle(Event event) { + if (event instanceof RMNodeResourceUpdateEvent) { + RMNodeResourceUpdateEvent resourceEvent = + (RMNodeResourceUpdateEvent) event; + resourceManager + .getResourceScheduler() + .getSchedulerNode(resourceEvent.getNodeId()) + .setTotalResource(resourceEvent.getResourceOption().getResource()); + } + } + }); + Mockito.doReturn(mockDispatcher).when(spyContext).getDispatcher(); + ((CapacityScheduler) resourceManager.getResourceScheduler()) + .setRMContext(spyContext); + ((AsyncDispatcher) mockDispatcher).start(); + // Register node + String host_0 = "host_0"; + org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(8 * GB, 4)); + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + + // Submit an application + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); + + application_0.addNodeManager(host_0, 1234, nm_0); + + Resource capability_0_0 = Resources.createResource(1 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_0); + + Task task_0_0 = + new Task(application_0, priority_0, new String[] { host_0 }); + application_0.addTask(task_0_0); + + // Send resource requests to the scheduler + application_0.schedule(); + + nodeUpdate(nm_0); + // Kick off another heartbeat with the node state mocked to decommissioning + // This should update the schedulernodes to have 0 available resource + RMNode spyNode = + Mockito.spy(resourceManager.getRMContext().getRMNodes() + .get(nm_0.getNodeId())); + when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING); + resourceManager.getResourceScheduler().handle( + new NodeUpdateSchedulerEvent(spyNode)); + + // Get allocations from the scheduler + application_0.schedule(); + + // Check the used resource is 1 GB 1 core + Assert.assertEquals(1 * GB, nm_0.getUsed().getMemory()); + Resource usedResource = + resourceManager.getResourceScheduler() + .getSchedulerNode(nm_0.getNodeId()).getUsedResource(); + Assert.assertEquals(usedResource.getMemory(), 1 * GB); + Assert.assertEquals(usedResource.getVirtualCores(), 1); + // Check total resource of scheduler node is also changed to 1 GB 1 core + Resource totalResource = + resourceManager.getResourceScheduler() + .getSchedulerNode(nm_0.getNodeId()).getTotalResource(); + Assert.assertEquals(totalResource.getMemory(), 1 * GB); + Assert.assertEquals(totalResource.getVirtualCores(), 1); + // Check the available resource is 0/0 + Resource availableResource = + resourceManager.getResourceScheduler() + .getSchedulerNode(nm_0.getNodeId()).getAvailableResource(); + Assert.assertEquals(availableResource.getMemory(), 0); + Assert.assertEquals(availableResource.getVirtualCores(), 0); + + } } -- 1.9.5.github.0