diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java index ceff6f6881d..fcb5867638d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java @@ -32,7 +32,9 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -279,4 +281,5 @@ public int getNumGlobalConstraints() { readLock.unlock(); } } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java index 7725d0d1a69..ec824a0d506 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java @@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java new file mode 100644 index 00000000000..e3712832882 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import java.util.Iterator; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.SingleConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm; + +/** + * This class contains various static methods used by the Placement Algorithms + * to simplify constrained placement (see also {@link DefaultPlacementAlgorithm}). + */ +@Public +@Unstable +public final class PlacementConstraintsUtil { + + // Suppresses default constructor, ensuring non-instantiability. + private PlacementConstraintsUtil() { + } + + /** + * Return if a SingleConstraint of a specific TargetExpression is satisfied + * on a specific Node + * @param appId + * @param sc + * @param te + * @param node + * @param tm + * @return boolean + * @throws InvalidAllocationTagsQueryException + */ + private static boolean canSatisfySingleConstraintExpression( + ApplicationId appId, SingleConstraint sc, TargetExpression te, + SchedulerNode node, AllocationTagsManager tm) + throws InvalidAllocationTagsQueryException { + long minScopeCardinality = 0; + long maxScopeCardinality = 0; + if (sc.getScope() == "node") { + minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId, + te.getTargetValues(), Long::min); + maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId, + te.getTargetValues(), Long::max); + } else if (sc.getScope() == "rack") { + minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId, + te.getTargetValues(), Long::min); + maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId, + te.getTargetValues(), Long::max); + } + + // Affinity case - first application allocation should go through + if (minScopeCardinality == 0 && sc.getMinCardinality() == 1) { + minScopeCardinality = 1; + } + + return (minScopeCardinality >= sc.getMinCardinality() + && maxScopeCardinality <= sc.getMaxCardinality()); + } + + /** + * Returns true if a specific Node is **currently** a valid placement + * for a Set of allocationTags. + * To do so the method should retrieves and goes through application constraint + * expressions and checks if the specific allocation is between the allowed + * min-max cardinality values of a specific scope + * @param appId + * @param allocationTags + * @param node + * @param pcm + * @param tagsManager + * @return boolean + * @throws InvalidAllocationTagsQueryException + */ + public static boolean canSatisfyConstraints(ApplicationId appId, + Set allocationTags, SchedulerNode node, + PlacementConstraintManager pcm, AllocationTagsManager tagsManager) + throws InvalidAllocationTagsQueryException { + PlacementConstraint constraint = pcm.getConstraint(appId, allocationTags); + if (constraint == null) { + return true; + } + // Transform to SimpleConstraint + SingleConstraintTransformer singleTransformer = + new SingleConstraintTransformer(constraint); + constraint = singleTransformer.transform(); + AbstractConstraint sConstraintExpr = constraint.getConstraintExpr(); + SingleConstraint single = (SingleConstraint) sConstraintExpr; + // Iterate through TargetExpressions + Iterator expIt = single.getTargetExpressions().iterator(); + while (expIt.hasNext()) { + TargetExpression currentExp = expIt.next(); + // Supporting AllocationTag Expressions for now + if (currentExp.getTargetType().equals(TargetType.ALLOCATION_TAG)) { + // Check if conditions are met + if (!canSatisfySingleConstraintExpression(appId, single, currentExp, + node, tagsManager)) { + return false; + } + } + } + // return true if all targetExpressions are satisfied + return true; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java index 395c1560c40..9ed9ab13b91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java @@ -19,19 +19,16 @@ import java.util.Iterator; import java.util.List; -import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.SchedulingRequest; -import org.apache.hadoop.yarn.api.resource.PlacementConstraint; -import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput; @@ -65,58 +62,14 @@ public void init(RMContext rmContext) { .getNodes(filter); } - /** - * TODO: Method will be moved to PlacementConstraintsUtil class (YARN-7682) - * @param applicationId - * @param allocationTags - * @param nodeId - * @param tagsManager - * @return boolean - * @throws InvalidAllocationTagsQueryException - */ - public boolean canAssign(ApplicationId applicationId, - Set allocationTags, NodeId nodeId, - AllocationTagsManager tagsManager) - throws InvalidAllocationTagsQueryException { - PlacementConstraint constraint = - constraintManager.getConstraint(applicationId, allocationTags); - if (constraint == null) { - return true; - } - // TODO: proper transformations - // Currently works only for simple anti-affinity - // NODE scope target expressions - PlacementConstraintTransformations.SpecializedConstraintTransformer transformer = - new PlacementConstraintTransformations.SpecializedConstraintTransformer( - constraint); - PlacementConstraint transform = transformer.transform(); - PlacementConstraint.TargetConstraint targetConstraint = - (PlacementConstraint.TargetConstraint) transform.getConstraintExpr(); - // Assume a single target expression tag; - // The Sample Algorithm assumes a constraint will always be a simple - // Target Constraint with a single entry in the target set. - // As mentioned in the class javadoc - This algorithm should be - // used mostly for testing and validating end-2-end workflow. - String targetTag = targetConstraint.getTargetExpressions().iterator().next() - .getTargetValues().iterator().next(); - // TODO: Assuming anti-affinity constraint - long nodeCardinality = - tagsManager.getNodeCardinality(nodeId, applicationId, targetTag); - if (nodeCardinality != 0) { - return false; - } - // return true if it is a valid placement - return true; - } - public boolean attemptPlacementOnNode(ApplicationId appId, SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) throws InvalidAllocationTagsQueryException { int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations(); if (numAllocs > 0) { - if (canAssign(appId, - schedulingRequest.getAllocationTags(), schedulerNode.getNodeID(), - tagsManager)) { + if (PlacementConstraintsUtil.canSatisfyConstraints(appId, + schedulingRequest.getAllocationTags(), schedulerNode, + constraintManager, tagsManager)) { return true; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java index 87dd5b71117..8e60a6be99b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java @@ -104,7 +104,7 @@ public void stopRM() { } @Test(timeout = 300000) - public void testPlacement() throws Exception { + public void testAntiAffinityPlacement() throws Exception { HashMap nodes = new HashMap<>(); MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); nodes.put(nm1.getNodeId(), nm1); @@ -138,7 +138,6 @@ public void testPlacement() throws Exception { allocatedContainers.addAll(allocResponse.getAllocatedContainers()); // kick the scheduler - while (allocatedContainers.size() < 4) { nm1.nodeHeartbeat(true); nm2.nodeHeartbeat(true); @@ -153,11 +152,64 @@ public void testPlacement() throws Exception { Assert.assertEquals(4, allocatedContainers.size()); Set nodeIds = allocatedContainers.stream() .map(x -> x.getNodeId()).collect(Collectors.toSet()); - // Ensure unique nodes + // Ensure unique nodes (antiaffinity) Assert.assertEquals(4, nodeIds.size()); } @Test(timeout = 300000) + public void testAffinityPlacement() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm3.getNodeId(), nm3); + MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm4.getNodeId(), nm4); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, + Collections.singletonMap( + Collections.singleton("foo"), + PlacementConstraints.build( + PlacementConstraints.targetIn(NODE, allocationTag("foo"))) + )); + am1.addSchedulingRequest( + Arrays.asList( + schedulingRequest(1, 1, 1, 512, "foo"), + schedulingRequest(1, 2, 1, 512, "foo"), + schedulingRequest(1, 3, 1, 512, "foo"), + schedulingRequest(1, 5, 1, 512, "foo")) + ); + AllocateResponse allocResponse = am1.schedule(); // send the request + List allocatedContainers = new ArrayList<>(); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + + // kick the scheduler + while (allocatedContainers.size() < 4) { + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + nm4.nodeHeartbeat(true); + LOG.info("Waiting for containers to be created for app 1..."); + sleep(1000); + allocResponse = am1.schedule(); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + } + + Assert.assertEquals(4, allocatedContainers.size()); + Set nodeIds = allocatedContainers.stream() + .map(x -> x.getNodeId()).collect(Collectors.toSet()); + // Ensure all containers end up on the same node (affinity) + Assert.assertEquals(1, nodeIds.size()); + } + + @Test(timeout = 300000) public void testSchedulerRejection() throws Exception { HashMap nodes = new HashMap<>(); MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());