diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 1e62b44..8b30f71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.StringTokenizer; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; @@ -959,4 +961,16 @@ public Integer getDefaultApplicationPriorityConfPerQueue(String queue) { DEFAULT_CONFIGURATION_APPLICATION_PRIORITY); return defaultPriority; } + + @VisibleForTesting + public void setOrderingPolicy(String queue, String policy) { + set(getQueuePrefix(queue) + ORDERING_POLICY, policy); + } + + @VisibleForTesting + public void setOrderingPolicyParameter(String queue, + String parameterKey, String parameterValue) { + set(getQueuePrefix(queue) + ORDERING_POLICY + "." + + parameterKey, parameterValue); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java index 7bec03a..7db1a94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java @@ -70,19 +70,17 @@ protected void reorderSchedulableEntity(S schedulableEntity) { schedulableEntity.getSchedulingResourceUsage()); schedulableEntities.add(schedulableEntity); } - - protected void reorderScheduleEntities() { - synchronized (entitiesToReorder) { - for (Map.Entry entry : - entitiesToReorder.entrySet()) { - reorderSchedulableEntity(entry.getValue()); - } - entitiesToReorder.clear(); + + protected synchronized void reorderScheduleEntities() { + for (Map.Entry entry : entitiesToReorder.entrySet()) { + reorderSchedulableEntity(entry.getValue()); } + entitiesToReorder.clear(); } - protected void entityRequiresReordering(S schedulableEntity) { - synchronized (entitiesToReorder) { + protected synchronized void entityRequiresReordering(S schedulableEntity) { + if (schedulableEntities.contains(schedulableEntity)) { + // Only reorder the schedulable entity if it's in schedulabl-entities map entitiesToReorder.put(schedulableEntity.getId(), schedulableEntity); } } @@ -99,16 +97,14 @@ public void addSchedulableEntity(S s) { } schedulableEntities.add(s); } - + @Override - public boolean removeSchedulableEntity(S s) { + public synchronized boolean removeSchedulableEntity(S s) { if (null == s) { return false; } - synchronized (entitiesToReorder) { - entitiesToReorder.remove(s.getId()); - } - return schedulableEntities.remove(s); + entitiesToReorder.remove(s.getId()); + return schedulableEntities.remove(s); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java index ffb9d93..d129c63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java @@ -20,6 +20,18 @@ import java.util.*; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.junit.Assert; import org.junit.Test; @@ -138,6 +150,48 @@ public void testIterators() { checkIds(schedOrder.getPreemptionIterator(), new String[]{"2", "1", "3"}); } + @Test + public void testSizeBasedWeightNotAffectAppActivation() throws Exception { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + + // Define top-level queues + String queuePath = CapacitySchedulerConfiguration.ROOT + ".default"; + csConf.setOrderingPolicy(queuePath, CapacitySchedulerConfiguration.FAIR_ORDERING_POLICY); + csConf.setOrderingPolicyParameter(queuePath, + FairOrderingPolicy.ENABLE_SIZE_BASED_WEIGHT, "true"); + csConf.setMaximumApplicationMasterResourcePerQueuePercent(queuePath, 0.1f); + + // inject node label manager + MockRM rm = new MockRM(csConf); + rm.start(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Get LeafQueue + LeafQueue lq = (LeafQueue) cs.getQueue("default"); + OrderingPolicy policy = lq.getOrderingPolicy(); + Assert.assertTrue(policy instanceof FairOrderingPolicy); + Assert.assertTrue(((FairOrderingPolicy)policy).getSizeBasedWeight()); + + MockNM nm1 = rm.registerNode("h1:1234", 10 * GB); // label = x + + // Submit 4 apps + rm.submitApp(1 * GB, "app", "user", null, "default"); + rm.submitApp(1 * GB, "app", "user", null, "default"); + rm.submitApp(1 * GB, "app", "user", null, "default"); + rm.submitApp(1 * GB, "app", "user", null, "default"); + + Assert.assertEquals(1, lq.getNumActiveApplications()); + Assert.assertEquals(3, lq.getNumPendingApplications()); + + // Try allocate once, #active-apps and #pending-apps should be still correct + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(NodeId.newInstance("h1", 1234)))); + Assert.assertEquals(1, lq.getNumActiveApplications()); + Assert.assertEquals(3, lq.getNumPendingApplications()); + } + public void checkIds(Iterator si, String[] ids) { for (int i = 0;i < ids.length;i++) {