diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index e6da24c..85d1cb2 100644
--- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -158,6 +158,18 @@
+
+
+
+
+
+
+
+
+
+
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 8e8d627..b5e2b6e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -78,6 +79,10 @@
protected Resource minimumAllocation;
protected Resource maximumAllocation;
+ protected Resource realMaximumAllocation;
+ private int maxNodeMemory = -1;
+ private ReentrantReadWriteLock maximumAllocationLock =
+ new ReentrantReadWriteLock();
protected RMContext rmContext;
protected Map> applications;
@@ -145,7 +150,13 @@ public Resource getMinimumResourceCapability() {
@Override
public Resource getMaximumResourceCapability() {
- return maximumAllocation;
+ ReentrantReadWriteLock.ReadLock readLock = maximumAllocationLock.readLock();
+ try {
+ readLock.lock();
+ return Resources.clone(maximumAllocation);
+ } finally {
+ readLock.unlock();
+ }
}
protected void containerLaunchedOnNode(ContainerId containerId,
@@ -528,4 +539,36 @@ public synchronized void updateNodeResource(RMNode nm,
throw new YarnException(getClass().getSimpleName()
+ " does not support reservations");
}
+
+ protected void updateMaximumAllocation(SchedulerNode node) {
+ ReentrantReadWriteLock.WriteLock writeLock = maximumAllocationLock.writeLock();
+ try {
+ writeLock.lock();
+ if (node != null) { // added node
+ int nodeMemory = node.getAvailableResource().getMemory();
+ if (nodeMemory > maxNodeMemory) {
+ maxNodeMemory = nodeMemory;
+ maximumAllocation.setMemory(Math.min(realMaximumAllocation.getMemory(),
+ maxNodeMemory));
+ }
+ } else { // removed node
+ maxNodeMemory = -1;
+ for (Map.Entry nodeEntry : nodes.entrySet()) {
+ int nodeMemory =
+ nodeEntry.getValue().getAvailableResource().getMemory();
+ if (nodeMemory > maxNodeMemory) {
+ maxNodeMemory = nodeMemory;
+ }
+ }
+ if (maxNodeMemory == -1) { // no nodes
+ maximumAllocation.setMemory(realMaximumAllocation.getMemory());
+ } else {
+ maximumAllocation.setMemory(Math.min(realMaximumAllocation.getMemory(),
+ maxNodeMemory));
+ }
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 9332228..d83b903 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -284,6 +284,7 @@ private synchronized void initScheduler(Configuration configuration) throws
validateConf(this.conf);
this.minimumAllocation = this.conf.getMinimumAllocation();
this.maximumAllocation = this.conf.getMaximumAllocation();
+ this.realMaximumAllocation = this.maximumAllocation;
this.calculator = this.conf.getResourceCalculator();
this.usePortForNodeName = this.conf.getUsePortForNodeName();
this.applications =
@@ -850,7 +851,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId,
// Sanity check
SchedulerUtils.normalizeRequests(
ask, getResourceCalculator(), getClusterResource(),
- getMinimumResourceCapability(), maximumAllocation);
+ getMinimumResourceCapability(), getMaximumResourceCapability());
// Release containers
releaseContainers(release, application);
@@ -1124,12 +1125,13 @@ private synchronized void addNode(RMNode nodeManager) {
labelManager.activateNode(nodeManager.getNodeID(),
nodeManager.getTotalCapability());
}
-
- this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
- usePortForNodeName));
+ FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
+ usePortForNodeName);
+ this.nodes.put(nodeManager.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
root.updateClusterResource(clusterResource);
int numNodes = numNodeManagers.incrementAndGet();
+ updateMaximumAllocation(schedulerNode);
LOG.info("Added node " + nodeManager.getNodeAddress() +
" clusterResource: " + clusterResource);
@@ -1178,6 +1180,7 @@ private synchronized void removeNode(RMNode nodeInfo) {
}
this.nodes.remove(nodeInfo.getNodeID());
+ updateMaximumAllocation(null);
LOG.info("Removed node " + nodeInfo.getNodeAddress() +
" clusterResource: " + clusterResource);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 3fc3019..4a156b3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -817,9 +817,11 @@ protected synchronized void completedContainer(RMContainer rmContainer,
}
private synchronized void addNode(RMNode node) {
- nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName));
+ FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName);
+ nodes.put(node.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, node.getTotalCapability());
updateRootQueueMetrics();
+ updateMaximumAllocation(schedulerNode);
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().recomputeSteadyShares();
@@ -859,6 +861,7 @@ private synchronized void removeNode(RMNode rmNode) {
nodes.remove(rmNode.getNodeID());
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().recomputeSteadyShares();
+ updateMaximumAllocation(null);
LOG.info("Removed node " + rmNode.getNodeAddress() +
" cluster capacity: " + clusterResource);
}
@@ -877,7 +880,8 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
// Sanity check
SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
- clusterResource, minimumAllocation, maximumAllocation, incrAllocation);
+ clusterResource, minimumAllocation, getMaximumResourceCapability(),
+ incrAllocation);
// Set amResource for this app
if (!application.getUnmanagedAM() && ask.size() == 1
@@ -1212,6 +1216,7 @@ private void initScheduler(Configuration conf) throws IOException {
validateConf(this.conf);
minimumAllocation = this.conf.getMinimumAllocation();
maximumAllocation = this.conf.getMaximumAllocation();
+ this.realMaximumAllocation = this.maximumAllocation;
incrAllocation = this.conf.getIncrementAllocation();
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
continuousSchedulingSleepMs =
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 532edc7..1770637 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -219,6 +219,7 @@ private synchronized void initScheduler(Configuration conf) {
Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+ this.realMaximumAllocation = Resources.clone(this.maximumAllocation);
this.usePortForNodeName = conf.getBoolean(
YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
@@ -303,7 +304,7 @@ public Allocation allocate(
// Sanity check
SchedulerUtils.normalizeRequests(ask, resourceCalculator,
- clusterResource, minimumAllocation, maximumAllocation);
+ clusterResource, minimumAllocation, getMaximumResourceCapability());
// Release containers
releaseContainers(release, application);
@@ -899,6 +900,7 @@ private synchronized void removeNode(RMNode nodeInfo) {
//Remove the node
this.nodes.remove(nodeInfo.getNodeID());
+ updateMaximumAllocation(null);
// Update cluster metrics
Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
@@ -916,9 +918,11 @@ public QueueInfo getQueueInfo(String queueName,
}
private synchronized void addNode(RMNode nodeManager) {
- this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
- usePortForNodeName));
+ FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
+ usePortForNodeName);
+ this.nodes.put(nodeManager.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
+ updateMaximumAllocation(schedulerNode);
}
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
index 12f7498..497e100 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
@@ -296,6 +296,64 @@ public void testNonDefaultMinimumAllocation() throws Exception {
testMinimumAllocation(conf, allocMB / 2);
}
+ @Test
+ public void testMaximimumAllocation() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration(TestFifoScheduler.conf);
+ final int configuredMaxMemory = 10 * GB;
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ configuredMaxMemory);
+ MockRM rm = new MockRM(conf);
+ try {
+ rm.start();
+ Assert.assertEquals(0, rm.getResourceScheduler().getNumClusterNodes());
+
+ int maxMemory =
+ rm.getResourceScheduler().getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(configuredMaxMemory, maxMemory);
+
+ final int node1MaxMemory = 15 * GB;
+ RMNode node1 = MockNodes.newNodeInfo(
+ 0, MockNodes.newResource(node1MaxMemory), 1, "127.0.0.2");
+ rm.getResourceScheduler().handle(new NodeAddedSchedulerEvent(node1));
+ Assert.assertEquals(1, rm.getResourceScheduler().getNumClusterNodes());
+ maxMemory =
+ rm.getResourceScheduler().getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(configuredMaxMemory, maxMemory);
+
+ rm.getResourceScheduler().handle(new NodeRemovedSchedulerEvent(node1));
+ Assert.assertEquals(0, rm.getResourceScheduler().getNumClusterNodes());
+ maxMemory =
+ rm.getResourceScheduler().getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(configuredMaxMemory, maxMemory);
+
+ final int node2MaxMemory = 5 * GB;
+ RMNode node2 = MockNodes.newNodeInfo(
+ 0, MockNodes.newResource(node2MaxMemory), 2, "127.0.0.3");
+ rm.getResourceScheduler().handle(new NodeAddedSchedulerEvent(node2));
+ Assert.assertEquals(1, rm.getResourceScheduler().getNumClusterNodes());
+ maxMemory =
+ rm.getResourceScheduler().getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(node2MaxMemory, maxMemory);
+
+ final int node3MaxMemory = 6 * GB;
+ RMNode node3 = MockNodes.newNodeInfo(
+ 0, MockNodes.newResource(node3MaxMemory), 3, "127.0.0.4");
+ rm.getResourceScheduler().handle(new NodeAddedSchedulerEvent(node3));
+ Assert.assertEquals(2, rm.getResourceScheduler().getNumClusterNodes());
+ maxMemory =
+ rm.getResourceScheduler().getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(node3MaxMemory, maxMemory);
+
+ rm.getResourceScheduler().handle(new NodeRemovedSchedulerEvent(node3));
+ Assert.assertEquals(1, rm.getResourceScheduler().getNumClusterNodes());
+ maxMemory =
+ rm.getResourceScheduler().getMaximumResourceCapability().getMemory();
+ Assert.assertEquals(node2MaxMemory, maxMemory);
+ } finally {
+ rm.stop();
+ }
+ }
+
@Test (timeout = 50000)
public void testReconnectedNode() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index b90df8e..c5c2b2f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -116,7 +116,7 @@ public void testExcessReservationThanNodeManagerCapacity() throws Exception {
am1.registerAppAttempt();
LOG.info("sending container requests ");
- am1.addRequests(new String[] {"*"}, 3 * GB, 1, 1);
+ am1.addRequests(new String[] {"*"}, 2 * GB, 1, 1);
AllocateResponse alloc1Response = am1.schedule(); // send the request
// kick the scheduler
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 5194251..37cf006 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -2449,37 +2449,7 @@ public void testNotAllowSubmitApplication() throws Exception {
}
assertEquals(FinalApplicationStatus.FAILED, application.getFinalApplicationStatus());
}
-
- @Test
- public void testReservationThatDoesntFit() throws IOException {
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
- RMNode node1 =
- MockNodes
- .newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
-
- ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1",
- "user1", 1);
- scheduler.update();
- NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
- scheduler.handle(updateEvent);
-
- FSAppAttempt app = scheduler.getSchedulerApp(attId);
- assertEquals(0, app.getLiveContainers().size());
- assertEquals(0, app.getReservedContainers().size());
-
- createSchedulingRequestExistingApplication(1024, 2, attId);
- scheduler.update();
- scheduler.handle(updateEvent);
-
- assertEquals(1, app.getLiveContainers().size());
- assertEquals(0, app.getReservedContainers().size());
- }
-
@Test
public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException {
scheduler.init(conf);