diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index f0d1ed1..c47adc7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -182,18 +182,21 @@ public Resource getHeadroom() { Resource clusterResource = this.scheduler.getClusterResource(); Resource clusterUsage = this.scheduler.getRootQueueMetrics() .getAllocatedResources(); - Resource clusterAvailableResource = Resources.subtract(clusterResource, - clusterUsage); + + // Max available resources for this queue correspond to the minimum + // of resources available on the cluster and queue's maxResources + Resource maxAvailableResource = + Resources.subtract(clusterResource, clusterUsage); + maxAvailableResource = + Resources.componentwiseMin(maxAvailableResource, queue.getMaxShare()); Resource headroom = policy.getHeadroom(queueFairShare, - queueUsage, clusterAvailableResource); + queueUsage, maxAvailableResource); if (LOG.isDebugEnabled()) { LOG.debug("Headroom calculation for " + this.getName() + ":" + "Min(" + "(queueFairShare=" + queueFairShare + " - queueUsage=" + queueUsage + ")," + - " clusterAvailableResource=" + clusterAvailableResource + - "(clusterResource=" + clusterResource + - " - clusterUsage=" + clusterUsage + ")" + + " maxAvailableResource=" + maxAvailableResource + "Headroom=" + headroom); } return headroom; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index bf2a25b..abdc834 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -185,10 +185,10 @@ public abstract boolean checkIfAMResourceUsageOverLimit( * * @param queueFairShare fairshare in the queue * @param queueUsage resources used in the queue - * @param clusterAvailable available resource in cluster + * @param maxAvailable available resource in cluster for this queue * @return calculated headroom */ public abstract Resource getHeadroom(Resource queueFairShare, - Resource queueUsage, Resource clusterAvailable); + Resource queueUsage, Resource maxAvailable); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index 3f6cbd1..86d503b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -90,15 +90,15 @@ public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMRes @Override public Resource getHeadroom(Resource queueFairShare, Resource queueUsage, - Resource clusterAvailable) { + Resource maxAvailable) { int queueAvailableMemory = Math.max(queueFairShare.getMemory() - queueUsage.getMemory(), 0); int queueAvailableCPU = Math.max(queueFairShare.getVirtualCores() - queueUsage .getVirtualCores(), 0); Resource headroom = Resources.createResource( - Math.min(clusterAvailable.getMemory(), queueAvailableMemory), - Math.min(clusterAvailable.getVirtualCores(), + Math.min(maxAvailable.getMemory(), queueAvailableMemory), + Math.min(maxAvailable.getVirtualCores(), queueAvailableCPU)); return headroom; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index 97669cb..918db9d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -116,12 +116,12 @@ else if (s1Needy && s2Needy) @Override public Resource getHeadroom(Resource queueFairShare, - Resource queueUsage, Resource clusterAvailable) { + Resource queueUsage, Resource maxAvailable) { int queueAvailableMemory = Math.max( queueFairShare.getMemory() - queueUsage.getMemory(), 0); Resource headroom = Resources.createResource( - Math.min(clusterAvailable.getMemory(), queueAvailableMemory), - clusterAvailable.getVirtualCores()); + Math.min(maxAvailable.getMemory(), queueAvailableMemory), + maxAvailable.getVirtualCores()); return headroom; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index a2e17ec..7d88933 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -109,12 +109,12 @@ public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMRes @Override public Resource getHeadroom(Resource queueFairShare, - Resource queueUsage, Resource clusterAvailable) { + Resource queueUsage, Resource maxAvailable) { int queueAvailableMemory = Math.max( queueFairShare.getMemory() - queueUsage.getMemory(), 0); Resource headroom = Resources.createResource( - Math.min(clusterAvailable.getMemory(), queueAvailableMemory), - clusterAvailable.getVirtualCores()); + Math.min(maxAvailable.getMemory(), queueAvailableMemory), + maxAvailable.getVirtualCores()); return headroom; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java index f560690..9f46475 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java @@ -198,18 +198,21 @@ public void testHeadroom() { Mockito.when(mockScheduler.getClock()).thenReturn(scheduler.getClock()); final FSLeafQueue mockQueue = Mockito.mock(FSLeafQueue.class); + final Resource queueMaxResources = Resource.newInstance(5 * 1024, 5); + final Resource queueFairShare = Resources.createResource(4096, 4); - final Resource queueUsage = Resource.newInstance(1024, 1); + final Resource queueUsage = Resource.newInstance(2048, 2); + final Resource queueStarvation = + Resources.subtract(queueFairShare, queueUsage); + final Resource clusterResource = Resources.createResource(8192, 8); - final Resource clusterUsage = Resources.createResource(6144, 2); - final QueueMetrics fakeRootQueueMetrics = Mockito.mock(QueueMetrics.class); + final Resource clusterUsage = Resources.createResource(2048, 2); + final Resource clusterAvailable = + Resources.subtract(clusterResource, clusterUsage); - ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); - RMContext rmContext = resourceManager.getRMContext(); - FSAppAttempt schedulerApp = - new FSAppAttempt(mockScheduler, applicationAttemptId, "user1", mockQueue , - null, rmContext); + final QueueMetrics fakeRootQueueMetrics = Mockito.mock(QueueMetrics.class); + Mockito.when(mockQueue.getMaxShare()).thenReturn(queueMaxResources); Mockito.when(mockQueue.getFairShare()).thenReturn(queueFairShare); Mockito.when(mockQueue.getResourceUsage()).thenReturn(queueUsage); Mockito.when(mockScheduler.getClusterResource()).thenReturn @@ -219,27 +222,51 @@ public void testHeadroom() { Mockito.when(mockScheduler.getRootQueueMetrics()).thenReturn (fakeRootQueueMetrics); - int minClusterAvailableMemory = 2048; - int minClusterAvailableCPU = 6; - int minQueueAvailableCPU = 3; + ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); + RMContext rmContext = resourceManager.getRMContext(); + FSAppAttempt schedulerApp = + new FSAppAttempt(mockScheduler, applicationAttemptId, "user1", mockQueue , + null, rmContext); // Min of Memory and CPU across cluster and queue is used in // DominantResourceFairnessPolicy Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy .getInstance(DominantResourceFairnessPolicy.class)); - verifyHeadroom(schedulerApp, minClusterAvailableMemory, - minQueueAvailableCPU); + verifyHeadroom(schedulerApp, + min(queueStarvation.getMemory(), + clusterAvailable.getMemory(), + queueMaxResources.getMemory()), + min(queueStarvation.getVirtualCores(), + clusterAvailable.getVirtualCores(), + queueMaxResources.getVirtualCores()) + ); // Fair and Fifo ignore CPU of queue, so use cluster available CPU Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy .getInstance(FairSharePolicy.class)); - verifyHeadroom(schedulerApp, minClusterAvailableMemory, - minClusterAvailableCPU); + verifyHeadroom(schedulerApp, + min(queueStarvation.getMemory(), + clusterAvailable.getMemory(), + queueMaxResources.getMemory()), + Math.min( + clusterAvailable.getVirtualCores(), + queueMaxResources.getVirtualCores()) + ); Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy .getInstance(FifoPolicy.class)); - verifyHeadroom(schedulerApp, minClusterAvailableMemory, - minClusterAvailableCPU); + verifyHeadroom(schedulerApp, + min(queueStarvation.getMemory(), + clusterAvailable.getMemory(), + queueMaxResources.getMemory()), + Math.min( + clusterAvailable.getVirtualCores(), + queueMaxResources.getVirtualCores()) + ); + } + + private static int min(int value1, int value2, int value3) { + return Math.min( Math.min(value1, value2), value3); } protected void verifyHeadroom(FSAppAttempt schedulerApp,