Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java (date 1513986680000) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java (revision ) @@ -17,6 +17,10 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetConstraint; @@ -41,6 +45,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; /** * Sample Test algorithm. Assumes anti-affinity always @@ -67,71 +72,84 @@ } @Override - public void place(ConstraintPlacementAlgorithmInput input, - ConstraintPlacementAlgorithmOutputCollector collector) { - BatchedRequests requests = (BatchedRequests)input; - ConstraintPlacementAlgorithmOutput resp = - new ConstraintPlacementAlgorithmOutput(requests.getApplicationId()); - List allNodes = nodeSelector.selectNodes(null); - Map> tagIndexedRequests = new HashMap<>(); - requests.getSchedulingRequests() - .stream() - .filter(r -> r.getAllocationTags() != null) - .forEach( - req -> req.getAllocationTags().forEach( - tag -> tagIndexedRequests.computeIfAbsent(tag, - k -> new ArrayList<>()).add(req)) - ); - for (Map.Entry> entry : - tagIndexedRequests.entrySet()) { - String tag = entry.getKey(); - PlacementConstraint constraint = - constraintManager.getConstraint(requests.getApplicationId(), - Collections.singleton(tag)); - if (constraint != null) { - // Currently works only for simple anti-affinity - // NODE scope target expressions - SpecializedConstraintTransformer transformer = - new SpecializedConstraintTransformer(constraint); - PlacementConstraint transform = transformer.transform(); - TargetConstraint targetConstraint = - (TargetConstraint) transform.getConstraintExpr(); - // Assume a single target expression tag; - // The Sample Algorithm assumes a constraint will always be a simple - // Target Constraint with a single entry in the target set. - // As mentioned in the class javadoc - This algorithm should be - // used mostly for testing and validating end-2-end workflow. - String targetTag = - targetConstraint.getTargetExpressions().iterator().next() + public boolean canAssign(ApplicationId applicationId, NodeId nodeId, + SchedulingRequest schedulingRequest) + throws InvalidAllocationTagsQueryException { + Set requestTags = schedulingRequest.getAllocationTags(); + PlacementConstraint constraint = + constraintManager.getConstraint(applicationId, requestTags); + if (constraint == null) { + return true; + } + // TODO: proper transformations + // Currently works only for simple anti-affinity + // NODE scope target expressions + SpecializedConstraintTransformer transformer = + new SpecializedConstraintTransformer(constraint); + PlacementConstraint transform = transformer.transform(); + TargetConstraint targetConstraint = + (TargetConstraint) transform.getConstraintExpr(); + // Assume a single target expression tag; + // The Sample Algorithm assumes a constraint will always be a simple + // Target Constraint with a single entry in the target set. + // As mentioned in the class javadoc - This algorithm should be + // used mostly for testing and validating end-2-end workflow. + String targetTag = + targetConstraint.getTargetExpressions().iterator().next() .getTargetValues().iterator().next(); - // iterate over all nodes - Iterator nodeIter = allNodes.iterator(); - List schedulingRequests = entry.getValue(); - Iterator reqIter = schedulingRequests.iterator(); - while (reqIter.hasNext()) { - SchedulingRequest sReq = reqIter.next(); - int numAllocs = sReq.getResourceSizing().getNumAllocations(); - while (numAllocs > 0 && nodeIter.hasNext()) { - SchedulerNode node = nodeIter.next(); - long nodeCardinality = 0; - try { - nodeCardinality = tagsManager.getNodeCardinality( - node.getNodeID(), requests.getApplicationId(), - targetTag); - if (nodeCardinality == 0 && - !requests.getBlacklist(tag).contains(node.getNodeID())) { - numAllocs--; - sReq.getResourceSizing().setNumAllocations(numAllocs); - PlacedSchedulingRequest placedReq = - new PlacedSchedulingRequest(sReq); - placedReq.setPlacementAttempt(requests.getPlacementAttempt()); - placedReq.getNodes().add(node); - resp.getPlacedRequests().add(placedReq); - } - } catch (InvalidAllocationTagsQueryException e) { - LOG.warn("Got exception from TagManager !", e); - } - } + //TODO: Assuming anti-affinity constraint + int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations(); + if (numAllocs > 0) { + long nodeCardinality = + tagsManager.getNodeCardinality(nodeId, applicationId, targetTag); + if (nodeCardinality != 0) { + return false; + } + } + // return true if it is a valid placement + return true; + } + + @Override + public void place(ConstraintPlacementAlgorithmInput input, + ConstraintPlacementAlgorithmOutputCollector collector) { + Map> tmpContainerTags = new HashMap<>(); + Map tmpContainerNodes = new HashMap<>(); + BatchedRequests requests = (BatchedRequests) input; + ConstraintPlacementAlgorithmOutput resp = + new ConstraintPlacementAlgorithmOutput(requests.getApplicationId()); + List allNodes = nodeSelector.selectNodes(null); + for (SchedulingRequest schedulingRequest : requests + .getSchedulingRequests()) { + Iterator nodeIter = allNodes.iterator(); + int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations(); + while (numAllocs > 0 && nodeIter.hasNext()) { + SchedulerNode node = nodeIter.next(); + try { + if (canAssign(requests.getApplicationId(), node.getNodeID(), + schedulingRequest)) { + schedulingRequest.getResourceSizing() + .setNumAllocations(--numAllocs); + PlacedSchedulingRequest placedReq = + new PlacedSchedulingRequest(schedulingRequest); + placedReq.setPlacementAttempt(requests.getPlacementAttempt()); + placedReq.getNodes().add(node); + resp.getPlacedRequests().add(placedReq); + numAllocs = + schedulingRequest.getResourceSizing().getNumAllocations(); + // Add temporary fake-container tag and keep track + ContainerId tmpContainer = + ContainerId.newContainerId(ApplicationAttemptId.newInstance( + requests.getApplicationId(), 1), System.nanoTime()); + this.tagsManager.addContainer(node.getNodeID(), + requests.getApplicationId(), tmpContainer, + schedulingRequest.getAllocationTags()); + tmpContainerTags.put(tmpContainer, + schedulingRequest.getAllocationTags()); + tmpContainerNodes.put(tmpContainer, node.getNodeID()); + } + } catch (InvalidAllocationTagsQueryException e) { + LOG.warn("Got exception from TagManager !", e); } } } @@ -140,5 +158,10 @@ .filter(sReq -> sReq.getResourceSizing().getNumAllocations() > 0) .forEach(rejReq -> resp.getRejectedRequests().add(rejReq)); collector.collect(resp); + // Clean temporary fake-container tags + for (Map.Entry> entry : tmpContainerTags + .entrySet()) + this.tagsManager.removeContainer(tmpContainerNodes.get(entry.getKey()), + requests.getApplicationId(), entry.getKey(), entry.getValue()); } } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithm.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithm.java (date 1513986680000) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithm.java (revision ) @@ -17,7 +17,11 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException; /** * Marker interface for a Constraint Placement. The only contract is that it @@ -31,6 +35,21 @@ */ void init(RMContext rmContext); + /** + * Returns true if a specific Node is a valid placement of a schedulingRequest + * To do so the method should retrieve and go through application constraint + * expressions and check if the specific allocation is between the allowed + * min-max values + * @param applicationId + * @param nodeId + * @param schedulingRequest + * @return + * @throws InvalidAllocationTagsQueryException + */ + boolean canAssign(ApplicationId applicationId, NodeId nodeId, + SchedulingRequest schedulingRequest) + throws InvalidAllocationTagsQueryException; + /** * The Algorithm is expected to compute the placement of the provided * ConstraintPlacementAlgorithmInput and use the collector to aggregate