Index: AppSchedulable.java =================================================================== --- AppSchedulable.java (revision 9343) +++ AppSchedulable.java (working copy) @@ -20,6 +20,8 @@ import java.util.Arrays; import java.util.Collection; +import java.util.Iterator; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -286,10 +288,19 @@ app.addSchedulingOpportunity(priority); + //ADD: we need to write a new one, add resource check ResourceRequest rackLocalRequest = app.getResourceRequest(priority, node.getRackName()); + if (rackLocalRequest != null && !queue.checkQueueResourceLimit(rackLocalRequest.getCapability()) ) { + rackLocalRequest = null; + } + + //ADD: also need a new one for resource check ResourceRequest localRequest = app.getResourceRequest(priority, node.getNodeName()); + if (localRequest != null && !queue.checkQueueResourceLimit(localRequest.getCapability()) ) { + localRequest = null; + } if (localRequest != null && !localRequest.getRelaxLocality()) { LOG.warn("Relax locality off is not supported on local request: " @@ -326,8 +337,16 @@ NodeType.RACK_LOCAL, reserved); } - ResourceRequest offSwitchRequest = app.getResourceRequest(priority, - ResourceRequest.ANY); + //ResourceRequest offSwitchRequest = app.getResourceRequest(priority, + // ResourceRequest.ANY); + //ADD: we need to add a checking function, for resource limit + ResourceRequest offSwitchRequest = null; + Map resourceRequestList = app.getResourceRequests(priority); + if(resourceRequestList != null) { + offSwitchRequest = getRequestByQResourceLimit(resourceRequestList); + } + + if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) { continue; } @@ -342,6 +361,27 @@ return Resources.none(); } + /** + * location: resource requests in a application -> a priority;
+ * choose a resource request depending on the queue's resource limit check + * @param resourceRequestList + * @return + */ + @SuppressWarnings("rawtypes") + private ResourceRequest getRequestByQResourceLimit( + Map resourceRequestList) { + // TODO Auto-generated method stub + Iterator iter = resourceRequestList.entrySet().iterator(); + while( iter.hasNext() ) { + Map.Entry entry = (Map.Entry)iter.next(); + ResourceRequest rRequest = (ResourceRequest) entry.getValue(); + if ( queue.checkQueueResourceLimit(rRequest.getCapability()) ) { + return rRequest; + } + } + return null; + } + public Resource assignReservedContainer(FSSchedulerNode node) { return assignContainer(node, true); } Index: FSQueue.java =================================================================== --- FSQueue.java (revision 9342) +++ FSQueue.java (working copy) @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; @@ -187,4 +188,21 @@ } return true; } + + + /** + * ADD: helper method to check if this resource request should be assigned.
+ * this check based on a equation: resourceRequest + resourceUsed should less than resourceMaxLimit + * @param rRequest + * @return + */ + protected boolean checkQueueResourceLimit(Resource resource) { + Resource resourceWillUse = Resources.add(resource, getResourceUsage()); + if ( !Resources.fitsIn( resourceWillUse, + scheduler.getAllocationConfiguration().getMaxResources(getName())) ) { + return false; + } + return true; + } + }