Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/SerialIterator.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/SerialIterator.java (revision ) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/SerialIterator.java (revision ) @@ -0,0 +1,36 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.iterators; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.hadoop.yarn.api.records.SchedulingRequest; + +/** + * Traverse Scheduling Requests in the same order as they arrive + */ +public class SerialIterator implements Iterator { + + private final List schedulingRequestList; + private int cursor; + + public SerialIterator(Collection schedulingRequests) { + this.schedulingRequestList = new ArrayList<>(schedulingRequests); + this.cursor = 0; + } + + @Override + public boolean hasNext() { + return (cursor < schedulingRequestList.size()); + } + + @Override + public SchedulingRequest next() { + if (hasNext()) { + return schedulingRequestList.get(cursor++); + } + throw new NoSuchElementException(); + } +} Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java (date 1514300057000) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java (revision ) @@ -35,7 +35,7 @@ * to place as a batch. The placement algorithm tends to give more optimal * placements if more requests are batched together. */ -class BatchedRequests implements ConstraintPlacementAlgorithmInput { +public class BatchedRequests implements ConstraintPlacementAlgorithmInput { // PlacementAlgorithmOutput attempt - the number of times the requests in this // batch has been placed but was rejected by the scheduler. @@ -45,7 +45,7 @@ private final Collection requests; private final Map> blacklist = new HashMap<>(); - BatchedRequests(ApplicationId applicationId, + public BatchedRequests(ApplicationId applicationId, Collection requests, int attempt) { this.applicationId = applicationId; this.requests = requests; @@ -56,7 +56,7 @@ * Get Application Id. * @return Application Id. */ - ApplicationId getApplicationId() { + public ApplicationId getApplicationId() { return applicationId; } @@ -73,11 +73,11 @@ * Add a Scheduling request to the batch. * @param req Scheduling Request. */ - void addToBatch(SchedulingRequest req) { + public void addToBatch(SchedulingRequest req) { requests.add(req); } - void addToBlacklist(Set tags, SchedulerNode node) { + public void addToBlacklist(Set tags, SchedulerNode node) { if (tags != null && !tags.isEmpty()) { // We are currently assuming a single allocation tag // per scheduler request currently. @@ -90,7 +90,7 @@ * Get placement attempt. * @return PlacementAlgorithmOutput placement Attempt. */ - int getPlacementAttempt() { + public int getPlacementAttempt() { return placementAttempt; } @@ -99,7 +99,7 @@ * @param tag Tag. * @return Set of blacklisted Nodes. */ - Set getBlacklist(String tag) { + public Set getBlacklist(String tag) { return blacklist.getOrDefault(tag, Collections.EMPTY_SET); } } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java (date 1514300057000) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java (revision ) @@ -74,25 +74,20 @@ */ // 3 Containers from app1 - atm.addContainer(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), + atm.addContainer(NodeId.fromString("host1:123"), TestUtils.getMockContainerId(1, 1), ImmutableSet.of("mapper", "reducer")); - atm.addContainer(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2), + atm.addContainer(NodeId.fromString("host2:123"), TestUtils.getMockContainerId(1, 2), ImmutableSet.of("mapper", "reducer")); - atm.addContainer(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), + atm.addContainer(NodeId.fromString("host1:123"), TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service")); - atm.addContainer(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4), + atm.addContainer(NodeId.fromString("host2:123"), TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer")); // 1 Container from app2 - atm.addContainer(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), + atm.addContainer(NodeId.fromString("host2:123"), TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service")); // Get Node Cardinality of app1 on node1, with tag "mapper" @@ -170,24 +165,21 @@ // Finish all containers: atm.removeContainer(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), + TestUtils.getMockContainerId(1, 1), ImmutableSet.of("mapper", "reducer")); atm.removeContainer(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2), + TestUtils.getMockContainerId(1, 2), ImmutableSet.of("mapper", "reducer")); atm.removeContainer(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), - ImmutableSet.of("service")); + TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service")); atm.removeContainer(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4), - ImmutableSet.of("reducer")); + TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer")); atm.removeContainer(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), - ImmutableSet.of("service")); + TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service")); // Expect all cardinality to be 0 // Get Cardinality of app1 on node1, with tag "mapper" @@ -270,25 +262,22 @@ // 3 Containers from app1 atm.addContainer(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), + TestUtils.getMockContainerId(1, 1), ImmutableSet.of("mapper", "reducer")); atm.addContainer(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 2), + TestUtils.getMockContainerId(2, 2), ImmutableSet.of("mapper", "reducer")); atm.addContainer(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 4), - ImmutableSet.of("reducer")); + TestUtils.getMockContainerId(2, 4), ImmutableSet.of("reducer")); atm.addContainer(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), - ImmutableSet.of("service")); + TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service")); // 1 Container from app2 atm.addContainer(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), - ImmutableSet.of("service")); + TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service")); // Get Rack Cardinality of app1 on rack0, with tag "mapper" Assert.assertEquals(1, atm.getRackCardinality("rack0", @@ -325,45 +314,39 @@ // Add a bunch of containers atm.addContainer(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), + TestUtils.getMockContainerId(1, 1), ImmutableSet.of("mapper", "reducer")); atm.addContainer(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2), + TestUtils.getMockContainerId(1, 2), ImmutableSet.of("mapper", "reducer")); atm.addContainer(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), - ImmutableSet.of("service")); + TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service")); atm.addContainer(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4), - ImmutableSet.of("reducer")); + TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer")); atm.addContainer(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), - ImmutableSet.of("service")); + TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service")); // Remove all these containers atm.removeContainer(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), + TestUtils.getMockContainerId(1, 1), ImmutableSet.of("mapper", "reducer")); atm.removeContainer(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2), + TestUtils.getMockContainerId(1, 2), ImmutableSet.of("mapper", "reducer")); atm.removeContainer(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), - ImmutableSet.of("service")); + TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service")); atm.removeContainer(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4), - ImmutableSet.of("reducer")); + TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer")); atm.removeContainer(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), - ImmutableSet.of("service")); + TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service")); // Check internal data structure Assert.assertEquals(0, @@ -373,6 +356,92 @@ atm.getGlobalRackMapping().getTypeToTagsWithCount().size()); Assert.assertEquals(0, atm.getPerAppRackMappings().size()); } + + @Test + public void testTempContainerAllocations() + throws InvalidAllocationTagsQueryException { + + AllocationTagsManager atm = new AllocationTagsManager(rmContext); + + /** + * Construct both TEMP and normal containers: + * Node1: + * TEMP container_1_1 (mapper/reducer/app_1) + * container_1_2 (service/app_1) + * + * Node2: + * container_1_3 (reducer/app_1) + * TEMP container_2_1 (service/app_2) + */ + + // 3 Containers from app1 + atm.addTempContainer(NodeId.fromString("host1:123"), TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper", "reducer")); + + atm.addContainer(NodeId.fromString("host1:123"), TestUtils.getMockContainerId(1, 2), + ImmutableSet.of("service")); + + atm.addContainer(NodeId.fromString("host2:123"), TestUtils.getMockContainerId(1, 3), + ImmutableSet.of("reducer")); + + // 1 Container from app2 + atm.addTempContainer(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(2), + ImmutableSet.of("service")); + + // Expect tag mappings to be present including temp Tags + Assert.assertEquals(1, + atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), + TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"), + Long::sum)); + + Assert.assertEquals(1, + atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), + TestUtils.getMockApplicationId(1), ImmutableSet.of("service"), + Long::sum)); + + Assert.assertEquals(1, + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), + TestUtils.getMockApplicationId(2), ImmutableSet.of("service"), + Long::sum)); + + // Do a temp Tag cleanup on app2 + atm.cleanTempContainers(TestUtils.getMockApplicationId(2)); + Assert.assertEquals(0, + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), + TestUtils.getMockApplicationId(2), ImmutableSet.of("service"), + Long::sum)); + // Expect app1 to be unaffected + Assert.assertEquals(1, + atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), + TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"), + Long::sum)); + // Do a cleanup on app1 as well + atm.cleanTempContainers(TestUtils.getMockApplicationId(1)); + Assert.assertEquals(0, + atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), + TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"), + Long::sum)); + + // Non temp-tags should be unaffected + Assert.assertEquals(1, + atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), + TestUtils.getMockApplicationId(1), ImmutableSet.of("service"), + Long::sum)); + + Assert.assertEquals(0, + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), + TestUtils.getMockApplicationId(2), ImmutableSet.of("service"), + Long::sum)); + + // Expect app2 with no containers, and app1 with 2 containers across 2 nodes + Assert.assertEquals(2, + atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(1)) + .getTypeToTagsWithCount().size()); + + Assert.assertNull( + atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(2))); + + } @Test public void testQueryCardinalityWithIllegalParameters() @@ -385,24 +454,21 @@ // Add a bunch of containers atm.addContainer(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), + TestUtils.getMockContainerId(1, 1), ImmutableSet.of("mapper", "reducer")); atm.addContainer(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2), + TestUtils.getMockContainerId(1, 2), ImmutableSet.of("mapper", "reducer")); atm.addContainer(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), - ImmutableSet.of("service")); + TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service")); atm.addContainer(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4), - ImmutableSet.of("reducer")); + TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer")); atm.addContainer(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), - ImmutableSet.of("service")); + TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service")); // No node-id boolean caughtException = false; Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java (date 1514300057000) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java (revision ) @@ -24,6 +24,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -54,24 +55,27 @@ private final RMContext rmContext; // Application's tags to Node - private Map perAppNodeMappings = + private Map perAppNodeMappings = new HashMap<>(); // Application's tags to Rack - private Map perAppRackMappings = + private Map perAppRackMappings = + new HashMap<>(); + // Application's Temporary containers mapping + private Map>>> appTempMappings = new HashMap<>(); // Global tags to node mapping (used to fast return aggregated tags // cardinality across apps) - private NodeToCountedTags globalNodeMapping = new NodeToCountedTags(); + private TypeToCountedTags globalNodeMapping = new TypeToCountedTags(); // Global tags to Rack mapping - private NodeToCountedTags globalRackMapping = new NodeToCountedTags(); + private TypeToCountedTags globalRackMapping = new TypeToCountedTags(); /** * Generic store mapping type to counted tags. * Currently used both for NodeId to Tag, Count and Rack to Tag, Count */ @VisibleForTesting - static class NodeToCountedTags { + static class TypeToCountedTags { // Map> private Map> typeToTagsWithCount = new HashMap<>(); @@ -209,25 +213,31 @@ } @VisibleForTesting - Map getPerAppNodeMappings() { + Map getPerAppNodeMappings() { return perAppNodeMappings; } @VisibleForTesting - Map getPerAppRackMappings() { + Map getPerAppRackMappings() { return perAppRackMappings; } @VisibleForTesting - NodeToCountedTags getGlobalNodeMapping() { + TypeToCountedTags getGlobalNodeMapping() { return globalNodeMapping; } @VisibleForTesting - NodeToCountedTags getGlobalRackMapping() { + TypeToCountedTags getGlobalRackMapping() { return globalRackMapping; } + @VisibleForTesting + public Map>> getAppTempMappings( + ApplicationId applicationId) { + return appTempMappings.get(applicationId); + } + public AllocationTagsManager(RMContext context) { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); @@ -235,18 +245,52 @@ rmContext = context; } + // + + /** + * Method adds a temporary fake-container tag to Node mapping. + * Used by the constrained placement algorithm to keep track of containers + * that are currently placed on nodes but are not yet allocated. + * @param nodeId + * @param applicationId + * @param allocationTags + */ + public void addTempContainer(NodeId nodeId, ApplicationId applicationId, + Set allocationTags) { + ContainerId tmpContainer = ContainerId.newContainerId( + ApplicationAttemptId.newInstance(applicationId, 1), System.nanoTime()); + + writeLock.lock(); + try { + Map>> appTempMapping = + appTempMappings.computeIfAbsent(applicationId, k -> new HashMap<>()); + Map> containerTempMapping = + appTempMapping.computeIfAbsent(nodeId, k -> new HashMap<>()); + containerTempMapping.put(tmpContainer, allocationTags); + if (LOG.isDebugEnabled()) { + LOG.debug("Added TEMP container=" + tmpContainer + " with tags=[" + + StringUtils.join(allocationTags, ",") + "]"); + } + } finally { + writeLock.unlock(); + } + + addContainer(nodeId, tmpContainer, allocationTags); + } + /** * Notify container allocated on a node. * * @param nodeId allocated node. - * @param applicationId applicationId * @param containerId container id. * @param allocationTags allocation tags, see * {@link SchedulingRequest#getAllocationTags()} * application_id will be added to allocationTags. */ - public void addContainer(NodeId nodeId, ApplicationId applicationId, - ContainerId containerId, Set allocationTags) { + public void addContainer(NodeId nodeId, ContainerId containerId, + Set allocationTags) { + ApplicationId applicationId = + containerId.getApplicationAttemptId().getApplicationId(); String applicationIdTag = AllocationTagsNamespaces.APP_ID + applicationId.toString(); @@ -260,10 +304,10 @@ writeLock.lock(); try { - NodeToCountedTags perAppTagsMapping = perAppNodeMappings - .computeIfAbsent(applicationId, k -> new NodeToCountedTags()); - NodeToCountedTags perAppRackTagsMapping = perAppRackMappings - .computeIfAbsent(applicationId, k -> new NodeToCountedTags()); + TypeToCountedTags perAppTagsMapping = perAppNodeMappings + .computeIfAbsent(applicationId, k -> new TypeToCountedTags()); + TypeToCountedTags perAppRackTagsMapping = perAppRackMappings + .computeIfAbsent(applicationId, k -> new TypeToCountedTags()); // Covering test-cases where context is mocked String nodeRack = (rmContext.getRMNodes() != null && rmContext.getRMNodes().get(nodeId) != null) @@ -294,12 +338,13 @@ * Notify container removed. * * @param nodeId nodeId - * @param applicationId applicationId * @param containerId containerId. * @param allocationTags allocation tags for given container */ - public void removeContainer(NodeId nodeId, ApplicationId applicationId, + public void removeContainer(NodeId nodeId, ContainerId containerId, Set allocationTags) { + ApplicationId applicationId = + containerId.getApplicationAttemptId().getApplicationId(); String applicationIdTag = AllocationTagsNamespaces.APP_ID + applicationId.toString(); boolean useSet = false; @@ -313,9 +358,9 @@ writeLock.lock(); try { - NodeToCountedTags perAppTagsMapping = + TypeToCountedTags perAppTagsMapping = perAppNodeMappings.get(applicationId); - NodeToCountedTags perAppRackTagsMapping = + TypeToCountedTags perAppRackTagsMapping = perAppRackMappings.get(applicationId); if (perAppTagsMapping == null) { return; @@ -353,6 +398,34 @@ } } + /** + * Method removes temporary containers associated with an application + * Used by the placement algorithm to clean temporary tags at the end of + * a placement cycle + * @param applicationId + */ + public void cleanTempContainers(ApplicationId applicationId) { + + if (!appTempMappings.get(applicationId).isEmpty()) { + appTempMappings.get(applicationId).entrySet().stream().forEach(nodeE -> { + nodeE.getValue().entrySet().stream().forEach(containerE -> { + removeContainer(nodeE.getKey(), containerE.getKey(), + containerE.getValue()); + }); + }); + writeLock.lock(); + try { + appTempMappings.remove(applicationId); + if (LOG.isDebugEnabled()) { + LOG.debug("Removed TEMP containers of app=" + applicationId); + } + } finally { + writeLock.unlock(); + } + } + } + + /** * Get Node cardinality for a specific tag. * When applicationId is null, method returns aggregated cardinality @@ -378,7 +451,7 @@ "Must specify nodeId/tag to query cardinality"); } - NodeToCountedTags mapping; + TypeToCountedTags mapping; if (applicationId != null) { mapping = perAppNodeMappings.get(applicationId); } else { @@ -419,7 +492,7 @@ "Must specify rack/tag to query cardinality"); } - NodeToCountedTags mapping; + TypeToCountedTags mapping; if (applicationId != null) { mapping = perAppRackMappings.get(applicationId); } else { @@ -492,7 +565,7 @@ "Must specify nodeId/tags/op to query cardinality"); } - NodeToCountedTags mapping; + TypeToCountedTags mapping; if (applicationId != null) { mapping = perAppNodeMappings.get(applicationId); } else { @@ -540,7 +613,7 @@ "Must specify rack/tags/op to query cardinality"); } - NodeToCountedTags mapping; + TypeToCountedTags mapping; if (applicationId != null) { mapping = perAppRackMappings.get(applicationId); } else { Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (date 1514300057000) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (revision ) @@ -534,6 +534,10 @@ public static final String RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS = RM_PREFIX + "placement-constraints.algorithm.class"; + /** Used for BasicPlacementAlgorithm - default SERIAL **/ + public static final String RM_PLACEMENT_CONSTRAINTS_ALGORITHM_ITERATOR = + RM_PREFIX + "placement-constraints.algorithm.iterator"; + public static final String RM_PLACEMENT_CONSTRAINTS_ENABLED = RM_PREFIX + "placement-constraints.enabled"; Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/SchedulingRequestWrapper.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/SchedulingRequestWrapper.java (revision ) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/SchedulingRequestWrapper.java (revision ) @@ -0,0 +1,41 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.iterators; + +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; + +import java.util.Collection; +import java.util.Iterator; + +public class SchedulingRequestWrapper implements Iterable{ + + private Collection schedulingRequests; + private PlacementConstraintManager constraintManager; + private AllocationTagsManager tagsManager; + private IteratorType iteratorType; + + public enum IteratorType { + SERIAL, + POPULAR_TAGS + } + + public SchedulingRequestWrapper(PlacementConstraintManager constraintManager, + AllocationTagsManager tagsManager, IteratorType type, + Collection schedulingRequests) { + this.schedulingRequests = schedulingRequests; + this.constraintManager = constraintManager; + this.tagsManager = tagsManager; + this.iteratorType = type; + } + + @Override public Iterator iterator() { + switch (this.iteratorType){ + case SERIAL: + return new SerialIterator(schedulingRequests); + case POPULAR_TAGS: + return new PopularTagsIterator(schedulingRequests); + default: + return null; + } + } +} Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java (date 1514300057000) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java (revision ) @@ -17,11 +17,14 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.SchedulingRequest; -import org.apache.hadoop.yarn.api.resource.PlacementConstraint; -import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetConstraint; -import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SpecializedConstraintTransformer; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; @@ -30,18 +33,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - /** * Sample Test algorithm. Assumes anti-affinity always * It also assumes the numAllocations in resource sizing is always = 1 @@ -66,72 +60,52 @@ .getScheduler()).getNodes(filter); } + public boolean attemptAllocationOnNode(ApplicationId appId, + SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) + throws InvalidAllocationTagsQueryException { + int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations(); + if (numAllocs > 0) { + if (constraintManager.canAssign(appId, + schedulingRequest.getAllocationTags(), schedulerNode.getNodeID(), + tagsManager)) { + return true; + } + } + return false; + } + @Override public void place(ConstraintPlacementAlgorithmInput input, ConstraintPlacementAlgorithmOutputCollector collector) { - BatchedRequests requests = (BatchedRequests)input; + BatchedRequests requests = (BatchedRequests) input; ConstraintPlacementAlgorithmOutput resp = new ConstraintPlacementAlgorithmOutput(requests.getApplicationId()); List allNodes = nodeSelector.selectNodes(null); - Map> tagIndexedRequests = new HashMap<>(); - requests.getSchedulingRequests() - .stream() - .filter(r -> r.getAllocationTags() != null) - .forEach( - req -> req.getAllocationTags().forEach( - tag -> tagIndexedRequests.computeIfAbsent(tag, - k -> new ArrayList<>()).add(req)) - ); - for (Map.Entry> entry : - tagIndexedRequests.entrySet()) { - String tag = entry.getKey(); - PlacementConstraint constraint = - constraintManager.getConstraint(requests.getApplicationId(), - Collections.singleton(tag)); - if (constraint != null) { - // Currently works only for simple anti-affinity - // NODE scope target expressions - SpecializedConstraintTransformer transformer = - new SpecializedConstraintTransformer(constraint); - PlacementConstraint transform = transformer.transform(); - TargetConstraint targetConstraint = - (TargetConstraint) transform.getConstraintExpr(); - // Assume a single target expression tag; - // The Sample Algorithm assumes a constraint will always be a simple - // Target Constraint with a single entry in the target set. - // As mentioned in the class javadoc - This algorithm should be - // used mostly for testing and validating end-2-end workflow. - String targetTag = - targetConstraint.getTargetExpressions().iterator().next() - .getTargetValues().iterator().next(); - // iterate over all nodes - Iterator nodeIter = allNodes.iterator(); - List schedulingRequests = entry.getValue(); - Iterator reqIter = schedulingRequests.iterator(); - while (reqIter.hasNext()) { - SchedulingRequest sReq = reqIter.next(); - int numAllocs = sReq.getResourceSizing().getNumAllocations(); - while (numAllocs > 0 && nodeIter.hasNext()) { - SchedulerNode node = nodeIter.next(); - long nodeCardinality = 0; - try { - nodeCardinality = tagsManager.getNodeCardinality( - node.getNodeID(), requests.getApplicationId(), - targetTag); - if (nodeCardinality == 0 && - !requests.getBlacklist(tag).contains(node.getNodeID())) { - numAllocs--; - sReq.getResourceSizing().setNumAllocations(numAllocs); - PlacedSchedulingRequest placedReq = - new PlacedSchedulingRequest(sReq); - placedReq.setPlacementAttempt(requests.getPlacementAttempt()); - placedReq.getNodes().add(node); - resp.getPlacedRequests().add(placedReq); - } - } catch (InvalidAllocationTagsQueryException e) { - LOG.warn("Got exception from TagManager !", e); - } - } + for (SchedulingRequest schedulingRequest : requests + .getSchedulingRequests()) { + Iterator nodeIter = allNodes.iterator(); + int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations(); + while (numAllocs > 0 && nodeIter.hasNext()) { + SchedulerNode node = nodeIter.next(); + try { + if (attemptAllocationOnNode(requests.getApplicationId(), + schedulingRequest, node)) { + schedulingRequest.getResourceSizing() + .setNumAllocations(--numAllocs); + PlacedSchedulingRequest placedReq = + new PlacedSchedulingRequest(schedulingRequest); + placedReq.setPlacementAttempt(requests.getPlacementAttempt()); + placedReq.getNodes().add(node); + resp.getPlacedRequests().add(placedReq); + numAllocs = + schedulingRequest.getResourceSizing().getNumAllocations(); + // Add temp-container tags for current placement cycle + this.tagsManager.addTempContainer(node.getNodeID(), + requests.getApplicationId(), + schedulingRequest.getAllocationTags()); + } + } catch (InvalidAllocationTagsQueryException e) { + LOG.warn("Got exception from TagManager !", e); } } } @@ -140,5 +114,7 @@ .filter(sReq -> sReq.getResourceSizing().getNumAllocations() > 0) .forEach(rejReq -> resp.getRejectedRequests().add(rejReq)); collector.collect(resp); + // Clean current temp-container tags + this.tagsManager.cleanTempContainers(requests.getApplicationId()); } } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestGreedyPlacementIterators.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestGreedyPlacementIterators.java (revision ) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestGreedyPlacementIterators.java (revision ) @@ -0,0 +1,62 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.TestPlacementProcessor.schedulingRequest; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.iterators.SchedulingRequestWrapper; +import org.junit.Assert; +import org.junit.Test; + +public class TestGreedyPlacementIterators { + + @Test + public void testSerialIterator() throws Exception { + List schedulingRequestList = + Arrays.asList(schedulingRequest(1, 1, 1, 512, "foo"), + schedulingRequest(1, 2, 1, 512, "foo"), + schedulingRequest(1, 3, 1, 512, "foo"), + schedulingRequest(1, 4, 1, 512, "foo")); + + SchedulingRequestWrapper requestWrapper = new SchedulingRequestWrapper(null, + null, SchedulingRequestWrapper.IteratorType.SERIAL, + schedulingRequestList); + + Iterator requestIterator = requestWrapper.iterator(); + long prevAllocId = 0; + while (requestIterator.hasNext()) { + SchedulingRequest request = requestIterator.next(); + Assert.assertTrue(request.getAllocationRequestId() > prevAllocId); + prevAllocId = request.getAllocationRequestId(); + } + } + + @Test + public void testPopularTagsIterator() throws Exception { + List schedulingRequestList = + Arrays.asList(schedulingRequest(1, 1, 1, 512, "pri", "foo"), + schedulingRequest(1, 2, 1, 512, "bar"), + schedulingRequest(1, 3, 1, 512, "foo", "pri"), + schedulingRequest(1, 4, 1, 512, "test"), + schedulingRequest(1, 5, 1, 512, "pri", "bar")); + + SchedulingRequestWrapper requestWrapper = new SchedulingRequestWrapper(null, + null, SchedulingRequestWrapper.IteratorType.POPULAR_TAGS, + schedulingRequestList); + + Iterator requestIterator = requestWrapper.iterator(); + long recCcount = 0; + while (requestIterator.hasNext()) { + SchedulingRequest request = requestIterator.next(); + if (recCcount < 3) + Assert.assertTrue(request.getAllocationTags().contains("pri")); + else + Assert.assertTrue(request.getAllocationTags().contains("bar") + || request.getAllocationTags().contains("test")); + recCcount++; + } + } +} \ No newline at end of file Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (date 1514300057000) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (revision ) @@ -579,9 +579,8 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { // Notify placementManager container.rmContext.getAllocationTagsManager().addContainer( - container.getNodeId(), - container.getApplicationAttemptId().getApplicationId(), - container.getContainerId(), container.getAllocationTags()); + container.getNodeId(), container.getContainerId(), + container.getAllocationTags()); container.eventHandler.handle(new RMAppAttemptEvent( container.appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED)); @@ -696,9 +695,8 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { // Notify placementManager container.rmContext.getAllocationTagsManager().removeContainer( - container.getNodeId(), - container.getApplicationAttemptId().getApplicationId(), - container.getContainerId(), container.getAllocationTags()); + container.getNodeId(), container.getContainerId(), + container.getAllocationTags()); RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event; Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/PopularTagsIterator.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/PopularTagsIterator.java (revision ) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/PopularTagsIterator.java (revision ) @@ -0,0 +1,53 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.iterators; + +import org.apache.hadoop.yarn.api.records.SchedulingRequest; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * Traverse Scheduling requests with the most popular tags (count) first + * Currently the count is per Batch but could use TagManager for global count + */ +public class PopularTagsIterator implements Iterator { + + private final List schedulingRequestList; + private int cursor; + + public PopularTagsIterator(Collection schedulingRequests) { + this.schedulingRequestList = new ArrayList<>(schedulingRequests); + // Most popular First + Collections.sort(schedulingRequestList, + (o1, o2) -> (int) getTagPopularity(o2) - (int) getTagPopularity(o1)); + + this.cursor = 0; + } + + private long getTagPopularity(SchedulingRequest o1) { + long max = 0; + for (String tag : o1.getAllocationTags()) { + long count = schedulingRequestList.stream() + .filter(req -> req.getAllocationTags().contains(tag)).count(); + if (count > max) + max = count; + } + return max; + } + + @Override + public boolean hasNext() { + return (cursor < schedulingRequestList.size()); + } + + @Override + public SchedulingRequest next() { + if (hasNext()) { + return schedulingRequestList.get(cursor++); + } + throw new NoSuchElementException(); + } +} Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/BasicPlacementAlgorithm.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/BasicPlacementAlgorithm.java (revision ) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/BasicPlacementAlgorithm.java (revision ) @@ -0,0 +1,125 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm; + +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.iterators.SchedulingRequestWrapper; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.BatchedRequests; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.NodeCandidateSelector; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.SamplePlacementAlgorithm; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Basic placement algorithm. + * Supports different Iterators at SchedulingRequest level including: + * Serial, PopularTags + */ +public class BasicPlacementAlgorithm implements ConstraintPlacementAlgorithm { + + private static final Logger LOG = + LoggerFactory.getLogger(SamplePlacementAlgorithm.class); + + private AllocationTagsManager tagsManager; + private PlacementConstraintManager constraintManager; + private NodeCandidateSelector nodeSelector; + private SchedulingRequestWrapper.IteratorType iteratorType; + + @Override + public void init(RMContext rmContext) { + this.tagsManager = rmContext.getAllocationTagsManager(); + this.constraintManager = rmContext.getPlacementConstraintManager(); + this.nodeSelector = + filter -> ((AbstractYarnScheduler) (rmContext).getScheduler()) + .getNodes(filter); + String iteratorName = rmContext.getYarnConfiguration().get( + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_ITERATOR, + SchedulingRequestWrapper.IteratorType.SERIAL.name()); + LOG.info("Planning Algorithm Iterator[{}]", iteratorName); + try { + iteratorType = + SchedulingRequestWrapper.IteratorType.valueOf(iteratorName); + } catch (IllegalArgumentException e) { + throw new YarnRuntimeException( + "Could not instantiate Basic Placement Algorithm: ", e); + } + } + + public boolean attemptAllocationOnNode(ApplicationId appId, + SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) + throws InvalidAllocationTagsQueryException { + int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations(); + if (numAllocs > 0) { + if (constraintManager.canAssign(appId, + schedulingRequest.getAllocationTags(), schedulerNode.getNodeID(), + tagsManager)) { + return true; + } + } + return false; + } + + + @Override + public void place(ConstraintPlacementAlgorithmInput input, + ConstraintPlacementAlgorithmOutputCollector collector) { + BatchedRequests requests = (BatchedRequests) input; + ConstraintPlacementAlgorithmOutput resp = + new ConstraintPlacementAlgorithmOutput(requests.getApplicationId()); + List allNodes = nodeSelector.selectNodes(null); + + Iterator requestIterator = + new SchedulingRequestWrapper(constraintManager, tagsManager, + this.iteratorType, requests.getSchedulingRequests()).iterator(); + while (requestIterator.hasNext()) { + SchedulingRequest schedulingRequest = requestIterator.next(); + Iterator nodeIter = allNodes.iterator(); + int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations(); + while (nodeIter.hasNext() && numAllocs > 0) { + SchedulerNode node = nodeIter.next(); + try { + if (attemptAllocationOnNode(requests.getApplicationId(), + schedulingRequest, node)) { + schedulingRequest.getResourceSizing() + .setNumAllocations(--numAllocs); + PlacedSchedulingRequest placedReq = + new PlacedSchedulingRequest(schedulingRequest); + placedReq.setPlacementAttempt(requests.getPlacementAttempt()); + placedReq.getNodes().add(node); + resp.getPlacedRequests().add(placedReq); + numAllocs = + schedulingRequest.getResourceSizing().getNumAllocations(); + // Add temp-container tags for current placement cycle + this.tagsManager.addTempContainer(node.getNodeID(), + requests.getApplicationId(), + schedulingRequest.getAllocationTags()); + } + } catch (InvalidAllocationTagsQueryException e) { + LOG.warn("Got exception from TagManager !", e); + } + } + } + // Add all requests whose numAllocations still > 0 to rejected list. + requests.getSchedulingRequests().stream() + .filter(sReq -> sReq.getResourceSizing().getNumAllocations() > 0) + .forEach(rejReq -> resp.getRejectedRequests().add(rejReq)); + collector.collect(resp); + // Clean current temp-container tags + this.tagsManager.cleanTempContainers(requests.getApplicationId()); + } +} Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java (date 1514300057000) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java (revision ) @@ -373,13 +373,13 @@ rej.getReason()); } - private static SchedulingRequest schedulingRequest( + protected static SchedulingRequest schedulingRequest( int priority, long allocReqId, int cores, int mem, String... tags) { return schedulingRequest(priority, allocReqId, cores, mem, ExecutionType.GUARANTEED, tags); } - private static SchedulingRequest schedulingRequest( + protected static SchedulingRequest schedulingRequest( int priority, long allocReqId, int cores, int mem, ExecutionType execType, String... tags) { return SchedulingRequest.newBuilder()