diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 8e8d627..0639832 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -77,7 +78,13 @@ protected Resource clusterResource = Resource.newInstance(0, 0); protected Resource minimumAllocation; - protected Resource maximumAllocation; + private Resource maximumAllocation; + private Resource configuredMaximumAllocation; + private int maxNodeMemory = -1; + private ReentrantReadWriteLock maximumAllocationLock = + new ReentrantReadWriteLock(); + private boolean useConfiguredMaximumAllocationOnly = true; + private long configuredMaximumAllocationWaitTime; protected RMContext rmContext; protected Map> applications; @@ -102,6 +109,9 @@ public void serviceInit(Configuration conf) throws Exception { nmExpireInterval = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); + configuredMaximumAllocationWaitTime = + conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS); createReleaseCache(); super.serviceInit(conf); } @@ -145,7 +155,37 @@ public Resource getMinimumResourceCapability() { @Override public Resource getMaximumResourceCapability() { - return maximumAllocation; + Resource maxResource; + ReentrantReadWriteLock.ReadLock readLock = maximumAllocationLock.readLock(); + readLock.lock(); + try { + if (useConfiguredMaximumAllocationOnly) { + if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp() + > configuredMaximumAllocationWaitTime) { + useConfiguredMaximumAllocationOnly = false; + } + maxResource = Resources.clone(configuredMaximumAllocation); + } else { + maxResource = Resources.clone(maximumAllocation); + } + } finally { + readLock.unlock(); + } + return maxResource; + } + + protected void initMaximumResourceCapability(Resource maximumAllocation) { + ReentrantReadWriteLock.WriteLock writeLock = + maximumAllocationLock.writeLock(); + writeLock.lock(); + try { + if (this.configuredMaximumAllocation == null) { + this.configuredMaximumAllocation = Resources.clone(maximumAllocation); + this.maximumAllocation = Resources.clone(maximumAllocation); + } + } finally { + writeLock.unlock(); + } } protected void containerLaunchedOnNode(ContainerId containerId, @@ -528,4 +568,37 @@ public synchronized void updateNodeResource(RMNode nm, throw new YarnException(getClass().getSimpleName() + " does not support reservations"); } + + protected void updateMaximumAllocation(SchedulerNode node) { + ReentrantReadWriteLock.WriteLock writeLock = + maximumAllocationLock.writeLock(); + writeLock.lock(); + try { + if (node != null) { // added node + int nodeMemory = node.getAvailableResource().getMemory(); + if (nodeMemory > maxNodeMemory) { + maxNodeMemory = nodeMemory; + maximumAllocation.setMemory( + Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory)); + } + } else { // removed node + maxNodeMemory = -1; + for (Map.Entry nodeEntry : nodes.entrySet()) { + int nodeMemory = + nodeEntry.getValue().getAvailableResource().getMemory(); + if (nodeMemory > maxNodeMemory) { + maxNodeMemory = nodeMemory; + } + } + if (maxNodeMemory == -1) { // no nodes + maximumAllocation.setMemory(configuredMaximumAllocation.getMemory()); + } else { + maximumAllocation.setMemory( + Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory)); + } + } + } finally { + writeLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index c383e43..4c41e02 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.HashSet; -import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -284,7 +283,7 @@ private synchronized void initScheduler(Configuration configuration) throws this.conf = loadCapacitySchedulerConfiguration(configuration); validateConf(this.conf); this.minimumAllocation = this.conf.getMinimumAllocation(); - this.maximumAllocation = this.conf.getMaximumAllocation(); + initMaximumResourceCapability(this.conf.getMaximumAllocation()); this.calculator = this.conf.getResourceCalculator(); this.usePortForNodeName = this.conf.getUsePortForNodeName(); this.applications = @@ -321,8 +320,8 @@ private synchronized void startSchedulerThreads() { @Override public void serviceInit(Configuration conf) throws Exception { Configuration configuration = new Configuration(conf); - initScheduler(configuration); super.serviceInit(conf); + initScheduler(configuration); } @Override @@ -849,7 +848,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, // Sanity check SchedulerUtils.normalizeRequests( ask, getResourceCalculator(), getClusterResource(), - getMinimumResourceCapability(), maximumAllocation); + getMinimumResourceCapability(), getMaximumResourceCapability()); // Release containers releaseContainers(release, application); @@ -1123,12 +1122,13 @@ private synchronized void addNode(RMNode nodeManager) { labelManager.activateNode(nodeManager.getNodeID(), nodeManager.getTotalCapability()); } - - this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager, - usePortForNodeName)); + FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, + usePortForNodeName); + this.nodes.put(nodeManager.getNodeID(), schedulerNode); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); root.updateClusterResource(clusterResource); int numNodes = numNodeManagers.incrementAndGet(); + updateMaximumAllocation(schedulerNode); LOG.info("Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); @@ -1177,6 +1177,7 @@ private synchronized void removeNode(RMNode nodeInfo) { } this.nodes.remove(nodeInfo.getNodeID()); + updateMaximumAllocation(null); LOG.info("Removed node " + nodeInfo.getNodeAddress() + " clusterResource: " + clusterResource); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 94fb849..07f1723 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -817,9 +817,11 @@ protected synchronized void completedContainer(RMContainer rmContainer, } private synchronized void addNode(RMNode node) { - nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName)); + FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName); + nodes.put(node.getNodeID(), schedulerNode); Resources.addTo(clusterResource, node.getTotalCapability()); updateRootQueueMetrics(); + updateMaximumAllocation(schedulerNode); queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); @@ -859,6 +861,7 @@ private synchronized void removeNode(RMNode rmNode) { nodes.remove(rmNode.getNodeID()); queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); + updateMaximumAllocation(null); LOG.info("Removed node " + rmNode.getNodeAddress() + " cluster capacity: " + clusterResource); } @@ -877,7 +880,8 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, // Sanity check SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(), - clusterResource, minimumAllocation, maximumAllocation, incrAllocation); + clusterResource, minimumAllocation, getMaximumResourceCapability(), + incrAllocation); // Set amResource for this app if (!application.getUnmanagedAM() && ask.size() == 1 @@ -1225,7 +1229,7 @@ private void initScheduler(Configuration conf) throws IOException { this.conf = new FairSchedulerConfiguration(conf); validateConf(this.conf); minimumAllocation = this.conf.getMinimumAllocation(); - maximumAllocation = this.conf.getMaximumAllocation(); + initMaximumResourceCapability(this.conf.getMaximumAllocation()); incrAllocation = this.conf.getIncrementAllocation(); continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); continuousSchedulingSleepMs = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 532edc7..6e008ab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -215,10 +215,10 @@ private synchronized void initScheduler(Configuration conf) { Resources.createResource(conf.getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); - this.maximumAllocation = + initMaximumResourceCapability( Resources.createResource(conf.getInt( YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB))); this.usePortForNodeName = conf.getBoolean( YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); @@ -303,7 +303,7 @@ public Allocation allocate( // Sanity check SchedulerUtils.normalizeRequests(ask, resourceCalculator, - clusterResource, minimumAllocation, maximumAllocation); + clusterResource, minimumAllocation, getMaximumResourceCapability()); // Release containers releaseContainers(release, application); @@ -899,6 +899,7 @@ private synchronized void removeNode(RMNode nodeInfo) { //Remove the node this.nodes.remove(nodeInfo.getNodeID()); + updateMaximumAllocation(null); // Update cluster metrics Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability()); @@ -916,9 +917,11 @@ public QueueInfo getQueueInfo(String queueName, } private synchronized void addNode(RMNode nodeManager) { - this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager, - usePortForNodeName)); + FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, + usePortForNodeName); + this.nodes.put(nodeManager.getNodeID(), schedulerNode); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); + updateMaximumAllocation(schedulerNode); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index 12f7498..084e59b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; @@ -296,6 +297,93 @@ public void testNonDefaultMinimumAllocation() throws Exception { testMinimumAllocation(conf, allocMB / 2); } + private void testMaximumAllocationHelper(AbstractYarnScheduler scheduler, + final int node1MaxMemory, final int node2MaxMemory, + final int node3MaxMemory, final int... expectedMaxMemory) + throws Exception { + Assert.assertEquals(6, expectedMaxMemory.length); + + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + int maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[0], maxMemory); + + RMNode node1 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node1MaxMemory), 1, "127.0.0.2"); + scheduler.handle(new NodeAddedSchedulerEvent(node1)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[1], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node1)); + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[2], maxMemory); + + RMNode node2 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node2MaxMemory), 2, "127.0.0.3"); + scheduler.handle(new NodeAddedSchedulerEvent(node2)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[3], maxMemory); + + RMNode node3 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node3MaxMemory), 3, "127.0.0.4"); + scheduler.handle(new NodeAddedSchedulerEvent(node3)); + Assert.assertEquals(2, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[4], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node3)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[5], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node2)); + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + } + + @Test + public void testMaximimumAllocation() throws Exception { + final int node1MaxMemory = 15 * GB; + final int node2MaxMemory = 5 * GB; + final int node3MaxMemory = 6 * GB; + final int configuredMaxMemory = 10 * GB; + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, + ResourceScheduler.class); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + configuredMaxMemory); + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + 1000 * 1000); + MockRM rm = new MockRM(conf); + try { + rm.start(); + testMaximumAllocationHelper( + (AbstractYarnScheduler) rm.getResourceScheduler(), + node1MaxMemory, node2MaxMemory, node3MaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory); + } finally { + rm.stop(); + } + + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + 0); + rm = new MockRM(conf); + try { + rm.start(); + testMaximumAllocationHelper( + (AbstractYarnScheduler) rm.getResourceScheduler(), + node1MaxMemory, node2MaxMemory, node3MaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, + node2MaxMemory, node3MaxMemory, node2MaxMemory); + } finally { + rm.stop(); + } + } + @Test (timeout = 50000) public void testReconnectedNode() throws Exception { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 2aa57a0..b6b12f8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -230,6 +230,93 @@ public void testConfValidation() throws Exception { return nm; } + private void testMaximumAllocationHelper(AbstractYarnScheduler scheduler, + final int node1MaxMemory, final int node2MaxMemory, + final int node3MaxMemory, final int... expectedMaxMemory) + throws Exception { + Assert.assertEquals(6, expectedMaxMemory.length); + + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + int maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[0], maxMemory); + + RMNode node1 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node1MaxMemory), 1, "127.0.0.2"); + scheduler.handle(new NodeAddedSchedulerEvent(node1)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[1], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node1)); + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[2], maxMemory); + + RMNode node2 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node2MaxMemory), 2, "127.0.0.3"); + scheduler.handle(new NodeAddedSchedulerEvent(node2)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[3], maxMemory); + + RMNode node3 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node3MaxMemory), 3, "127.0.0.4"); + scheduler.handle(new NodeAddedSchedulerEvent(node3)); + Assert.assertEquals(2, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[4], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node3)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[5], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node2)); + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + } + + @Test + public void testMaximimumAllocation() throws Exception { + final int node1MaxMemory = 15 * GB; + final int node2MaxMemory = 5 * GB; + final int node3MaxMemory = 6 * GB; + final int configuredMaxMemory = 10 * GB; + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + configuredMaxMemory); + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + 1000 * 1000); + MockRM rm = new MockRM(conf); + try { + rm.start(); + testMaximumAllocationHelper( + (AbstractYarnScheduler) rm.getResourceScheduler(), + node1MaxMemory, node2MaxMemory, node3MaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory); + } finally { + rm.stop(); + } + + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + 0); + rm = new MockRM(conf); + try { + rm.start(); + testMaximumAllocationHelper( + (AbstractYarnScheduler) rm.getResourceScheduler(), + node1MaxMemory, node2MaxMemory, node3MaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, + node2MaxMemory, node3MaxMemory, node2MaxMemory); + } finally { + rm.stop(); + } + } + @Test public void testCapacityScheduler() throws Exception { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index ad834ac..9a29bff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -116,7 +116,7 @@ public void testExcessReservationThanNodeManagerCapacity() throws Exception { am1.registerAppAttempt(); LOG.info("sending container requests "); - am1.addRequests(new String[] {"*"}, 3 * GB, 1, 1); + am1.addRequests(new String[] {"*"}, 2 * GB, 1, 1); AllocateResponse alloc1Response = am1.schedule(); // send the request // kick the scheduler @@ -291,7 +291,7 @@ public Token createContainerToken(ContainerId containerId, // This is to test fetching AM container will be retried, if AM container is // not fetchable since DNS is unavailable causing container token/NMtoken // creation failure. - @Test(timeout = 20000) + @Test(timeout = 30000) public void testAMContainerAllocationWhenDNSUnavailable() throws Exception { MockRM rm1 = new MockRM(conf) { @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 0b14499..94c9772 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; @@ -251,8 +252,93 @@ public void testMinZeroResourcesSettings() throws IOException { Assert.assertEquals(0, scheduler.getMinimumResourceCapability().getVirtualCores()); Assert.assertEquals(512, scheduler.getIncrementResourceCapability().getMemory()); Assert.assertEquals(2, scheduler.getIncrementResourceCapability().getVirtualCores()); - } - + } + + private void testMaximumAllocationHelper(AbstractYarnScheduler scheduler, + final int node1MaxMemory, final int node2MaxMemory, + final int node3MaxMemory, final int... expectedMaxMemory) + throws Exception { + Assert.assertEquals(6, expectedMaxMemory.length); + + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + int maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[0], maxMemory); + + RMNode node1 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node1MaxMemory), 1, "127.0.0.2"); + scheduler.handle(new NodeAddedSchedulerEvent(node1)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[1], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node1)); + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[2], maxMemory); + + RMNode node2 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node2MaxMemory), 2, "127.0.0.3"); + scheduler.handle(new NodeAddedSchedulerEvent(node2)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[3], maxMemory); + + RMNode node3 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node3MaxMemory), 3, "127.0.0.4"); + scheduler.handle(new NodeAddedSchedulerEvent(node3)); + Assert.assertEquals(2, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[4], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node3)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[5], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node2)); + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + } + + @Test + public void testMaximimumAllocation() throws Exception { + final int node1MaxMemory = 15 * 1024; + final int node2MaxMemory = 5 * 1024; + final int node3MaxMemory = 6 * 1024; + final int configuredMaxMemory = 10 * 1024; + YarnConfiguration conf = new YarnConfiguration(createConfiguration()); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + configuredMaxMemory); + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + 1000 * 1000); + MockRM rm = new MockRM(conf); + try { + rm.start(); + testMaximumAllocationHelper( + (AbstractYarnScheduler) rm.getResourceScheduler(), + node1MaxMemory, node2MaxMemory, node3MaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory); + } finally { + rm.stop(); + } + + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + 0); + rm = new MockRM(conf); + try { + rm.start(); + testMaximumAllocationHelper( + (AbstractYarnScheduler) rm.getResourceScheduler(), + node1MaxMemory, node2MaxMemory, node3MaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, + node2MaxMemory, node3MaxMemory, node2MaxMemory); + } finally { + rm.stop(); + } + } + @Test public void testAggregateCapacityTracking() throws Exception { scheduler.init(conf); @@ -2590,37 +2676,7 @@ public void testNotAllowSubmitApplication() throws Exception { } assertEquals(FinalApplicationStatus.FAILED, application.getFinalApplicationStatus()); } - - @Test - public void testReservationThatDoesntFit() throws IOException { - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - RMNode node1 = - MockNodes - .newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", - "user1", 1); - scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(updateEvent); - - FSAppAttempt app = scheduler.getSchedulerApp(attId); - assertEquals(0, app.getLiveContainers().size()); - assertEquals(0, app.getReservedContainers().size()); - - createSchedulingRequestExistingApplication(1024, 2, attId); - scheduler.update(); - scheduler.handle(updateEvent); - - assertEquals(1, app.getLiveContainers().size()); - assertEquals(0, app.getReservedContainers().size()); - } - @Test public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException { scheduler.init(conf);