diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 45d7294..28f48c1 100644
--- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -158,6 +158,18 @@
+
+
+
+
+
+
+
+
+
+
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 8e8d627..90c191e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -78,6 +79,12 @@
protected Resource minimumAllocation;
protected Resource maximumAllocation;
+ protected Resource configuredMaximumAllocation;
+ private int maxNodeMemory = -1;
+ private ReentrantReadWriteLock maximumAllocationLock =
+ new ReentrantReadWriteLock();
+ private boolean useConfiguredMaximumAllocationOnly = true;
+ private long configuredMaximumAllocationWaitTime;
protected RMContext rmContext;
protected Map> applications;
@@ -102,6 +109,9 @@ public void serviceInit(Configuration conf) throws Exception {
nmExpireInterval =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+ configuredMaximumAllocationWaitTime =
+ conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+ YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
createReleaseCache();
super.serviceInit(conf);
}
@@ -145,7 +155,20 @@ public Resource getMinimumResourceCapability() {
@Override
public Resource getMaximumResourceCapability() {
- return maximumAllocation;
+ if (useConfiguredMaximumAllocationOnly) {
+ if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
+ > configuredMaximumAllocationWaitTime) {
+ useConfiguredMaximumAllocationOnly = false;
+ }
+ return configuredMaximumAllocation;
+ }
+ ReentrantReadWriteLock.ReadLock readLock = maximumAllocationLock.readLock();
+ readLock.lock();
+ try {
+ return Resources.clone(maximumAllocation);
+ } finally {
+ readLock.unlock();
+ }
}
protected void containerLaunchedOnNode(ContainerId containerId,
@@ -528,4 +551,37 @@ public synchronized void updateNodeResource(RMNode nm,
throw new YarnException(getClass().getSimpleName()
+ " does not support reservations");
}
+
+ protected void updateMaximumAllocation(SchedulerNode node) {
+ ReentrantReadWriteLock.WriteLock writeLock =
+ maximumAllocationLock.writeLock();
+ writeLock.lock();
+ try {
+ if (node != null) { // added node
+ int nodeMemory = node.getAvailableResource().getMemory();
+ if (nodeMemory > maxNodeMemory) {
+ maxNodeMemory = nodeMemory;
+ maximumAllocation.setMemory(
+ Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory));
+ }
+ } else { // removed node
+ maxNodeMemory = -1;
+ for (Map.Entry nodeEntry : nodes.entrySet()) {
+ int nodeMemory =
+ nodeEntry.getValue().getAvailableResource().getMemory();
+ if (nodeMemory > maxNodeMemory) {
+ maxNodeMemory = nodeMemory;
+ }
+ }
+ if (maxNodeMemory == -1) { // no nodes
+ maximumAllocation.setMemory(configuredMaximumAllocation.getMemory());
+ } else {
+ maximumAllocation.setMemory(
+ Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory));
+ }
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index c383e43..7c4372e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -33,7 +33,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.HashSet;
-import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -284,7 +283,8 @@ private synchronized void initScheduler(Configuration configuration) throws
this.conf = loadCapacitySchedulerConfiguration(configuration);
validateConf(this.conf);
this.minimumAllocation = this.conf.getMinimumAllocation();
- this.maximumAllocation = this.conf.getMaximumAllocation();
+ configuredMaximumAllocation = this.conf.getMaximumAllocation();
+ maximumAllocation = Resources.clone(configuredMaximumAllocation);
this.calculator = this.conf.getResourceCalculator();
this.usePortForNodeName = this.conf.getUsePortForNodeName();
this.applications =
@@ -321,8 +321,8 @@ private synchronized void startSchedulerThreads() {
@Override
public void serviceInit(Configuration conf) throws Exception {
Configuration configuration = new Configuration(conf);
- initScheduler(configuration);
super.serviceInit(conf);
+ initScheduler(configuration);
}
@Override
@@ -849,7 +849,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId,
// Sanity check
SchedulerUtils.normalizeRequests(
ask, getResourceCalculator(), getClusterResource(),
- getMinimumResourceCapability(), maximumAllocation);
+ getMinimumResourceCapability(), getMaximumResourceCapability());
// Release containers
releaseContainers(release, application);
@@ -1123,12 +1123,13 @@ private synchronized void addNode(RMNode nodeManager) {
labelManager.activateNode(nodeManager.getNodeID(),
nodeManager.getTotalCapability());
}
-
- this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
- usePortForNodeName));
+ FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
+ usePortForNodeName);
+ this.nodes.put(nodeManager.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
root.updateClusterResource(clusterResource);
int numNodes = numNodeManagers.incrementAndGet();
+ updateMaximumAllocation(schedulerNode);
LOG.info("Added node " + nodeManager.getNodeAddress() +
" clusterResource: " + clusterResource);
@@ -1177,6 +1178,7 @@ private synchronized void removeNode(RMNode nodeInfo) {
}
this.nodes.remove(nodeInfo.getNodeID());
+ updateMaximumAllocation(null);
LOG.info("Removed node " + nodeInfo.getNodeAddress() +
" clusterResource: " + clusterResource);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 94fb849..bb8e665 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -817,9 +817,11 @@ protected synchronized void completedContainer(RMContainer rmContainer,
}
private synchronized void addNode(RMNode node) {
- nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName));
+ FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName);
+ nodes.put(node.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, node.getTotalCapability());
updateRootQueueMetrics();
+ updateMaximumAllocation(schedulerNode);
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().recomputeSteadyShares();
@@ -859,6 +861,7 @@ private synchronized void removeNode(RMNode rmNode) {
nodes.remove(rmNode.getNodeID());
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().recomputeSteadyShares();
+ updateMaximumAllocation(null);
LOG.info("Removed node " + rmNode.getNodeAddress() +
" cluster capacity: " + clusterResource);
}
@@ -877,7 +880,8 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
// Sanity check
SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
- clusterResource, minimumAllocation, maximumAllocation, incrAllocation);
+ clusterResource, minimumAllocation, getMaximumResourceCapability(),
+ incrAllocation);
// Set amResource for this app
if (!application.getUnmanagedAM() && ask.size() == 1
@@ -1225,7 +1229,8 @@ private void initScheduler(Configuration conf) throws IOException {
this.conf = new FairSchedulerConfiguration(conf);
validateConf(this.conf);
minimumAllocation = this.conf.getMinimumAllocation();
- maximumAllocation = this.conf.getMaximumAllocation();
+ configuredMaximumAllocation = this.conf.getMaximumAllocation();
+ maximumAllocation = Resources.clone(configuredMaximumAllocation);
incrAllocation = this.conf.getIncrementAllocation();
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
continuousSchedulingSleepMs =
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 532edc7..c7713b0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -215,10 +215,11 @@ private synchronized void initScheduler(Configuration conf) {
Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
- this.maximumAllocation =
+ configuredMaximumAllocation =
Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+ maximumAllocation = Resources.clone(configuredMaximumAllocation);
this.usePortForNodeName = conf.getBoolean(
YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
@@ -303,7 +304,7 @@ public Allocation allocate(
// Sanity check
SchedulerUtils.normalizeRequests(ask, resourceCalculator,
- clusterResource, minimumAllocation, maximumAllocation);
+ clusterResource, minimumAllocation, getMaximumResourceCapability());
// Release containers
releaseContainers(release, application);
@@ -899,6 +900,7 @@ private synchronized void removeNode(RMNode nodeInfo) {
//Remove the node
this.nodes.remove(nodeInfo.getNodeID());
+ updateMaximumAllocation(null);
// Update cluster metrics
Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
@@ -916,9 +918,11 @@ public QueueInfo getQueueInfo(String queueName,
}
private synchronized void addNode(RMNode nodeManager) {
- this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
- usePortForNodeName));
+ FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
+ usePortForNodeName);
+ this.nodes.put(nodeManager.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
+ updateMaximumAllocation(schedulerNode);
}
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
index 12f7498..084e59b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
@@ -296,6 +297,93 @@ public void testNonDefaultMinimumAllocation() throws Exception {
testMinimumAllocation(conf, allocMB / 2);
}
+ private void testMaximumAllocationHelper(AbstractYarnScheduler scheduler,
+ final int node1MaxMemory, final int node2MaxMemory,
+ final int node3MaxMemory, final int... expectedMaxMemory)
+ throws Exception {
+ Assert.assertEquals(6, expectedMaxMemory.length);
+
+ Assert.assertEquals(0, scheduler.getNumClusterNodes());
+ int maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[0], maxMemory);
+
+ RMNode node1 = MockNodes.newNodeInfo(
+ 0, MockNodes.newResource(node1MaxMemory), 1, "127.0.0.2");
+ scheduler.handle(new NodeAddedSchedulerEvent(node1));
+ Assert.assertEquals(1, scheduler.getNumClusterNodes());
+ maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[1], maxMemory);
+
+ scheduler.handle(new NodeRemovedSchedulerEvent(node1));
+ Assert.assertEquals(0, scheduler.getNumClusterNodes());
+ maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[2], maxMemory);
+
+ RMNode node2 = MockNodes.newNodeInfo(
+ 0, MockNodes.newResource(node2MaxMemory), 2, "127.0.0.3");
+ scheduler.handle(new NodeAddedSchedulerEvent(node2));
+ Assert.assertEquals(1, scheduler.getNumClusterNodes());
+ maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[3], maxMemory);
+
+ RMNode node3 = MockNodes.newNodeInfo(
+ 0, MockNodes.newResource(node3MaxMemory), 3, "127.0.0.4");
+ scheduler.handle(new NodeAddedSchedulerEvent(node3));
+ Assert.assertEquals(2, scheduler.getNumClusterNodes());
+ maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[4], maxMemory);
+
+ scheduler.handle(new NodeRemovedSchedulerEvent(node3));
+ Assert.assertEquals(1, scheduler.getNumClusterNodes());
+ maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[5], maxMemory);
+
+ scheduler.handle(new NodeRemovedSchedulerEvent(node2));
+ Assert.assertEquals(0, scheduler.getNumClusterNodes());
+ }
+
+ @Test
+ public void testMaximimumAllocation() throws Exception {
+ final int node1MaxMemory = 15 * GB;
+ final int node2MaxMemory = 5 * GB;
+ final int node3MaxMemory = 6 * GB;
+ final int configuredMaxMemory = 10 * GB;
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
+ ResourceScheduler.class);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ configuredMaxMemory);
+ conf.setLong(
+ YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+ 1000 * 1000);
+ MockRM rm = new MockRM(conf);
+ try {
+ rm.start();
+ testMaximumAllocationHelper(
+ (AbstractYarnScheduler) rm.getResourceScheduler(),
+ node1MaxMemory, node2MaxMemory, node3MaxMemory,
+ configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
+ configuredMaxMemory, configuredMaxMemory, configuredMaxMemory);
+ } finally {
+ rm.stop();
+ }
+
+ conf.setLong(
+ YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+ 0);
+ rm = new MockRM(conf);
+ try {
+ rm.start();
+ testMaximumAllocationHelper(
+ (AbstractYarnScheduler) rm.getResourceScheduler(),
+ node1MaxMemory, node2MaxMemory, node3MaxMemory,
+ configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
+ node2MaxMemory, node3MaxMemory, node2MaxMemory);
+ } finally {
+ rm.stop();
+ }
+ }
+
@Test (timeout = 50000)
public void testReconnectedNode() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 2aa57a0..b6b12f8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -230,6 +230,93 @@ public void testConfValidation() throws Exception {
return nm;
}
+ private void testMaximumAllocationHelper(AbstractYarnScheduler scheduler,
+ final int node1MaxMemory, final int node2MaxMemory,
+ final int node3MaxMemory, final int... expectedMaxMemory)
+ throws Exception {
+ Assert.assertEquals(6, expectedMaxMemory.length);
+
+ Assert.assertEquals(0, scheduler.getNumClusterNodes());
+ int maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[0], maxMemory);
+
+ RMNode node1 = MockNodes.newNodeInfo(
+ 0, MockNodes.newResource(node1MaxMemory), 1, "127.0.0.2");
+ scheduler.handle(new NodeAddedSchedulerEvent(node1));
+ Assert.assertEquals(1, scheduler.getNumClusterNodes());
+ maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[1], maxMemory);
+
+ scheduler.handle(new NodeRemovedSchedulerEvent(node1));
+ Assert.assertEquals(0, scheduler.getNumClusterNodes());
+ maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[2], maxMemory);
+
+ RMNode node2 = MockNodes.newNodeInfo(
+ 0, MockNodes.newResource(node2MaxMemory), 2, "127.0.0.3");
+ scheduler.handle(new NodeAddedSchedulerEvent(node2));
+ Assert.assertEquals(1, scheduler.getNumClusterNodes());
+ maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[3], maxMemory);
+
+ RMNode node3 = MockNodes.newNodeInfo(
+ 0, MockNodes.newResource(node3MaxMemory), 3, "127.0.0.4");
+ scheduler.handle(new NodeAddedSchedulerEvent(node3));
+ Assert.assertEquals(2, scheduler.getNumClusterNodes());
+ maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[4], maxMemory);
+
+ scheduler.handle(new NodeRemovedSchedulerEvent(node3));
+ Assert.assertEquals(1, scheduler.getNumClusterNodes());
+ maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[5], maxMemory);
+
+ scheduler.handle(new NodeRemovedSchedulerEvent(node2));
+ Assert.assertEquals(0, scheduler.getNumClusterNodes());
+ }
+
+ @Test
+ public void testMaximimumAllocation() throws Exception {
+ final int node1MaxMemory = 15 * GB;
+ final int node2MaxMemory = 5 * GB;
+ final int node3MaxMemory = 6 * GB;
+ final int configuredMaxMemory = 10 * GB;
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ configuredMaxMemory);
+ conf.setLong(
+ YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+ 1000 * 1000);
+ MockRM rm = new MockRM(conf);
+ try {
+ rm.start();
+ testMaximumAllocationHelper(
+ (AbstractYarnScheduler) rm.getResourceScheduler(),
+ node1MaxMemory, node2MaxMemory, node3MaxMemory,
+ configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
+ configuredMaxMemory, configuredMaxMemory, configuredMaxMemory);
+ } finally {
+ rm.stop();
+ }
+
+ conf.setLong(
+ YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+ 0);
+ rm = new MockRM(conf);
+ try {
+ rm.start();
+ testMaximumAllocationHelper(
+ (AbstractYarnScheduler) rm.getResourceScheduler(),
+ node1MaxMemory, node2MaxMemory, node3MaxMemory,
+ configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
+ node2MaxMemory, node3MaxMemory, node2MaxMemory);
+ } finally {
+ rm.stop();
+ }
+ }
+
@Test
public void testCapacityScheduler() throws Exception {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index ad834ac..24f22b3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -116,7 +116,7 @@ public void testExcessReservationThanNodeManagerCapacity() throws Exception {
am1.registerAppAttempt();
LOG.info("sending container requests ");
- am1.addRequests(new String[] {"*"}, 3 * GB, 1, 1);
+ am1.addRequests(new String[] {"*"}, 2 * GB, 1, 1);
AllocateResponse alloc1Response = am1.schedule(); // send the request
// kick the scheduler
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 1996367..cec4b0e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -70,6 +70,7 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
@@ -250,8 +251,93 @@ public void testMinZeroResourcesSettings() throws IOException {
Assert.assertEquals(0, scheduler.getMinimumResourceCapability().getVirtualCores());
Assert.assertEquals(512, scheduler.getIncrementResourceCapability().getMemory());
Assert.assertEquals(2, scheduler.getIncrementResourceCapability().getVirtualCores());
- }
-
+ }
+
+ private void testMaximumAllocationHelper(AbstractYarnScheduler scheduler,
+ final int node1MaxMemory, final int node2MaxMemory,
+ final int node3MaxMemory, final int... expectedMaxMemory)
+ throws Exception {
+ Assert.assertEquals(6, expectedMaxMemory.length);
+
+ Assert.assertEquals(0, scheduler.getNumClusterNodes());
+ int maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[0], maxMemory);
+
+ RMNode node1 = MockNodes.newNodeInfo(
+ 0, MockNodes.newResource(node1MaxMemory), 1, "127.0.0.2");
+ scheduler.handle(new NodeAddedSchedulerEvent(node1));
+ Assert.assertEquals(1, scheduler.getNumClusterNodes());
+ maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[1], maxMemory);
+
+ scheduler.handle(new NodeRemovedSchedulerEvent(node1));
+ Assert.assertEquals(0, scheduler.getNumClusterNodes());
+ maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[2], maxMemory);
+
+ RMNode node2 = MockNodes.newNodeInfo(
+ 0, MockNodes.newResource(node2MaxMemory), 2, "127.0.0.3");
+ scheduler.handle(new NodeAddedSchedulerEvent(node2));
+ Assert.assertEquals(1, scheduler.getNumClusterNodes());
+ maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[3], maxMemory);
+
+ RMNode node3 = MockNodes.newNodeInfo(
+ 0, MockNodes.newResource(node3MaxMemory), 3, "127.0.0.4");
+ scheduler.handle(new NodeAddedSchedulerEvent(node3));
+ Assert.assertEquals(2, scheduler.getNumClusterNodes());
+ maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[4], maxMemory);
+
+ scheduler.handle(new NodeRemovedSchedulerEvent(node3));
+ Assert.assertEquals(1, scheduler.getNumClusterNodes());
+ maxMemory = scheduler.getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(expectedMaxMemory[5], maxMemory);
+
+ scheduler.handle(new NodeRemovedSchedulerEvent(node2));
+ Assert.assertEquals(0, scheduler.getNumClusterNodes());
+ }
+
+ @Test
+ public void testMaximimumAllocation() throws Exception {
+ final int node1MaxMemory = 15 * 1024;
+ final int node2MaxMemory = 5 * 1024;
+ final int node3MaxMemory = 6 * 1024;
+ final int configuredMaxMemory = 10 * 1024;
+ YarnConfiguration conf = new YarnConfiguration(createConfiguration());
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ configuredMaxMemory);
+ conf.setLong(
+ YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+ 1000 * 1000);
+ MockRM rm = new MockRM(conf);
+ try {
+ rm.start();
+ testMaximumAllocationHelper(
+ (AbstractYarnScheduler) rm.getResourceScheduler(),
+ node1MaxMemory, node2MaxMemory, node3MaxMemory,
+ configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
+ configuredMaxMemory, configuredMaxMemory, configuredMaxMemory);
+ } finally {
+ rm.stop();
+ }
+
+ conf.setLong(
+ YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+ 0);
+ rm = new MockRM(conf);
+ try {
+ rm.start();
+ testMaximumAllocationHelper(
+ (AbstractYarnScheduler) rm.getResourceScheduler(),
+ node1MaxMemory, node2MaxMemory, node3MaxMemory,
+ configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
+ node2MaxMemory, node3MaxMemory, node2MaxMemory);
+ } finally {
+ rm.stop();
+ }
+ }
+
@Test
public void testAggregateCapacityTracking() throws Exception {
scheduler.init(conf);
@@ -2529,37 +2615,7 @@ public void testNotAllowSubmitApplication() throws Exception {
}
assertEquals(FinalApplicationStatus.FAILED, application.getFinalApplicationStatus());
}
-
- @Test
- public void testReservationThatDoesntFit() throws IOException {
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
- RMNode node1 =
- MockNodes
- .newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
-
- ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1",
- "user1", 1);
- scheduler.update();
- NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
- scheduler.handle(updateEvent);
-
- FSAppAttempt app = scheduler.getSchedulerApp(attId);
- assertEquals(0, app.getLiveContainers().size());
- assertEquals(0, app.getReservedContainers().size());
-
- createSchedulingRequestExistingApplication(1024, 2, attId);
- scheduler.update();
- scheduler.handle(updateEvent);
-
- assertEquals(1, app.getLiveContainers().size());
- assertEquals(0, app.getReservedContainers().size());
- }
-
@Test
public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException {
scheduler.init(conf);