diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 134b69871ce..4389e05b3d2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3737,6 +3737,12 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE = CENTRALIZED_NODELABEL_CONFIGURATION_TYPE; + public static final String EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX + = "exclusive-enforced-partitions"; + + public static final String EXCLUSIVE_ENFORCED_PARTITIONS = NODE_LABELS_PREFIX + + EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX; + public static final String MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY = YARN_PREFIX + "cluster.max-application-priority"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index 6763d66eecc..4d5cb13cf6e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler .SchedulerNodeReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils; @@ -236,6 +237,11 @@ public void allocate(ApplicationAttemptId appAttemptId, && ResourceRequest.ANY.equals(req.getResourceName())) { req.setNodeLabelExpression(asc.getNodeLabelExpression()); } + if (ResourceRequest.ANY.equals(req.getResourceName())) { + SchedulerUtils.enforcePartitionExclusivity(req, + getRmContext().getExclusiveEnforcedPartitions(), + asc.getNodeLabelExpression()); + } } Resource maximumCapacity = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index bdc68acf3a8..0c9f89e5003 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -21,6 +21,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -100,6 +101,7 @@ private YarnAuthorizationProvider authorizer; private boolean timelineServiceV2Enabled; private boolean nodeLabelsEnabled; + private Set exclusiveEnforcedPartitions; public RMAppManager(RMContext context, YarnScheduler scheduler, ApplicationMasterService masterService, @@ -124,6 +126,7 @@ public RMAppManager(RMContext context, timelineServiceV2Enabled(conf); this.nodeLabelsEnabled = YarnConfiguration .areNodeLabelsEnabled(rmContext.getYarnConfiguration()); + this.exclusiveEnforcedPartitions = context.getExclusiveEnforcedPartitions(); } /** @@ -577,6 +580,9 @@ private RMAppImpl createAndPopulateNewRMApp( throw new InvalidResourceRequestException("Invalid resource request, " + "no resource request specified with " + ResourceRequest.ANY); } + SchedulerUtils.enforcePartitionExclusivity(anyReq, + exclusiveEnforcedPartitions, + submissionContext.getNodeLabelExpression()); // Make sure that all of the requests agree with the ANY request // and have correct values diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 55420bd9270..d4949566cf2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; @@ -203,4 +204,7 @@ void setMultiNodeSortingManager( long getTokenSequenceNo(); void incrTokenSequenceNo(); + + Set getExclusiveEnforcedPartitions(); + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 54e0281f7e8..6231425a987 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -20,6 +20,8 @@ import java.net.URI; import java.net.URISyntaxException; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import org.slf4j.Logger; @@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.ConfigurationProvider; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; @@ -678,4 +681,16 @@ public long getTokenSequenceNo() { public void incrTokenSequenceNo() { this.activeServiceContext.incrTokenSequenceNo(); } + + public Set getExclusiveEnforcedPartitions() { + String[] configuredPartitions = getYarnConfiguration().getStrings( + YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS); + Set exclusiveEnforcedPartitions = new HashSet<>(); + if (configuredPartitions != null) { + for (String partition : configuredPartitions) { + exclusiveEnforcedPartitions.add(partition); + } + } + return exclusiveEnforcedPartitions; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index cc7f5852d63..41b8944312e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -205,6 +205,8 @@ private AtomicLong unconfirmedAllocatedMem = new AtomicLong(); private AtomicInteger unconfirmedAllocatedVcores = new AtomicInteger(); + protected String nodeLabelExpression; + public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, String user, Queue queue, AbstractUsersManager abstractUsersManager, RMContext rmContext) { @@ -226,6 +228,7 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, unmanagedAM = appSubmissionContext.getUnmanagedAM(); this.logAggregationContext = appSubmissionContext.getLogAggregationContext(); + this.nodeLabelExpression = appSubmissionContext.getNodeLabelExpression(); } applicationSchedulingEnvs = rmApp.getApplicationSchedulingEnvs(); } @@ -1469,4 +1472,9 @@ public String getDiagnosticMessage() { public Map getApplicationSchedulingEnvs() { return this.applicationSchedulingEnvs; } + + @Override + public String getPartition() { + return nodeLabelExpression == null ? "" : nodeLabelExpression; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index c82bf058bee..1fa591663f0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -301,6 +301,28 @@ public static void normalizeAndValidateRequest(ResourceRequest resReq, rmContext, queueInfo, nodeLabelsEnabled); } + /** + * If RM should enforce partition exclusivity for enforced partition "x": + * 1) If request is "x" and app label is not "x", override request to app's label. + * 2) If app label is "x", ensure request is "x". + * @param resReq resource request + * @param enforcedPartitions list of exclusive enforced partitions + * @param appLabel app's node label expression + */ + public static void enforcePartitionExclusivity(ResourceRequest resReq, + Set enforcedPartitions, String appLabel) { + if (enforcedPartitions == null || enforcedPartitions.isEmpty()) { + return; + } + if (!enforcedPartitions.contains(appLabel) + && enforcedPartitions.contains(resReq.getNodeLabelExpression())) { + resReq.setNodeLabelExpression(appLabel); + } + if (enforcedPartitions.contains(appLabel)) { + resReq.setNodeLabelExpression(appLabel); + } + } + /** * Utility method to validate a resource request, by ensuring that the * requested memory/vcore is non-negative and not greater than max diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index a88beef8c97..c082b8c8bce 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodePolicySpec; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; import org.apache.hadoop.yarn.util.UnitsConversionUtil; @@ -162,6 +163,9 @@ public static final String FAIR_APP_ORDERING_POLICY = "fair"; + public static final String FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY + = "fifo-with-partitions"; + public static final String DEFAULT_APP_ORDERING_POLICY = FIFO_APP_ORDERING_POLICY; @@ -561,6 +565,9 @@ public int getUserLimit(String queue) { if (policyType.trim().equals(FAIR_APP_ORDERING_POLICY)) { policyType = FairOrderingPolicy.class.getName(); } + if (policyType.trim().equals(FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY)) { + policyType = FifoOrderingPolicyWithExclusivePartitions.class.getName(); + } try { orderingPolicy = (OrderingPolicy) Class.forName(policyType).newInstance(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index a178f9e9a0b..991b2c5bc86 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; @@ -803,7 +804,8 @@ protected void activateApplications() { } for (Iterator fsApp = - getPendingAppsOrderingPolicy().getAssignmentIterator(); + getPendingAppsOrderingPolicy() + .getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR); fsApp.hasNext(); ) { FiCaSchedulerApp application = fsApp.next(); ApplicationId applicationId = application.getApplicationId(); @@ -1097,8 +1099,10 @@ public CSAssignment assignContainers(Resource clusterResource, Map userLimits = new HashMap<>(); boolean needAssignToQueueCheck = true; + IteratorSelector sel = new IteratorSelector(); + sel.setPartition(candidates.getPartition()); for (Iterator assignmentIterator = - orderingPolicy.getAssignmentIterator(); + orderingPolicy.getAssignmentIterator(sel); assignmentIterator.hasNext(); ) { FiCaSchedulerApp application = assignmentIterator.next(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java index b83b2aeb67d..4ee44506510 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java @@ -47,13 +47,13 @@ public AbstractComparatorOrderingPolicy() { } public Collection getSchedulableEntities() { return schedulableEntities; } - + @Override - public Iterator getAssignmentIterator() { + public Iterator getAssignmentIterator(IteratorSelector sel) { reorderScheduleEntities(); return schedulableEntities.iterator(); } - + @Override public Iterator getPreemptionIterator() { reorderScheduleEntities(); @@ -138,5 +138,5 @@ public abstract void containerReleased(S schedulableEntity, @Override public abstract String getInfo(); - + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyWithExclusivePartitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyWithExclusivePartitions.java new file mode 100644 index 00000000000..2e97d608e6c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyWithExclusivePartitions.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + + +/** + * Similar to {@link FifoOrderingPolicy}, but with separate ordering policies + * for each partition in + * {@code yarn.scheduler.capacity..ordering-policy.exclusive-enforced-partitions}. + */ +public class FifoOrderingPolicyWithExclusivePartitions implements OrderingPolicy { + + private static final String DEFAULT_PARTITION = "DEFAULT_PARTITION"; + + protected Map> orderingPolicies; + + public FifoOrderingPolicyWithExclusivePartitions() { + this.orderingPolicies = new HashMap<>(); + this.orderingPolicies.put(DEFAULT_PARTITION, new FifoOrderingPolicy()); + } + + public Collection getSchedulableEntities() { + return unionOrderingPolicies().getSchedulableEntities(); + } + + public Iterator getAssignmentIterator(IteratorSelector sel) { + // Return schedulable entities only from filtered partition + return getPartitionOrderingPolicy(sel.getPartition()) + .getAssignmentIterator(sel); + } + + public Iterator getPreemptionIterator() { + // Entities from all partitions should be preemptible + return unionOrderingPolicies().getPreemptionIterator(); + } + + /** + * Union all schedulable entities from all ordering policies + * @return ordering policy containing all schedulable entities + */ + private OrderingPolicy unionOrderingPolicies() { + OrderingPolicy ret = new FifoOrderingPolicy<>(); + for (Map.Entry> entry : orderingPolicies.entrySet()) { + ret.addAllSchedulableEntities(entry.getValue().getSchedulableEntities()); + } + return ret; + } + + public void addSchedulableEntity(S s) { + getPartitionOrderingPolicy(s.getPartition()).addSchedulableEntity(s); + } + + public boolean removeSchedulableEntity(S s) { + return getPartitionOrderingPolicy(s.getPartition()) + .removeSchedulableEntity(s); + } + + public void addAllSchedulableEntities(Collection sc) { + for (S entity : sc) { + getPartitionOrderingPolicy(entity.getPartition()) + .addSchedulableEntity(entity); + } + } + + public int getNumSchedulableEntities() { + // Return total number of schedulable entities, to maintain parity with + // existing FifoOrderingPolicy e.g. when determining if queue has reached + // its max app limit + int ret = 0; + for (Map.Entry> entry : orderingPolicies.entrySet()) { + ret += entry.getValue().getNumSchedulableEntities(); + } + return ret; + } + + public void containerAllocated(S schedulableEntity, RMContainer r) { + getPartitionOrderingPolicy(schedulableEntity.getPartition()) + .containerAllocated(schedulableEntity, r); + } + + public void containerReleased(S schedulableEntity, RMContainer r) { + getPartitionOrderingPolicy(schedulableEntity.getPartition()) + .containerReleased(schedulableEntity, r); + } + + public void demandUpdated(S schedulableEntity) { + getPartitionOrderingPolicy(schedulableEntity.getPartition()) + .demandUpdated(schedulableEntity); + } + + @Override + public void configure(Map conf) { + if (conf == null) { + return; + } + String partitions = + conf.get(YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX); + if (partitions != null) { + for (String partition : partitions.split(",")) { + partition = partition.trim(); + if (!partition.isEmpty()) { + this.orderingPolicies.put(partition, new FifoOrderingPolicy()); + } + } + } + } + + @Override + public String getInfo() { + return "FifoOrderingPolicyWithExclusivePartitions"; + } + + private OrderingPolicy getPartitionOrderingPolicy(String partition) { + String keyPartition = orderingPolicies.containsKey(partition) ? + partition : DEFAULT_PARTITION; + return orderingPolicies.get(keyPartition); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/IteratorSelector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/IteratorSelector.java new file mode 100644 index 00000000000..4694c0b2d74 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/IteratorSelector.java @@ -0,0 +1,29 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +/** + * IteratorSelector contains information needed to tell an + * {@link OrderingPolicy} what to return in an iterator. + */ +public class IteratorSelector { + + public static final IteratorSelector EMPTY_ITERATOR_SELECTOR = new IteratorSelector(); + + private String partition; + + /** + * The partition for this iterator selector + * @return partition + */ + public String getPartition() { + return this.partition; + } + + /** + * Set partition for this iterator selector + * @param p partition + */ + public void setPartition(String p) { + this.partition = p; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java index 9aacc7e79a4..ccec974ba8a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java @@ -45,10 +45,11 @@ /** * Return an iterator over the collection of {@link SchedulableEntity} * objects which orders them for container assignment. + * @param sel the {@link IteratorSelector} to filter with * @return an iterator over the collection of {@link SchedulableEntity} * objects */ - public Iterator getAssignmentIterator(); + public Iterator getAssignmentIterator(IteratorSelector sel); /** * Return an iterator over the collection of {@link SchedulableEntity} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java index 41b83ce7162..be835560ade 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java @@ -55,4 +55,9 @@ */ public boolean isRecovering(); + /** + * Get partition corresponding to this entity. + * @return partition + */ + String getPartition(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java index 623cdfb2342..c3f3e7c0188 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java @@ -38,6 +38,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -978,6 +979,118 @@ public static void waitSchedulerApplicationAttemptStopped( System.err.println("Failed to wait scheduler application attempt stopped."); } + @Test + public void testEnforcePartitionExclusivity() { + String enforcedExclusiveLabel = "x"; + Set enforcedExclusiveLabelSet = Collections.singleton(enforcedExclusiveLabel); + String dummyLabel = "y"; + String appLabel = "appLabel"; + ResourceRequest rr = BuilderUtils.newResourceRequest( + mock(Priority.class), ResourceRequest.ANY, mock(Resource.class), 1); + + // RR label unset and app label does not match. Nothing should happen. + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, null); + Assert.assertNull(rr.getNodeLabelExpression()); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, appLabel); + Assert.assertNull(rr.getNodeLabelExpression()); + + // RR label and app label do not match. Nothing should happen. + rr.setNodeLabelExpression(dummyLabel); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, null); + Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression()); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, appLabel); + Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression()); + + // RR label matches but app label does not. RR label should be set to app label + rr.setNodeLabelExpression(enforcedExclusiveLabel); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, null); + Assert.assertNull(rr.getNodeLabelExpression()); + rr.setNodeLabelExpression(enforcedExclusiveLabel); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, appLabel); + Assert.assertEquals(appLabel, rr.getNodeLabelExpression()); + + // RR label unset and app label matches. RR label should be set to app label + rr.setNodeLabelExpression(null); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, enforcedExclusiveLabel); + Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression()); + + // RR label does not match and app label matches. RR label should be set to app label + rr.setNodeLabelExpression(dummyLabel); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, enforcedExclusiveLabel); + Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression()); + + // RR label and app label matches. Nothing should happen. + rr.setNodeLabelExpression(enforcedExclusiveLabel); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, enforcedExclusiveLabel); + Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression()); + + // Unconfigured label: nothing should happen. + rr.setNodeLabelExpression(null); + SchedulerUtils.enforcePartitionExclusivity(rr, null, appLabel); + Assert.assertNull(rr.getNodeLabelExpression()); + rr.setNodeLabelExpression(dummyLabel); + SchedulerUtils.enforcePartitionExclusivity(rr, null, appLabel); + Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression()); + rr.setNodeLabelExpression(enforcedExclusiveLabel); + SchedulerUtils.enforcePartitionExclusivity(rr, null, appLabel); + Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression()); + } + + @Test + public void testEnforcePartitionExclusivityMultipleLabels() { + String enforcedLabel1 = "x"; + String enforcedLabel2 = "y"; + Set enforcedExclusiveLabelSet = new HashSet<>(); + enforcedExclusiveLabelSet.add(enforcedLabel1); + enforcedExclusiveLabelSet.add(enforcedLabel2); + String dummyLabel = "dummyLabel"; + String appLabel = "appLabel"; + ResourceRequest rr = BuilderUtils.newResourceRequest( + mock(Priority.class), ResourceRequest.ANY, mock(Resource.class), 1); + + // RR label unset and app label does not match. Nothing should happen. + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, null); + Assert.assertNull(rr.getNodeLabelExpression()); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, appLabel); + Assert.assertNull(rr.getNodeLabelExpression()); + + // RR label and app label do not match. Nothing should happen. + rr.setNodeLabelExpression(dummyLabel); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, null); + Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression()); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, appLabel); + Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression()); + + // RR label matches but app label does not. RR label should be set to app label + rr.setNodeLabelExpression(enforcedLabel1); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, null); + Assert.assertNull(rr.getNodeLabelExpression()); + rr.setNodeLabelExpression(enforcedLabel2); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, appLabel); + Assert.assertEquals(appLabel, rr.getNodeLabelExpression()); + + // RR label unset and app label matches. RR label should be set to app label + rr.setNodeLabelExpression(null); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, enforcedLabel1); + Assert.assertEquals(enforcedLabel1, rr.getNodeLabelExpression()); + + // RR label does not match and app label matches. RR label should be set to app label + rr.setNodeLabelExpression(dummyLabel); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, enforcedLabel2); + Assert.assertEquals(enforcedLabel2, rr.getNodeLabelExpression()); + + // RR label and app label matches. Nothing should happen. + rr.setNodeLabelExpression(enforcedLabel1); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, enforcedLabel1); + Assert.assertEquals(enforcedLabel1, rr.getNodeLabelExpression()); + + // RR label and app label don't match, but they're both enforced labels. + // RR label should be set to app label. + rr.setNodeLabelExpression(enforcedLabel2); + SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet, enforcedLabel1); + Assert.assertEquals(enforcedLabel1, rr.getNodeLabelExpression()); + } + public static SchedulerApplication verifyAppAddedAndRemovedFromScheduler( Map> diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 855c793edc5..47c81f9d813 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -162,6 +162,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -1299,7 +1300,7 @@ public void testAllocateReorder() throws Exception { //This happens because app2 has no demand/a magnitude of NaN, which //results in app1 and app2 being equal in the fairness comparison and //failling back to fifo (start) ordering - assertEquals(q.getOrderingPolicy().getAssignmentIterator().next().getId(), + assertEquals(q.getOrderingPolicy().getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR).next().getId(), appId1.toString()); //Now, allocate for app2 (this would be the first/AM allocation) @@ -1312,7 +1313,7 @@ public void testAllocateReorder() throws Exception { //verify re-ordering based on the allocation alone //Now, the first app for assignment is app2 - assertEquals(q.getOrderingPolicy().getAssignmentIterator().next().getId(), + assertEquals(q.getOrderingPolicy().getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR).next().getId(), appId2.toString()); rm.stop(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 6a4391ab211..0714a3412db 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; @@ -133,6 +134,8 @@ final static int GB = 1024; final static String DEFAULT_RACK = "/default"; + private final static String LABEL = "test"; + private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); @@ -141,14 +144,19 @@ @Before public void setUp() throws Exception { - setUpInternal(resourceCalculator); + setUpInternal(resourceCalculator, false); } private void setUpWithDominantResourceCalculator() throws Exception { - setUpInternal(dominantResourceCalculator); + setUpInternal(dominantResourceCalculator, false); + } + + private void setUpWithNodeLabels() throws Exception { + setUpInternal(resourceCalculator, true); } - private void setUpInternal(ResourceCalculator rC) throws Exception { + private void setUpInternal(ResourceCalculator rC, boolean withNodeLabels) + throws Exception { CapacityScheduler spyCs = new CapacityScheduler(); queues = new HashMap(); cs = spy(spyCs); @@ -174,7 +182,7 @@ private void setUpInternal(ResourceCalculator rC) throws Exception { csConf.setBoolean(CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES, false); final String newRoot = "root" + System.currentTimeMillis(); - setupQueueConfiguration(csConf, newRoot); + setupQueueConfiguration(csConf, newRoot, withNodeLabels); YarnConfiguration conf = new YarnConfiguration(); cs.setConf(conf); when(spyRMContext.getYarnConfiguration()).thenReturn(conf); @@ -231,24 +239,39 @@ private void setUpInternal(ResourceCalculator rC) throws Exception { private static final String E = "e"; private void setupQueueConfiguration( CapacitySchedulerConfiguration conf, - final String newRoot) { + final String newRoot, boolean withNodeLabels) { // Define top-level queues conf.setQueues(ROOT, new String[] {newRoot}); conf.setMaximumCapacity(ROOT, 100); conf.setAcl(ROOT, QueueACL.SUBMIT_APPLICATIONS, " "); + if (withNodeLabels) { + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, LABEL, 100); + conf.setMaximumCapacityByLabel(CapacitySchedulerConfiguration.ROOT, + LABEL, 100); + } final String Q_newRoot = ROOT + "." + newRoot; conf.setQueues(Q_newRoot, new String[] {A, B, C, D, E}); conf.setCapacity(Q_newRoot, 100); conf.setMaximumCapacity(Q_newRoot, 100); conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " "); + if (withNodeLabels) { + conf.setAccessibleNodeLabels(Q_newRoot, Collections.singleton(LABEL)); + conf.setCapacityByLabel(Q_newRoot, LABEL, 100); + conf.setMaximumCapacityByLabel(Q_newRoot, LABEL, 100); + } final String Q_A = Q_newRoot + "." + A; conf.setCapacity(Q_A, 8.5f); conf.setMaximumCapacity(Q_A, 20); conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*"); + if (withNodeLabels) { + conf.setAccessibleNodeLabels(Q_A, Collections.singleton(LABEL)); + conf.setCapacityByLabel(Q_A, LABEL, 100); + conf.setMaximumCapacityByLabel(Q_A, LABEL, 100); + } final String Q_B = Q_newRoot + "." + B; conf.setCapacity(Q_B, 80); @@ -3100,7 +3123,7 @@ public void testMaxAMResourcePerQueuePercentAfterQueueRefresh() Map queues = new HashMap(); CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); final String newRootName = "root" + System.currentTimeMillis(); - setupQueueConfiguration(csConf, newRootName); + setupQueueConfiguration(csConf, newRootName, false); Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 32); @@ -3292,6 +3315,116 @@ public void testFifoAssignment() throws Exception { Assert.assertEquals(3 * GB, app_0.getCurrentConsumption().getMemorySize()); } + @Test + public void testFifoWithPartitionsAssignment() throws Exception { + setUpWithNodeLabels(); + + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + OrderingPolicy policy = + new FifoOrderingPolicyWithExclusivePartitions<>(); + policy.configure(Collections.singletonMap( + YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX, LABEL)); + a.setOrderingPolicy(policy); + String host_0_0 = "127.0.0.1"; + String rack_0 = "rack_0"; + FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, + 16 * GB); + when(node_0_0.getPartition()).thenReturn(LABEL); + String host_0_1 = "127.0.0.2"; + FiCaSchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, + 16 * GB); + when(node_0_1.getPartition()).thenReturn(""); + + final int numNodes = 4; + Resource clusterResource = Resources.createResource(numNodes * (16 * GB), + numNodes * 16); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + String user_0 = "user_0"; + + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5), + false)); + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3), + false)); + when(app_1.getPartition()).thenReturn(LABEL); + a.submitApplicationAttempt(app_1, user_0); + + Map apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map nodes = ImmutableMap.of(node_0_0.getNodeID(), + node_0_0, node_0_1.getNodeID(), node_0_1); + + Priority priority = TestUtils.createMockPriority(1); + List app_0_requests_0 = new ArrayList(); + List app_1_requests_0 = new ArrayList(); + + app_0_requests_0.clear(); + app_0_requests_0.add(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, priority, + recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + app_1_requests_0.clear(); + app_1_requests_0.add(TestUtils + .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority, + recordFactory, LABEL)); + app_1.updateResourceRequests(app_1_requests_0); + + // app_1 will get containers since it is exclusive-enforced + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + Assert.assertEquals(1 * GB, app_1.getSchedulingResourceUsage() + .getUsed(LABEL).getMemorySize()); + // app_0 should not get resources from node_0_0 since the labels + // don't match + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + Assert.assertEquals(0 * GB, app_0.getCurrentConsumption().getMemorySize()); + + app_1_requests_0.clear(); + app_1_requests_0.add(TestUtils + .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority, + recordFactory, LABEL)); + app_1.updateResourceRequests(app_1_requests_0); + + // When node_0_1 heartbeats, app_0 should get containers + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + Assert.assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); + Assert.assertEquals(1 * GB, app_1.getSchedulingResourceUsage() + .getUsed(LABEL).getMemorySize()); + + app_0_requests_0.clear(); + app_0_requests_0.add(TestUtils + .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority, + recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + // When node_0_0 heartbeats, app_1 should get containers again + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + Assert.assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); + Assert.assertEquals(2 * GB, app_1.getSchedulingResourceUsage() + .getUsed(LABEL).getMemorySize()); + } + @Test public void testConcurrentAccess() throws Exception { YarnConfiguration conf = new YarnConfiguration(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java index 4f251bf4e38..62f7a4956b9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java @@ -18,21 +18,19 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; -import java.util.*; - import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; public class MockSchedulableEntity implements SchedulableEntity { - + private String id; private long serial = 0; private Priority priority; private boolean isRecovering; + private String partition = ""; public MockSchedulableEntity() { } @@ -101,4 +99,13 @@ public boolean isRecovering() { protected void setRecovering(boolean entityRecovering) { this.isRecovering = entityRecovering; } + + @Override + public String getPartition() { + return partition; + } + + public void setPartition(String partition) { + this.partition = partition; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java index 683173af709..b4f207a2176 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java @@ -126,19 +126,19 @@ public void testIterators() { //Assignment, least to greatest consumption - checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"}); + checkIds(schedOrder.getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR), new String[]{"3", "2", "1"}); //Preemption, greatest to least checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"}); //Change value without inform, should see no change msp2.setUsed(Resources.createResource(6)); - checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"}); + checkIds(schedOrder.getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR), new String[]{"3", "2", "1"}); checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"}); //Do inform, will reorder schedOrder.containerAllocated(msp2, null); - checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "1", "2"}); + checkIds(schedOrder.getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR), new String[]{"3", "1", "2"}); checkIds(schedOrder.getPreemptionIterator(), new String[]{"2", "1", "3"}); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicy.java index 776f6c6962b..7ec2c01ec25 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicy.java @@ -63,7 +63,7 @@ public void testIterators() { schedOrder.addSchedulableEntity(msp3); //Assignment, oldest to youngest - checkSerials(schedOrder.getAssignmentIterator(), new long[]{1, 2, 3}); + checkSerials(schedOrder.getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR), new long[]{1, 2, 3}); //Preemption, youngest to oldest checkSerials(schedOrder.getPreemptionIterator(), new long[]{3, 2, 1}); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyForPendingApps.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyForPendingApps.java index 5fa9a1d0a91..4efeb88c709 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyForPendingApps.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyForPendingApps.java @@ -74,7 +74,7 @@ public void testIterators() { schedOrder.addSchedulableEntity(msp7); // Assignment with serial id's are 3,2,4,1,6,5,7 - checkSerials(schedOrder.getAssignmentIterator(), new long[] { 3, 2, 4, 1, + checkSerials(schedOrder.getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR), new long[] { 3, 2, 4, 1, 6, 5, 7 }); //Preemption, youngest to oldest diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyWithExclusivePartitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyWithExclusivePartitions.java new file mode 100644 index 00000000000..e6558ce68fd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyWithExclusivePartitions.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Assert; +import org.junit.Test; + + +public class TestFifoOrderingPolicyWithExclusivePartitions { + + private static final String PARTITION = "test"; + private static final String PARTITION2 = "test2"; + + @Test + public void testNoConfiguredExclusiveEnforcedPartitions() { + FifoOrderingPolicyWithExclusivePartitions policy = + new FifoOrderingPolicyWithExclusivePartitions<>(); + policy.configure(Collections.EMPTY_MAP); + + MockSchedulableEntity p1 = new MockSchedulableEntity(4, 0, false); + p1.setPartition(PARTITION); + p1.setId("p1"); + MockSchedulableEntity p2 = new MockSchedulableEntity(3, 1, false); + p2.setPartition(PARTITION); + p2.setId("p2"); + + MockSchedulableEntity r1 = new MockSchedulableEntity(2, 0, false); + r1.setId("r1"); + MockSchedulableEntity r2 = new MockSchedulableEntity(1, 0, false); + r2.setId("r2"); + + policy.addSchedulableEntity(p1); + policy.addAllSchedulableEntities(Arrays.asList(p2, r1, r2)); + Assert.assertEquals(4, policy.getNumSchedulableEntities()); + Assert.assertEquals(4, policy.getSchedulableEntities().size()); + IteratorSelector sel = new IteratorSelector(); + // Should behave like FifoOrderingPolicy, regardless of partition + verifyAssignmentIteratorOrder(policy, IteratorSelector.EMPTY_ITERATOR_SELECTOR, "p2", "r2", "r1", "p1"); + verifyPreemptionIteratorOrder(policy, "p1", "r1", "r2", "p2"); + sel.setPartition(PARTITION); + verifyAssignmentIteratorOrder(policy, sel, "p2", "r2", "r1", "p1"); + verifyPreemptionIteratorOrder(policy, "p1", "r1", "r2", "p2"); + + policy.removeSchedulableEntity(p2); + policy.removeSchedulableEntity(r2); + Assert.assertEquals(2, policy.getNumSchedulableEntities()); + Assert.assertEquals(2, policy.getSchedulableEntities().size()); + verifyAssignmentIteratorOrder(policy, IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r1", "p1"); + verifyPreemptionIteratorOrder(policy, "p1", "r1"); + sel.setPartition(PARTITION); + verifyAssignmentIteratorOrder(policy, sel, "r1", "p1"); + verifyPreemptionIteratorOrder(policy, "p1", "r1"); + } + + @Test + public void testSingleExclusiveEnforcedPartition() { + FifoOrderingPolicyWithExclusivePartitions policy = + new FifoOrderingPolicyWithExclusivePartitions<>(); + policy.configure(Collections.singletonMap( + YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX, PARTITION)); + + // PARTITION iterator should return p2, p1, p3 + MockSchedulableEntity p1 = new MockSchedulableEntity(1, 0, false); + p1.setPartition(PARTITION); + p1.setId("p1"); + MockSchedulableEntity p2 = new MockSchedulableEntity(5, 1, false); + p2.setPartition(PARTITION); + p2.setId("p2"); + MockSchedulableEntity p3 = new MockSchedulableEntity(3, 0, false); + p3.setPartition(PARTITION); + p3.setId("p3"); + + // non-PARTITION iterator should return r3, r2, r1 + MockSchedulableEntity r1 = new MockSchedulableEntity(6, 0, false); + r1.setId("r1"); + MockSchedulableEntity r2 = new MockSchedulableEntity(4, 0, false); + r2.setId("r2"); + MockSchedulableEntity r3 = new MockSchedulableEntity(2, 1, false); + r3.setId("r3"); + + policy.addSchedulableEntity(r1); + Assert.assertEquals(1, policy.getNumSchedulableEntities()); + Assert.assertEquals("r1", policy.getSchedulableEntities() + .iterator().next().getId()); + verifyAssignmentIteratorOrder(policy, IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r1"); + verifyPreemptionIteratorOrder(policy, "r1"); + + List entities = Arrays.asList(r2, r3, p1, p2); + policy.addAllSchedulableEntities(entities); + policy.addSchedulableEntity(p3); + Assert.assertEquals(6, policy.getNumSchedulableEntities()); + Assert.assertEquals(6, policy.getSchedulableEntities().size()); + // Assignment iterator should return non-PARTITION entities, + // in order based on FifoOrderingPolicy + verifyAssignmentIteratorOrder(policy, IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r3", "r2", "r1"); + // Preemption iterator should return all entities, in global order + verifyPreemptionIteratorOrder(policy, "r1", "r2", "p3", "p1", "p2", "r3"); + // Same thing as above, but with a non-empty partition + IteratorSelector sel = new IteratorSelector(); + sel.setPartition("dummy"); + verifyAssignmentIteratorOrder(policy, sel, "r3", "r2", "r1"); + verifyPreemptionIteratorOrder(policy, "r1", "r2", "p3", "p1", "p2", "r3"); + // Should return PARTITION entities, in order based on FifoOrderingPolicy + sel.setPartition(PARTITION); + verifyAssignmentIteratorOrder(policy, sel, "p2", "p1", "p3"); + verifyPreemptionIteratorOrder(policy, "r1", "r2", "p3", "p1", "p2", "r3"); + + policy.removeSchedulableEntity(p2); + policy.removeSchedulableEntity(r2); + Assert.assertEquals(4, policy.getNumSchedulableEntities()); + Assert.assertEquals(4, policy.getSchedulableEntities().size()); + verifyAssignmentIteratorOrder(policy, IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r3", "r1"); + verifyPreemptionIteratorOrder(policy, "r1", "p3", "p1", "r3"); + sel.setPartition(PARTITION); + verifyAssignmentIteratorOrder(policy, sel, "p1", "p3"); + verifyPreemptionIteratorOrder(policy, "r1", "p3", "p1", "r3"); + + policy.removeSchedulableEntity(p1); + policy.removeSchedulableEntity(p3); + Assert.assertEquals(2, policy.getNumSchedulableEntities()); + Assert.assertEquals(2, policy.getSchedulableEntities().size()); + verifyAssignmentIteratorOrder(policy, IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r3", "r1"); + verifyPreemptionIteratorOrder(policy, "r1", "r3"); + sel.setPartition(PARTITION); + verifyAssignmentIteratorOrder(policy, sel); + verifyPreemptionIteratorOrder(policy, "r1", "r3"); + } + + @Test + public void testMultipleExclusiveEnforcedPartitions() { + FifoOrderingPolicyWithExclusivePartitions policy = + new FifoOrderingPolicyWithExclusivePartitions<>(); + policy.configure(Collections.singletonMap( + YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX, + PARTITION + "," + PARTITION2)); + + // PARTITION iterator should return p2, p1 + MockSchedulableEntity p1 = new MockSchedulableEntity(1, 0, false); + p1.setPartition(PARTITION); + p1.setId("p1"); + MockSchedulableEntity p2 = new MockSchedulableEntity(5, 1, false); + p2.setPartition(PARTITION); + p2.setId("p2"); + + // PARTITION2 iterator should return r1, r2 + MockSchedulableEntity r1 = new MockSchedulableEntity(3, 0, false); + r1.setPartition(PARTITION2); + r1.setId("r1"); + MockSchedulableEntity r2 = new MockSchedulableEntity(4, 0, false); + r2.setPartition(PARTITION2); + r2.setId("r2"); + + // default iterator should return s2, s1 + MockSchedulableEntity s1 = new MockSchedulableEntity(6, 0, false); + s1.setId("s1"); + MockSchedulableEntity s2 = new MockSchedulableEntity(2, 0, false); + s2.setId("s2"); + + policy.addAllSchedulableEntities(Arrays.asList(s1, s2, r1)); + Assert.assertEquals(3, policy.getNumSchedulableEntities()); + Assert.assertEquals(3, policy.getSchedulableEntities().size()); + IteratorSelector sel = new IteratorSelector(); + // assignment iterator returns only default (non-partitioned) entities + verifyAssignmentIteratorOrder(policy, IteratorSelector.EMPTY_ITERATOR_SELECTOR, "s2", "s1"); + verifyPreemptionIteratorOrder(policy, "s1", "r1", "s2"); + sel.setPartition(PARTITION2); + verifyAssignmentIteratorOrder(policy, sel, "r1"); + + policy.addAllSchedulableEntities(Arrays.asList(r2, p1, p2)); + Assert.assertEquals(6, policy.getNumSchedulableEntities()); + Assert.assertEquals(6, policy.getSchedulableEntities().size()); + verifyAssignmentIteratorOrder(policy, IteratorSelector.EMPTY_ITERATOR_SELECTOR, "s2", "s1"); + sel.setPartition(PARTITION); + verifyAssignmentIteratorOrder(policy, sel, "p2", "p1"); + sel.setPartition(PARTITION2); + verifyAssignmentIteratorOrder(policy, sel, "r1", "r2"); + verifyPreemptionIteratorOrder(policy, "s1", "r2", "r1", "s2", "p1", "p2"); + + policy.removeSchedulableEntity(p2); + policy.removeSchedulableEntity(r1); + policy.removeSchedulableEntity(r2); + Assert.assertEquals(3, policy.getNumSchedulableEntities()); + Assert.assertEquals(3, policy.getSchedulableEntities().size()); + verifyAssignmentIteratorOrder(policy, IteratorSelector.EMPTY_ITERATOR_SELECTOR, "s2", "s1"); + sel.setPartition(PARTITION); + verifyAssignmentIteratorOrder(policy, sel, "p1"); + sel.setPartition(PARTITION2); + verifyAssignmentIteratorOrder(policy, sel); + verifyPreemptionIteratorOrder(policy, "s1", "s2", "p1"); + } + + private void verifyAssignmentIteratorOrder( + FifoOrderingPolicyWithExclusivePartitions policy, + IteratorSelector sel, String ... ids) { + verifyIteratorOrder(policy.getAssignmentIterator(sel), ids); + } + + private void verifyPreemptionIteratorOrder( + FifoOrderingPolicyWithExclusivePartitions policy, + String ... ids) { + verifyIteratorOrder(policy.getPreemptionIterator(), ids); + } + + private void verifyIteratorOrder(Iterator itr, + String ... ids) { + for (String id : ids) { + Assert.assertEquals(id, itr.next().getId()); + } + Assert.assertFalse(itr.hasNext()); + } +}