diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index d5d1374..7a79530 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -30,7 +30,6 @@ import java.util.Set; import java.util.StringTokenizer; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -57,6 +56,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration { @@ -75,7 +75,7 @@ @Private public static final String MAXIMUM_APPLICATIONS_SUFFIX = "maximum-applications"; - + @Private public static final String MAXIMUM_SYSTEM_APPLICATIONS = PREFIX + MAXIMUM_APPLICATIONS_SUFFIX; @@ -1077,4 +1077,19 @@ public boolean getLazyPreemptionEnabled() { PREEMPTION_CONFIG_PREFIX + "select_based_on_reserved_containers"; public static final boolean DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS = false; + + /** + * Maximum application for a queue to be used when application per queue is + * not defined.To be consistent with previous version the default value is set + * as UNDEFINED. + */ + @Private + public static final String QUEUE_GLOBAL_MAX_APPLICATION = + PREFIX + "global-queue-max-application"; + + public int getGlobalMaximumApplicationsPerQueue() { + int maxApplicationsPerQueue = + getInt(QUEUE_GLOBAL_MAX_APPLICATION, (int) UNDEFINED); + return maxApplicationsPerQueue; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index eecd4ba..a002d73 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -19,7 +19,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; @@ -56,16 +64,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; - import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -183,10 +192,16 @@ protected void setupQueueConfigs(Resource clusterResource) maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath()); if (maxApplications < 0) { - int maxSystemApps = conf.getMaximumSystemApplications(); - maxApplications = + int maxGlobalPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue(); + if (maxGlobalPerQueueApps > 0) { + maxApplications = maxGlobalPerQueueApps; + } else { + int maxSystemApps = conf.getMaximumSystemApplications(); + maxApplications = (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity()); + } } + maxApplicationsPerUser = Math.min(maxApplications, (int) (maxApplications * (userLimit / 100.0f) * userLimitFactor)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 175f5bb..f92e37e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -31,14 +31,17 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.Resource; @@ -47,11 +50,17 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -61,11 +70,16 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Matchers; import org.mockito.Mockito; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + public class TestApplicationLimits { private static final Log LOG = LogFactory.getLog(TestApplicationLimits.class); @@ -698,6 +712,139 @@ public void testHeadroom() throws Exception { } + private Configuration getConfigurationWithQueueLabels(Configuration config) { + CapacitySchedulerConfiguration conf = + new CapacitySchedulerConfiguration(config); + // Define top-level + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{"a", "b", "c", "d"}); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100); + + conf.setInt( + "yarn.scheduler.capacity.maximum-applications.accessible-node-labels.z", + 8); + conf.setInt(CapacitySchedulerConfiguration.QUEUE_GLOBAL_MAX_APPLICATION, + 20); + conf.setInt("yarn.scheduler.capacity.root.a.a1.maximum-applications", 1); + conf.setFloat("yarn.scheduler.capacity.root.d.user-limit-factor", 0.1f); + + final String a = CapacitySchedulerConfiguration.ROOT + ".a"; + final String b = CapacitySchedulerConfiguration.ROOT + ".b"; + final String c = CapacitySchedulerConfiguration.ROOT + ".c"; + final String d = CapacitySchedulerConfiguration.ROOT + ".d"; + final String aa1 = a + ".a1"; + final String aa2 = a + ".a2"; + final String aa3 = a + ".a3"; + + conf.setQueues(a, new String[]{"a1", "a2", "a3"}); + conf.setCapacity(a, 50); + conf.setCapacity(b, 50); + conf.setCapacity(c, 0); + conf.setCapacity(d, 0); + conf.setCapacity(aa1, 50); + conf.setCapacity(aa2, 50); + conf.setCapacity(aa3, 0); + + conf.setCapacityByLabel(a, "y", 25); + conf.setCapacityByLabel(b, "y", 50); + conf.setCapacityByLabel(c, "y", 25); + conf.setCapacityByLabel(d, "y", 0); + + conf.setCapacityByLabel(a, "x", 50); + conf.setCapacityByLabel(b, "x", 50); + + conf.setCapacityByLabel(a, "z", 50); + conf.setCapacityByLabel(b, "z", 50); + + conf.setCapacityByLabel(aa1, "x", 100); + conf.setCapacityByLabel(aa2, "x", 0); + + conf.setCapacityByLabel(aa1, "y", 25); + conf.setCapacityByLabel(aa2, "y", 75); + + conf.setCapacityByLabel(aa2, "z", 75); + conf.setCapacityByLabel(aa3, "z", 25); + return conf; + } + + private Set toSet(String... elements) { + Set set = Sets.newHashSet(elements); + return set; + } + + @Test(timeout = 120000000) + public void testApplicationLimitSubmit() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + // set node -> label + mgr.addToCluserNodeLabelsWithDefaultExclusivity( + ImmutableSet.of("x", "y", "z")); + + // set mapping: + // h1 -> x + // h2 -> y + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y"))); + + // inject node label manager + MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + MockNM nm1 = rm.registerNode("h1:1234", 4096); + MockNM nm2 = rm.registerNode("h2:1234", 4096); + MockNM nm3 = rm.registerNode("h3:1234", 4096); + + // Submit application to queue c where the default partition capacity is + // zero + RMApp app1 = rm.submitApp(GB, "app", "user", null, "c", false); + rm.drainEvents(); + rm.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(RMAppState.ACCEPTED, app1.getState()); + rm.killApp(app1.getApplicationId()); + + RMApp app2 = rm.submitApp(GB, "app", "user", null, "a1", false); + rm.drainEvents(); + rm.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(RMAppState.ACCEPTED, app2.getState()); + + // Check second application is rejected and based on queue level max + // application app is rejected + RMApp app3 = rm.submitApp(GB, "app", "user", null, "a1", false); + rm.drainEvents(); + rm.waitForState(app3.getApplicationId(), RMAppState.FAILED); + Assert.assertEquals(RMAppState.FAILED, app3.getState()); + + // based on Global limit of queue the application is rejected + RMApp app11 = rm.submitApp(GB, "app", "user", null, "d", false); + rm.drainEvents(); + rm.waitForState(app11.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(RMAppState.ACCEPTED, app11.getState()); + RMApp app12 = rm.submitApp(GB, "app", "user", null, "d", false); + rm.drainEvents(); + rm.waitForState(app12.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(RMAppState.ACCEPTED, app12.getState()); + RMApp app13 = rm.submitApp(GB, "app", "user", null, "d", false); + rm.drainEvents(); + rm.waitForState(app13.getApplicationId(), RMAppState.FAILED); + Assert.assertEquals(RMAppState.FAILED, app13.getState()); + + rm.killApp(app2.getApplicationId()); + rm.killApp(app11.getApplicationId()); + rm.killApp(app13.getApplicationId()); + rm.stop(); + } @After public void tearDown() {