diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index 4cd5925f242..bba58c81f9a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -223,11 +223,12 @@ public void allocate(ApplicationAttemptId appAttemptId, getRmContext().getRMApps().get(appAttemptId.getApplicationId()); // set label expression for Resource Requests if resourceName=ANY - ApplicationSubmissionContext asc = app.getApplicationSubmissionContext(); + ApplicationSubmissionContext submissionContext = + app.getApplicationSubmissionContext(); for (ResourceRequest req : ask) { if (null == req.getNodeLabelExpression() && ResourceRequest.ANY.equals(req.getResourceName())) { - req.setNodeLabelExpression(asc.getNodeLabelExpression()); + req.setNodeLabelExpression(submissionContext.getNodeLabelExpression()); } } @@ -252,8 +253,7 @@ public void allocate(ApplicationAttemptId appAttemptId, // In the case of work-preserving AM restart, it's possible for the // AM to release containers from the earlier attempt. - if (!app.getApplicationSubmissionContext() - .getKeepContainersAcrossApplicationAttempts()) { + if (!submissionContext.getKeepContainersAcrossApplicationAttempts()) { try { RMServerUtils.validateContainerReleaseRequest(release, appAttemptId); } catch (InvalidContainerReleaseException e) { @@ -267,9 +267,9 @@ public void allocate(ApplicationAttemptId appAttemptId, // No Exceptions are thrown here. All update errors are aggregated // and returned to the AM. List updateErrors = new ArrayList<>(); - ContainerUpdates containerUpdateRequests = - RMServerUtils.validateAndSplitUpdateResourceRequests( - getRmContext(), request, maximumCapacity, updateErrors); + ContainerUpdates containerUpdateRequests = RMServerUtils + .validateAndSplitUpdateResourceRequests(getRmContext(), request, + maximumCapacity, updateErrors, submissionContext.getQueue()); // Send new requests to appAttempt. Allocation allocation; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index ee78c083713..4c171dbb88e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -568,11 +568,12 @@ private RMAppImpl createAndPopulateNewRMApp( // Normalize all requests for (ResourceRequest amReq : amReqs) { SchedulerUtils.normalizeAndValidateRequest(amReq, - scheduler.getMaximumResourceCapability(), + scheduler + .getMaximumResourceCapability(submissionContext.getQueue()), submissionContext.getQueue(), scheduler, isRecovery, rmContext); - amReq.setCapability( - scheduler.getNormalizedResource(amReq.getCapability())); + amReq.setCapability(scheduler.getNormalizedResource( + amReq.getCapability(), submissionContext.getQueue())); } return amReqs; } catch (InvalidResourceRequestException e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index ab6bbcf3355..cf7f81f7879 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -146,7 +146,7 @@ public static ContainerUpdates validateAndSplitUpdateResourceRequests(RMContext rmContext, AllocateRequest request, Resource maximumAllocation, - List updateErrors) { + List updateErrors, String queueName) { ContainerUpdates updateRequests = new ContainerUpdates(); Set outstandingUpdate = new HashSet<>(); @@ -160,7 +160,7 @@ if ((updateType != ContainerUpdateType.PROMOTE_EXECUTION_TYPE) && (updateType !=ContainerUpdateType.DEMOTE_EXECUTION_TYPE)) { if (validateIncreaseDecreaseRequest( - rmContext, updateReq, maximumAllocation)) { + rmContext, updateReq, maximumAllocation, queueName)) { if (ContainerUpdateType.INCREASE_RESOURCE == updateType) { updateRequests.getIncreaseRequests().add(updateReq); } else { @@ -326,7 +326,8 @@ public static void validateBlacklistRequest( // Sanity check and normalize target resource private static boolean validateIncreaseDecreaseRequest(RMContext rmContext, - UpdateContainerRequest request, Resource maximumAllocation) { + UpdateContainerRequest request, Resource maximumAllocation, + String queueName) { if (request.getCapability().getMemorySize() < 0 || request.getCapability().getMemorySize() > maximumAllocation .getMemorySize()) { @@ -338,7 +339,8 @@ private static boolean validateIncreaseDecreaseRequest(RMContext rmContext, return false; } ResourceScheduler scheduler = rmContext.getScheduler(); - request.setCapability(scheduler.getNormalizedResource(request.getCapability())); + request.setCapability( + scheduler.getNormalizedResource(request.getCapability(), queueName)); return true; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index d2e81a50d94..0db5af199bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -1150,10 +1150,23 @@ protected void nodeUpdate(RMNode nm) { @Override public Resource getNormalizedResource(Resource requestedResource) { + return getNormalizedResource(requestedResource, null); + } + + @Override + public Resource getNormalizedResource( + Resource requestedResource, + String queueName) { + Resource maxResourceCapability; + if(queueName == null || queueName.isEmpty()) { + maxResourceCapability = getMaximumResourceCapability(); + } else { + maxResourceCapability = getMaximumResourceCapability(queueName); + } return SchedulerUtils.getNormalizedResource(requestedResource, getResourceCalculator(), getMinimumResourceCapability(), - getMaximumResourceCapability(), + maxResourceCapability, getMinimumResourceCapability()); } @@ -1163,8 +1176,18 @@ public Resource getNormalizedResource(Resource requestedResource) { * @param asks resource requests */ protected void normalizeResourceRequests(List asks) { - for (ResourceRequest ask: asks) { - ask.setCapability(getNormalizedResource(ask.getCapability())); + normalizeResourceRequests(asks, null); + } + + /** + * Normalize a list of resource requests + * using queue maximum resource allocations + * @param asks resource requests + */ + protected void normalizeResourceRequests(List asks, + String queueName) { + for (ResourceRequest ask : asks) { + ask.setCapability(getNormalizedResource(ask.getCapability(), queueName)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 43d55c4f1dc..e571fdaabdb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -387,6 +387,18 @@ public Priority updateApplicationPriority(Priority newPriority, */ Resource getNormalizedResource(Resource requestedResource); + /** + * Normalize a resource request using scheduler level maximum resource or + * queue based maximum resource + * + * @param requestedResource the resource to be normalized + * @param queueName Name of the queue to get maximum container allocation + * value from, if null or empty scheduler level maximum container + * allocation value will be used + * @return the normalized resource + */ + Resource getNormalizedResource(Resource requestedResource, String queueName); + /** * Verify whether a submitted application lifetime is valid as per configured * Queue lifetime. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 0b7fe92b495..5e612bb22c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1147,7 +1147,7 @@ private void normalizeSchedulingRequests(List asks) { for (SchedulingRequest ask: asks) { ResourceSizing sizing = ask.getResourceSizing(); if (sizing != null && sizing.getResources() != null) { - sizing.setResources(getNormalizedResource(sizing.getResources())); + sizing.setResources(getNormalizedResource(sizing.getResources(), null)); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java index cf944a6213a..2ee9b126429 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java @@ -178,8 +178,8 @@ private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId, // Normalize the Requests before dispatching schedulingRequests.forEach(req -> { Resource reqResource = req.getResourceSizing().getResources(); - req.getResourceSizing() - .setResources(this.scheduler.getNormalizedResource(reqResource)); + req.getResourceSizing().setResources( + this.scheduler.getNormalizedResource(reqResource, null)); }); this.placementDispatcher.dispatch(new BatchedRequests(iteratorType, appAttemptId.getApplicationId(), schedulingRequests, 1)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index e48e04b486c..0d15b773a16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -91,6 +91,9 @@ private final SchedulingPolicy defaultSchedulingPolicy; + //Map for maximum container resource allocation per queues by queue name + private final Map queueMaxContainerAllocationMap; + // Policy for mapping apps to queues @VisibleForTesting QueuePlacementPolicy placementPolicy; @@ -138,6 +141,8 @@ public AllocationConfiguration(QueueProperties queueProperties, this.placementPolicy = newPlacementPolicy; this.configuredQueues = queueProperties.getConfiguredQueues(); this.nonPreemptableQueues = queueProperties.getNonPreemptableQueues(); + this.queueMaxContainerAllocationMap = + queueProperties.getMaxContainerAllocation(); } public AllocationConfiguration(Configuration conf) { @@ -167,6 +172,7 @@ public AllocationConfiguration(Configuration conf) { placementPolicy = QueuePlacementPolicy.fromConfiguration(conf, configuredQueues); nonPreemptableQueues = new HashSet<>(); + queueMaxContainerAllocationMap = new HashMap<>(); } /** @@ -272,6 +278,12 @@ ConfigurableResource getMaxResources(String queue) { return maxQueueResource; } + @VisibleForTesting + Resource getQueueMaxContainerAllocation(String queue) { + Resource resource = queueMaxContainerAllocationMap.get(queue); + return resource == null ? Resources.none() : resource; + } + /** * Get the maximum resource allocation for children of the given queue. * @@ -375,6 +387,7 @@ public void initFSQueue(FSQueue queue){ queue.setMaxRunningApps(getQueueMaxApps(name)); queue.setMaxAMShare(getQueueMaxAMShare(name)); queue.setMaxChildQueueResource(getMaxChildResources(name)); + queue.setMaxContainerAllocation(getQueueMaxContainerAllocation(name)); // Set queue metrics. queue.getMetrics().setMinShare(queue.getMinShare()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index cbc74d25345..5885554cf45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -547,6 +547,15 @@ public void setWeights(float weight) { this.weights = weight; } + @Override + public Resource getMaximumResourceCapability() { + if (maxContainerAllocation.equals(Resources.none()) && getParent() != null) { + return getParent().getMaximumResourceCapability(); + } else { + return maxContainerAllocation; + } + } + /** * Helper method to compute the amount of minshare starvation. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index d5df549b282..e1117335f89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -59,7 +59,19 @@ public FSParentQueue(String name, FairScheduler scheduler, FSParentQueue parent) { super(name, scheduler, parent); } - + + @Override + public Resource getMaximumResourceCapability() { + if (getName().equals("root")) { + return maxContainerAllocation; + } + if(maxContainerAllocation.equals(Resources.none()) && getParent() != null) { + return getParent().getMaximumResourceCapability(); + } else { + return maxContainerAllocation; + } + } + void addChildQueue(FSQueue child) { writeLock.lock(); try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 6b88a329fa3..bc2350f788d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -84,6 +84,7 @@ private float fairSharePreemptionThreshold = 0.5f; private boolean preemptable = true; private boolean isDynamic = true; + protected Resource maxContainerAllocation; public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { this.name = name; @@ -163,6 +164,12 @@ public void setMaxShare(ConfigurableResource maxShare){ this.maxShare = maxShare; } + public void setMaxContainerAllocation(Resource maxContainerAllocation){ + this.maxContainerAllocation = maxContainerAllocation; + } + + public abstract Resource getMaximumResourceCapability(); + @Override public Resource getMaxShare() { Resource maxResource = maxShare.getResource(scheduler.getClusterResource()); @@ -579,7 +586,6 @@ public String dumpState() { return sb.toString(); } - /** * Recursively dump states of all queues. * @@ -594,4 +600,5 @@ public boolean isDynamic() { public void setDynamic(boolean dynamic) { this.isDynamic = dynamic; } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 43a47ae65fe..28e29d6c51a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation.ContainerMaxAllocationCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; @@ -153,6 +154,7 @@ new DefaultResourceCalculator(); private static final ResourceCalculator DOMINANT_RESOURCE_CALCULATOR = new DominantResourceCalculator(); + private ContainerMaxAllocationCalculator containerMaxAllocationCalculator; // Value that container assignment methods return when a container is // reserved @@ -192,6 +194,7 @@ protected long rackLocalityDelayMs; // Delay for rack locality protected boolean assignMultiple; // Allocate multiple containers per // heartbeat + @VisibleForTesting boolean maxAssignDynamic; protected int maxAssign; // Max containers to assign per heartbeat @@ -227,12 +230,12 @@ public boolean isAtLeastReservationThreshold( private void validateConf(FairSchedulerConfiguration config) { // validate scheduler memory allocation setting - int minMem = config.getInt( - YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); - int maxMem = config.getInt( - YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + int minMem = + config.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + int maxMem = + config.getInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); if (minMem < 0 || minMem > maxMem) { throw new YarnRuntimeException("Invalid resource scheduler memory" @@ -254,12 +257,12 @@ private void validateConf(FairSchedulerConfiguration config) { } // validate scheduler vcores allocation setting - int minVcores = config.getInt( - YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); - int maxVcores = config.getInt( - YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + int minVcores = + config.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + int maxVcores = + config.getInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); if (minVcores < 0 || minVcores > maxVcores) { throw new YarnRuntimeException("Invalid resource scheduler vcores" @@ -834,13 +837,43 @@ private void removeNode(RMNode rmNode) { @Override public Resource getNormalizedResource(Resource requestedResource) { + return getNormalizedResource(requestedResource, null); + } + + @Override + public Resource getNormalizedResource( + Resource requestedResource, + String queueName) { + Resource maxResourceCapability; + if(queueName == null || queueName.isEmpty()) { + maxResourceCapability = getMaximumResourceCapability(); + } else { + maxResourceCapability = getMaximumResourceCapability(queueName); + } return SchedulerUtils.getNormalizedResource(requestedResource, DOMINANT_RESOURCE_CALCULATOR, minimumAllocation, - getMaximumResourceCapability(), + maxResourceCapability, incrAllocation); } + @Override + public Resource getMaximumResourceCapability(String queueName) { + FSQueue queue = queueMgr.getQueue(queueName); + if (queue == null) { + return getMaximumResourceCapability(); + } + Resource queueMaxResourceCapability = queue.getMaximumResourceCapability(); + if (queueMaxResourceCapability.equals(Resources.none())) { + return getMaximumResourceCapability(); + } else { + containerMaxAllocationCalculator + .correctQueueMaxResurceCapability(queueMaxResourceCapability, queue); + + return queueMaxResourceCapability; + } + } + @VisibleForTesting @Override public void killContainer(RMContainer container) { @@ -897,7 +930,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, handleContainerUpdates(application, updateRequests); // Sanity check - normalizeResourceRequests(ask); + normalizeResourceRequests(ask, queue.getName()); // TODO, normalize SchedulingRequest @@ -1437,6 +1470,9 @@ private void initScheduler(Configuration conf) throws IOException { if (this.conf.getPreemptionEnabled()) { createPreemptionThread(); } + + containerMaxAllocationCalculator = + new ContainerMaxAllocationCalculator(this.conf); } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java index 441c34a1aa1..854b9daf30b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java @@ -51,6 +51,7 @@ private static final String MAX_CHILD_RESOURCES = "maxChildResources"; private static final String MAX_RUNNING_APPS = "maxRunningApps"; private static final String MAX_AMSHARE = "maxAMShare"; + public static final String MAX_CONTAINER_ALLOCATION = "maxContainerAllocation"; private static final String WEIGHT = "weight"; private static final String MIN_SHARE_PREEMPTION_TIMEOUT = "minSharePreemptionTimeout"; @@ -155,6 +156,11 @@ private void loadQueue(String parentName, Element element, float val = Float.parseFloat(text); val = Math.min(val, 1.0f); builder.queueMaxAMShares(queueName, val); + } else if (MAX_CONTAINER_ALLOCATION.equals(field.getTagName())) { + String text = getTrimmedTextData(field); + ConfigurableResource val = + FairSchedulerConfiguration.parseResourceConfigValue(text); + builder.queueMaxContainerAllocation(queueName, val.getResource()); } else if (WEIGHT.equals(field.getTagName())) { String text = getTrimmedTextData(field); double val = Double.parseDouble(text); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/ContainerMaxAllocationCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/ContainerMaxAllocationCalculator.java new file mode 100644 index 00000000000..7633c4f5fce --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/ContainerMaxAllocationCalculator.java @@ -0,0 +1,52 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.yarn.conf.YarnConfiguration.*; + +public class ContainerMaxAllocationCalculator { + + private static final Logger logger = + LoggerFactory.getLogger(ContainerMaxAllocationCalculator.class); + + private final int maxMem; + private final int maxVcores; + + public ContainerMaxAllocationCalculator(Configuration config) { + maxVcores = config.getInt(RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + maxMem = config.getInt(RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + } + + public void correctQueueMaxResurceCapability( + Resource queueMaxResourceCapability, FSQueue queue) { + if (maxVcores < queueMaxResourceCapability.getVirtualCores()) { + queueMaxResourceCapability.setVirtualCores(maxVcores); + logger.warn(getWarnMsg( + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, queue, + queueMaxResourceCapability.getVirtualCores())); + } + if (maxMem < queueMaxResourceCapability.getMemorySize()) { + queueMaxResourceCapability.setMemorySize(maxMem); + logger.warn(getWarnMsg( + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, queue, + queueMaxResourceCapability.getMemorySize())); + } + } + + private String getWarnMsg(String propertyName, FSQueue queue, + long maxResourceValue) { + return String.format( + "Invalid queue resource allocation! " + + "Property %s = %s at queue: %s cannot be higher than %s = %d", + AllocationFileQueueParser.MAX_CONTAINER_ALLOCATION, + queue.getMaximumResourceCapability(), queue.getName(), propertyName, + maxResourceValue); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java index ee5f1790237..35bff1543cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java @@ -53,6 +53,7 @@ private final Set reservableQueues; private final Set nonPreemptableQueues; private final Map> configuredQueues; + private final Map queueMaxContainerAllocation; QueueProperties(Builder builder) { this.reservableQueues = builder.reservableQueues; @@ -70,6 +71,7 @@ this.maxChildQueueResources = builder.maxChildQueueResources; this.reservationAcls = builder.reservationAcls; this.queueAcls = builder.queueAcls; + this.queueMaxContainerAllocation = builder.queueMaxContainerAllocation; } public Map> getConfiguredQueues() { @@ -133,7 +135,11 @@ return nonPreemptableQueues; } - /** + public Map getMaxContainerAllocation() { + return queueMaxContainerAllocation; + } + + /** * Builder class for {@link QueueProperties}. * All methods are adding queue properties to the maps of this builder * keyed by the queue's name except some methods @@ -149,6 +155,7 @@ new HashMap<>(); private Map queueMaxApps = new HashMap<>(); private Map queueMaxAMShares = new HashMap<>(); + private Map queueMaxContainerAllocation = new HashMap<>(); private Map queueWeights = new HashMap<>(); private Map queuePolicies = new HashMap<>(); private Map minSharePreemptionTimeouts = new HashMap<>(); @@ -253,6 +260,11 @@ public Builder nonPreemptableQueues(String queue) { return this; } + public Builder queueMaxContainerAllocation(String queueName, Resource value) { + queueMaxContainerAllocation.put(queueName, value); + return this; + } + public void configuredQueues(FSQueueType queueType, String queueName) { this.configuredQueues.get(queueType).add(queueName); } @@ -275,6 +287,5 @@ public boolean isAclDefinedForAccessType(String queueName, public QueueProperties build() { return new QueueProperties(this); } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java index ef417d4760f..e343a45d41b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java @@ -78,6 +78,8 @@ protected void render(Block html) { __("Num Pending Applications:", qinfo.getNumPendingApplications()). __("Min Resources:", qinfo.getMinResources().toString()). __("Max Resources:", qinfo.getMaxResources().toString()). + __("Max Container Resources:", + qinfo.getMaxContainerResources().toString()). __("Reserved Resources:", qinfo.getReservedResources().toString()); int maxApps = qinfo.getMaxApplications(); if (maxApps < Integer.MAX_VALUE) { @@ -107,6 +109,8 @@ protected void render(Block html) { __("Used Resources:", qinfo.getUsedResources().toString()). __("Min Resources:", qinfo.getMinResources().toString()). __("Max Resources:", qinfo.getMaxResources().toString()). + __("Max Container Resources:", + qinfo.getMaxContainerResources().toString()). __("Reserved Resources:", qinfo.getReservedResources().toString()); int maxApps = qinfo.getMaxApplications(); if (maxApps < Integer.MAX_VALUE) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java index 913513c52ae..4862b82c339 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java @@ -60,6 +60,7 @@ private ResourceInfo fairResources; private ResourceInfo clusterResources; private ResourceInfo reservedResources; + private ResourceInfo maxContainerResources; private long allocatedContainers; private long reservedContainers; @@ -99,6 +100,7 @@ public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) { maxResources = new ResourceInfo( Resources.componentwiseMin(queue.getMaxShare(), scheduler.getClusterResource())); + maxContainerResources = new ResourceInfo(scheduler.getMaximumResourceCapability(queueName)); reservedResources = new ResourceInfo(queue.getReservedResource()); fractionMemSteadyFairShare = @@ -186,7 +188,11 @@ public ResourceInfo getMinResources() { public ResourceInfo getMaxResources() { return maxResources; } - + + public ResourceInfo getMaxContainerResources() { + return maxContainerResources; + } + public ResourceInfo getReservedResources() { return reservedResources; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 27e87bdcdda..9aee44daf10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -19,9 +19,26 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isA; +import static org.mockito.Matchers.matches; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.anyString; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -68,12 +85,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.TestRMAppManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -82,35 +99,12 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import java.io.IOException; -import java.lang.reflect.Field; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; - -import static java.util.stream.Collectors.toSet; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.isA; -import static org.mockito.Matchers.matches; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; /** * Testing applications being retired from RM. @@ -234,74 +228,10 @@ public void handle(RMAppEvent event) { setAppEventType(event.getType()); System.out.println("in handle routine " + getAppEventType().toString()); } - } - - - // Extend and make the functions we want to test public - public class TestRMAppManager extends RMAppManager { - private final RMStateStore stateStore; - - public TestRMAppManager(RMContext context, Configuration conf) { - super(context, null, null, new ApplicationACLsManager(conf), conf); - this.stateStore = context.getStateStore(); - } - - public TestRMAppManager(RMContext context, - ClientToAMTokenSecretManagerInRM clientToAMSecretManager, - YarnScheduler scheduler, ApplicationMasterService masterService, - ApplicationACLsManager applicationACLsManager, Configuration conf) { - super(context, scheduler, masterService, applicationACLsManager, conf); - this.stateStore = context.getStateStore(); - } - - public void checkAppNumCompletedLimit() { - super.checkAppNumCompletedLimit(); - } - - public void finishApplication(ApplicationId appId) { - super.finishApplication(appId); - } - - public int getCompletedAppsListSize() { - return super.getCompletedAppsListSize(); - } - - public int getNumberOfCompletedAppsInStateStore() { - return this.completedAppsInStateStore; - } - - List getCompletedApps() { - return completedApps; - } - - Set getFirstNCompletedApps(int n) { - return getCompletedApps().stream().limit(n).collect(toSet()); - } - - Set getCompletedAppsWithEvenIdsInRange(int n) { - return getCompletedApps().stream().limit(n) - .filter(app -> app.getId() % 2 == 0).collect(toSet()); - } - - Set getRemovedAppsFromStateStore(int numRemoves) { - ArgumentCaptor argumentCaptor = - ArgumentCaptor.forClass(RMApp.class); - verify(stateStore, times(numRemoves)) - .removeApplication(argumentCaptor.capture()); - return argumentCaptor.getAllValues().stream().map(RMApp::getApplicationId) - .collect(toSet()); - } - - public void submitApplication( - ApplicationSubmissionContext submissionContext, String user) - throws YarnException, IOException { - super.submitApplication(submissionContext, System.currentTimeMillis(), - user); - } } private void addToCompletedApps(TestRMAppManager appMonitor, - RMContext rmContext) { + RMContext rmContext) { // ensure applications are finished in order by their IDs List sortedApps = new ArrayList<>(rmContext.getRMApps().values()); sortedApps.sort(Comparator.comparingInt(o -> o.getApplicationId().getId())); @@ -1210,6 +1140,10 @@ private static ResourceScheduler mockResourceScheduler() { Resources.createResource( YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); + when(scheduler.getMaximumResourceCapability(anyString())).thenReturn( + Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); + ResourceCalculator rs = mock(ResourceCalculator.class); when(scheduler.getResourceCalculator()).thenReturn(rs); @@ -1222,6 +1156,15 @@ public Resource answer(InvocationOnMock invocationOnMock) } }); + when(scheduler.getNormalizedResource(any(), any())) + .thenAnswer(new Answer() { + @Override + public Resource answer(InvocationOnMock invocationOnMock) + throws Throwable { + return (Resource) invocationOnMock.getArguments()[0]; + } + }); + return scheduler; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 3ac3849cf73..81a5e4d9138 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -77,6 +77,7 @@ public static final float TEST_RESERVATION_THRESHOLD = 0.09f; private static final int SLEEP_DURATION = 10; private static final int SLEEP_RETRIES = 1000; + protected static final int RM_SCHEDULER_MAXIMUM_ALLOCATION_MB_VALUE = 10240; final static ContainerUpdates NULL_UPDATE_REQUESTS = new ContainerUpdates(); @@ -93,7 +94,7 @@ public Configuration createConfiguration() { conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1024); - conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, RM_SCHEDULER_MAXIMUM_ALLOCATION_MB_VALUE); conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false); conf.setLong(FairSchedulerConfiguration.UPDATE_INTERVAL_MS, 10); conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 50a003ecd11..4fecba2aa7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -23,21 +23,21 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.yarn.LocalConfigurationProvider; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.util.ControlledClock; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Test; -import java.io.File; -import java.io.FileOutputStream; -import java.io.FileWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; + +import java.io.*; import java.net.URISyntaxException; import java.net.URL; import java.nio.charset.StandardCharsets; @@ -53,6 +53,28 @@ public class TestAllocationFileLoaderService { + private static final String A_CUSTOM_RESOURCE = "a-custom-resource"; + + private static class CustomResourceTypesConfigurationProvider + extends LocalConfigurationProvider { + + @Override + public InputStream getConfigurationInputStream(Configuration bootstrapConf, + String name) throws YarnException, IOException { + if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) { + return new ByteArrayInputStream(("\n" + " \n" + + " yarn.resource-types\n" + " " + + A_CUSTOM_RESOURCE + "\n" + " \n" + + " \n" + + " yarn.resource-types.a-custom-resource.units\n" + + " k\n" + " \n" + "\n") + .getBytes()); + } else { + return super.getConfigurationInputStream(bootstrapConf, name); + } + } + } + final static String TEST_DIR = new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath(); @@ -204,6 +226,11 @@ public void testReload() throws Exception { @Test public void testAllocationFileParsing() throws Exception { Configuration conf = new Configuration(); + + conf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + CustomResourceTypesConfigurationProvider.class.getName()); + ResourceUtils.resetResourceTypes(conf); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); @@ -246,6 +273,8 @@ public void testAllocationFileParsing() throws Exception { .fairSharePreemptionTimeout(120) .minSharePreemptionTimeout(50) .fairSharePreemptionThreshold(0.6) + .maxContainerResources( + "vcores=16, memory-mb=512, " + A_CUSTOM_RESOURCE + "=10") // Create hierarchical queues G,H, with different min/fair // share preemption timeouts and preemption thresholds. // Also add a child default to make sure it doesn't impact queue H. @@ -253,6 +282,7 @@ public void testAllocationFileParsing() throws Exception { .fairSharePreemptionTimeout(180) .minSharePreemptionTimeout(40) .fairSharePreemptionThreshold(0.7) + .maxContainerResources("1024mb,8vcores") .buildSubQueue() .buildQueue() // Set default limit of apps per queue to 15 @@ -286,8 +316,6 @@ public void testAllocationFileParsing() throws Exception { assertEquals(6, queueConf.getConfiguredQueues().get(FSQueueType.LEAF).size()); assertEquals(Resources.createResource(0), queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); - assertEquals(Resources.createResource(0), - queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); assertEquals(Resources.createResource(2048, 10), queueConf.getMaxResources("root.queueA").getResource()); @@ -358,6 +386,28 @@ public void testAllocationFileParsing() throws Exception { assertEquals(.4f, queueConf.getQueueMaxAMShare("root.queueD"), 0.01); assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueE"), 0.01); + Resource expectedResourceWithCustomType = Resources.createResource(512, 16); + expectedResourceWithCustomType.setResourceValue(A_CUSTOM_RESOURCE, 10); + + assertEquals(Resources.none(), queueConf.getQueueMaxContainerAllocation( + "root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); + assertEquals(Resources.none(), + queueConf.getQueueMaxContainerAllocation("root.queueA")); + assertEquals(Resources.none(), + queueConf.getQueueMaxContainerAllocation("root.queueB")); + assertEquals(Resources.none(), + queueConf.getQueueMaxContainerAllocation("root.queueC")); + assertEquals(Resources.none(), + queueConf.getQueueMaxContainerAllocation("root.queueD")); + assertEquals(Resources.none(), + queueConf.getQueueMaxContainerAllocation("root.queueE")); + assertEquals(Resources.none(), + queueConf.getQueueMaxContainerAllocation("root.queueF")); + assertEquals(expectedResourceWithCustomType, + queueConf.getQueueMaxContainerAllocation("root.queueG")); + assertEquals(Resources.createResource(1024, 8), + queueConf.getQueueMaxContainerAllocation("root.queueG.queueH")); + assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root")); assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 9120d3a6cc1..cfa4922bf44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -102,6 +103,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; @@ -198,8 +200,6 @@ public void testConfValidation() throws Exception { } } - // TESTS - @SuppressWarnings("deprecation") @Test(timeout=2000) public void testLoadConfigurationOnInitialize() throws IOException { @@ -338,6 +338,102 @@ public void testSimpleFairShareCalculation() throws IOException { } } + @Test + public void testQueueMaximumCapacityAllocations() throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(" "); + out.println( + " 512 mb 1 vcores"); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(" "); + out.println( + " 2048 mb 3 vcores"); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(""); + out.close(); + + scheduler.init(conf); + + Assert.assertEquals(1, scheduler.getMaximumResourceCapability("root.queueA") + .getVirtualCores()); + Assert.assertEquals(512, + scheduler.getMaximumResourceCapability("root.queueA").getMemorySize()); + + Assert.assertEquals(DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + scheduler.getMaximumResourceCapability("root.queueB") + .getVirtualCores()); + Assert.assertEquals(RM_SCHEDULER_MAXIMUM_ALLOCATION_MB_VALUE, + scheduler.getMaximumResourceCapability("root.queueB").getMemorySize()); + + Assert.assertEquals(3, scheduler.getMaximumResourceCapability("root.queueC") + .getVirtualCores()); + Assert.assertEquals(2048, + scheduler.getMaximumResourceCapability("root.queueC").getMemorySize()); + + Assert.assertEquals(3, scheduler + .getMaximumResourceCapability("root.queueC.queueD").getVirtualCores()); + Assert.assertEquals(2048, scheduler + .getMaximumResourceCapability("root.queueC.queueD").getMemorySize()); + } + + @Test + public void testNormalizationUsingQueueMaximumAllocation() + throws IOException { + + int queueMaxAllocation = 4096; + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(" "); + out.println(" " + queueMaxAllocation + + " mb 1 vcores" + ""); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + allocateAppAttempt("root.queueA", 1, queueMaxAllocation + 1024); + allocateAppAttempt("root.queueB", 2, + RM_SCHEDULER_MAXIMUM_ALLOCATION_MB_VALUE + 1024); + + scheduler.update(); + FSQueue queueToCheckA = scheduler.getQueueManager().getQueue("root.queueA"); + FSQueue queueToCheckB = scheduler.getQueueManager().getQueue("root.queueB"); + + assertEquals(queueMaxAllocation, queueToCheckA.getDemand().getMemorySize()); + assertEquals(RM_SCHEDULER_MAXIMUM_ALLOCATION_MB_VALUE, + queueToCheckB.getDemand().getMemorySize()); + } + + private void allocateAppAttempt(String queueName, int id, int memorySize) { + ApplicationAttemptId id11 = createAppAttemptId(id, id); + createMockRMApp(id11); + scheduler.addApplication(id11.getApplicationId(), queueName, "user1", + false); + scheduler.addApplicationAttempt(id11, false, false); + List ask1 = new ArrayList(); + ResourceRequest request1 = + createResourceRequest(memorySize, ResourceRequest.ANY, 1, 1, true); + ask1.add(request1); + scheduler.allocate(id11, ask1, null, new ArrayList(), null, + null, NULL_UPDATE_REQUESTS); + } + /** * Test fair shares when max resources are set but are too high to impact * the shares. @@ -1316,8 +1412,9 @@ public void testRackLocalAppReservationThreshold() throws Exception { // New node satisfies resource request scheduler.update(); scheduler.handle(new NodeUpdateSchedulerEvent(node4)); - assertEquals(10240, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemorySize()); + assertEquals(RM_SCHEDULER_MAXIMUM_ALLOCATION_MB_VALUE, + scheduler.getQueueManager().getQueue("queue1").getResourceUsage() + .getMemorySize()); scheduler.handle(new NodeUpdateSchedulerEvent(node1)); scheduler.handle(new NodeUpdateSchedulerEvent(node2)); @@ -4099,12 +4196,12 @@ public void testQueueMaxAMShareWithContainerReservation() throws Exception { scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(10240, 10), - 1, "127.0.0.1"); - RMNode node2 = - MockNodes.newNodeInfo(1, Resources.createResource(10240, 10), - 2, "127.0.0.2"); + RMNode node1 = MockNodes.newNodeInfo(1, + Resources.createResource(RM_SCHEDULER_MAXIMUM_ALLOCATION_MB_VALUE, 10), + 1, "127.0.0.1"); + RMNode node2 = MockNodes.newNodeInfo(1, + Resources.createResource(RM_SCHEDULER_MAXIMUM_ALLOCATION_MB_VALUE, 10), + 2, "127.0.0.2"); RMNode node3 = MockNodes.newNodeInfo(1, Resources.createResource(5120, 5), 3, "127.0.0.3"); @@ -4122,10 +4219,12 @@ public void testQueueMaxAMShareWithContainerReservation() throws Exception { true); Resource amResource1 = Resource.newInstance(1024, 1); Resource amResource2 = Resource.newInstance(1024, 1); - Resource amResource3 = Resource.newInstance(10240, 1); + Resource amResource3 = + Resource.newInstance(RM_SCHEDULER_MAXIMUM_ALLOCATION_MB_VALUE, 1); Resource amResource4 = Resource.newInstance(5120, 1); Resource amResource5 = Resource.newInstance(1024, 1); - Resource amResource6 = Resource.newInstance(10240, 1); + Resource amResource6 = + Resource.newInstance(RM_SCHEDULER_MAXIMUM_ALLOCATION_MB_VALUE, 1); Resource amResource7 = Resource.newInstance(1024, 1); Resource amResource8 = Resource.newInstance(1024, 1); int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority(); @@ -4159,7 +4258,8 @@ public void testQueueMaxAMShareWithContainerReservation() throws Exception { ApplicationAttemptId attId3 = createAppAttemptId(3, 1); createApplicationWithAMResource(attId3, "queue1", "user1", amResource3); - createSchedulingRequestExistingApplication(10240, 1, amPriority, attId3); + createSchedulingRequestExistingApplication( + RM_SCHEDULER_MAXIMUM_ALLOCATION_MB_VALUE, 1, amPriority, attId3); FSAppAttempt app3 = scheduler.getSchedulerApp(attId3); scheduler.update(); // app3 reserves a container on node1 because node1's available resource @@ -4233,7 +4333,8 @@ public void testQueueMaxAMShareWithContainerReservation() throws Exception { ApplicationAttemptId attId6 = createAppAttemptId(6, 1); createApplicationWithAMResource(attId6, "queue1", "user1", amResource6); - createSchedulingRequestExistingApplication(10240, 1, amPriority, attId6); + createSchedulingRequestExistingApplication( + RM_SCHEDULER_MAXIMUM_ALLOCATION_MB_VALUE, 1, amPriority, attId6); FSAppAttempt app6 = scheduler.getSchedulerApp(attId6); scheduler.update(); // app6 can't reserve a container on node1 because @@ -4322,7 +4423,8 @@ public void testQueueMaxAMShareWithContainerReservation() throws Exception { // app6 turns the reservation into an allocation on node2. scheduler.handle(updateE2); assertEquals("Application6's AM requests 10240 MB memory", - 10240, app6.getAMResource().getMemorySize()); + RM_SCHEDULER_MAXIMUM_ALLOCATION_MB_VALUE, + app6.getAMResource().getMemorySize()); assertEquals("Application6's AM should be running", 1, app6.getLiveContainers().size()); assertEquals("Queue1's AM resource usage should be 11264 MB memory", diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueue.java index f1afe6979fc..28b96647c95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueue.java @@ -60,9 +60,13 @@ String render() { () -> AllocationFileWriter .createNumberSupplier(properties.getFairSharePreemptionTimeout())); AllocationFileWriter.addIfPresent(pw, "fairSharePreemptionThreshold", - () -> AllocationFileWriter - .createNumberSupplier( - properties.getFairSharePreemptionThreshold())); + () -> AllocationFileWriter + .createNumberSupplier( + properties.getFairSharePreemptionThreshold())); + AllocationFileWriter.addIfPresent(pw, "maxContainerAllocation", + () -> AllocationFileWriter + .createNumberSupplier( + properties.getMaxContainerResources())); printEndTag(pw); pw.close(); return sw.toString(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueueBuilder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueueBuilder.java index a2faf1da318..3f4d029b5c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueueBuilder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueueBuilder.java @@ -94,6 +94,11 @@ public AllocationFileQueueBuilder fairSharePreemptionThreshold( return this; } + public AllocationFileQueueBuilder maxContainerResources(String maxContainerResources){ + this.queuePropertiesBuilder.maxContainerResources(maxContainerResources); + return this; + } + public AllocationFileQueueBuilder subQueue(String queueName) { if (this instanceof AllocationFileSimpleQueueBuilder) { return new AllocationFileSubQueueBuilder( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueueProperties.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueueProperties.java index 2c01144a152..b923cca79c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueueProperties.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueueProperties.java @@ -33,6 +33,7 @@ private final String maxChildResources; private final Integer fairSharePreemptionTimeout; private final Double fairSharePreemptionThreshold; + private final String maxContainerResources; AllocationFileQueueProperties(Builder builder) { this.queueName = builder.queueName; @@ -48,6 +49,7 @@ this.maxChildResources = builder.maxChildResources; this.fairSharePreemptionTimeout = builder.fairSharePreemptionTimeout; this.fairSharePreemptionThreshold = builder.fairSharePreemptionThreshold; + this.maxContainerResources = builder.maxContainerResources; } public String getQueueName() { @@ -102,6 +104,8 @@ public Double getFairSharePreemptionThreshold() { return fairSharePreemptionThreshold; } + public String getMaxContainerResources() { return maxContainerResources; } + /** * Builder class for {@link AllocationFileQueueProperties}. */ @@ -119,6 +123,7 @@ public Double getFairSharePreemptionThreshold() { private String maxChildResources; private Integer fairSharePreemptionTimeout; private Double fairSharePreemptionThreshold; + private String maxContainerResources; Builder() { } @@ -167,6 +172,11 @@ public Builder maxAMShare(Double maxAMShare) { return this; } + public Builder maxContainerResources(String maxContainerResources) { + this.maxContainerResources = maxContainerResources; + return this; + } + public Builder minSharePreemptionTimeout( Integer minSharePreemptionTimeout) { this.minSharePreemptionTimeout = minSharePreemptionTimeout; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/TestContainerMaxAllocationCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/TestContainerMaxAllocationCalculator.java new file mode 100644 index 00000000000..79b77f7af8b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/TestContainerMaxAllocationCalculator.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueueManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation.ContainerMaxAllocationCalculator; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +import java.io.IOException; +import java.util.Arrays; + +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES; + + +public class TestContainerMaxAllocationCalculator { + + private ContainerMaxAllocationCalculator target; + + private YarnConfiguration configuration = new YarnConfiguration(); + private int maxVCores = 2; + private int maxMemory = 1000; + private FairScheduler scheduler = new FairScheduler(); + private FSQueue queue; + + @Before + public void setup() throws Exception { + scheduler.serviceInit(configuration); + configuration.setInt( + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, maxVCores); + configuration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + maxMemory); + queue = new FSLeafQueue("name", scheduler, null); + + target = new ContainerMaxAllocationCalculator(configuration); + } + + @Test + public void testTooMuchVCores(){ + Resource resource = Resource.newInstance(maxMemory, maxVCores + 1); + target.correctQueueMaxResurceCapability(resource, queue); + + Assert.assertEquals(maxVCores, resource.getVirtualCores()); + Assert.assertEquals(maxMemory, resource.getMemorySize()); + } + + @Test + public void testTooMuchMemory(){ + Resource resource = Resource.newInstance(maxMemory + 1, maxVCores); + target.correctQueueMaxResurceCapability(resource, queue); + + Assert.assertEquals(maxVCores, resource.getVirtualCores()); + Assert.assertEquals(maxMemory, resource.getMemorySize()); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestAppManagerWithFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestAppManagerWithFairScheduler.java new file mode 100644 index 00000000000..084b65cd4ae --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestAppManagerWithFairScheduler.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.MockApps; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.TestAppManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.HashMap; + +import static org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException.InvalidResourceType.GREATER_THEN_MAX_ALLOCATION; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.matches; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +public class TestAppManagerWithFairScheduler { + + @Test + public void testQueueSubmitWithHighQueueContainerSize() throws IOException, YarnException { + + ApplicationId appId = MockApps.newAppID(1); + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + + Resource resource = Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + + ApplicationSubmissionContext asContext = + recordFactory.newRecordInstance(ApplicationSubmissionContext.class); + asContext.setApplicationId(appId); + asContext.setResource(resource); + asContext.setPriority(Priority.newInstance(0)); + asContext.setAMContainerSpec(mockContainerLaunchContext(recordFactory)); + asContext.setQueue("queueA"); + QueueInfo mockDefaultQueueInfo = mock(QueueInfo.class); + + String TEST_DIR = new File(System.getProperty("test.build.data", "/tmp")) + .getAbsolutePath(); + + String ALLOC_FILE = new File(TEST_DIR, "test-queues").getAbsolutePath(); + + int queueMaxAllocation = 512; + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(" "); + out.println(" " + queueMaxAllocation + + " mb 1 vcores" + ""); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(""); + out.close(); + + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + // Setup a PlacementManager returns a new queue + PlacementManager placementMgr = mock(PlacementManager.class); + doAnswer(new Answer() { + + @Override + public ApplicationPlacementContext answer(InvocationOnMock invocation) + throws Throwable { + return new ApplicationPlacementContext("queueA"); + } + + }).when(placementMgr).placeApplication( + any(ApplicationSubmissionContext.class), matches("test1")); + doAnswer(new Answer() { + + @Override + public ApplicationPlacementContext answer(InvocationOnMock invocation) + throws Throwable { + return new ApplicationPlacementContext("queueB"); + } + + }).when(placementMgr).placeApplication( + any(ApplicationSubmissionContext.class), matches("test2")); + + MockRM newMockRM = new MockRM(conf); + RMContext newMockRMContext = newMockRM.getRMContext(); + newMockRMContext.setQueuePlacementManager(placementMgr); + ApplicationMasterService masterService = new ApplicationMasterService( + newMockRMContext, newMockRMContext.getScheduler()); + + TestRMAppManager newAppMonitor = new TestRMAppManager(newMockRMContext, + new ClientToAMTokenSecretManagerInRM(), newMockRMContext.getScheduler(), + masterService, new ApplicationACLsManager(conf), conf); + + // only user test has permission to submit to 'test' queue + + try { + newAppMonitor.submitApplication(asContext, "test1"); + Assert.fail("Test should fail on too high allocation!"); + } catch (InvalidResourceRequestException e) { + Assert.assertEquals(GREATER_THEN_MAX_ALLOCATION, + e.getInvalidResourceType()); + } + + // Should not throw exception + newAppMonitor.submitApplication(asContext, "test2"); + } + + private static ContainerLaunchContext mockContainerLaunchContext( + RecordFactory recordFactory) { + ContainerLaunchContext amContainer = recordFactory.newRecordInstance( + ContainerLaunchContext.class); + amContainer.setApplicationACLs(new HashMap());; + return amContainer; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMAppManager.java new file mode 100644 index 00000000000..7a92981c561 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMAppManager.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; +import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.mockito.ArgumentCaptor; + +import java.util.List; +import java.util.Set; + +import static java.util.stream.Collectors.toSet; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +// Extend and make the functions we want to test public +public class TestRMAppManager extends RMAppManager { + private final RMStateStore stateStore; + + public TestRMAppManager(RMContext context, Configuration conf) { + super(context, null, null, new ApplicationACLsManager(conf), conf); + this.stateStore = context.getStateStore(); + } + + public TestRMAppManager(RMContext context, + ClientToAMTokenSecretManagerInRM clientToAMSecretManager, + YarnScheduler scheduler, ApplicationMasterService masterService, + ApplicationACLsManager applicationACLsManager, Configuration conf) { + super(context, scheduler, masterService, applicationACLsManager, conf); + this.stateStore = context.getStateStore(); + } + + public void checkAppNumCompletedLimit() { + super.checkAppNumCompletedLimit(); + } + + public void finishApplication(ApplicationId appId) { + super.finishApplication(appId); + } + + public int getCompletedAppsListSize() { + return super.getCompletedAppsListSize(); + } + + public int getNumberOfCompletedAppsInStateStore() { + return this.completedAppsInStateStore; + } + + public List getCompletedApps() { + return completedApps; + } + + public Set getFirstNCompletedApps(int n) { + return getCompletedApps().stream().limit(n).collect(toSet()); + } + + public Set getCompletedAppsWithEvenIdsInRange(int n) { + return getCompletedApps().stream().limit(n) + .filter(app -> app.getId() % 2 == 0).collect(toSet()); + } + + public Set getRemovedAppsFromStateStore(int numRemoves) { + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(RMApp.class); + verify(stateStore, times(numRemoves)) + .removeApplication(argumentCaptor.capture()); + return argumentCaptor.getAllValues().stream().map(RMApp::getApplicationId) + .collect(toSet()); + } + + public void submitApplication(ApplicationSubmissionContext submissionContext, + String user) throws YarnException { + super.submitApplication(submissionContext, System.currentTimeMillis(), + user); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/FairScheduler.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/FairScheduler.md index b5bcbf5c8e9..d246b85b6d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/FairScheduler.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/FairScheduler.md @@ -90,6 +90,8 @@ The allocation file must be in XML format. The format contains five types of ele * **maxResources**: maximum resources a queue will allocated, expressed in the form of "X%", "X% cpu, Y% memory", "X mb, Y vcores", or "vcores=X, memory-mb=Y". The last form is required when specifying resources other than memory and CPU. In the last form, X and Y can either be a percentage or an integer resource value without units. In the latter case the units will be inferred from the default units configured for that resource. A queue will not be assigned a container that would put its aggregate usage over this limit. + * **maxContainerResources**: maximum resources a queue can allocate for a single container, expressed in the form of "X mb, Y vcores" or "vcores=X, memory-mb=Y". The latter form is required when specifying resources other than memory and CPU. If the property is not set it's value is inherited from a parent queue. It's default value is **yarn.scheduler.maximum-allocation-mb**. Cannot be higher than **maxResources**. This property is invalid for root queue. + * **maxChildResources**: maximum resources an ad hoc child queue will allocated, expressed in the form of "X%", "X% cpu, Y% memory", "X mb, Y vcores", or "vcores=X, memory-mb=Y". The last form is required when specifying resources other than memory and CPU. In the last form, X and Y can either be a percentage or an integer resource value without units. In the latter case the units will be inferred from the default units configured for that resource. An ad hoc child queue will not be assigned a container that would put its aggregate usage over this limit. * **maxRunningApps**: limit the number of apps from the queue to run at once