diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index de71f71..1cf1a6b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -55,6 +55,7 @@ private final String queueName; Queue queue; final String user; + private Resource amResource; private final AtomicInteger containerIdCounter = new AtomicInteger(0); final Set priorities = new TreeSet( @@ -95,6 +96,14 @@ public String getUser() { return user; } + public Resource getAMResource() { + return amResource; + } + + public void setAMResource(Resource amResource) { + this.amResource = amResource; + } + public synchronized boolean isPending() { return pending; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index fc7e047..23a152d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -168,6 +168,14 @@ public String getQueueName() { return appSchedulingInfo.getQueueName(); } + public Resource getAMResource() { + return appSchedulingInfo.getAMResource(); + } + + public void setAMResource(Resource amResource) { + appSchedulingInfo.setAMResource(amResource); + } + public synchronized RMContainer getRMContainer(ContainerId id) { return liveContainers.get(id); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 0f9d906..237cad2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -53,6 +53,10 @@ private final int userMaxAppsDefault; private final int queueMaxAppsDefault; + // Maximum resource share for each leaf queue that can be used to run AMs + final Map queueMaxAMShares; + private final float queueMaxAMShareDefault; + // ACL's for each queue. Only specifies non-default ACL's from configuration. private final Map> queueAcls; @@ -84,8 +88,9 @@ public AllocationConfiguration(Map minQueueResources, Map maxQueueResources, Map queueMaxApps, Map userMaxApps, - Map queueWeights, int userMaxAppsDefault, - int queueMaxAppsDefault, + Map queueWeights, + Map queueMaxAMShares, int userMaxAppsDefault, + int queueMaxAppsDefault, float queueMaxAMShareDefault, Map schedulingPolicies, SchedulingPolicy defaultSchedulingPolicy, Map minSharePreemptionTimeouts, @@ -97,9 +102,11 @@ public AllocationConfiguration(Map minQueueResources, this.maxQueueResources = maxQueueResources; this.queueMaxApps = queueMaxApps; this.userMaxApps = userMaxApps; + this.queueMaxAMShares = queueMaxAMShares; this.queueWeights = queueWeights; this.userMaxAppsDefault = userMaxAppsDefault; this.queueMaxAppsDefault = queueMaxAppsDefault; + this.queueMaxAMShareDefault = queueMaxAMShareDefault; this.defaultSchedulingPolicy = defaultSchedulingPolicy; this.schedulingPolicies = schedulingPolicies; this.minSharePreemptionTimeouts = minSharePreemptionTimeouts; @@ -116,8 +123,10 @@ public AllocationConfiguration(Configuration conf) { queueWeights = new HashMap(); queueMaxApps = new HashMap(); userMaxApps = new HashMap(); + queueMaxAMShares = new HashMap(); userMaxAppsDefault = Integer.MAX_VALUE; queueMaxAppsDefault = Integer.MAX_VALUE; + queueMaxAMShareDefault = 1.0f; queueAcls = new HashMap>(); minSharePreemptionTimeouts = new HashMap(); defaultMinSharePreemptionTimeout = Long.MAX_VALUE; @@ -184,6 +193,11 @@ public int getQueueMaxApps(String queue) { return (maxApps == null) ? queueMaxAppsDefault : maxApps; } + public float getQueueMaxAMShare(String queue) { + Float maxAMShare = queueMaxAMShares.get(queue); + return (maxAMShare == null) ? queueMaxAMShareDefault : maxAMShare; + } + /** * Get the minimum resource allocation for the given queue. * @return the cap set on this queue, or 0 if not set. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 6c35630..3a962a8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -209,6 +209,7 @@ public synchronized void reloadAllocations() throws IOException, Map maxQueueResources = new HashMap(); Map queueMaxApps = new HashMap(); Map userMaxApps = new HashMap(); + Map queueMaxAMShares = new HashMap(); Map queueWeights = new HashMap(); Map queuePolicies = new HashMap(); Map minSharePreemptionTimeouts = new HashMap(); @@ -216,6 +217,7 @@ public synchronized void reloadAllocations() throws IOException, new HashMap>(); int userMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE; + float queueMaxAMShareDefault = 1.0f; long fairSharePreemptionTimeout = Long.MAX_VALUE; long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY; @@ -282,6 +284,11 @@ public synchronized void reloadAllocations() throws IOException, String text = ((Text)element.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); queueMaxAppsDefault = val; + } else if ("queueMaxAMShareDefault".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.min(val, 1.0f); + queueMaxAMShareDefault = val; } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName()) || "defaultQueueSchedulingMode".equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); @@ -306,8 +313,8 @@ public synchronized void reloadAllocations() throws IOException, parent = null; } loadQueue(parent, element, minQueueResources, maxQueueResources, - queueMaxApps, userMaxApps, queueWeights, queuePolicies, - minSharePreemptionTimeouts, queueAcls, + queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, + queuePolicies, minSharePreemptionTimeouts, queueAcls, configuredQueues); } @@ -322,8 +329,8 @@ public synchronized void reloadAllocations() throws IOException, } AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources, - queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault, - queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, + queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault, + queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout, newPlacementPolicy, configuredQueues); @@ -338,7 +345,8 @@ public synchronized void reloadAllocations() throws IOException, */ private void loadQueue(String parentName, Element element, Map minQueueResources, Map maxQueueResources, Map queueMaxApps, - Map userMaxApps, Map queueWeights, + Map userMaxApps, Map queueMaxAMShares, + Map queueWeights, Map queuePolicies, Map minSharePreemptionTimeouts, Map> queueAcls, @@ -370,6 +378,11 @@ private void loadQueue(String parentName, Element element, Map String text = ((Text)field.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); queueMaxApps.put(queueName, val); + } else if ("maxAMShare".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.min(val, 1.0f); + queueMaxAMShares.put(queueName, val); } else if ("weight".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); double val = Double.parseDouble(text); @@ -392,8 +405,9 @@ private void loadQueue(String parentName, Element element, Map } else if ("queue".endsWith(field.getTagName()) || "pool".equals(field.getTagName())) { loadQueue(queueName, field, minQueueResources, maxQueueResources, - queueMaxApps, userMaxApps, queueWeights, queuePolicies, - minSharePreemptionTimeouts, queueAcls, configuredQueues); + queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, + queuePolicies, minSharePreemptionTimeouts, queueAcls, + configuredQueues); configuredQueues.get(FSQueueType.PARENT).add(queueName); isLeaf = false; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index 4dc0bf4..8f26b33 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -267,6 +268,11 @@ private Resource assignContainer(FSSchedulerNode node, node.allocateContainer(app.getApplicationId(), allocatedContainer); + // If this container is used to run AM, update the leaf queue's AM usage + if (request.getPriority().equals(RMAppAttemptImpl.AM_CONTAINER_PRIORITY)) { + queue.addAMResourceUsage(container.getResource()); + } + return container.getResource(); } else { // The desired container won't fit here, so reserve @@ -297,6 +303,14 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { app.addSchedulingOpportunity(priority); + // Check the AM resource usage for the leaf queue + if (priority.equals(RMAppAttemptImpl.AM_CONTAINER_PRIORITY) + && app.getAMResource() != null) { + if (! queue.canRunAppAM(app.getAMResource())) { + return Resources.none(); + } + } + ResourceRequest rackLocalRequest = app.getResourceRequest(priority, node.getRackName()); ResourceRequest localRequest = app.getResourceRequest(priority, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index fe738da..cecfbfc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -55,6 +55,9 @@ private long lastTimeAtMinShare; private long lastTimeAtHalfFairShare; + // Track the AM resource usage for this queue + private Resource amResourceUsage; + private final ActiveUsersManager activeUsersManager; public FSLeafQueue(String name, FairScheduler scheduler, @@ -63,6 +66,7 @@ public FSLeafQueue(String name, FairScheduler scheduler, this.lastTimeAtMinShare = scheduler.getClock().getTime(); this.lastTimeAtHalfFairShare = scheduler.getClock().getTime(); activeUsersManager = new ActiveUsersManager(getMetrics()); + amResourceUsage = Resource.newInstance(0, 0); } public void addApp(FSSchedulerApp app, boolean runnable) { @@ -86,6 +90,10 @@ void addAppSchedulable(AppSchedulable appSched) { */ public boolean removeApp(FSSchedulerApp app) { if (runnableAppScheds.remove(app.getAppSchedulable())) { + // Update AM resource usage + if (app.getAMResource() != null) { + Resources.subtractFrom(amResourceUsage, app.getAMResource()); + } return true; } else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) { return false; @@ -284,4 +292,26 @@ public int getNumRunnableApps() { public ActiveUsersManager getActiveUsersManager() { return activeUsersManager; } + + /** + * Check whether this queue can run this application master under the + * maxAMShare limit + * + * @param amResource + * @return true if this queue can run + */ + public boolean canRunAppAM(Resource amResource) { + float maxAMShare = + scheduler.getAllocationConfiguration().getQueueMaxAMShare(getName()); + Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare); + Resource ifRunAMResource = Resources.add(amResourceUsage, amResource); + return !policy + .checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource); + } + + public void addAMResourceUsage(Resource amResource) { + if (amResource != null) { + Resources.addTo(amResourceUsage, amResource); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 1b7011a..4fbadda 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -832,6 +833,13 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, return EMPTY_ALLOCATION; } + // Set the AM resource request for the given application + if (ask.size() == 1 + && ask.iterator().next().getPriority() + .equals(RMAppAttemptImpl.AM_CONTAINER_PRIORITY)) { + application.setAMResource(ask.iterator().next().getCapability()); + } + // Sanity check SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(), clusterResource, minimumAllocation, maximumAllocation, incrAllocation); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index 1d77a43..1087c73 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -149,4 +149,15 @@ public abstract void computeShares( */ public abstract boolean checkIfUsageOverFairShare( Resource usage, Resource fairShare); + + /** + * Check if a leaf queue's AM resource usage over its limit under this policy + * + * @param usage {@link Resource} the resource used by application masters + * @param maxAMResource {@link Resource} the maximum allowed resource for + * application masters + * @return true if AM resource usage is over the limit + */ + public abstract boolean checkIfAMResourceUsageOverLimit( + Resource usage, Resource maxAMResource); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index 4b663d9..af674b9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -75,6 +75,11 @@ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { } @Override + public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) { + return !Resources.fitsIn(usage, maxAMResource); + } + + @Override public void initialize(Resource clusterCapacity) { comparator.setClusterCapacity(clusterCapacity); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index ca7297f..8f5f442 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -125,6 +125,11 @@ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { } @Override + public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) { + return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, maxAMResource); + } + + @Override public byte getApplicableDepth() { return SchedulingPolicy.DEPTH_ANY; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index d996944..aafa058 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @@ -36,6 +37,8 @@ @VisibleForTesting public static final String NAME = "FIFO"; private FifoComparator comparator = new FifoComparator(); + private static final DefaultResourceCalculator RESOURCE_CALCULATOR = + new DefaultResourceCalculator(); @Override public String getName() { @@ -95,6 +98,11 @@ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { } @Override + public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) { + return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, maxAMResource); + } + + @Override public byte getApplicableDepth() { return SchedulingPolicy.DEPTH_LEAF; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 2a725d8..2a4992c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -174,9 +174,10 @@ public void testAllocationFileParsing() throws Exception { out.println(""); out.println("alice,bob admins"); out.println(""); - // Give queue D a limit of 3 running apps + // Give queue D a limit of 3 running apps and 0.4f maxAMShare out.println(""); out.println("3"); + out.println("0.4"); out.println(""); // Give queue E a preemption timeout of one minute out.println(""); @@ -194,6 +195,8 @@ public void testAllocationFileParsing() throws Exception { out.println("15"); // Set default limit of apps per user to 5 out.println("5"); + // Set default limit of AMResourceShare to 0.5f + out.println("0.5f"); // Give user1 a limit of 10 jobs out.println(""); out.println("10"); @@ -240,6 +243,13 @@ public void testAllocationFileParsing() throws Exception { assertEquals(10, queueConf.getUserMaxApps("user1")); assertEquals(5, queueConf.getUserMaxApps("user2")); + assertEquals(.5f, queueConf.getQueueMaxAMShare("root." + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01); + assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueA"), 0.01); + assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueB"), 0.01); + assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueC"), 0.01); + assertEquals(.4f, queueConf.getQueueMaxAMShare("root.queueD"), 0.01); + assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueE"), 0.01); + // Root should get * ACL assertEquals("*", queueConf.getQueueAcl("root", QueueACL.ADMINISTER_QUEUE).getAclString()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 4fd51a7..ff2c6bf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -2330,6 +2331,114 @@ public void testUserAndQueueMaxRunningApps() throws Exception { } @Test + public void testQueueMaxAMShare() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0.2"); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + RMNode node = + MockNodes.newNodeInfo(1, Resources.createResource(20480, 20), + 0, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + scheduler.handle(nodeEvent); + scheduler.update(); + + assertEquals("Queue queue1's fair share should be 10240", + 10240, scheduler.getQueueManager().getLeafQueue("queue1", true) + .getFairShare().getMemory()); + + // exceeds no limits + ApplicationAttemptId attId1 = createSchedulingRequest(1024, 1, "queue1", + "user1", 1, RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority()); + FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application1's AM requests 1024 MB memory", + 1024, app1.getAMResource().getMemory()); + assertEquals("Application1's AM should be running", + 1, app1.getLiveContainers().size()); + + // exceeds no limits + ApplicationAttemptId attId2 = createSchedulingRequest(1024, 1, "queue1", + "user1", 1, RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority()); + FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application2's AM requests 1024 MB memory", + 1024, app2.getAMResource().getMemory()); + assertEquals("Application2's AM should be running", + 1, app2.getLiveContainers().size()); + + // exceeds queue limit + ApplicationAttemptId attId3 = createSchedulingRequest(1024, 1, "queue1", + "user1", 1, RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority()); + FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application3's AM requests 1024 MB memory", + 1024, app3.getAMResource().getMemory()); + assertEquals("Application3's AM should not be running", + 0, app3.getLiveContainers().size()); + + // Still can run non-AM container + createSchedulingRequestExistingApplication(1024, 1, attId1); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application1 should have two running containers", + 2, app1.getLiveContainers().size()); + + // Remove app1, app3's AM should become running + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = + new AppAttemptRemovedSchedulerEvent(attId1, RMAppAttemptState.FINISHED, false); + scheduler.update(); + scheduler.handle(appRemovedEvent1); + scheduler.handle(updateEvent); + assertEquals("Application1's AM should be finished", + 0, app1.getLiveContainers().size()); + assertEquals("Application3's AM should be running", + 1, app3.getLiveContainers().size()); + + // exceeds queue limit + ApplicationAttemptId attId4 = createSchedulingRequest(2048, 2, "queue1", + "user1", 1, RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority()); + FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application4's AM requests 2048 MB memory", + 2048, app4.getAMResource().getMemory()); + assertEquals("Application4's AM should not be running", + 0, app4.getLiveContainers().size()); + + // Remove app2 and app3, app4's AM should become running + AppAttemptRemovedSchedulerEvent appRemovedEvent2 = + new AppAttemptRemovedSchedulerEvent(attId2, RMAppAttemptState.FINISHED, false); + AppAttemptRemovedSchedulerEvent appRemovedEvent3 = + new AppAttemptRemovedSchedulerEvent(attId3, RMAppAttemptState.FINISHED, false); + scheduler.handle(appRemovedEvent2); + scheduler.handle(appRemovedEvent3); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application2's AM should be finished", + 0, app2.getLiveContainers().size()); + assertEquals("Application3's AM should be finished", + 0, app3.getLiveContainers().size()); + assertEquals("Application4's AM should be running", + 1, app4.getLiveContainers().size()); + } + + @Test public void testMaxRunningAppsHierarchicalQueues() throws Exception { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); MockClock clock = new MockClock(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm index 54daf2d..07a3edd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm @@ -237,6 +237,12 @@ Allocation file format * maxRunningApps: limit the number of apps from the queue to run at once + * maxAMShare: limit the resource share from the leaf queue that can be used + to run application master. maxAMShare * queue_fair_share is the maximum + resources that can be used by AMs in that leaf queue. Default value is + 1.0f, which means AMs in that leaf queue can take up to 100% resource of + fair share. + * weight: to share the cluster non-proportionally with other queues. Weights default to 1, and a queue with weight 2 should receive approximately twice as many resources as a queue with the default weight. @@ -279,6 +285,9 @@ Allocation file format * <>, which sets the default running app limit for queues; overriden by maxRunningApps element in each queue. + * <>, which sets the default AM resource + limit for queue; overriden by maxAMShare element in each queue. + * <>, which sets the default scheduling policy for queues; overriden by the schedulingPolicy element in each queue if specified. Defaults to "fair". @@ -328,6 +337,7 @@ Allocation file format 10000 mb,0vcores 90000 mb,0vcores 50 + 0.1 2.0 fair @@ -336,6 +346,8 @@ Allocation file format + 0.5 +