diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java index ceff6f6881d..fcb5867638d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java @@ -32,7 +32,9 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -279,4 +281,5 @@ public int getNumGlobalConstraints() { readLock.unlock(); } } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java index 7725d0d1a69..ec824a0d506 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java @@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java new file mode 100644 index 00000000000..8682bc82d30 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import java.util.Iterator; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.SingleConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm; + +/** + * This class contains various static methods used by the Placement Algorithms + * to simplify constrained placement (see also {@link DefaultPlacementAlgorithm}). + */ +@Public +@Unstable +public final class PlacementConstraintsUtil { + + // Suppresses default constructor, ensuring non-instantiability. + private PlacementConstraintsUtil() { + } + + // Check if an allocation (with the associated allocationTags) + // on a specific Node can satisfy the SingleConstraint given as parameter + private static boolean canSatisfySingleConstraintExpression( + ApplicationId appId, SingleConstraint sc, TargetExpression te, + SchedulerNode node, AllocationTagsManager tm) + throws InvalidAllocationTagsQueryException { + long minScopeCardinality = 0; + long maxScopeCardinality = 0; + if (sc.getScope() == PlacementConstraints.NODE) { + minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId, + te.getTargetValues(), Long::min); + maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId, + te.getTargetValues(), Long::max); + } else if (sc.getScope() == PlacementConstraints.RACK) { + minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId, + te.getTargetValues(), Long::min); + maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId, + te.getTargetValues(), Long::max); + } + // Make sure Anti-affinity satisfies hard upper limit + maxScopeCardinality = sc.getMaxCardinality() == 0 ? maxScopeCardinality - 1 + : maxScopeCardinality; + + return (minScopeCardinality >= sc.getMinCardinality() + && maxScopeCardinality < sc.getMaxCardinality()); + } + + /** + * Returns true if a specific Node is **currently** a valid placement + * for a Set of allocationTags. + * To do so the method retrieves and goes through application constraint + * expressions and checks if the specific allocation is between the allowed + * min-max cardinality values of a specific scope + * @param appId + * @param allocationTags + * @param node + * @param pcm + * @param tagsManager + * @return boolean + * @throws InvalidAllocationTagsQueryException + */ + public static boolean canSatisfyConstraints(ApplicationId appId, + Set allocationTags, SchedulerNode node, + PlacementConstraintManager pcm, AllocationTagsManager tagsManager) + throws InvalidAllocationTagsQueryException { + PlacementConstraint constraint = pcm.getConstraint(appId, allocationTags); + if (constraint == null) { + return true; + } + // Transform to SimpleConstraint + SingleConstraintTransformer singleTransformer = + new SingleConstraintTransformer(constraint); + constraint = singleTransformer.transform(); + AbstractConstraint sConstraintExpr = constraint.getConstraintExpr(); + SingleConstraint single = (SingleConstraint) sConstraintExpr; + // Iterate through TargetExpressions + Iterator expIt = single.getTargetExpressions().iterator(); + while (expIt.hasNext()) { + TargetExpression currentExp = expIt.next(); + // Supporting AllocationTag Expressions for now + if (currentExp.getTargetType().equals(TargetType.ALLOCATION_TAG)) { + // Check if conditions are met + if (!canSatisfySingleConstraintExpression(appId, single, currentExp, + node, tagsManager)) { + return false; + } + } + } + // return true if all targetExpressions are satisfied + return true; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java index 395c1560c40..9ed9ab13b91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java @@ -19,19 +19,16 @@ import java.util.Iterator; import java.util.List; -import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.SchedulingRequest; -import org.apache.hadoop.yarn.api.resource.PlacementConstraint; -import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput; @@ -65,58 +62,14 @@ public void init(RMContext rmContext) { .getNodes(filter); } - /** - * TODO: Method will be moved to PlacementConstraintsUtil class (YARN-7682) - * @param applicationId - * @param allocationTags - * @param nodeId - * @param tagsManager - * @return boolean - * @throws InvalidAllocationTagsQueryException - */ - public boolean canAssign(ApplicationId applicationId, - Set allocationTags, NodeId nodeId, - AllocationTagsManager tagsManager) - throws InvalidAllocationTagsQueryException { - PlacementConstraint constraint = - constraintManager.getConstraint(applicationId, allocationTags); - if (constraint == null) { - return true; - } - // TODO: proper transformations - // Currently works only for simple anti-affinity - // NODE scope target expressions - PlacementConstraintTransformations.SpecializedConstraintTransformer transformer = - new PlacementConstraintTransformations.SpecializedConstraintTransformer( - constraint); - PlacementConstraint transform = transformer.transform(); - PlacementConstraint.TargetConstraint targetConstraint = - (PlacementConstraint.TargetConstraint) transform.getConstraintExpr(); - // Assume a single target expression tag; - // The Sample Algorithm assumes a constraint will always be a simple - // Target Constraint with a single entry in the target set. - // As mentioned in the class javadoc - This algorithm should be - // used mostly for testing and validating end-2-end workflow. - String targetTag = targetConstraint.getTargetExpressions().iterator().next() - .getTargetValues().iterator().next(); - // TODO: Assuming anti-affinity constraint - long nodeCardinality = - tagsManager.getNodeCardinality(nodeId, applicationId, targetTag); - if (nodeCardinality != 0) { - return false; - } - // return true if it is a valid placement - return true; - } - public boolean attemptPlacementOnNode(ApplicationId appId, SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) throws InvalidAllocationTagsQueryException { int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations(); if (numAllocs > 0) { - if (canAssign(appId, - schedulingRequest.getAllocationTags(), schedulerNode.getNodeID(), - tagsManager)) { + if (PlacementConstraintsUtil.canSatisfyConstraints(appId, + schedulingRequest.getAllocationTags(), schedulerNode, + constraintManager, tagsManager)) { return true; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java index 87dd5b71117..6a347a9d9be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -48,16 +49,21 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import static java.lang.Thread.sleep; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetCardinality; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn; /** * This tests end2end workflow of the constraint placement framework. @@ -104,7 +110,7 @@ public void stopRM() { } @Test(timeout = 300000) - public void testPlacement() throws Exception { + public void testAntiAffinityPlacement() throws Exception { HashMap nodes = new HashMap<>(); MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); nodes.put(nm1.getNodeId(), nm1); @@ -121,43 +127,164 @@ public void testPlacement() throws Exception { RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, - Collections.singletonMap( - Collections.singleton("foo"), + Collections.singletonMap(Collections.singleton("foo"), PlacementConstraints.build( - PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))) - )); + PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))))); am1.addSchedulingRequest( - Arrays.asList( - schedulingRequest(1, 1, 1, 512, "foo"), + Arrays.asList(schedulingRequest(1, 1, 1, 512, "foo"), schedulingRequest(1, 2, 1, 512, "foo"), schedulingRequest(1, 3, 1, 512, "foo"), - schedulingRequest(1, 5, 1, 512, "foo")) - ); + schedulingRequest(1, 5, 1, 512, "foo"))); AllocateResponse allocResponse = am1.schedule(); // send the request List allocatedContainers = new ArrayList<>(); allocatedContainers.addAll(allocResponse.getAllocatedContainers()); // kick the scheduler - - while (allocatedContainers.size() < 4) { - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); - nm3.nodeHeartbeat(true); - nm4.nodeHeartbeat(true); - LOG.info("Waiting for containers to be created for app 1..."); - sleep(1000); - allocResponse = am1.schedule(); - allocatedContainers.addAll(allocResponse.getAllocatedContainers()); - } + waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 4); Assert.assertEquals(4, allocatedContainers.size()); - Set nodeIds = allocatedContainers.stream() - .map(x -> x.getNodeId()).collect(Collectors.toSet()); - // Ensure unique nodes + Set nodeIds = allocatedContainers.stream().map(x -> x.getNodeId()) + .collect(Collectors.toSet()); + // Ensure unique nodes (antiaffinity) Assert.assertEquals(4, nodeIds.size()); } @Test(timeout = 300000) + public void testCardinalityPlacement() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm3.getNodeId(), nm3); + MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm4.getNodeId(), nm4); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, + Collections.singletonMap(Collections.singleton("foo"), + PlacementConstraints.build(PlacementConstraints + .targetCardinality(NODE, 0, 4, allocationTag("foo"))))); + am1.addSchedulingRequest( + Arrays.asList(schedulingRequest(1, 1, 1, 512, "foo"), + schedulingRequest(1, 2, 1, 512, "foo"), + schedulingRequest(1, 3, 1, 512, "foo"), + schedulingRequest(1, 4, 1, 512, "foo"), + schedulingRequest(1, 5, 1, 512, "foo"), + schedulingRequest(1, 6, 1, 512, "foo"), + schedulingRequest(1, 7, 1, 512, "foo"), + schedulingRequest(1, 8, 1, 512, "foo"))); + AllocateResponse allocResponse = am1.schedule(); // send the request + List allocatedContainers = new ArrayList<>(); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + + // kick the scheduler + waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 8); + + Assert.assertEquals(8, allocatedContainers.size()); + Map nodeIdContainerIdMap = + allocatedContainers.stream().collect( + Collectors.groupingBy(c -> c.getNodeId(), Collectors.counting())); + // Ensure no more than 4 containers per node + for (NodeId n : nodeIdContainerIdMap.keySet()) + Assert.assertTrue(nodeIdContainerIdMap.get(n) < 5); + } + + @Test(timeout = 300000) + public void testAffinityPlacement() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm3.getNodeId(), nm3); + MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm4.getNodeId(), nm4); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, + Collections.singletonMap(Collections.singleton("foo"), + PlacementConstraints.build( + PlacementConstraints.targetIn(NODE, allocationTag("bar"))))); + am1.addSchedulingRequest( + Arrays.asList(schedulingRequest(1, 1, 1, 512, "bar"), + schedulingRequest(1, 2, 1, 512, "foo"), + schedulingRequest(1, 3, 1, 512, "foo"), + schedulingRequest(1, 4, 1, 512, "foo"), + schedulingRequest(1, 5, 1, 512, "foo"))); + AllocateResponse allocResponse = am1.schedule(); // send the request + List allocatedContainers = new ArrayList<>(); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + + // kick the scheduler + waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 5); + + Assert.assertEquals(5, allocatedContainers.size()); + Set nodeIds = allocatedContainers.stream().map(x -> x.getNodeId()) + .collect(Collectors.toSet()); + // Ensure all containers end up on the same node (affinity) + Assert.assertEquals(1, nodeIds.size()); + } + + @Test(timeout = 300000) + public void testComplexPlacement() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm3.getNodeId(), nm3); + MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm4.getNodeId(), nm4); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + Map, PlacementConstraint> constraintMap = new HashMap<>(); + constraintMap.put(Collections.singleton("bar"), + PlacementConstraints.build(targetNotIn(NODE, allocationTag("bar")))); + constraintMap.put(Collections.singleton("foo"), + PlacementConstraints.build(targetIn(NODE, allocationTag("bar")))); + constraintMap.put(Collections.singleton("foo"), PlacementConstraints + .build(targetCardinality(NODE, 0, 2, allocationTag("foo")))); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, constraintMap); + am1.addSchedulingRequest( + Arrays.asList(schedulingRequest(1, 1, 1, 512, "bar"), + schedulingRequest(1, 2, 1, 512, "bar"), + schedulingRequest(1, 3, 1, 512, "foo"), + schedulingRequest(1, 4, 1, 512, "foo"), + schedulingRequest(1, 5, 1, 512, "foo"), + schedulingRequest(1, 6, 1, 512, "foo"))); + AllocateResponse allocResponse = am1.schedule(); // send the request + List allocatedContainers = new ArrayList<>(); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + + // kick the scheduler + waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 6); + + Assert.assertEquals(6, allocatedContainers.size()); + Map nodeIdContainerIdMap = + allocatedContainers.stream().collect( + Collectors.groupingBy(c -> c.getNodeId(), Collectors.counting())); + // Ensure no more than 3 containers per node (1 bar, 2 foo) + for (NodeId n : nodeIdContainerIdMap.keySet()) + Assert.assertTrue(nodeIdContainerIdMap.get(n) < 4); + } + + @Test(timeout = 300000) public void testSchedulerRejection() throws Exception { HashMap nodes = new HashMap<>(); MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); @@ -373,6 +500,20 @@ public void testPlacementRejection() throws Exception { rej.getReason()); } + private static void waitForContainerAllocation(Collection nodes, + MockAM am, List allocatedContainers, int ContainerNum) + throws Exception { + while (allocatedContainers.size() < ContainerNum) { + for (MockNM node : nodes) + node.nodeHeartbeat(true); + LOG.info("Waiting for containers to be created for " + + am.getApplicationAttemptId().getApplicationId() + "..."); + sleep(1000); + AllocateResponse allocResponse = am.schedule(); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + } + } + protected static SchedulingRequest schedulingRequest( int priority, long allocReqId, int cores, int mem, String... tags) { return schedulingRequest(priority, allocReqId, cores, mem,