diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java
index ceff6f6881d..fcb5867638d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java
@@ -32,7 +32,9 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -279,4 +281,5 @@ public int getNumGlobalConstraints() {
readLock.unlock();
}
}
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java
index 7725d0d1a69..ec824a0d506 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
new file mode 100644
index 00000000000..e3712832882
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.SingleConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
+
+/**
+ * This class contains various static methods used by the Placement Algorithms
+ * to simplify constrained placement (see also {@link DefaultPlacementAlgorithm}).
+ */
+@Public
+@Unstable
+public final class PlacementConstraintsUtil {
+
+ // Suppresses default constructor, ensuring non-instantiability.
+ private PlacementConstraintsUtil() {
+ }
+
+ /**
+ * Return if a SingleConstraint of a specific TargetExpression is satisfied
+ * on a specific Node
+ * @param appId
+ * @param sc
+ * @param te
+ * @param node
+ * @param tm
+ * @return boolean
+ * @throws InvalidAllocationTagsQueryException
+ */
+ private static boolean canSatisfySingleConstraintExpression(
+ ApplicationId appId, SingleConstraint sc, TargetExpression te,
+ SchedulerNode node, AllocationTagsManager tm)
+ throws InvalidAllocationTagsQueryException {
+ long minScopeCardinality = 0;
+ long maxScopeCardinality = 0;
+ if (sc.getScope() == "node") {
+ minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId,
+ te.getTargetValues(), Long::min);
+ maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId,
+ te.getTargetValues(), Long::max);
+ } else if (sc.getScope() == "rack") {
+ minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,
+ te.getTargetValues(), Long::min);
+ maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,
+ te.getTargetValues(), Long::max);
+ }
+
+ // Affinity case - first application allocation should go through
+ if (minScopeCardinality == 0 && sc.getMinCardinality() == 1) {
+ minScopeCardinality = 1;
+ }
+
+ return (minScopeCardinality >= sc.getMinCardinality()
+ && maxScopeCardinality <= sc.getMaxCardinality());
+ }
+
+ /**
+ * Returns true if a specific Node is **currently** a valid placement
+ * for a Set of allocationTags.
+ * To do so the method should retrieves and goes through application constraint
+ * expressions and checks if the specific allocation is between the allowed
+ * min-max cardinality values of a specific scope
+ * @param appId
+ * @param allocationTags
+ * @param node
+ * @param pcm
+ * @param tagsManager
+ * @return boolean
+ * @throws InvalidAllocationTagsQueryException
+ */
+ public static boolean canSatisfyConstraints(ApplicationId appId,
+ Set allocationTags, SchedulerNode node,
+ PlacementConstraintManager pcm, AllocationTagsManager tagsManager)
+ throws InvalidAllocationTagsQueryException {
+ PlacementConstraint constraint = pcm.getConstraint(appId, allocationTags);
+ if (constraint == null) {
+ return true;
+ }
+ // Transform to SimpleConstraint
+ SingleConstraintTransformer singleTransformer =
+ new SingleConstraintTransformer(constraint);
+ constraint = singleTransformer.transform();
+ AbstractConstraint sConstraintExpr = constraint.getConstraintExpr();
+ SingleConstraint single = (SingleConstraint) sConstraintExpr;
+ // Iterate through TargetExpressions
+ Iterator expIt = single.getTargetExpressions().iterator();
+ while (expIt.hasNext()) {
+ TargetExpression currentExp = expIt.next();
+ // Supporting AllocationTag Expressions for now
+ if (currentExp.getTargetType().equals(TargetType.ALLOCATION_TAG)) {
+ // Check if conditions are met
+ if (!canSatisfySingleConstraintExpression(appId, single, currentExp,
+ node, tagsManager)) {
+ return false;
+ }
+ }
+ }
+ // return true if all targetExpressions are satisfied
+ return true;
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
index 395c1560c40..9ed9ab13b91 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
@@ -19,19 +19,16 @@
import java.util.Iterator;
import java.util.List;
-import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput;
@@ -65,58 +62,14 @@ public void init(RMContext rmContext) {
.getNodes(filter);
}
- /**
- * TODO: Method will be moved to PlacementConstraintsUtil class (YARN-7682)
- * @param applicationId
- * @param allocationTags
- * @param nodeId
- * @param tagsManager
- * @return boolean
- * @throws InvalidAllocationTagsQueryException
- */
- public boolean canAssign(ApplicationId applicationId,
- Set allocationTags, NodeId nodeId,
- AllocationTagsManager tagsManager)
- throws InvalidAllocationTagsQueryException {
- PlacementConstraint constraint =
- constraintManager.getConstraint(applicationId, allocationTags);
- if (constraint == null) {
- return true;
- }
- // TODO: proper transformations
- // Currently works only for simple anti-affinity
- // NODE scope target expressions
- PlacementConstraintTransformations.SpecializedConstraintTransformer transformer =
- new PlacementConstraintTransformations.SpecializedConstraintTransformer(
- constraint);
- PlacementConstraint transform = transformer.transform();
- PlacementConstraint.TargetConstraint targetConstraint =
- (PlacementConstraint.TargetConstraint) transform.getConstraintExpr();
- // Assume a single target expression tag;
- // The Sample Algorithm assumes a constraint will always be a simple
- // Target Constraint with a single entry in the target set.
- // As mentioned in the class javadoc - This algorithm should be
- // used mostly for testing and validating end-2-end workflow.
- String targetTag = targetConstraint.getTargetExpressions().iterator().next()
- .getTargetValues().iterator().next();
- // TODO: Assuming anti-affinity constraint
- long nodeCardinality =
- tagsManager.getNodeCardinality(nodeId, applicationId, targetTag);
- if (nodeCardinality != 0) {
- return false;
- }
- // return true if it is a valid placement
- return true;
- }
-
public boolean attemptPlacementOnNode(ApplicationId appId,
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode)
throws InvalidAllocationTagsQueryException {
int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations();
if (numAllocs > 0) {
- if (canAssign(appId,
- schedulingRequest.getAllocationTags(), schedulerNode.getNodeID(),
- tagsManager)) {
+ if (PlacementConstraintsUtil.canSatisfyConstraints(appId,
+ schedulingRequest.getAllocationTags(), schedulerNode,
+ constraintManager, tagsManager)) {
return true;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
index 87dd5b71117..8e60a6be99b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
@@ -104,7 +104,7 @@ public void stopRM() {
}
@Test(timeout = 300000)
- public void testPlacement() throws Exception {
+ public void testAntiAffinityPlacement() throws Exception {
HashMap nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
@@ -138,7 +138,6 @@ public void testPlacement() throws Exception {
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
-
while (allocatedContainers.size() < 4) {
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
@@ -153,11 +152,64 @@ public void testPlacement() throws Exception {
Assert.assertEquals(4, allocatedContainers.size());
Set nodeIds = allocatedContainers.stream()
.map(x -> x.getNodeId()).collect(Collectors.toSet());
- // Ensure unique nodes
+ // Ensure unique nodes (antiaffinity)
Assert.assertEquals(4, nodeIds.size());
}
@Test(timeout = 300000)
+ public void testAffinityPlacement() throws Exception {
+ HashMap nodes = new HashMap<>();
+ MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm1.getNodeId(), nm1);
+ MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm2.getNodeId(), nm2);
+ MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm3.getNodeId(), nm3);
+ MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm4.getNodeId(), nm4);
+ nm1.registerNode();
+ nm2.registerNode();
+ nm3.registerNode();
+ nm4.registerNode();
+
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
+ Collections.singletonMap(
+ Collections.singleton("foo"),
+ PlacementConstraints.build(
+ PlacementConstraints.targetIn(NODE, allocationTag("foo")))
+ ));
+ am1.addSchedulingRequest(
+ Arrays.asList(
+ schedulingRequest(1, 1, 1, 512, "foo"),
+ schedulingRequest(1, 2, 1, 512, "foo"),
+ schedulingRequest(1, 3, 1, 512, "foo"),
+ schedulingRequest(1, 5, 1, 512, "foo"))
+ );
+ AllocateResponse allocResponse = am1.schedule(); // send the request
+ List allocatedContainers = new ArrayList<>();
+ allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+
+ // kick the scheduler
+ while (allocatedContainers.size() < 4) {
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+ nm3.nodeHeartbeat(true);
+ nm4.nodeHeartbeat(true);
+ LOG.info("Waiting for containers to be created for app 1...");
+ sleep(1000);
+ allocResponse = am1.schedule();
+ allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+ }
+
+ Assert.assertEquals(4, allocatedContainers.size());
+ Set nodeIds = allocatedContainers.stream()
+ .map(x -> x.getNodeId()).collect(Collectors.toSet());
+ // Ensure all containers end up on the same node (affinity)
+ Assert.assertEquals(1, nodeIds.size());
+ }
+
+ @Test(timeout = 300000)
public void testSchedulerRejection() throws Exception {
HashMap nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());