diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintTransformations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintTransformations.java index 62da092201a..aa92d7a3738 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintTransformations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintTransformations.java @@ -156,7 +156,7 @@ public void testCompositeConstraint() { SingleConstraintTransformer singleTransformer = new SingleConstraintTransformer(specConstraint); PlacementConstraint simConstraint = singleTransformer.transform(); - Assert.assertTrue(constraintExpr instanceof Or); + Assert.assertTrue(simConstraint.getConstraintExpr() instanceof Or); Or simOrExpr = (Or) specConstraint.getConstraintExpr(); for (AbstractConstraint child : simOrExpr.getChildren()) { Assert.assertTrue(child instanceof SingleConstraint); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java index 199dd62c21a..6625a199d12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.Or; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.SingleConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType; @@ -149,6 +151,59 @@ private static boolean canSatisfySingleConstraint(ApplicationId applicationId, return true; } + /** + * Returns true if all child constraints are satisfied. + * @param appId application id + * @param constraint Or constraint + * @param node node + * @param atm allocation tags manager + * @return true if all child constraints are satisfied, false otherwise + * @throws InvalidAllocationTagsQueryException + */ + private static boolean canSatisfyAndConstraint(ApplicationId appId, + And constraint, SchedulerNode node, AllocationTagsManager atm) + throws InvalidAllocationTagsQueryException { + for (AbstractConstraint child : constraint.getChildren()) { + // TODO nested AND/OR constraints is not supported, validate in YARN-6621 + if (child instanceof SingleConstraint) { + SingleConstraint single = (SingleConstraint) child; + if (!canSatisfySingleConstraint(appId, single, node, atm)) { + return false; + } + } else { + throw new InvalidAllocationTagsQueryException( + "Invalid request: Nested AND constraint is not supported."); + } + } + return true; + } + + /** + * Returns true as long as any of child constraint is satisfied. + * @param appId application id + * @param constraint Or constraint + * @param node node + * @param atm allocation tags manager + * @return true if any child constraint is satisfied, false otherwise + * @throws InvalidAllocationTagsQueryException + */ + private static boolean canSatisfyOrConstraint(ApplicationId appId, + Or constraint, SchedulerNode node, AllocationTagsManager atm) + throws InvalidAllocationTagsQueryException { + for (AbstractConstraint child : constraint.getChildren()) { + if (child instanceof SingleConstraint) { + SingleConstraint single = (SingleConstraint) child; + if (canSatisfySingleConstraint(appId, single, node, atm)) { + return true; + } + } else { + throw new InvalidAllocationTagsQueryException( + "Invalid request: Nested AND constraint is not supported."); + } + } + return false; + } + private static boolean canSatisfyConstraints(ApplicationId appId, PlacementConstraint constraint, SchedulerNode node, AllocationTagsManager atm) @@ -167,7 +222,13 @@ private static boolean canSatisfyConstraints(ApplicationId appId, if (sConstraintExpr instanceof SingleConstraint) { SingleConstraint single = (SingleConstraint) sConstraintExpr; return canSatisfySingleConstraint(appId, single, node, atm); - } else { + } else if (sConstraintExpr instanceof And) { + And and = (And) sConstraintExpr; + return canSatisfyAndConstraint(appId, and, node, atm); + } else if (sConstraintExpr instanceof Or) { + Or or = (Or) sConstraintExpr; + return canSatisfyOrConstraint(appId, or, node, atm); + }else { throw new InvalidAllocationTagsQueryException( "Unsupported type of constraint."); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java index a5460c2d7cd..d74a5d990c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java @@ -21,6 +21,9 @@ import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.RACK; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.maxCardinality; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.and; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.or; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag; import java.util.AbstractMap; @@ -66,9 +69,10 @@ private RMContext rmContext; private static final int GB = 1024; private ApplicationId appId1; - private PlacementConstraint c1, c2, c3, c4; + private PlacementConstraint c1, c2, c3, c4, c5, c6; private Set sourceTag1, sourceTag2; - private Map, PlacementConstraint> constraintMap1, constraintMap2; + private Map, PlacementConstraint> constraintMap1, + constraintMap2, constraintMap3, constraintMap4; private AtomicLong requestID = new AtomicLong(0); @Before @@ -92,6 +96,12 @@ public void setup() { .build(targetNotIn(NODE, allocationTag("hbase-m"))); c4 = PlacementConstraints .build(targetNotIn(RACK, allocationTag("hbase-rs"))); + c5 = PlacementConstraints + .build(and(targetNotIn(NODE, allocationTag("hbase-m")), + maxCardinality(NODE, 3, "spark"))); + c6 = PlacementConstraints + .build(or(targetIn(NODE, allocationTag("hbase-m")), + targetIn(NODE, allocationTag("hbase-rs")))); sourceTag1 = new HashSet<>(Arrays.asList("spark")); sourceTag2 = new HashSet<>(Arrays.asList("zk")); @@ -106,6 +116,14 @@ public void setup() { new AbstractMap.SimpleEntry<>(sourceTag2, c4)) .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue)); + constraintMap3 = Stream + .of(new AbstractMap.SimpleEntry<>(sourceTag1, c5)) + .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, + AbstractMap.SimpleEntry::getValue)); + constraintMap4 = Stream + .of(new AbstractMap.SimpleEntry<>(sourceTag1, c6)) + .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, + AbstractMap.SimpleEntry::getValue)); } private SchedulingRequest createSchedulingRequest(Set allocationTags, @@ -124,6 +142,11 @@ private SchedulingRequest createSchedulingRequest(Set return createSchedulingRequest(allocationTags, null); } + private ContainerId newContainerId(ApplicationId appId) { + return ContainerId.newContainerId( + ApplicationAttemptId.newInstance(appId, 0), 0); + } + @Test public void testNodeAffinityAssignment() throws InvalidAllocationTagsQueryException { @@ -306,4 +329,142 @@ public void testRackAntiAffinityAssignment() Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, createSchedulingRequest(sourceTag2), schedulerNode3, pcm, tm)); } + + @Test + public void testORConstraintAssignment() + throws InvalidAllocationTagsQueryException { + AllocationTagsManager tm = new AllocationTagsManager(rmContext); + PlacementConstraintManagerService pcm = + new MemoryPlacementConstraintManager(); + // Register App1 with anti-affinity constraint map. + pcm.registerApplication(appId1, constraintMap4); + RMNode n0_r1 = rmNodes.get(0); + RMNode n1_r1 = rmNodes.get(1); + RMNode n2_r2 = rmNodes.get(2); + RMNode n3_r2 = rmNodes.get(3); + + /** + * Place container: + * n0: hbase-m(1) + * n1: "" + * n2: hbase-rs(1) + * n3: "" + */ + tm.addContainer(n0_r1.getNodeID(), + newContainerId(appId1), ImmutableSet.of("hbase-m")); + tm.addContainer(n2_r2.getNodeID(), + newContainerId(appId1), ImmutableSet.of("hbase-rs")); + Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0_r1.getNodeID()) + .get("hbase-m").longValue()); + Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2_r2.getNodeID()) + .get("hbase-rs").longValue()); + + FiCaSchedulerNode schedulerNode0 = TestUtils + .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB); + FiCaSchedulerNode schedulerNode1 = TestUtils + .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB); + FiCaSchedulerNode schedulerNode2 = TestUtils + .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB); + FiCaSchedulerNode schedulerNode3 = TestUtils + .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB); + + // n0 and n2 should be qualified for allocation as + // they either have hbase-m or hbase-rs tag + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm)); + + /** + * Place container: + * n0: hbase-m(1) + * n1: "" + * n2: hbase-rs(1) + * n3: hbase-rs(1) + */ + tm.addContainer(n3_r2.getNodeID(), + newContainerId(appId1), ImmutableSet.of("hbase-rs")); + // n3 is qualified now because it is allocated with hbase-rs tag + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm)); + } + + @Test + public void testANDConstraintAssignment() + throws InvalidAllocationTagsQueryException { + AllocationTagsManager tm = new AllocationTagsManager(rmContext); + PlacementConstraintManagerService pcm = + new MemoryPlacementConstraintManager(); + // Register App1 with anti-affinity constraint map. + pcm.registerApplication(appId1, constraintMap3); + RMNode n0_r1 = rmNodes.get(0); + RMNode n1_r1 = rmNodes.get(1); + RMNode n2_r2 = rmNodes.get(2); + RMNode n3_r2 = rmNodes.get(3); + + /** + * Place container: + * n0: hbase-m(1) + * n1: "" + * n2: hbase-m(1) + * n3: "" + */ + tm.addContainer(n0_r1.getNodeID(), + newContainerId(appId1), ImmutableSet.of("hbase-m")); + tm.addContainer(n2_r2.getNodeID(), + newContainerId(appId1), ImmutableSet.of("hbase-m")); + Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0_r1.getNodeID()) + .get("hbase-m").longValue()); + Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2_r2.getNodeID()) + .get("hbase-m").longValue()); + + FiCaSchedulerNode schedulerNode0 = TestUtils + .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB); + FiCaSchedulerNode schedulerNode1 = TestUtils + .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB); + FiCaSchedulerNode schedulerNode2 = TestUtils + .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB); + FiCaSchedulerNode schedulerNode3 = TestUtils + .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB); + + // Anti-affinity with hbase-m so it should not be able to be placed + // onto n0 and n2 as they already have hbase-m allocated. + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm)); + + /** + * Place container: + * n0: hbase-m(1) + * n1: spark(3) + * n2: hbase-m(1) + * n3: "" + */ + // FIXME why this is 4 instead of 3 + for (int i=0; i<4; i++) { + tm.addContainer(n1_r1.getNodeID(), + newContainerId(appId1), ImmutableSet.of("spark")); + } + Assert.assertEquals(4L, tm.getAllocationTagsWithCount(n1_r1.getNodeID()) + .get("spark").longValue()); + + // Violate cardinality constraint + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm)); + } } \ No newline at end of file