diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 1ff00be..317fd72 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -65,6 +66,7 @@ //private final ApplicationStore store; private final ActiveUsersManager activeUsersManager; + private final ContainerId masterContainerId; /* Allocated by scheduler */ boolean pending = true; // for app metrics @@ -77,6 +79,9 @@ public AppSchedulingInfo(ApplicationAttemptId appAttemptId, this.queueName = queue.getQueueName(); this.user = user; this.activeUsersManager = activeUsersManager; + this.masterContainerId = + ContainerId.newInstance(applicationAttemptId, + containerIdCounter.get() + 1); } public ApplicationId getApplicationId() { @@ -397,4 +402,8 @@ synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) { public synchronized void setQueue(Queue queue) { this.queue = queue; } + + public ContainerId getMasterContainerId() { + return masterContainerId; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java index cc9b872..8676d19 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; /** @@ -57,4 +58,9 @@ */ public abstract boolean isPending(); + /** + * Get master container Id + * @return master container id + */ + public abstract ContainerId getMasterContainerId(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index a08ba70..e9e7b0e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -58,5 +58,10 @@ * @return number of active containers on the node */ public abstract int getNumContainers(); - + + /** + * Get total resources on the node. + * @return total resources on the node. + */ + public abstract Resource getTotalResource(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index c2c5d27..e2e4cb1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1308,19 +1308,19 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod + " request=" + request + " type=" + type); } Resource capability = request.getCapability(); - Resource available = node.getAvailableResource(); + Resource totalResource = node.getTotalResource(); + assert Resources.greaterThan( resourceCalculator, clusterResource, available, Resources.none()); - // Create the container if necessary - Container container = - getContainer(rmContainer, application, node, capability, priority); - - // something went wrong getting/creating the container - if (container == null) { - LOG.warn("Couldn't get container for allocation!"); + if (!Resources.fitsIn(capability, totalResource)) { + if (LOG.isDebugEnabled()) { + LOG.info("Node : " + node.getNodeID() + + " does not have sufficient resources for request : " + request + + " node total capability : " + node.getTotalResource()); + } return Resources.none(); } @@ -1330,6 +1330,16 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod if (availableContainers > 0) { // Allocate... + // Create the container if necessary + Container container = + getContainer(rmContainer, application, node, capability, priority); + + // something went wrong getting/creating the container + if (container == null) { + LOG.warn("Couldn't get container for allocation!"); + return Resources.none(); + } + // Did we previously reserve containers at this 'priority'? if (rmContainer != null){ unreserve(application, priority, node, rmContainer); @@ -1369,6 +1379,30 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod return container.getResource(); } else { // Reserve by 'charging' in advance... + RMContainer masterContainer = + application.getRMContainer(application.getMasterContainerId()); + + // we need to check if master container is also allocated on same + // node manager if yes then reserve only if node manager has sufficient + // resources excluding application master resources. + if (masterContainer.getContainer().getNodeId().equals(node.getNodeID())) { + Resource expected = + Resources.add(capability, masterContainer.getContainer() + .getResource()); + if (!Resources.fitsIn(expected, totalResource)) { + return Resources.none(); + } + } + + // Create the container if necessary + Container container = + getContainer(rmContainer, application, node, capability, priority); + + // something went wrong getting/creating the container + if (container == null) { + LOG.warn("Couldn't get container for allocation!"); + return Resources.none(); + } reserve(application, priority, node, rmContainer, container); LOG.info("Reserved container " + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index a261dbf..17eb92a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -185,6 +185,11 @@ public boolean isBlacklisted(String resourceName) { public boolean isPending() { return this.appSchedulingInfo.isPending(); } + + @Override + public ContainerId getMasterContainerId() { + return appSchedulingInfo.getMasterContainerId(); + } public synchronized boolean isStopped() { return this.isStopped; @@ -575,5 +580,4 @@ public synchronized Allocation getAllocation(ResourceCalculator rc, null, currentContPreemption, Collections.singletonList(rr)); } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index bb9ba92..efed211 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -49,6 +49,8 @@ private Resource availableResource = recordFactory.newRecordInstance(Resource.class); private Resource usedResource = recordFactory.newRecordInstance(Resource.class); + private Resource totalResourceCapability = + recordFactory.newRecordInstance(Resource.class); private volatile int numContainers; @@ -63,7 +65,12 @@ public FiCaSchedulerNode(RMNode node) { this.rmNode = node; this.availableResource.setMemory(node.getTotalCapability().getMemory()); - this.availableResource.setVirtualCores(node.getTotalCapability().getVirtualCores()); + this.availableResource.setVirtualCores(node.getTotalCapability() + .getVirtualCores()); + this.totalResourceCapability.setMemory(node.getTotalCapability() + .getMemory()); + this.totalResourceCapability.setVirtualCores(node.getTotalCapability() + .getVirtualCores()); } public RMNode getRMNode() { @@ -110,12 +117,21 @@ public synchronized void allocateContainer(ApplicationId applicationId, getAvailableResource() + " available"); } + public synchronized Resource getTotalResourceCapability() { + return this.totalResourceCapability; + } + @Override public synchronized Resource getAvailableResource() { return this.availableResource; } @Override + public Resource getTotalResource() { + return totalResourceCapability; + } + + @Override public synchronized Resource getUsedResource() { return this.usedResource; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index 670e961..e33a6cc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -174,6 +174,11 @@ public boolean isPending() { return appSchedulingInfo.isPending(); } + @Override + public ContainerId getMasterContainerId() { + return appSchedulingInfo.getMasterContainerId(); + } + public String getQueueName() { return appSchedulingInfo.getQueueName(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index cc15a5d..52b0cc2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -51,6 +51,7 @@ .getRecordFactory(null); private Resource availableResource; + private Resource totalResourceCapability; private Resource usedResource = recordFactory.newRecordInstance(Resource.class); private volatile int numContainers; @@ -66,6 +67,8 @@ public FSSchedulerNode(RMNode node) { this.rmNode = node; + this.totalResourceCapability = + Resources.clone(node.getTotalCapability()); this.availableResource = Resources.clone(node.getTotalCapability()); } @@ -119,6 +122,11 @@ public synchronized Resource getAvailableResource() { } @Override + public Resource getTotalResource() { + return totalResourceCapability; + } + + @Override public synchronized Resource getUsedResource() { return usedResource; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 0aea4b6..a017572 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -45,7 +45,7 @@ private int responseId; private NodeId nodeId; private final int memory; - private final int vCores = 1; + private final int vCores; private ResourceTrackerService resourceTracker; private final int httpPort = 2; private MasterKey currentContainerTokenMasterKey; @@ -53,6 +53,16 @@ public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { this.memory = memory; + this.vCores = 1; + this.resourceTracker = resourceTracker; + String[] splits = nodeIdStr.split(":"); + nodeId = BuilderUtils.newNodeId(splits[0], Integer.parseInt(splits[1])); + } + + public MockNM(String nodeIdStr, int memory, int vCores, + ResourceTrackerService resourceTracker) { + this.memory = memory; + this.vCores = vCores; this.resourceTracker = resourceTracker; String[] splits = nodeIdStr.split(":"); nodeId = BuilderUtils.newNodeId(splits[0], Integer.parseInt(splits[1])); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 522debb..7fd877f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -231,6 +231,14 @@ public MockNM registerNode(String nodeIdStr, int memory) throws Exception { return nm; } + public MockNM registerNode(String nodeIdStr, int memory, int vCores) + throws Exception { + MockNM nm = + new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService()); + nm.registerNode(); + return nm; + } + public void sendNodeStarted(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java new file mode 100644 index 0000000..8f79037 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -0,0 +1,147 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.junit.Test; + + +public class TestContainerAllocation { + + private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); + + private final int GB = 1024; + + @Test(timeout = 3000000) + public void testExcessReservationThanNodeManagerCapacity() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + MockRM rm = new MockRM(conf); + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 2 * GB, 4); + MockNM nm2 = rm.registerNode("127.0.0.1:2234", 3 * GB, 4); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + // wait.. + Thread.sleep(2000); + + // Submit an application + RMApp app1 = rm.submitApp(128); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + LOG.info("sending container requests "); + am1.addRequests(new String[] {"*"}, 3 * GB, 1, 1); + AllocateResponse alloc1Response = am1.schedule(); // send the request + + // kick the scheduler + nm1.nodeHeartbeat(true); + int waitCounter = 20; + LOG.info("heartbeating nm1"); + while (alloc1Response.getAllocatedContainers().size() < 1 + && waitCounter-- > 0) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(500); + alloc1Response = am1.schedule(); + } + LOG.info("received container : " + + alloc1Response.getAllocatedContainers().size()); + + // No container should be allocated. + // Internally it should not been reserved. + Assert.assertTrue(alloc1Response.getAllocatedContainers().size() == 0); + + LOG.info("heartbeating nm2"); + waitCounter = 20; + nm2.nodeHeartbeat(true); + while (alloc1Response.getAllocatedContainers().size() < 1 + && waitCounter-- > 0) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(500); + alloc1Response = am1.schedule(); + } + LOG.info("received container : " + + alloc1Response.getAllocatedContainers().size()); + Assert.assertTrue(alloc1Response.getAllocatedContainers().size() == 1); + + rm.stop(); + } + + @Test(timeout = 3000000) + public void testContainerReservationOnNodeManagerRunningAM() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + MockRM rm = new MockRM(conf); + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 2 * GB, 4); + MockNM nm2 = rm.registerNode("127.0.0.1:2234", 3 * GB, 4); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + // wait.. + Thread.sleep(2000); + + // Submit an application + RMApp app1 = rm.submitApp(128); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + LOG.info("sending container requests "); + am1.addRequests(new String[] {"*"}, 2 * GB, 1, 1); + AllocateResponse alloc1Response = am1.schedule(); // send the request + + // kick the scheduler + nm1.nodeHeartbeat(true); + int waitCounter = 20; + LOG.info("heartbeating nm1"); + while (alloc1Response.getAllocatedContainers().size() < 1 + && waitCounter-- > 0) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(500); + alloc1Response = am1.schedule(); + } + LOG.info("received container : " + + alloc1Response.getAllocatedContainers().size()); + + // No container should be allocated. + // Internally it should not been reserved. + Assert.assertTrue(alloc1Response.getAllocatedContainers().size() == 0); + + LOG.info("heartbeating nm2"); + waitCounter = 20; + nm2.nodeHeartbeat(true); + while (alloc1Response.getAllocatedContainers().size() < 1 + && waitCounter-- > 0) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(500); + alloc1Response = am1.schedule(); + } + LOG.info("received container : " + + alloc1Response.getAllocatedContainers().size()); + Assert.assertTrue(alloc1Response.getAllocatedContainers().size() == 1); + + rm.stop(); + } +} \ No newline at end of file