diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/NodeUpdateSchedulerEventWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/NodeUpdateSchedulerEventWrapper.java index 12dfe8b..7e87b64 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/NodeUpdateSchedulerEventWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/NodeUpdateSchedulerEventWrapper.java @@ -28,7 +28,7 @@ public class NodeUpdateSchedulerEventWrapper extends NodeUpdateSchedulerEvent { public NodeUpdateSchedulerEventWrapper(NodeUpdateSchedulerEvent event) { - super(new RMNodeWrapper(event.getRMNode())); + super(new RMNodeWrapper(event.getRMNode()), false); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index e0d27d6..e699c38 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -1055,7 +1055,7 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { if(rmNode.nextHeartBeat) { rmNode.nextHeartBeat = false; rmNode.context.getDispatcher().getEventHandler().handle( - new NodeUpdateSchedulerEvent(rmNode)); + new NodeUpdateSchedulerEvent(rmNode, false)); } // Update DTRenewer in secure mode to keep these apps alive. Today this is diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index f3d3906..0ff8723 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -56,6 +56,7 @@ private Resource availableResource = Resource.newInstance(0, 0); private Resource usedResource = Resource.newInstance(0, 0); private Resource totalResourceCapability; + private Resource originalUsedResource; private RMContainer reservedContainer; private volatile int numContainers; @@ -158,6 +159,21 @@ public synchronized void allocateContainer(RMContainer rmContainer) { + " available after allocation"); } + /** + * Set the node resources to have 0 available memory. + */ + public synchronized void setDecommissioningResources() { + if (originalUsedResource == null) { + this.originalUsedResource = Resources.clone(this.usedResource); + } + // Set availableResource = 0 + deductAvailableResource(Resources.clone(this.availableResource)); + LOG.info("Decommissioning node originalUsedResource: " + + originalUsedResource + + ", usedResource: " + usedResource + + ", availableResource: " + availableResource); + } + private synchronized void changeContainerResource(ContainerId containerId, Resource deltaResource, boolean increase) { if (increase) { @@ -174,7 +190,7 @@ private synchronized void changeContainerResource(ContainerId containerId, } /** - * The Scheduler increased container + * The Scheduler increased container. */ public synchronized void increaseContainer(ContainerId containerId, Resource deltaResource) { @@ -182,7 +198,7 @@ public synchronized void increaseContainer(ContainerId containerId, } /** - * The Scheduler decreased container + * The Scheduler decreased container. */ public synchronized void decreaseContainer(ContainerId containerId, Resource deltaResource) { @@ -229,6 +245,36 @@ private synchronized void updateResource(Container container) { } /** + * This will be called only if the node is decommissioning + * @param container + */ + private synchronized void updateResourceDecommissioning( + Container container) { + if (container.getResource() == null) { + LOG.error("Invalid resource addition of null resource for " + + rmNode.getNodeAddress()); + return; + } + Resources.subtractFrom(originalUsedResource, container.getResource()); + LOG.info("updateResourceDecommissioning updated originalUsedResource: " + + originalUsedResource); + } + + /** + * If the node was previously decommissioning, on recommission + * restore the original used resource. + */ + public synchronized void recommissionResourcesIfNeeded() { + if (this.originalUsedResource != null){ + LOG.info("Setting usedResource=" + this.originalUsedResource); + this.usedResource = this.originalUsedResource; + this.availableResource = Resources.subtract( + this.totalResourceCapability, usedResource); + this.originalUsedResource = null; + } + } + + /** * Release an allocated container on this node. * * @param container @@ -242,7 +288,12 @@ public synchronized void releaseContainer(Container container) { /* remove the containers from the nodemanger */ if (null != launchedContainers.remove(container.getId())) { - updateResource(container); + //If the originalUsedResources is set, the node is decommissioning + if (this.originalUsedResource == null){ + updateResource(container); + } else { + updateResourceDecommissioning(container); + } } LOG.info("Released container " + container.getId() + " of capacity " diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6e356b5..7e20053 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1010,7 +1010,8 @@ public QueueInfo getQueueInfo(String queueName, return root.getQueueUserAclInfo(user); } - private synchronized void nodeUpdate(RMNode nm) { + private synchronized void nodeUpdate(RMNode nm, + boolean isNodeDecommissioning) { if (LOG.isDebugEnabled()) { LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource); } @@ -1018,7 +1019,12 @@ private synchronized void nodeUpdate(RMNode nm) { Resource releaseResources = Resource.newInstance(0, 0); FiCaSchedulerNode node = getNode(nm.getNodeID()); - + if (isNodeDecommissioning) { + node.setDecommissioningResources(); + } else { + node.recommissionResourcesIfNeeded(); + } + List containerInfoList = nm.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList(); List completedContainers = new ArrayList(); @@ -1311,8 +1317,8 @@ public void handle(SchedulerEvent event) { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; RMNode node = nodeUpdatedEvent.getRMNode(); setLastNodeUpdateTime(Time.now()); - nodeUpdate(node); - if (!scheduleAsynchronously) { + nodeUpdate(node, nodeUpdatedEvent.getIsDecommissioning()); + if (!scheduleAsynchronously && !nodeUpdatedEvent.getIsDecommissioning()) { allocateContainersToNode(getNode(node.getNodeID())); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java index 7a8686c..1fbf45b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java @@ -23,13 +23,19 @@ public class NodeUpdateSchedulerEvent extends SchedulerEvent { private final RMNode rmNode; + private final boolean isDecommissioning; - public NodeUpdateSchedulerEvent(RMNode rmNode) { + public NodeUpdateSchedulerEvent(RMNode rmNode, boolean isDecommissioning) { super(SchedulerEventType.NODE_UPDATE); this.rmNode = rmNode; + this.isDecommissioning = isDecommissioning; } public RMNode getRMNode() { return rmNode; } + + public boolean getIsDecommissioning(){ + return isDecommissioning; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java index 9ceeffb..163a788 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java @@ -183,7 +183,7 @@ private void nodeUpdate( org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1) { RMNode node = resourceManager.getRMContext().getRMNodes().get(nm1.getNodeId()); // Send a heartbeat to kick the tires on the Scheduler - NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node, false); resourceManager.getResourceScheduler().handle(nodeUpdate); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java index df35485..3964e9f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java @@ -186,7 +186,7 @@ private void nodeUpdate(NodeManager nm) { RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); // Send a heartbeat to kick the tires on the Scheduler - NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node, false); resourceManager.getResourceScheduler().handle(nodeUpdate); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index f0a1d03..19e292c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -373,7 +373,7 @@ private void nodeUpdate( org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm) { RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); // Send a heartbeat to kick the tires on the Scheduler - NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node, false); resourceManager.getResourceScheduler().handle(nodeUpdate); } @@ -3026,8 +3026,8 @@ public void testHeadRoomCalculationWithDRC() throws Exception { fiCaApp1.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 10*GB, 1, true, u0Priority, recordFactory))); - cs.handle(new NodeUpdateSchedulerEvent(node)); - cs.handle(new NodeUpdateSchedulerEvent(node2)); + cs.handle(new NodeUpdateSchedulerEvent(node, false)); + cs.handle(new NodeUpdateSchedulerEvent(node2, false)); assertEquals(6*GB, fiCaApp1.getHeadroom().getMemory()); assertEquals(15, fiCaApp1.getHeadroom().getVirtualCores()); @@ -3035,8 +3035,8 @@ public void testHeadRoomCalculationWithDRC() throws Exception { fiCaApp2.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, u0Priority, recordFactory))); - cs.handle(new NodeUpdateSchedulerEvent(node)); - cs.handle(new NodeUpdateSchedulerEvent(node2)); + cs.handle(new NodeUpdateSchedulerEvent(node, false)); + cs.handle(new NodeUpdateSchedulerEvent(node2, false)); assertEquals(9*GB, fiCaApp2.getHeadroom().getMemory()); assertEquals(15, fiCaApp2.getHeadroom().getVirtualCores()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index b5b2222..2d24352 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -371,9 +371,9 @@ public void testExcessReservationWillBeUnreserved() throws Exception { // Do node heartbeats 2 times // First time will allocate container for app1, second time will reserve // container for app2 - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); + // App2 will get preference to be allocated on node1, and node1 will be all // used by App2. FiCaSchedulerApp schedulerApp1 = @@ -396,8 +396,8 @@ public void testExcessReservationWillBeUnreserved() throws Exception { // Cancel asks of app2 and re-kick RM am2.allocate("*", 4 * GB, 0, new ArrayList()); - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); + // App2's reservation will be cancelled Assert.assertTrue(schedulerApp2.getReservedContainers().size() == 0); Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId()) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java index 672af64..76d17fb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -109,7 +109,7 @@ public RMNodeLabelsManager createNodeLabelManager() { // NM1 do 1 heartbeats CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); // Pending resource should be deducted checkPendingResource(rm1, "default", 0 * GB, null); @@ -232,7 +232,7 @@ public RMNodeLabelsManager createNodeLabelManager() { // NM1 do 1 heartbeats CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1); @@ -257,7 +257,7 @@ public RMNodeLabelsManager createNodeLabelManager() { // Complete one container and do another allocation am1.allocate(null, Arrays.asList(containerId2)); - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); // Now container should be increased verifyContainerIncreased(am1.allocate(null, null), containerId1, 7 * GB); @@ -332,7 +332,7 @@ public RMNodeLabelsManager createNodeLabelManager() { // NM1 do 1 heartbeats CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId2); @@ -411,7 +411,7 @@ public RMNodeLabelsManager createNodeLabelManager() { // NM1 do 1 heartbeats CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1); @@ -444,8 +444,8 @@ public RMNodeLabelsManager createNodeLabelManager() { .newInstance(containerId1, Resources.createResource(1 * GB))), null); // Trigger a node heartbeat.. - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); + /* Check statuses after reservation satisfied */ // Increase request should be unreserved Assert.assertTrue(app.getReservedContainers().isEmpty()); @@ -522,7 +522,7 @@ public RMNodeLabelsManager createNodeLabelManager() { // NM1 do 1 heartbeats CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1); @@ -553,8 +553,8 @@ public RMNodeLabelsManager createNodeLabelManager() { ContainerResourceChangeRequest .newInstance(containerId1, Resources.createResource(1 * GB)))); // Trigger a node heartbeat.. - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); + /* Check statuses after reservation satisfied */ // Increase request should be unreserved Assert.assertTrue(app.getReservedContainers().isEmpty()); @@ -629,7 +629,7 @@ public RMNodeLabelsManager createNodeLabelManager() { // NM1 do 1 heartbeats CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); RMContainer rmContainer2 = app.getLiveContainersMap().get(containerId2); @@ -726,7 +726,7 @@ public RMNodeLabelsManager createNodeLabelManager() { // NM1 do 1 heartbeats CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); RMContainer rmContainer2 = app.getLiveContainersMap().get(containerId2); @@ -851,7 +851,7 @@ public RMNodeLabelsManager createNodeLabelManager() { // assignContainer, container-4/5/2 increased (which has highest priority OR // earlier allocated) - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); AllocateResponse allocateResponse = am1.allocate(null, null); Assert.assertEquals(3, allocateResponse.getIncreasedContainers().size()); verifyContainerIncreased(allocateResponse, @@ -931,7 +931,7 @@ public RMNodeLabelsManager createNodeLabelManager() { // assignContainer, container-4/5/2 increased (which has highest priority OR // earlier allocated) - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); AllocateResponse allocateResponse = am1.allocate(null, null); Assert.assertEquals(3, allocateResponse.getIncreasedContainers().size()); verifyContainerIncreased(allocateResponse, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 91666df..c5e2353 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -630,8 +630,8 @@ public RMNodeLabelsManager createNodeLabelManager() { // Do node heartbeats many times for (int i = 0; i < 50; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2, false)); } // App2 will get preference to be allocated on node1, and node1 will be all @@ -894,7 +894,7 @@ public RMNodeLabelsManager createNodeLabelManager() { CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); for (int i = 0; i < 15; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); } // NM1 get 15 new containers (total is 18, 15 task containers and 3 AM @@ -985,8 +985,8 @@ public RMNodeLabelsManager createNodeLabelManager() { // partitioned node int cycleWaited = 0; for (int i = 0; i < 50; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2, false)); if (schedulerNode1.getNumContainers() == 0) { cycleWaited++; } @@ -1038,7 +1038,7 @@ public RMNodeLabelsManager createNodeLabelManager() { // Heartbeat for many times, app1 should get nothing for (int i = 0; i < 50; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); } Assert.assertEquals(0, cs.getSchedulerNode(nm1.getNodeId()) @@ -1119,7 +1119,7 @@ public RMNodeLabelsManager createNodeLabelManager() { SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); for (int i = 0; i < 50; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); } // app1 gets all resource in partition=x @@ -1158,7 +1158,7 @@ private void doNMHeartbeat(MockRM rm, NodeId nodeId, int nHeartbeat) { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nodeId); for (int i = 0; i < nHeartbeat; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1, false)); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java index 7ca9606..bef4d7a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java @@ -218,7 +218,7 @@ public void testMoveRunnableApp() throws Exception { ApplicationId appId = appAttId.getApplicationId(); RMNode node = MockNodes.newNodeInfo(1, Resources.createResource(1024)); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node, false); scheduler.handle(nodeEvent); scheduler.handle(updateEvent); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java index 65c80a6..c0654e4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java @@ -85,7 +85,7 @@ public void testSchedulingDelay() throws InterruptedException { 1, Resources.createResource(4096, 4), 1, host); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); - NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(nodeUpdateEvent); // Create one application and submit one each of node-local, rack-local diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 7637410..3743e33 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -131,7 +131,7 @@ public void test() throws Exception { // Queue A wants 3 * 1024. Node update gives this all to A createSchedulingRequest(3 * 1024, "queueA", "user1"); scheduler.update(); - NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(nodeEvent2); // Queue B arrives and wants 1 * 1024 @@ -192,7 +192,7 @@ public void testIsStarvedForFairShare() throws Exception { // Queue A wants 4 * 1024. Node update gives this all to A createSchedulingRequest(1 * 1024, "queueA", "user1", 4); scheduler.update(); - NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1, false); for (int i = 0; i < 4; i ++) { scheduler.handle(nodeEvent2); } @@ -266,7 +266,7 @@ public void testIsStarvedForFairShareDRF() throws Exception { // Queue A wants 7 * 1024, 1. Node update gives this all to A createSchedulingRequest(7 * 1024, 1, "queueA", "user1", 1); scheduler.update(); - NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(nodeEvent2); QueueManager queueMgr = scheduler.getQueueManager(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 4909e09..d0147c0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -557,7 +557,7 @@ public void testQueueInfo() throws IOException { // Each NodeUpdate Event will only assign one container. // To assign two containers, call handle NodeUpdate Event twice. - NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(nodeEvent2); scheduler.handle(nodeEvent2); @@ -656,7 +656,7 @@ public void testSchedulerRootQueueMetrics() throws Exception { // Queue 1 requests full capacity of node createSchedulingRequest(1024, "queue1", "user1", 1); scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(updateEvent); // Now queue 2 requests likewise @@ -670,7 +670,7 @@ public void testSchedulerRootQueueMetrics() throws Exception { // Now another node checks in with capacity RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); - NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2, false); scheduler.handle(nodeEvent2); scheduler.handle(updateEvent2); @@ -706,7 +706,7 @@ public void testSimpleContainerAllocation() throws IOException { scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(updateEvent); // Asked for less than increment allocation. @@ -715,7 +715,7 @@ public void testSimpleContainerAllocation() throws IOException { scheduler.getQueueManager().getQueue("queue1"). getResourceUsage().getMemory()); - NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2, false); scheduler.handle(updateEvent2); assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). @@ -750,8 +750,8 @@ public void testSimpleContainerReservation() throws Exception { // Queue 1 requests full capacity of node createSchedulingRequest(1024, "queue1", "user1", 1); scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); - + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, false); + scheduler.handle(updateEvent); // Make sure queue 1 is allocated app capacity @@ -774,7 +774,7 @@ public void testSimpleContainerReservation() throws Exception { MockNodes .newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2"); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); - NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2, false); scheduler.handle(nodeEvent2); scheduler.handle(updateEvent2); @@ -820,13 +820,13 @@ public void testOffSwitchAppReservationThreshold() throws Exception { // Ensure capacity on all nodes are allocated createSchedulingRequest(2048, "queue1", "user1", 1); scheduler.update(); - scheduler.handle(new NodeUpdateSchedulerEvent(node1)); + scheduler.handle(new NodeUpdateSchedulerEvent(node1, false)); createSchedulingRequest(2048, "queue1", "user1", 1); scheduler.update(); - scheduler.handle(new NodeUpdateSchedulerEvent(node2)); + scheduler.handle(new NodeUpdateSchedulerEvent(node2, false)); createSchedulingRequest(2048, "queue1", "user1", 1); scheduler.update(); - scheduler.handle(new NodeUpdateSchedulerEvent(node3)); + scheduler.handle(new NodeUpdateSchedulerEvent(node3, false)); // Verify capacity allocation assertEquals(6144, scheduler.getQueueManager().getQueue("queue1"). @@ -836,16 +836,16 @@ public void testOffSwitchAppReservationThreshold() throws Exception { // node but would be ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", "user1", 1); scheduler.update(); - scheduler.handle(new NodeUpdateSchedulerEvent(node1)); + scheduler.handle(new NodeUpdateSchedulerEvent(node1, false)); assertEquals(1, scheduler.getSchedulerApp(attId).getNumReservations(null, true)); scheduler.update(); - scheduler.handle(new NodeUpdateSchedulerEvent(node2)); + scheduler.handle(new NodeUpdateSchedulerEvent(node2, false)); assertEquals(2, scheduler.getSchedulerApp(attId).getNumReservations(null, true)); scheduler.update(); - scheduler.handle(new NodeUpdateSchedulerEvent(node3)); + scheduler.handle(new NodeUpdateSchedulerEvent(node3, false)); // No new reservations should happen since it exceeds threshold assertEquals(2, @@ -860,13 +860,13 @@ public void testOffSwitchAppReservationThreshold() throws Exception { // New node satisfies resource request scheduler.update(); - scheduler.handle(new NodeUpdateSchedulerEvent(node4)); + scheduler.handle(new NodeUpdateSchedulerEvent(node4, false)); assertEquals(8192, scheduler.getQueueManager().getQueue("queue1"). getResourceUsage().getMemory()); - scheduler.handle(new NodeUpdateSchedulerEvent(node1)); - scheduler.handle(new NodeUpdateSchedulerEvent(node2)); - scheduler.handle(new NodeUpdateSchedulerEvent(node3)); + scheduler.handle(new NodeUpdateSchedulerEvent(node1, false)); + scheduler.handle(new NodeUpdateSchedulerEvent(node2, false)); + scheduler.handle(new NodeUpdateSchedulerEvent(node3, false)); scheduler.update(); // Verify number of reservations have decremented @@ -910,16 +910,16 @@ public void testRackLocalAppReservationThreshold() throws Exception { // Ensure capacity on all nodes are allocated createSchedulingRequest(2048, "queue1", "user1", 1); scheduler.update(); - scheduler.handle(new NodeUpdateSchedulerEvent(node1)); + scheduler.handle(new NodeUpdateSchedulerEvent(node1, false)); createSchedulingRequest(2048, "queue1", "user1", 1); scheduler.update(); - scheduler.handle(new NodeUpdateSchedulerEvent(node2)); + scheduler.handle(new NodeUpdateSchedulerEvent(node2, false)); createSchedulingRequest(2048, "queue1", "user1", 1); scheduler.update(); - scheduler.handle(new NodeUpdateSchedulerEvent(node3)); + scheduler.handle(new NodeUpdateSchedulerEvent(node3, false)); createSchedulingRequest(2048, "queue1", "user1", 1); scheduler.update(); - scheduler.handle(new NodeUpdateSchedulerEvent(node4)); + scheduler.handle(new NodeUpdateSchedulerEvent(node4, false)); // Verify capacity allocation assertEquals(8192, scheduler.getQueueManager().getQueue("queue1"). @@ -942,16 +942,16 @@ public void testRackLocalAppReservationThreshold() throws Exception { ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", "user1", 1); scheduler.update(); - scheduler.handle(new NodeUpdateSchedulerEvent(node1)); + scheduler.handle(new NodeUpdateSchedulerEvent(node1, false)); assertEquals(1, scheduler.getSchedulerApp(attId).getNumReservations(null, true)); scheduler.update(); - scheduler.handle(new NodeUpdateSchedulerEvent(node2)); + scheduler.handle(new NodeUpdateSchedulerEvent(node2, false)); assertEquals(2, scheduler.getSchedulerApp(attId).getNumReservations(null, true)); scheduler.update(); - scheduler.handle(new NodeUpdateSchedulerEvent(node3)); + scheduler.handle(new NodeUpdateSchedulerEvent(node3, false)); // No new reservations should happen since it exceeds threshold assertEquals(2, @@ -966,14 +966,14 @@ public void testRackLocalAppReservationThreshold() throws Exception { // New node satisfies resource request scheduler.update(); - scheduler.handle(new NodeUpdateSchedulerEvent(node4)); + scheduler.handle(new NodeUpdateSchedulerEvent(node4, false)); assertEquals(10240, scheduler.getQueueManager().getQueue("queue1"). getResourceUsage().getMemory()); - scheduler.handle(new NodeUpdateSchedulerEvent(node1)); - scheduler.handle(new NodeUpdateSchedulerEvent(node2)); - scheduler.handle(new NodeUpdateSchedulerEvent(node3)); - scheduler.handle(new NodeUpdateSchedulerEvent(node4)); + scheduler.handle(new NodeUpdateSchedulerEvent(node1, false)); + scheduler.handle(new NodeUpdateSchedulerEvent(node2, false)); + scheduler.handle(new NodeUpdateSchedulerEvent(node3, false)); + scheduler.handle(new NodeUpdateSchedulerEvent(node4, false)); scheduler.update(); // Verify number of reservations have decremented @@ -1013,7 +1013,7 @@ public void testContainerReservationAttemptExceedingQueueMax() // Queue 1 requests full capacity of the queue createSchedulingRequest(2048, "queue1", "user1", 1); scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(updateEvent); // Make sure queue 1 is allocated app capacity @@ -1070,7 +1070,7 @@ public void testContainerReservationNotExceedingQueueMax() throws Exception { // Queue 1 requests full capacity of the queue createSchedulingRequest(2048, "queue1", "user1", 1); scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(updateEvent); // Make sure queue 1 is allocated app capacity @@ -1163,7 +1163,7 @@ public void testReservationThresholdGatesReservations() throws Exception { // Queue 1 requests full capacity of node createSchedulingRequest(4096, 4, "queue1", "user1", 1, 1); scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(updateEvent); @@ -1198,7 +1198,7 @@ public void testReservationThresholdGatesReservations() throws Exception { MockNodes .newNodeInfo(1, Resources.createResource(1024, 4), 2, "127.0.0.2"); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); - NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2, false); scheduler.handle(nodeEvent2); scheduler.handle(updateEvent2); @@ -1827,8 +1827,8 @@ public void testChoiceOfPreemptedContainers() throws Exception { .setPolicy(SchedulingPolicy.parse("fair")); // Sufficient node check-ins to fully schedule containers - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); - NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1, false); + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2, false); for (int i = 0; i < 4; i++) { scheduler.handle(nodeUpdate1); scheduler.handle(nodeUpdate2); @@ -1964,7 +1964,7 @@ public void testPreemptionIsNotDelayedToNextRound() throws Exception { scheduler.update(); - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1, false); for (int i = 0; i < 8; i++) { scheduler.handle(nodeUpdate1); } @@ -2072,13 +2072,13 @@ public void testPreemptionDecision() throws Exception { // Sufficient node check-ins to fully schedule containers for (int i = 0; i < 2; i++) { - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(nodeUpdate1); - NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2, false); scheduler.handle(nodeUpdate2); - NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3, false); scheduler.handle(nodeUpdate3); } @@ -2206,13 +2206,13 @@ public void testPreemptionDecisionWithDRF() throws Exception { // Sufficient node check-ins to fully schedule containers for (int i = 0; i < 2; i++) { - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(nodeUpdate1); - NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2, false); scheduler.handle(nodeUpdate2); - NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3, false); scheduler.handle(nodeUpdate3); } @@ -2365,7 +2365,7 @@ public void testPreemptionDecisionWithVariousTimeout() throws Exception { scheduler.update(); // Sufficient node check-ins to fully schedule containers - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1, false); for (int i = 0; i < 6; i++) { scheduler.handle(nodeUpdate1); } @@ -2561,7 +2561,7 @@ public void testMultipleContainersWaitingForReservation() throws IOException { // Request full capacity of node createSchedulingRequest(1024, "queue1", "user1", 1); scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(updateEvent); ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue2", "user2", 1); @@ -2607,7 +2607,7 @@ public void testUserMaxRunningApps() throws Exception { "user1", 1); scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(updateEvent); // App 1 should be running @@ -2709,7 +2709,7 @@ private void testIncreaseQueueSettingOnTheFlyInternal(String allocBefore, "user1", 1); scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(updateEvent); // App 1 should be running @@ -2852,7 +2852,7 @@ private void testDecreaseQueueSettingOnTheFlyInternal(String allocBefore, "user1", 1); scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(updateEvent); // App 1 should be running @@ -2956,7 +2956,7 @@ public void testReservationWhileMultiplePriorities() throws IOException { ApplicationAttemptId attId = createSchedulingRequest(1024, 4, "queue1", "user1", 1, 2); scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(updateEvent); FSAppAttempt app = scheduler.getSchedulerApp(attId); @@ -3083,7 +3083,7 @@ public void testMultipleNodesSingleRackRequest() throws Exception { // node 1 checks in scheduler.update(); - NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(updateEvent1); // should assign node local assertEquals(1, scheduler.getSchedulerApp(attemptId).getLiveContainers() @@ -3091,7 +3091,7 @@ public void testMultipleNodesSingleRackRequest() throws Exception { // node 2 checks in scheduler.update(); - NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2, false); scheduler.handle(updateEvent2); // should assign rack local assertEquals(2, scheduler.getSchedulerApp(attemptId).getLiveContainers() @@ -3128,7 +3128,7 @@ public void testFifoWithinQueue() throws Exception { // Because tests set assignmultiple to false, each heartbeat assigns a single // container. - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(updateEvent); assertEquals(1, app1.getLiveContainers().size()); @@ -3154,7 +3154,7 @@ public void testMaxAssign() throws Exception { MockNodes.newNodeInfo(1, Resources.createResource(16384, 16), 0, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node, false); scheduler.handle(nodeEvent); ApplicationAttemptId attId = @@ -3189,7 +3189,7 @@ public void testMaxAssignWithZeroMemoryContainers() throws Exception { MockNodes.newNodeInfo(1, Resources.createResource(16384, 16), 0, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node, false); scheduler.handle(nodeEvent); ApplicationAttemptId attId = @@ -3267,8 +3267,8 @@ public void testAssignContainer() throws Exception { .setPolicy(SchedulingPolicy.parse("fifo")); scheduler.update(); - NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1); - NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1, false); + NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2, false); for (int i = 0; i < 8; i++) { scheduler.handle(updateEvent1); @@ -3419,8 +3419,8 @@ public void testStrictLocality() throws IOException { scheduler.update(); - NodeUpdateSchedulerEvent node1UpdateEvent = new NodeUpdateSchedulerEvent(node1); - NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent node1UpdateEvent = new NodeUpdateSchedulerEvent(node1, false); + NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2, false); // no matter how many heartbeats, node2 should never get a container FSAppAttempt app = scheduler.getSchedulerApp(attId1); @@ -3461,7 +3461,7 @@ public void testCancelStrictLocality() throws IOException { scheduler.update(); - NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2, false); // no matter how many heartbeats, node2 should never get a container FSAppAttempt app = scheduler.getSchedulerApp(attId1); @@ -3511,7 +3511,7 @@ public void testReservationsStrictLocality() throws IOException { scheduler.update(); - NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(nodeUpdateEvent); assertEquals(1, app.getLiveContainers().size()); scheduler.handle(nodeUpdateEvent); @@ -3544,7 +3544,7 @@ public void testNoMoreCpuOnNode() throws IOException { FSAppAttempt app = scheduler.getSchedulerApp(attId); scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(updateEvent); assertEquals(1, app.getLiveContainers().size()); scheduler.handle(updateEvent); @@ -3576,7 +3576,7 @@ public void testBasicDRFAssignment() throws Exception { // First both apps get a container // Then the first gets another container because its dominant share of // 2048/8192 is less than the other's of 2/5 - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node, false); scheduler.handle(updateEvent); Assert.assertEquals(1, app1.getLiveContainers().size()); Assert.assertEquals(0, app2.getLiveContainers().size()); @@ -3620,7 +3620,7 @@ public void testBasicDRFWithQueues() throws Exception { scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node, false); scheduler.handle(updateEvent); Assert.assertEquals(1, app1.getLiveContainers().size()); scheduler.handle(updateEvent); @@ -3666,7 +3666,7 @@ public void testDRFHierarchicalQueues() throws Exception { scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy); scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node, false); scheduler.handle(updateEvent); // app1 gets first container because it asked first Assert.assertEquals(1, app1.getLiveContainers().size()); @@ -3734,9 +3734,9 @@ public void testHostPortNodeName() throws Exception { scheduler.update(); NodeUpdateSchedulerEvent node1UpdateEvent = new - NodeUpdateSchedulerEvent(node1); - NodeUpdateSchedulerEvent node2UpdateEvent = new - NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent(node1, false); + NodeUpdateSchedulerEvent node2UpdateEvent = new + NodeUpdateSchedulerEvent(node2, false); // no matter how many heartbeats, node2 should never get a container FSAppAttempt app = scheduler.getSchedulerApp(attId1); @@ -3837,7 +3837,7 @@ public void testQueueMaxAMShare() throws Exception { MockNodes.newNodeInfo(1, Resources.createResource(20480, 20), 0, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node, false); scheduler.handle(nodeEvent); scheduler.update(); @@ -4058,7 +4058,7 @@ public void testQueueMaxAMShareDefault() throws Exception { MockNodes.newNodeInfo(1, Resources.createResource(8192, 20), 0, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node, false); scheduler.handle(nodeEvent); scheduler.update(); @@ -4175,11 +4175,11 @@ public void testQueueMaxAMShareWithContainerReservation() throws Exception { MockNodes.newNodeInfo(1, Resources.createResource(5120, 5), 3, "127.0.0.3"); NodeAddedSchedulerEvent nodeE1 = new NodeAddedSchedulerEvent(node1); - NodeUpdateSchedulerEvent updateE1 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateE1 = new NodeUpdateSchedulerEvent(node1, false); NodeAddedSchedulerEvent nodeE2 = new NodeAddedSchedulerEvent(node2); - NodeUpdateSchedulerEvent updateE2 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent updateE2 = new NodeUpdateSchedulerEvent(node2, false); NodeAddedSchedulerEvent nodeE3 = new NodeAddedSchedulerEvent(node3); - NodeUpdateSchedulerEvent updateE3 = new NodeUpdateSchedulerEvent(node3); + NodeUpdateSchedulerEvent updateE3 = new NodeUpdateSchedulerEvent(node3, false); scheduler.handle(nodeE1); scheduler.handle(nodeE2); scheduler.handle(nodeE3); @@ -4736,7 +4736,7 @@ public void testRecoverRequestAfterPreemption() throws Exception { scheduler.update(); // Sufficient node check-ins to fully schedule containers - NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node, false); scheduler.handle(nodeUpdate); assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers() @@ -4795,7 +4795,7 @@ public void testBlacklistNodes() throws Exception { MockNodes.newNodeInfo(1, Resources.createResource(16 * GB, 16), 0, host); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node, false); scheduler.handle(nodeEvent); ApplicationAttemptId appAttemptId = @@ -4916,7 +4916,7 @@ public void testMoveWouldViolateMaxResourcesConstraints() throws Exception { createSchedulingRequest(1024, 1, "queue1", "user1", 3); RMNode node = MockNodes.newNodeInfo(1, Resources.createResource(2048, 2)); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node, false); scheduler.handle(nodeEvent); scheduler.handle(updateEvent); scheduler.handle(updateEvent); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 6f759ce..3068660 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -114,7 +114,7 @@ private void registerNodeAndSubmitApp( scheduler.update(); // Sufficient node check-ins to fully schedule containers for (int i = 0; i < 3; i++) { - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1, false); scheduler.handle(nodeUpdate1); } assertEquals("app1's request is not met", diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 78322b7..7612039 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -265,7 +265,7 @@ public void testNodeLocalAssignment() throws Exception { ask.add(any); scheduler.allocate(appAttemptId, ask, new ArrayList(), null, null, null, null); - NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0); + NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0, false); // Before the node update event, there are 3 local requests outstanding Assert.assertEquals(3, nodeLocal.getNumContainers()); @@ -370,7 +370,7 @@ public void testUpdateResourceOnNode() throws Exception { // Before the node update event, there are one local request Assert.assertEquals(1, nodeLocal.getNumContainers()); - NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0); + NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0, false); // Now schedule. scheduler.handle(node0Update); @@ -780,7 +780,7 @@ public void testNodeUpdateBeforeAppAttemptInit() throws Exception { ApplicationId appId = ApplicationId.newInstance(0, 1); scheduler.addApplication(appId, "queue1", "user1", false); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node, false); try { scheduler.handle(updateEvent); } catch (NullPointerException e) { @@ -859,7 +859,7 @@ public void testReconnectedNode() throws Exception { fs.handle(new NodeAddedSchedulerEvent(n1)); fs.handle(new NodeAddedSchedulerEvent(n2)); - fs.handle(new NodeUpdateSchedulerEvent(n1)); + fs.handle(new NodeUpdateSchedulerEvent(n1, false)); Assert.assertEquals(6 * GB, fs.getRootQueueMetrics().getAvailableMB()); // reconnect n1 with downgraded memory @@ -867,7 +867,7 @@ public void testReconnectedNode() throws Exception { MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1, "127.0.0.2"); fs.handle(new NodeRemovedSchedulerEvent(n1)); fs.handle(new NodeAddedSchedulerEvent(n1)); - fs.handle(new NodeUpdateSchedulerEvent(n1)); + fs.handle(new NodeUpdateSchedulerEvent(n1, false)); Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB()); fs.stop(); @@ -946,7 +946,7 @@ public void testBlackListNodes() throws Exception { Collections.singletonList(host_1_0), null, null, null); // Trigger container assignment - fs.handle(new NodeUpdateSchedulerEvent(n3)); + fs.handle(new NodeUpdateSchedulerEvent(n3, false)); // Get the allocation for the application and verify no allocation on // blacklist node @@ -956,7 +956,7 @@ public void testBlackListNodes() throws Exception { Assert.assertEquals("allocation1", 0, allocation1.getContainers().size()); // verify host_1_1 can get allocated as not in blacklist - fs.handle(new NodeUpdateSchedulerEvent(n4)); + fs.handle(new NodeUpdateSchedulerEvent(n4, false)); Allocation allocation2 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null); Assert.assertEquals("allocation2", 1, allocation2.getContainers().size()); @@ -976,24 +976,24 @@ public void testBlackListNodes() throws Exception { Collections.singletonList("rack0"), null, null, null); // verify n1 is not qualified to be allocated - fs.handle(new NodeUpdateSchedulerEvent(n1)); + fs.handle(new NodeUpdateSchedulerEvent(n1, false)); Allocation allocation3 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null); Assert.assertEquals("allocation3", 0, allocation3.getContainers().size()); // verify n2 is not qualified to be allocated - fs.handle(new NodeUpdateSchedulerEvent(n2)); + fs.handle(new NodeUpdateSchedulerEvent(n2, false)); Allocation allocation4 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null); Assert.assertEquals("allocation4", 0, allocation4.getContainers().size()); // verify n3 is not qualified to be allocated - fs.handle(new NodeUpdateSchedulerEvent(n3)); + fs.handle(new NodeUpdateSchedulerEvent(n3, false)); Allocation allocation5 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null); Assert.assertEquals("allocation5", 0, allocation5.getContainers().size()); - fs.handle(new NodeUpdateSchedulerEvent(n4)); + fs.handle(new NodeUpdateSchedulerEvent(n4, false)); Allocation allocation6 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null); Assert.assertEquals("allocation6", 1, allocation6.getContainers().size()); @@ -1063,7 +1063,7 @@ public void testHeadroom() throws Exception { fs.allocate(appAttemptId2, ask2, emptyId, null, null, null, null); // Trigger container assignment - fs.handle(new NodeUpdateSchedulerEvent(n1)); + fs.handle(new NodeUpdateSchedulerEvent(n1, false)); // Get the allocation for the applications and verify headroom Allocation allocation1 = -- 1.9.5.github.0