diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 825c398..b9966e7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -171,6 +171,33 @@ private synchronized void unreserveInternal( + priority + "; currentReservation " + currentReservation); } + @Override + public synchronized Resource getHeadroom() { + final FSQueue queue = (FSQueue) this.queue; + SchedulingPolicy policy = queue.getPolicy(); + + Resource queueFairShare = queue.getFairShare(); + Resource queueUsage = queue.getResourceUsage(); + Resource clusterResource = this.scheduler.getClusterResource(); + Resource clusterUsage = this.scheduler.getRootQueueMetrics() + .getAllocatedResources(); + Resource clusterAvailableResource = Resources.subtract(clusterResource, + clusterUsage); + Resource headroom = policy.getHeadroom(queueFairShare, + queueUsage, clusterAvailableResource); + if (LOG.isDebugEnabled()) { + LOG.debug("Headroom calculation for " + this.getName() + ":" + + "Min(" + + "(queueFairShare=" + queueFairShare + + " - queueUsage=" + queueUsage + ")," + + " clusterAvailableResource=" + clusterAvailableResource + + "(clusterResource=" + clusterResource + + " - clusterUsage=" + clusterUsage + ")" + + "Headroom=" + headroom); + } + return headroom; + } + public synchronized float getLocalityWaitFactor( Priority priority, int clusterNodes) { // Estimate: Required unique resources (i.e. hosts + racks) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index ca006c5..4f3123d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -175,4 +175,19 @@ public abstract boolean checkIfUsageOverFairShare( */ public abstract boolean checkIfAMResourceUsageOverLimit( Resource usage, Resource maxAMResource); + + /** + * Get headroom by calculating the min of clusterAvailable and + * (queueFairShare - queueUsage) resources that are + * applicable to this policy. For eg if only memory then leave other + * resources such as CPU to same as clusterAvailable. + * + * @param queueFairShare fairshare in the queue + * @param queueUsage resources used in the queue + * @param clusterAvailable available resource in cluster + * @return calculated headroom + */ + public abstract Resource getHeadroom(Resource queueFairShare, + Resource queueUsage, Resource clusterAvailable); + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index 42044bc..3f6cbd1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -77,7 +77,7 @@ public void computeSteadyShares(Collection queues, ComputeFairShares.computeSteadyShares(queues, totalResources, type); } } - + @Override public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { return !Resources.fitsIn(usage, fairShare); @@ -89,6 +89,21 @@ public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMRes } @Override + public Resource getHeadroom(Resource queueFairShare, Resource queueUsage, + Resource clusterAvailable) { + int queueAvailableMemory = + Math.max(queueFairShare.getMemory() - queueUsage.getMemory(), 0); + int queueAvailableCPU = + Math.max(queueFairShare.getVirtualCores() - queueUsage + .getVirtualCores(), 0); + Resource headroom = Resources.createResource( + Math.min(clusterAvailable.getMemory(), queueAvailableMemory), + Math.min(clusterAvailable.getVirtualCores(), + queueAvailableCPU)); + return headroom; + } + + @Override public void initialize(Resource clusterCapacity) { comparator.setClusterCapacity(clusterCapacity); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index 66bb88b..97669cb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -115,6 +115,17 @@ else if (s1Needy && s2Needy) } @Override + public Resource getHeadroom(Resource queueFairShare, + Resource queueUsage, Resource clusterAvailable) { + int queueAvailableMemory = Math.max( + queueFairShare.getMemory() - queueUsage.getMemory(), 0); + Resource headroom = Resources.createResource( + Math.min(clusterAvailable.getMemory(), queueAvailableMemory), + clusterAvailable.getVirtualCores()); + return headroom; + } + + @Override public void computeShares(Collection schedulables, Resource totalResources) { ComputeFairShares.computeShares(schedulables, totalResources, ResourceType.MEMORY); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index 591ee49..a2e17ec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -108,6 +108,18 @@ public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMRes } @Override + public Resource getHeadroom(Resource queueFairShare, + Resource queueUsage, Resource clusterAvailable) { + int queueAvailableMemory = Math.max( + queueFairShare.getMemory() - queueUsage.getMemory(), 0); + Resource headroom = Resources.createResource( + Math.min(clusterAvailable.getMemory(), queueAvailableMemory), + clusterAvailable.getVirtualCores()); + return headroom; + } + + + @Override public byte getApplicableDepth() { return SchedulingPolicy.DEPTH_LEAF; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java index 0ab1f70..f560690 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import static org.junit.Assert.assertEquals; @@ -26,7 +27,12 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -185,4 +191,61 @@ public void testLocalityLevelWithoutDelays() { assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel( prio, 10, -1.0, -1.0)); } + + @Test + public void testHeadroom() { + final FairScheduler mockScheduler = Mockito.mock(FairScheduler.class); + Mockito.when(mockScheduler.getClock()).thenReturn(scheduler.getClock()); + + final FSLeafQueue mockQueue = Mockito.mock(FSLeafQueue.class); + final Resource queueFairShare = Resources.createResource(4096, 4); + final Resource queueUsage = Resource.newInstance(1024, 1); + final Resource clusterResource = Resources.createResource(8192, 8); + final Resource clusterUsage = Resources.createResource(6144, 2); + final QueueMetrics fakeRootQueueMetrics = Mockito.mock(QueueMetrics.class); + + ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); + RMContext rmContext = resourceManager.getRMContext(); + FSAppAttempt schedulerApp = + new FSAppAttempt(mockScheduler, applicationAttemptId, "user1", mockQueue , + null, rmContext); + + Mockito.when(mockQueue.getFairShare()).thenReturn(queueFairShare); + Mockito.when(mockQueue.getResourceUsage()).thenReturn(queueUsage); + Mockito.when(mockScheduler.getClusterResource()).thenReturn + (clusterResource); + Mockito.when(fakeRootQueueMetrics.getAllocatedResources()).thenReturn + (clusterUsage); + Mockito.when(mockScheduler.getRootQueueMetrics()).thenReturn + (fakeRootQueueMetrics); + + int minClusterAvailableMemory = 2048; + int minClusterAvailableCPU = 6; + int minQueueAvailableCPU = 3; + + // Min of Memory and CPU across cluster and queue is used in + // DominantResourceFairnessPolicy + Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy + .getInstance(DominantResourceFairnessPolicy.class)); + verifyHeadroom(schedulerApp, minClusterAvailableMemory, + minQueueAvailableCPU); + + // Fair and Fifo ignore CPU of queue, so use cluster available CPU + Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy + .getInstance(FairSharePolicy.class)); + verifyHeadroom(schedulerApp, minClusterAvailableMemory, + minClusterAvailableCPU); + + Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy + .getInstance(FifoPolicy.class)); + verifyHeadroom(schedulerApp, minClusterAvailableMemory, + minClusterAvailableCPU); + } + + protected void verifyHeadroom(FSAppAttempt schedulerApp, + int expectedMemory, int expectedCPU) { + Resource headroom = schedulerApp.getHeadroom(); + assertEquals(expectedMemory, headroom.getMemory()); + assertEquals(expectedCPU, headroom.getVirtualCores()); + } }