diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 187a6e0..ed177c3 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 1ff00be..317fd72 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -65,6 +66,7 @@ //private final ApplicationStore store; private final ActiveUsersManager activeUsersManager; + private final ContainerId masterContainerId; /* Allocated by scheduler */ boolean pending = true; // for app metrics @@ -77,6 +79,9 @@ public AppSchedulingInfo(ApplicationAttemptId appAttemptId, this.queueName = queue.getQueueName(); this.user = user; this.activeUsersManager = activeUsersManager; + this.masterContainerId = + ContainerId.newInstance(applicationAttemptId, + containerIdCounter.get() + 1); } public ApplicationId getApplicationId() { @@ -397,4 +402,8 @@ synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) { public synchronized void setQueue(Queue queue) { this.queue = queue; } + + public ContainerId getMasterContainerId() { + return masterContainerId; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java index cc9b872..8676d19 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; /** @@ -57,4 +58,9 @@ */ public abstract boolean isPending(); + /** + * Get master container Id + * @return master container id + */ + public abstract ContainerId getMasterContainerId(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index a08ba70..e9e7b0e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -58,5 +58,10 @@ * @return number of active containers on the node */ public abstract int getNumContainers(); - + + /** + * Get total resources on the node. + * @return total resources on the node. + */ + public abstract Resource getTotalResource(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index c2c5d27..e2e4cb1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1308,19 +1308,19 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod + " request=" + request + " type=" + type); } Resource capability = request.getCapability(); - Resource available = node.getAvailableResource(); + Resource totalResource = node.getTotalResource(); + assert Resources.greaterThan( resourceCalculator, clusterResource, available, Resources.none()); - // Create the container if necessary - Container container = - getContainer(rmContainer, application, node, capability, priority); - - // something went wrong getting/creating the container - if (container == null) { - LOG.warn("Couldn't get container for allocation!"); + if (!Resources.fitsIn(capability, totalResource)) { + if (LOG.isDebugEnabled()) { + LOG.info("Node : " + node.getNodeID() + + " does not have sufficient resources for request : " + request + + " node total capability : " + node.getTotalResource()); + } return Resources.none(); } @@ -1330,6 +1330,16 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod if (availableContainers > 0) { // Allocate... + // Create the container if necessary + Container container = + getContainer(rmContainer, application, node, capability, priority); + + // something went wrong getting/creating the container + if (container == null) { + LOG.warn("Couldn't get container for allocation!"); + return Resources.none(); + } + // Did we previously reserve containers at this 'priority'? if (rmContainer != null){ unreserve(application, priority, node, rmContainer); @@ -1369,6 +1379,30 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod return container.getResource(); } else { // Reserve by 'charging' in advance... + RMContainer masterContainer = + application.getRMContainer(application.getMasterContainerId()); + + // we need to check if master container is also allocated on same + // node manager if yes then reserve only if node manager has sufficient + // resources excluding application master resources. + if (masterContainer.getContainer().getNodeId().equals(node.getNodeID())) { + Resource expected = + Resources.add(capability, masterContainer.getContainer() + .getResource()); + if (!Resources.fitsIn(expected, totalResource)) { + return Resources.none(); + } + } + + // Create the container if necessary + Container container = + getContainer(rmContainer, application, node, capability, priority); + + // something went wrong getting/creating the container + if (container == null) { + LOG.warn("Couldn't get container for allocation!"); + return Resources.none(); + } reserve(application, priority, node, rmContainer, container); LOG.info("Reserved container " + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index a261dbf..17eb92a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -185,6 +185,11 @@ public boolean isBlacklisted(String resourceName) { public boolean isPending() { return this.appSchedulingInfo.isPending(); } + + @Override + public ContainerId getMasterContainerId() { + return appSchedulingInfo.getMasterContainerId(); + } public synchronized boolean isStopped() { return this.isStopped; @@ -575,5 +580,4 @@ public synchronized Allocation getAllocation(ResourceCalculator rc, null, currentContPreemption, Collections.singletonList(rr)); } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index bb9ba92..efed211 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -49,6 +49,8 @@ private Resource availableResource = recordFactory.newRecordInstance(Resource.class); private Resource usedResource = recordFactory.newRecordInstance(Resource.class); + private Resource totalResourceCapability = + recordFactory.newRecordInstance(Resource.class); private volatile int numContainers; @@ -63,7 +65,12 @@ public FiCaSchedulerNode(RMNode node) { this.rmNode = node; this.availableResource.setMemory(node.getTotalCapability().getMemory()); - this.availableResource.setVirtualCores(node.getTotalCapability().getVirtualCores()); + this.availableResource.setVirtualCores(node.getTotalCapability() + .getVirtualCores()); + this.totalResourceCapability.setMemory(node.getTotalCapability() + .getMemory()); + this.totalResourceCapability.setVirtualCores(node.getTotalCapability() + .getVirtualCores()); } public RMNode getRMNode() { @@ -110,12 +117,21 @@ public synchronized void allocateContainer(ApplicationId applicationId, getAvailableResource() + " available"); } + public synchronized Resource getTotalResourceCapability() { + return this.totalResourceCapability; + } + @Override public synchronized Resource getAvailableResource() { return this.availableResource; } @Override + public Resource getTotalResource() { + return totalResourceCapability; + } + + @Override public synchronized Resource getUsedResource() { return this.usedResource; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index 670e961..e33a6cc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -174,6 +174,11 @@ public boolean isPending() { return appSchedulingInfo.isPending(); } + @Override + public ContainerId getMasterContainerId() { + return appSchedulingInfo.getMasterContainerId(); + } + public String getQueueName() { return appSchedulingInfo.getQueueName(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index cc15a5d..52b0cc2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -51,6 +51,7 @@ .getRecordFactory(null); private Resource availableResource; + private Resource totalResourceCapability; private Resource usedResource = recordFactory.newRecordInstance(Resource.class); private volatile int numContainers; @@ -66,6 +67,8 @@ public FSSchedulerNode(RMNode node) { this.rmNode = node; + this.totalResourceCapability = + Resources.clone(node.getTotalCapability()); this.availableResource = Resources.clone(node.getTotalCapability()); } @@ -119,6 +122,11 @@ public synchronized Resource getAvailableResource() { } @Override + public Resource getTotalResource() { + return totalResourceCapability; + } + + @Override public synchronized Resource getUsedResource() { return usedResource; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 0aea4b6..a017572 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -45,7 +45,7 @@ private int responseId; private NodeId nodeId; private final int memory; - private final int vCores = 1; + private final int vCores; private ResourceTrackerService resourceTracker; private final int httpPort = 2; private MasterKey currentContainerTokenMasterKey; @@ -53,6 +53,16 @@ public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { this.memory = memory; + this.vCores = 1; + this.resourceTracker = resourceTracker; + String[] splits = nodeIdStr.split(":"); + nodeId = BuilderUtils.newNodeId(splits[0], Integer.parseInt(splits[1])); + } + + public MockNM(String nodeIdStr, int memory, int vCores, + ResourceTrackerService resourceTracker) { + this.memory = memory; + this.vCores = vCores; this.resourceTracker = resourceTracker; String[] splits = nodeIdStr.split(":"); nodeId = BuilderUtils.newNodeId(splits[0], Integer.parseInt(splits[1])); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 522debb..7fd877f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -231,6 +231,14 @@ public MockNM registerNode(String nodeIdStr, int memory) throws Exception { return nm; } + public MockNM registerNode(String nodeIdStr, int memory, int vCores) + throws Exception { + MockNM nm = + new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService()); + nm.registerNode(); + return nm; + } + public void sendNodeStarted(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId());