diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java index ceff6f6881d..6852a460e8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java @@ -24,6 +24,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.List; +import java.util.ArrayList; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -32,7 +34,9 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -236,6 +240,42 @@ public PlacementConstraint getGlobalConstraint(Set sourceTags) { } } + @Override + public PlacementConstraint getRequestConstraint(ApplicationId applicationId, + SchedulingRequest request) { + List constraints = new ArrayList<>(); + // Add app-level constraint if appId is given. + if (applicationId != null) { + constraints.add(getConstraint(applicationId, + request.getAllocationTags())); + } + // Add request-level and global constraints + if (request != null) { + constraints.add(request.getPlacementConstraint()); + constraints.add(getGlobalConstraint(request.getAllocationTags())); + } + + // Remove all null or duplicate constraints + List allConstraints = + constraints.stream() + .filter(constraint -> constraint != null + && constraint.getConstraintExpr() != null) + .map(PlacementConstraint::getConstraintExpr) + .distinct() + .collect(Collectors.toList()); + + // Compose an AND constraint + // When merge request(RC), app(AC) and global constraint(GC), + // we do a merge on them with CC=AND(GC, AC, RC) and returns a + // composite AND constraint. Subsequently we check if CC could + // be satisfied. This ensures that every level of constraint + // is satisfied. + PlacementConstraint.And andConstraint = PlacementConstraints.and( + allConstraints.toArray(new PlacementConstraint + .AbstractConstraint[allConstraints.size()])); + return andConstraint.build(); + } + @Override public void unregisterApplication(ApplicationId appId) { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java index 7725d0d1a69..205eee388dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java @@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; /** @@ -104,6 +105,21 @@ PlacementConstraint getConstraint(ApplicationId appId, */ PlacementConstraint getGlobalConstraint(Set sourceTags); + /** + * Retrieve the placement constraint of a given scheduling request from + * the placement constraint manager's point of view. Internally, PCM merges + * constraints associated with this request with respect to multiple level + * constraints, including request level shipped in {@link SchedulingRequest}, + * app level constraint added during AM registration and global constraint + * added by cluster admin. + * + * @param applicationId application ID + * @param request scheduling request + * @return a merged placement constraint + */ + PlacementConstraint getRequestConstraint(ApplicationId applicationId, + SchedulingRequest request); + /** * Remove the constraints that correspond to a given application. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java index 6396e5722ba..85818c238bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -248,22 +248,7 @@ public static boolean canSatisfyConstraints(ApplicationId applicationId, SchedulingRequest request, SchedulerNode schedulerNode, PlacementConstraintManager pcm, AllocationTagsManager atm) throws InvalidAllocationTagsQueryException { - // TODO do proper merge on different level of constraints, see YARN-7778. - - // Request level constraint - PlacementConstraint constraint = request.getPlacementConstraint(); - if (constraint == null) { - // Application level constraint - constraint = pcm.getConstraint(applicationId, - request.getAllocationTags()); - if (constraint == null) { - // Global level constraint - constraint = pcm.getGlobalConstraint(request.getAllocationTags()); - if (constraint == null) { - return true; - } - } - } - return canSatisfyConstraints(applicationId, constraint, schedulerNode, atm); + return canSatisfyConstraints(applicationId, + pcm.getRequestConstraint(applicationId, request), schedulerNode, atm); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java index abcab1a3a05..cf9bfdd990a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java @@ -34,8 +34,15 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import com.google.common.collect.Sets; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; @@ -88,6 +95,14 @@ public void before() { .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)); } + private SchedulingRequest newSchedulingRequest(Set tags, + PlacementConstraint pc) { + return SchedulingRequest + .newInstance(1, Priority.newInstance(1), + ExecutionTypeRequest.newInstance(), tags, + ResourceSizing.newInstance(Resource.newInstance(100, 10)), pc); + } + @Test public void testRegisterUnregisterApps() { Assert.assertEquals(0, pcm.getNumRegisteredApplications()); @@ -179,4 +194,86 @@ public void testValidateConstraint() { Assert.assertTrue(pcm.validateConstraint(sourceTag1, c1)); Assert.assertFalse(pcm.validateConstraint(sourceTag4, c1)); } + + @Test + public void testGetRequestConstraint() { + // Request Constraint(RC), App Constraint(AC), Global Constraint(GC) + PlacementConstraint constraint; + And mergedConstraint; + SchedulingRequest request; + + // RC = c1 + // AC = null + // GC = null + request = newSchedulingRequest(null, c1); + constraint = pcm.getRequestConstraint(appId1, request); + Assert.assertTrue(constraint.getConstraintExpr() instanceof And); + mergedConstraint = (And) constraint.getConstraintExpr(); + Assert.assertEquals(1, mergedConstraint.getChildren().size()); + Assert.assertEquals(c1, mergedConstraint.getChildren().get(0).build()); + + // RC = null + // AC = tag1->c1, tag2->c2 + // GC = null + pcm.registerApplication(appId1, constraintMap1); + // if the source tag in the request is not mapped to any existing + // registered constraint, we should get an empty AND constraint. + request = newSchedulingRequest(Sets.newHashSet("not_exist_tag"), null); + constraint = pcm.getRequestConstraint(appId1, request); + Assert.assertTrue(constraint.getConstraintExpr() instanceof And); + mergedConstraint = (And) constraint.getConstraintExpr(); + // AND() + Assert.assertEquals(0, mergedConstraint.getChildren().size()); + // if a mapping is found for a given source tag + request = newSchedulingRequest(sourceTag1, null); + constraint = pcm.getRequestConstraint(appId1, request); + Assert.assertTrue(constraint.getConstraintExpr() instanceof And); + mergedConstraint = (And) constraint.getConstraintExpr(); + // AND(c1) + Assert.assertEquals(1, mergedConstraint.getChildren().size()); + Assert.assertEquals(c1, mergedConstraint.getChildren().get(0).build()); + pcm.unregisterApplication(appId1); + + // RC = null + // AC = null + // GC = tag1->c1 + pcm.addGlobalConstraint(sourceTag1, c1, true); + request = newSchedulingRequest(Sets.newHashSet(sourceTag1), null); + constraint = pcm.getRequestConstraint(appId1, request); + Assert.assertTrue(constraint.getConstraintExpr() instanceof And); + mergedConstraint = (And) constraint.getConstraintExpr(); + // AND(c1) + Assert.assertEquals(1, mergedConstraint.getChildren().size()); + Assert.assertEquals(c1, mergedConstraint.getChildren().get(0).build()); + pcm.removeGlobalConstraint(sourceTag1); + + // RC = c2 + // AC = tag1->c1, tag2->c2 + // GC = tag1->c3 + pcm.addGlobalConstraint(sourceTag1, c3, true); + pcm.registerApplication(appId1, constraintMap1); + // both RC, AC and GC should be respected + request = newSchedulingRequest(Sets.newHashSet(sourceTag1), c2); + constraint = pcm.getRequestConstraint(appId1, request); + Assert.assertTrue(constraint.getConstraintExpr() instanceof And); + mergedConstraint = (And) constraint.getConstraintExpr(); + // AND(c1, c2, c3) + Assert.assertEquals(3, mergedConstraint.getChildren().size()); + pcm.removeGlobalConstraint(sourceTag1); + pcm.unregisterApplication(appId1); + + // RC = c1 + // AC = tag1->c1, tag2->c2 + // GC = tag1->c2 + pcm.addGlobalConstraint(sourceTag1, c2, true); + pcm.registerApplication(appId1, constraintMap1); + request = newSchedulingRequest(Sets.newHashSet(sourceTag1), c1); + constraint = pcm.getRequestConstraint(appId1, request); + Assert.assertTrue(constraint.getConstraintExpr() instanceof And); + mergedConstraint = (And) constraint.getConstraintExpr(); + // AND(c1, c2) + Assert.assertEquals(2, mergedConstraint.getChildren().size()); + pcm.removeGlobalConstraint(sourceTag1); + pcm.unregisterApplication(appId1); + } }