diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 45d7294..28f48c1 100644 --- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -158,6 +158,18 @@ + + + + + + + + + + + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 8e8d627..90c191e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -78,6 +79,12 @@ protected Resource minimumAllocation; protected Resource maximumAllocation; + protected Resource configuredMaximumAllocation; + private int maxNodeMemory = -1; + private ReentrantReadWriteLock maximumAllocationLock = + new ReentrantReadWriteLock(); + private boolean useConfiguredMaximumAllocationOnly = true; + private long configuredMaximumAllocationWaitTime; protected RMContext rmContext; protected Map> applications; @@ -102,6 +109,9 @@ public void serviceInit(Configuration conf) throws Exception { nmExpireInterval = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); + configuredMaximumAllocationWaitTime = + conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS); createReleaseCache(); super.serviceInit(conf); } @@ -145,7 +155,20 @@ public Resource getMinimumResourceCapability() { @Override public Resource getMaximumResourceCapability() { - return maximumAllocation; + if (useConfiguredMaximumAllocationOnly) { + if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp() + > configuredMaximumAllocationWaitTime) { + useConfiguredMaximumAllocationOnly = false; + } + return configuredMaximumAllocation; + } + ReentrantReadWriteLock.ReadLock readLock = maximumAllocationLock.readLock(); + readLock.lock(); + try { + return Resources.clone(maximumAllocation); + } finally { + readLock.unlock(); + } } protected void containerLaunchedOnNode(ContainerId containerId, @@ -528,4 +551,37 @@ public synchronized void updateNodeResource(RMNode nm, throw new YarnException(getClass().getSimpleName() + " does not support reservations"); } + + protected void updateMaximumAllocation(SchedulerNode node) { + ReentrantReadWriteLock.WriteLock writeLock = + maximumAllocationLock.writeLock(); + writeLock.lock(); + try { + if (node != null) { // added node + int nodeMemory = node.getAvailableResource().getMemory(); + if (nodeMemory > maxNodeMemory) { + maxNodeMemory = nodeMemory; + maximumAllocation.setMemory( + Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory)); + } + } else { // removed node + maxNodeMemory = -1; + for (Map.Entry nodeEntry : nodes.entrySet()) { + int nodeMemory = + nodeEntry.getValue().getAvailableResource().getMemory(); + if (nodeMemory > maxNodeMemory) { + maxNodeMemory = nodeMemory; + } + } + if (maxNodeMemory == -1) { // no nodes + maximumAllocation.setMemory(configuredMaximumAllocation.getMemory()); + } else { + maximumAllocation.setMemory( + Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory)); + } + } + } finally { + writeLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index c383e43..7c4372e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.HashSet; -import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -284,7 +283,8 @@ private synchronized void initScheduler(Configuration configuration) throws this.conf = loadCapacitySchedulerConfiguration(configuration); validateConf(this.conf); this.minimumAllocation = this.conf.getMinimumAllocation(); - this.maximumAllocation = this.conf.getMaximumAllocation(); + configuredMaximumAllocation = this.conf.getMaximumAllocation(); + maximumAllocation = Resources.clone(configuredMaximumAllocation); this.calculator = this.conf.getResourceCalculator(); this.usePortForNodeName = this.conf.getUsePortForNodeName(); this.applications = @@ -321,8 +321,8 @@ private synchronized void startSchedulerThreads() { @Override public void serviceInit(Configuration conf) throws Exception { Configuration configuration = new Configuration(conf); - initScheduler(configuration); super.serviceInit(conf); + initScheduler(configuration); } @Override @@ -849,7 +849,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, // Sanity check SchedulerUtils.normalizeRequests( ask, getResourceCalculator(), getClusterResource(), - getMinimumResourceCapability(), maximumAllocation); + getMinimumResourceCapability(), getMaximumResourceCapability()); // Release containers releaseContainers(release, application); @@ -1123,12 +1123,13 @@ private synchronized void addNode(RMNode nodeManager) { labelManager.activateNode(nodeManager.getNodeID(), nodeManager.getTotalCapability()); } - - this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager, - usePortForNodeName)); + FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, + usePortForNodeName); + this.nodes.put(nodeManager.getNodeID(), schedulerNode); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); root.updateClusterResource(clusterResource); int numNodes = numNodeManagers.incrementAndGet(); + updateMaximumAllocation(schedulerNode); LOG.info("Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); @@ -1177,6 +1178,7 @@ private synchronized void removeNode(RMNode nodeInfo) { } this.nodes.remove(nodeInfo.getNodeID()); + updateMaximumAllocation(null); LOG.info("Removed node " + nodeInfo.getNodeAddress() + " clusterResource: " + clusterResource); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 94fb849..bb8e665 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -817,9 +817,11 @@ protected synchronized void completedContainer(RMContainer rmContainer, } private synchronized void addNode(RMNode node) { - nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName)); + FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName); + nodes.put(node.getNodeID(), schedulerNode); Resources.addTo(clusterResource, node.getTotalCapability()); updateRootQueueMetrics(); + updateMaximumAllocation(schedulerNode); queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); @@ -859,6 +861,7 @@ private synchronized void removeNode(RMNode rmNode) { nodes.remove(rmNode.getNodeID()); queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); + updateMaximumAllocation(null); LOG.info("Removed node " + rmNode.getNodeAddress() + " cluster capacity: " + clusterResource); } @@ -877,7 +880,8 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, // Sanity check SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(), - clusterResource, minimumAllocation, maximumAllocation, incrAllocation); + clusterResource, minimumAllocation, getMaximumResourceCapability(), + incrAllocation); // Set amResource for this app if (!application.getUnmanagedAM() && ask.size() == 1 @@ -1225,7 +1229,8 @@ private void initScheduler(Configuration conf) throws IOException { this.conf = new FairSchedulerConfiguration(conf); validateConf(this.conf); minimumAllocation = this.conf.getMinimumAllocation(); - maximumAllocation = this.conf.getMaximumAllocation(); + configuredMaximumAllocation = this.conf.getMaximumAllocation(); + maximumAllocation = Resources.clone(configuredMaximumAllocation); incrAllocation = this.conf.getIncrementAllocation(); continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); continuousSchedulingSleepMs = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 532edc7..c7713b0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -215,10 +215,11 @@ private synchronized void initScheduler(Configuration conf) { Resources.createResource(conf.getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); - this.maximumAllocation = + configuredMaximumAllocation = Resources.createResource(conf.getInt( YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); + maximumAllocation = Resources.clone(configuredMaximumAllocation); this.usePortForNodeName = conf.getBoolean( YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); @@ -303,7 +304,7 @@ public Allocation allocate( // Sanity check SchedulerUtils.normalizeRequests(ask, resourceCalculator, - clusterResource, minimumAllocation, maximumAllocation); + clusterResource, minimumAllocation, getMaximumResourceCapability()); // Release containers releaseContainers(release, application); @@ -899,6 +900,7 @@ private synchronized void removeNode(RMNode nodeInfo) { //Remove the node this.nodes.remove(nodeInfo.getNodeID()); + updateMaximumAllocation(null); // Update cluster metrics Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability()); @@ -916,9 +918,11 @@ public QueueInfo getQueueInfo(String queueName, } private synchronized void addNode(RMNode nodeManager) { - this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager, - usePortForNodeName)); + FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, + usePortForNodeName); + this.nodes.put(nodeManager.getNodeID(), schedulerNode); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); + updateMaximumAllocation(schedulerNode); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index 12f7498..084e59b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; @@ -296,6 +297,93 @@ public void testNonDefaultMinimumAllocation() throws Exception { testMinimumAllocation(conf, allocMB / 2); } + private void testMaximumAllocationHelper(AbstractYarnScheduler scheduler, + final int node1MaxMemory, final int node2MaxMemory, + final int node3MaxMemory, final int... expectedMaxMemory) + throws Exception { + Assert.assertEquals(6, expectedMaxMemory.length); + + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + int maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[0], maxMemory); + + RMNode node1 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node1MaxMemory), 1, "127.0.0.2"); + scheduler.handle(new NodeAddedSchedulerEvent(node1)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[1], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node1)); + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[2], maxMemory); + + RMNode node2 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node2MaxMemory), 2, "127.0.0.3"); + scheduler.handle(new NodeAddedSchedulerEvent(node2)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[3], maxMemory); + + RMNode node3 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node3MaxMemory), 3, "127.0.0.4"); + scheduler.handle(new NodeAddedSchedulerEvent(node3)); + Assert.assertEquals(2, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[4], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node3)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[5], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node2)); + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + } + + @Test + public void testMaximimumAllocation() throws Exception { + final int node1MaxMemory = 15 * GB; + final int node2MaxMemory = 5 * GB; + final int node3MaxMemory = 6 * GB; + final int configuredMaxMemory = 10 * GB; + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, + ResourceScheduler.class); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + configuredMaxMemory); + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + 1000 * 1000); + MockRM rm = new MockRM(conf); + try { + rm.start(); + testMaximumAllocationHelper( + (AbstractYarnScheduler) rm.getResourceScheduler(), + node1MaxMemory, node2MaxMemory, node3MaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory); + } finally { + rm.stop(); + } + + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + 0); + rm = new MockRM(conf); + try { + rm.start(); + testMaximumAllocationHelper( + (AbstractYarnScheduler) rm.getResourceScheduler(), + node1MaxMemory, node2MaxMemory, node3MaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, + node2MaxMemory, node3MaxMemory, node2MaxMemory); + } finally { + rm.stop(); + } + } + @Test (timeout = 50000) public void testReconnectedNode() throws Exception { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 2aa57a0..b6b12f8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -230,6 +230,93 @@ public void testConfValidation() throws Exception { return nm; } + private void testMaximumAllocationHelper(AbstractYarnScheduler scheduler, + final int node1MaxMemory, final int node2MaxMemory, + final int node3MaxMemory, final int... expectedMaxMemory) + throws Exception { + Assert.assertEquals(6, expectedMaxMemory.length); + + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + int maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[0], maxMemory); + + RMNode node1 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node1MaxMemory), 1, "127.0.0.2"); + scheduler.handle(new NodeAddedSchedulerEvent(node1)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[1], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node1)); + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[2], maxMemory); + + RMNode node2 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node2MaxMemory), 2, "127.0.0.3"); + scheduler.handle(new NodeAddedSchedulerEvent(node2)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[3], maxMemory); + + RMNode node3 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node3MaxMemory), 3, "127.0.0.4"); + scheduler.handle(new NodeAddedSchedulerEvent(node3)); + Assert.assertEquals(2, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[4], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node3)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[5], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node2)); + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + } + + @Test + public void testMaximimumAllocation() throws Exception { + final int node1MaxMemory = 15 * GB; + final int node2MaxMemory = 5 * GB; + final int node3MaxMemory = 6 * GB; + final int configuredMaxMemory = 10 * GB; + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + configuredMaxMemory); + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + 1000 * 1000); + MockRM rm = new MockRM(conf); + try { + rm.start(); + testMaximumAllocationHelper( + (AbstractYarnScheduler) rm.getResourceScheduler(), + node1MaxMemory, node2MaxMemory, node3MaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory); + } finally { + rm.stop(); + } + + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + 0); + rm = new MockRM(conf); + try { + rm.start(); + testMaximumAllocationHelper( + (AbstractYarnScheduler) rm.getResourceScheduler(), + node1MaxMemory, node2MaxMemory, node3MaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, + node2MaxMemory, node3MaxMemory, node2MaxMemory); + } finally { + rm.stop(); + } + } + @Test public void testCapacityScheduler() throws Exception { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index ad834ac..24f22b3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -116,7 +116,7 @@ public void testExcessReservationThanNodeManagerCapacity() throws Exception { am1.registerAppAttempt(); LOG.info("sending container requests "); - am1.addRequests(new String[] {"*"}, 3 * GB, 1, 1); + am1.addRequests(new String[] {"*"}, 2 * GB, 1, 1); AllocateResponse alloc1Response = am1.schedule(); // send the request // kick the scheduler diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 1996367..cec4b0e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; @@ -250,8 +251,93 @@ public void testMinZeroResourcesSettings() throws IOException { Assert.assertEquals(0, scheduler.getMinimumResourceCapability().getVirtualCores()); Assert.assertEquals(512, scheduler.getIncrementResourceCapability().getMemory()); Assert.assertEquals(2, scheduler.getIncrementResourceCapability().getVirtualCores()); - } - + } + + private void testMaximumAllocationHelper(AbstractYarnScheduler scheduler, + final int node1MaxMemory, final int node2MaxMemory, + final int node3MaxMemory, final int... expectedMaxMemory) + throws Exception { + Assert.assertEquals(6, expectedMaxMemory.length); + + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + int maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[0], maxMemory); + + RMNode node1 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node1MaxMemory), 1, "127.0.0.2"); + scheduler.handle(new NodeAddedSchedulerEvent(node1)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[1], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node1)); + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[2], maxMemory); + + RMNode node2 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node2MaxMemory), 2, "127.0.0.3"); + scheduler.handle(new NodeAddedSchedulerEvent(node2)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[3], maxMemory); + + RMNode node3 = MockNodes.newNodeInfo( + 0, MockNodes.newResource(node3MaxMemory), 3, "127.0.0.4"); + scheduler.handle(new NodeAddedSchedulerEvent(node3)); + Assert.assertEquals(2, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[4], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node3)); + Assert.assertEquals(1, scheduler.getNumClusterNodes()); + maxMemory = scheduler.getMaximumResourceCapability().getMemory(); + Assert.assertEquals(expectedMaxMemory[5], maxMemory); + + scheduler.handle(new NodeRemovedSchedulerEvent(node2)); + Assert.assertEquals(0, scheduler.getNumClusterNodes()); + } + + @Test + public void testMaximimumAllocation() throws Exception { + final int node1MaxMemory = 15 * 1024; + final int node2MaxMemory = 5 * 1024; + final int node3MaxMemory = 6 * 1024; + final int configuredMaxMemory = 10 * 1024; + YarnConfiguration conf = new YarnConfiguration(createConfiguration()); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + configuredMaxMemory); + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + 1000 * 1000); + MockRM rm = new MockRM(conf); + try { + rm.start(); + testMaximumAllocationHelper( + (AbstractYarnScheduler) rm.getResourceScheduler(), + node1MaxMemory, node2MaxMemory, node3MaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory); + } finally { + rm.stop(); + } + + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + 0); + rm = new MockRM(conf); + try { + rm.start(); + testMaximumAllocationHelper( + (AbstractYarnScheduler) rm.getResourceScheduler(), + node1MaxMemory, node2MaxMemory, node3MaxMemory, + configuredMaxMemory, configuredMaxMemory, configuredMaxMemory, + node2MaxMemory, node3MaxMemory, node2MaxMemory); + } finally { + rm.stop(); + } + } + @Test public void testAggregateCapacityTracking() throws Exception { scheduler.init(conf); @@ -2529,37 +2615,7 @@ public void testNotAllowSubmitApplication() throws Exception { } assertEquals(FinalApplicationStatus.FAILED, application.getFinalApplicationStatus()); } - - @Test - public void testReservationThatDoesntFit() throws IOException { - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - RMNode node1 = - MockNodes - .newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", - "user1", 1); - scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(updateEvent); - - FSAppAttempt app = scheduler.getSchedulerApp(attId); - assertEquals(0, app.getLiveContainers().size()); - assertEquals(0, app.getReservedContainers().size()); - - createSchedulingRequestExistingApplication(1024, 2, attId); - scheduler.update(); - scheduler.handle(updateEvent); - - assertEquals(1, app.getLiveContainers().size()); - assertEquals(0, app.getReservedContainers().size()); - } - @Test public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException { scheduler.init(conf);