diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 8e8d627..2e32ea0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -78,6 +78,8 @@ protected Resource minimumAllocation; protected Resource maximumAllocation; + protected Resource realMaximumAllocation; + private int maxNodeMemory = -1; protected RMContext rmContext; protected Map> applications; @@ -144,7 +146,7 @@ public Resource getMinimumResourceCapability() { } @Override - public Resource getMaximumResourceCapability() { + public synchronized Resource getMaximumResourceCapability() { return maximumAllocation; } @@ -528,4 +530,30 @@ public synchronized void updateNodeResource(RMNode nm, throw new YarnException(getClass().getSimpleName() + " does not support reservations"); } + + protected synchronized void updateMaximumAllocation(SchedulerNode node) { + if (node != null) { // added node + int nodeMemory = node.getAvailableResource().getMemory(); + if (nodeMemory > maxNodeMemory) { + maxNodeMemory = nodeMemory; + maximumAllocation.setMemory(Math.min(realMaximumAllocation.getMemory(), + maxNodeMemory)); + } + } else { // removed node + maxNodeMemory = -1; + for (Map.Entry nodeEntry : nodes.entrySet()) { + int nodeMemory = + nodeEntry.getValue().getAvailableResource().getMemory(); + if (nodeMemory > maxNodeMemory) { + maxNodeMemory = nodeMemory; + } + } + if (maxNodeMemory == -1) { // no nodes + maximumAllocation.setMemory(realMaximumAllocation.getMemory()); + } else { + maximumAllocation.setMemory(Math.min(realMaximumAllocation.getMemory(), + maxNodeMemory)); + } + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 9332228..d83b903 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -284,6 +284,7 @@ private synchronized void initScheduler(Configuration configuration) throws validateConf(this.conf); this.minimumAllocation = this.conf.getMinimumAllocation(); this.maximumAllocation = this.conf.getMaximumAllocation(); + this.realMaximumAllocation = this.maximumAllocation; this.calculator = this.conf.getResourceCalculator(); this.usePortForNodeName = this.conf.getUsePortForNodeName(); this.applications = @@ -850,7 +851,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, // Sanity check SchedulerUtils.normalizeRequests( ask, getResourceCalculator(), getClusterResource(), - getMinimumResourceCapability(), maximumAllocation); + getMinimumResourceCapability(), getMaximumResourceCapability()); // Release containers releaseContainers(release, application); @@ -1124,12 +1125,13 @@ private synchronized void addNode(RMNode nodeManager) { labelManager.activateNode(nodeManager.getNodeID(), nodeManager.getTotalCapability()); } - - this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager, - usePortForNodeName)); + FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, + usePortForNodeName); + this.nodes.put(nodeManager.getNodeID(), schedulerNode); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); root.updateClusterResource(clusterResource); int numNodes = numNodeManagers.incrementAndGet(); + updateMaximumAllocation(schedulerNode); LOG.info("Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); @@ -1178,6 +1180,7 @@ private synchronized void removeNode(RMNode nodeInfo) { } this.nodes.remove(nodeInfo.getNodeID()); + updateMaximumAllocation(null); LOG.info("Removed node " + nodeInfo.getNodeAddress() + " clusterResource: " + clusterResource); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3fc3019..4a156b3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -817,9 +817,11 @@ protected synchronized void completedContainer(RMContainer rmContainer, } private synchronized void addNode(RMNode node) { - nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName)); + FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName); + nodes.put(node.getNodeID(), schedulerNode); Resources.addTo(clusterResource, node.getTotalCapability()); updateRootQueueMetrics(); + updateMaximumAllocation(schedulerNode); queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); @@ -859,6 +861,7 @@ private synchronized void removeNode(RMNode rmNode) { nodes.remove(rmNode.getNodeID()); queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); + updateMaximumAllocation(null); LOG.info("Removed node " + rmNode.getNodeAddress() + " cluster capacity: " + clusterResource); } @@ -877,7 +880,8 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, // Sanity check SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(), - clusterResource, minimumAllocation, maximumAllocation, incrAllocation); + clusterResource, minimumAllocation, getMaximumResourceCapability(), + incrAllocation); // Set amResource for this app if (!application.getUnmanagedAM() && ask.size() == 1 @@ -1212,6 +1216,7 @@ private void initScheduler(Configuration conf) throws IOException { validateConf(this.conf); minimumAllocation = this.conf.getMinimumAllocation(); maximumAllocation = this.conf.getMaximumAllocation(); + this.realMaximumAllocation = this.maximumAllocation; incrAllocation = this.conf.getIncrementAllocation(); continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); continuousSchedulingSleepMs = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 532edc7..1770637 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -219,6 +219,7 @@ private synchronized void initScheduler(Configuration conf) { Resources.createResource(conf.getInt( YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); + this.realMaximumAllocation = Resources.clone(this.maximumAllocation); this.usePortForNodeName = conf.getBoolean( YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); @@ -303,7 +304,7 @@ public Allocation allocate( // Sanity check SchedulerUtils.normalizeRequests(ask, resourceCalculator, - clusterResource, minimumAllocation, maximumAllocation); + clusterResource, minimumAllocation, getMaximumResourceCapability()); // Release containers releaseContainers(release, application); @@ -899,6 +900,7 @@ private synchronized void removeNode(RMNode nodeInfo) { //Remove the node this.nodes.remove(nodeInfo.getNodeID()); + updateMaximumAllocation(null); // Update cluster metrics Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability()); @@ -916,9 +918,11 @@ public QueueInfo getQueueInfo(String queueName, } private synchronized void addNode(RMNode nodeManager) { - this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager, - usePortForNodeName)); + FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, + usePortForNodeName); + this.nodes.put(nodeManager.getNodeID(), schedulerNode); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); + updateMaximumAllocation(schedulerNode); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index 12f7498..497e100 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -296,6 +296,64 @@ public void testNonDefaultMinimumAllocation() throws Exception { testMinimumAllocation(conf, allocMB / 2); } + @Test + public void testMaximimumAllocation() throws Exception { + YarnConfiguration conf = new YarnConfiguration(TestFifoScheduler.conf); + final int configuredMaxMemory = 10 * GB; + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + configuredMaxMemory); + MockRM rm = new MockRM(conf); + try { + rm.start(); + Assert.assertEquals(0, rm.getResourceScheduler().getNumClusterNodes()); + + int maxMemory = + rm.getResourceScheduler().getMaximumResourceCapability().getMemory(); + Assert.assertEquals(configuredMaxMemory, maxMemory); + + final int node1MaxMemory = 15 * GB; + RMNode node1 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node1MaxMemory), 1, "127.0.0.2"); + rm.getResourceScheduler().handle(new NodeAddedSchedulerEvent(node1)); + Assert.assertEquals(1, rm.getResourceScheduler().getNumClusterNodes()); + maxMemory = + rm.getResourceScheduler().getMaximumResourceCapability().getMemory(); + Assert.assertEquals(configuredMaxMemory, maxMemory); + + rm.getResourceScheduler().handle(new NodeRemovedSchedulerEvent(node1)); + Assert.assertEquals(0, rm.getResourceScheduler().getNumClusterNodes()); + maxMemory = + rm.getResourceScheduler().getMaximumResourceCapability().getMemory(); + Assert.assertEquals(configuredMaxMemory, maxMemory); + + final int node2MaxMemory = 5 * GB; + RMNode node2 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node2MaxMemory), 2, "127.0.0.3"); + rm.getResourceScheduler().handle(new NodeAddedSchedulerEvent(node2)); + Assert.assertEquals(1, rm.getResourceScheduler().getNumClusterNodes()); + maxMemory = + rm.getResourceScheduler().getMaximumResourceCapability().getMemory(); + Assert.assertEquals(node2MaxMemory, maxMemory); + + final int node3MaxMemory = 6 * GB; + RMNode node3 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node3MaxMemory), 3, "127.0.0.4"); + rm.getResourceScheduler().handle(new NodeAddedSchedulerEvent(node3)); + Assert.assertEquals(2, rm.getResourceScheduler().getNumClusterNodes()); + maxMemory = + rm.getResourceScheduler().getMaximumResourceCapability().getMemory(); + Assert.assertEquals(node3MaxMemory, maxMemory); + + rm.getResourceScheduler().handle(new NodeRemovedSchedulerEvent(node3)); + Assert.assertEquals(1, rm.getResourceScheduler().getNumClusterNodes()); + maxMemory = + rm.getResourceScheduler().getMaximumResourceCapability().getMemory(); + Assert.assertEquals(node2MaxMemory, maxMemory); + } finally { + rm.stop(); + } + } + @Test (timeout = 50000) public void testReconnectedNode() throws Exception { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();