diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 8e8d627..b5e2b6e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -78,6 +79,10 @@ protected Resource minimumAllocation; protected Resource maximumAllocation; + protected Resource realMaximumAllocation; + private int maxNodeMemory = -1; + private ReentrantReadWriteLock maximumAllocationLock = + new ReentrantReadWriteLock(); protected RMContext rmContext; protected Map> applications; @@ -145,7 +150,13 @@ public Resource getMinimumResourceCapability() { @Override public Resource getMaximumResourceCapability() { - return maximumAllocation; + ReentrantReadWriteLock.ReadLock readLock = maximumAllocationLock.readLock(); + try { + readLock.lock(); + return Resources.clone(maximumAllocation); + } finally { + readLock.unlock(); + } } protected void containerLaunchedOnNode(ContainerId containerId, @@ -528,4 +539,36 @@ public synchronized void updateNodeResource(RMNode nm, throw new YarnException(getClass().getSimpleName() + " does not support reservations"); } + + protected void updateMaximumAllocation(SchedulerNode node) { + ReentrantReadWriteLock.WriteLock writeLock = maximumAllocationLock.writeLock(); + try { + writeLock.lock(); + if (node != null) { // added node + int nodeMemory = node.getAvailableResource().getMemory(); + if (nodeMemory > maxNodeMemory) { + maxNodeMemory = nodeMemory; + maximumAllocation.setMemory(Math.min(realMaximumAllocation.getMemory(), + maxNodeMemory)); + } + } else { // removed node + maxNodeMemory = -1; + for (Map.Entry nodeEntry : nodes.entrySet()) { + int nodeMemory = + nodeEntry.getValue().getAvailableResource().getMemory(); + if (nodeMemory > maxNodeMemory) { + maxNodeMemory = nodeMemory; + } + } + if (maxNodeMemory == -1) { // no nodes + maximumAllocation.setMemory(realMaximumAllocation.getMemory()); + } else { + maximumAllocation.setMemory(Math.min(realMaximumAllocation.getMemory(), + maxNodeMemory)); + } + } + } finally { + writeLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 9332228..d83b903 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -284,6 +284,7 @@ private synchronized void initScheduler(Configuration configuration) throws validateConf(this.conf); this.minimumAllocation = this.conf.getMinimumAllocation(); this.maximumAllocation = this.conf.getMaximumAllocation(); + this.realMaximumAllocation = this.maximumAllocation; this.calculator = this.conf.getResourceCalculator(); this.usePortForNodeName = this.conf.getUsePortForNodeName(); this.applications = @@ -850,7 +851,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, // Sanity check SchedulerUtils.normalizeRequests( ask, getResourceCalculator(), getClusterResource(), - getMinimumResourceCapability(), maximumAllocation); + getMinimumResourceCapability(), getMaximumResourceCapability()); // Release containers releaseContainers(release, application); @@ -1124,12 +1125,13 @@ private synchronized void addNode(RMNode nodeManager) { labelManager.activateNode(nodeManager.getNodeID(), nodeManager.getTotalCapability()); } - - this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager, - usePortForNodeName)); + FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, + usePortForNodeName); + this.nodes.put(nodeManager.getNodeID(), schedulerNode); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); root.updateClusterResource(clusterResource); int numNodes = numNodeManagers.incrementAndGet(); + updateMaximumAllocation(schedulerNode); LOG.info("Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); @@ -1178,6 +1180,7 @@ private synchronized void removeNode(RMNode nodeInfo) { } this.nodes.remove(nodeInfo.getNodeID()); + updateMaximumAllocation(null); LOG.info("Removed node " + nodeInfo.getNodeAddress() + " clusterResource: " + clusterResource); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3fc3019..4a156b3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -817,9 +817,11 @@ protected synchronized void completedContainer(RMContainer rmContainer, } private synchronized void addNode(RMNode node) { - nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName)); + FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName); + nodes.put(node.getNodeID(), schedulerNode); Resources.addTo(clusterResource, node.getTotalCapability()); updateRootQueueMetrics(); + updateMaximumAllocation(schedulerNode); queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); @@ -859,6 +861,7 @@ private synchronized void removeNode(RMNode rmNode) { nodes.remove(rmNode.getNodeID()); queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); + updateMaximumAllocation(null); LOG.info("Removed node " + rmNode.getNodeAddress() + " cluster capacity: " + clusterResource); } @@ -877,7 +880,8 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, // Sanity check SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(), - clusterResource, minimumAllocation, maximumAllocation, incrAllocation); + clusterResource, minimumAllocation, getMaximumResourceCapability(), + incrAllocation); // Set amResource for this app if (!application.getUnmanagedAM() && ask.size() == 1 @@ -1212,6 +1216,7 @@ private void initScheduler(Configuration conf) throws IOException { validateConf(this.conf); minimumAllocation = this.conf.getMinimumAllocation(); maximumAllocation = this.conf.getMaximumAllocation(); + this.realMaximumAllocation = this.maximumAllocation; incrAllocation = this.conf.getIncrementAllocation(); continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); continuousSchedulingSleepMs = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 532edc7..1770637 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -219,6 +219,7 @@ private synchronized void initScheduler(Configuration conf) { Resources.createResource(conf.getInt( YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); + this.realMaximumAllocation = Resources.clone(this.maximumAllocation); this.usePortForNodeName = conf.getBoolean( YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); @@ -303,7 +304,7 @@ public Allocation allocate( // Sanity check SchedulerUtils.normalizeRequests(ask, resourceCalculator, - clusterResource, minimumAllocation, maximumAllocation); + clusterResource, minimumAllocation, getMaximumResourceCapability()); // Release containers releaseContainers(release, application); @@ -899,6 +900,7 @@ private synchronized void removeNode(RMNode nodeInfo) { //Remove the node this.nodes.remove(nodeInfo.getNodeID()); + updateMaximumAllocation(null); // Update cluster metrics Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability()); @@ -916,9 +918,11 @@ public QueueInfo getQueueInfo(String queueName, } private synchronized void addNode(RMNode nodeManager) { - this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager, - usePortForNodeName)); + FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, + usePortForNodeName); + this.nodes.put(nodeManager.getNodeID(), schedulerNode); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); + updateMaximumAllocation(schedulerNode); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index 12f7498..497e100 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -296,6 +296,64 @@ public void testNonDefaultMinimumAllocation() throws Exception { testMinimumAllocation(conf, allocMB / 2); } + @Test + public void testMaximimumAllocation() throws Exception { + YarnConfiguration conf = new YarnConfiguration(TestFifoScheduler.conf); + final int configuredMaxMemory = 10 * GB; + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + configuredMaxMemory); + MockRM rm = new MockRM(conf); + try { + rm.start(); + Assert.assertEquals(0, rm.getResourceScheduler().getNumClusterNodes()); + + int maxMemory = + rm.getResourceScheduler().getMaximumResourceCapability().getMemory(); + Assert.assertEquals(configuredMaxMemory, maxMemory); + + final int node1MaxMemory = 15 * GB; + RMNode node1 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node1MaxMemory), 1, "127.0.0.2"); + rm.getResourceScheduler().handle(new NodeAddedSchedulerEvent(node1)); + Assert.assertEquals(1, rm.getResourceScheduler().getNumClusterNodes()); + maxMemory = + rm.getResourceScheduler().getMaximumResourceCapability().getMemory(); + Assert.assertEquals(configuredMaxMemory, maxMemory); + + rm.getResourceScheduler().handle(new NodeRemovedSchedulerEvent(node1)); + Assert.assertEquals(0, rm.getResourceScheduler().getNumClusterNodes()); + maxMemory = + rm.getResourceScheduler().getMaximumResourceCapability().getMemory(); + Assert.assertEquals(configuredMaxMemory, maxMemory); + + final int node2MaxMemory = 5 * GB; + RMNode node2 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node2MaxMemory), 2, "127.0.0.3"); + rm.getResourceScheduler().handle(new NodeAddedSchedulerEvent(node2)); + Assert.assertEquals(1, rm.getResourceScheduler().getNumClusterNodes()); + maxMemory = + rm.getResourceScheduler().getMaximumResourceCapability().getMemory(); + Assert.assertEquals(node2MaxMemory, maxMemory); + + final int node3MaxMemory = 6 * GB; + RMNode node3 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node3MaxMemory), 3, "127.0.0.4"); + rm.getResourceScheduler().handle(new NodeAddedSchedulerEvent(node3)); + Assert.assertEquals(2, rm.getResourceScheduler().getNumClusterNodes()); + maxMemory = + rm.getResourceScheduler().getMaximumResourceCapability().getMemory(); + Assert.assertEquals(node3MaxMemory, maxMemory); + + rm.getResourceScheduler().handle(new NodeRemovedSchedulerEvent(node3)); + Assert.assertEquals(1, rm.getResourceScheduler().getNumClusterNodes()); + maxMemory = + rm.getResourceScheduler().getMaximumResourceCapability().getMemory(); + Assert.assertEquals(node2MaxMemory, maxMemory); + } finally { + rm.stop(); + } + } + @Test (timeout = 50000) public void testReconnectedNode() throws Exception { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index b90df8e..c5c2b2f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -116,7 +116,7 @@ public void testExcessReservationThanNodeManagerCapacity() throws Exception { am1.registerAppAttempt(); LOG.info("sending container requests "); - am1.addRequests(new String[] {"*"}, 3 * GB, 1, 1); + am1.addRequests(new String[] {"*"}, 2 * GB, 1, 1); AllocateResponse alloc1Response = am1.schedule(); // send the request // kick the scheduler diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 5194251..37cf006 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2449,37 +2449,7 @@ public void testNotAllowSubmitApplication() throws Exception { } assertEquals(FinalApplicationStatus.FAILED, application.getFinalApplicationStatus()); } - - @Test - public void testReservationThatDoesntFit() throws IOException { - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - RMNode node1 = - MockNodes - .newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", - "user1", 1); - scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(updateEvent); - - FSAppAttempt app = scheduler.getSchedulerApp(attId); - assertEquals(0, app.getLiveContainers().size()); - assertEquals(0, app.getReservedContainers().size()); - - createSchedulingRequestExistingApplication(1024, 2, attId); - scheduler.update(); - scheduler.handle(updateEvent); - - assertEquals(1, app.getLiveContainers().size()); - assertEquals(0, app.getReservedContainers().size()); - } - @Test public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException { scheduler.init(conf);