diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java index 1a3ebca..45406d7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java @@ -54,8 +54,8 @@ public Resource getAvailableResources() { memoryOverAllocationThresholdBytes - memoryUtilizationBytes; int vcoreAvailable = Math.round( - (overAllocationThresholds.getCpuThreshold() - utilization.getCPU()) * - containersMonitor.getVCoresAllocatedForContainers()); + containersMonitor.getVCoresAllocatedForContainers() * + overAllocationThresholds.getCpuThreshold() - utilization.getCPU()); return (memoryAvailable <= 0 || vcoreAvailable <= 0) ? Resources.none() : Resource.newInstance(memoryAvailable >> 20, vcoreAvailable); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java index e4665bb..f8a8a24 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java @@ -62,7 +62,9 @@ public ResourceUtilization getResourcesToReclaim() { int memoryOverLimit = utilization.getPhysicalMemory() - absoluteMemoryPreemptionThresholdMb; - float vcoreOverLimit = utilization.getCPU() - cpuPreemptionThreshold; + float vcoreOverLimit = utilization.getCPU() - + getContainersMonitor().getVCoresAllocatedForContainers() * + cpuPreemptionThreshold; if (vcoreOverLimit > 0) { timesCpuOverPreemption++; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java index a2d4aa8..7dcc15e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java @@ -179,7 +179,7 @@ public void testStartOppContainersWithPartialOverallocationLowUtilization() // the current containers utilization is low setContainerResourceUtilization( - ResourceUtilization.newInstance(512, 0, 1.0f/8)); + ResourceUtilization.newInstance(512, 0, 0.5f)); // start a container that requests more than what's left unallocated // 512 + 1024 + 824 > 2048 @@ -251,9 +251,9 @@ public void testQueueOppContainerWithPartialOverallocationHighUtilization() BaseContainerManagerTest.waitForContainerSubState( containerManager, createContainerId(1), ContainerSubState.RUNNING); - // the containers utilization is high + // the containers memory utilization is high setContainerResourceUtilization( - ResourceUtilization.newInstance(1500, 0, 1.0f/8)); + ResourceUtilization.newInstance(1500, 0, 0.5f)); // start a container that requests more than what's left unallocated // 512 + 1024 + 824 > 2048 @@ -315,7 +315,7 @@ public void testStartOppContainersWithOverallocationLowUtilization() // the current containers utilization is low setContainerResourceUtilization( - ResourceUtilization.newInstance(800, 0, 1.0f/8)); + ResourceUtilization.newInstance(800, 0, 0.5f)); // start a container when there is no resources left unallocated. containerManager.startContainers(StartContainersRequest.newInstance( @@ -385,7 +385,7 @@ public void testQueueOppContainersWithFullUtilization() throws Exception { // the containers are fully utilizing their resources setContainerResourceUtilization( - ResourceUtilization.newInstance(2048, 0, 1.0f/8)); + ResourceUtilization.newInstance(2048, 0, 0.5f)); // start more OPPORTUNISTIC containers than what the OPPORTUNISTIC container // queue can hold when there is no unallocated resource left. @@ -453,7 +453,7 @@ public void testStartOppContainerWithHighUtilizationNoOverallocation() // containers utilization is above the over-allocation threshold setContainerResourceUtilization( - ResourceUtilization.newInstance(1600, 0, 1.0f/2)); + ResourceUtilization.newInstance(1600, 0, 2.0f)); // start a container that can just fit in the remaining unallocated space containerManager.startContainers(StartContainersRequest.newInstance( @@ -508,7 +508,7 @@ public void testKillNoOppContainersWithPartialOverallocationLowUtilization() // containers utilization is low setContainerResourceUtilization( - ResourceUtilization.newInstance(512, 0, 1.0f/8)); + ResourceUtilization.newInstance(512, 0, 0.5f)); // start a GUARANTEED container that requests more than what's left // unallocated on the node: (512 + 1024 + 824) > 2048 @@ -564,7 +564,7 @@ public void testKillOppContainersWithPartialOverallocationHighUtilization() // the containers utilization is very high setContainerResourceUtilization( - ResourceUtilization.newInstance(1800, 0, 1.0f/8)); + ResourceUtilization.newInstance(1800, 0, 0.5f)); // start a GUARANTEED container that requests more than what's left // unallocated on the node 512 + 1024 + 824 > 2048 @@ -656,7 +656,7 @@ public void testKillNoOppContainersWithOverallocationLowUtilization() // the containers utilization is low setContainerResourceUtilization( - ResourceUtilization.newInstance(1024, 0, 1.0f/8)); + ResourceUtilization.newInstance(1024, 0, 0.5f)); // start a GUARANTEED container that requests more than what's left // unallocated on the node: (512 + 1024 + 824) > 2048 @@ -717,9 +717,9 @@ public void testStartOppContainersUponContainerCompletion() throws Exception { BaseContainerManagerTest.waitForContainerSubState(containerManager, createContainerId(2), ContainerSubState.RUNNING); - // the container utilization is at the overallocation threshold + // the container memory utilization is at the overallocation threshold setContainerResourceUtilization( - ResourceUtilization.newInstance(1536, 0, 1.0f/2)); + ResourceUtilization.newInstance(1536, 0, 2.0f)); containerManager.startContainers(StartContainersRequest.newInstance( new ArrayList() { @@ -741,7 +741,7 @@ public void testStartOppContainersUponContainerCompletion() throws Exception { // the GUARANTEED container is completed releasing resources setContainerResourceUtilization( - ResourceUtilization.newInstance(100, 0, 1.0f/5)); + ResourceUtilization.newInstance(100, 0, 0.8f)); allowContainerToSucceed(2); BaseContainerManagerTest.waitForContainerSubState(containerManager, createContainerId(2), ContainerSubState.DONE); @@ -802,7 +802,7 @@ public void testStartOpportunisticContainersOutOfBand() throws Exception { // the container is fully utilizing its resources setContainerResourceUtilization( - ResourceUtilization.newInstance(2048, 0, 1.0f)); + ResourceUtilization.newInstance(2048, 0, 4.0f)); // send two OPPORTUNISTIC container requests that are expected to be queued containerManager.startContainers(StartContainersRequest.newInstance( @@ -822,7 +822,7 @@ public void testStartOpportunisticContainersOutOfBand() throws Exception { // the containers utilization dropped to the overallocation threshold setContainerResourceUtilization( - ResourceUtilization.newInstance(1536, 0, 1.0f/2)); + ResourceUtilization.newInstance(1536, 0, 2.0f)); // try to start opportunistic containers out of band. ((ContainerManagerForTest) containerManager) @@ -840,11 +840,11 @@ public void testStartOpportunisticContainersOutOfBand() throws Exception { // the GUARANTEED container is completed releasing resources setContainerResourceUtilization( - ResourceUtilization.newInstance(100, 0, 1.0f/5)); + ResourceUtilization.newInstance(100, 0, 0.8f)); // the containers utilization dropped way below the overallocation threshold setContainerResourceUtilization( - ResourceUtilization.newInstance(512, 0, 1.0f/8)); + ResourceUtilization.newInstance(512, 0, 0.5f)); ((ContainerManagerForTest) containerManager) .checkNodeResourceUtilization(); @@ -925,7 +925,7 @@ public void testPreemptOpportunisticContainersUponHighMemoryUtilization() // the containers memory utilization is over the preemption threshold // (2048 > 2048 * 0.8 = 1638.4) setContainerResourceUtilization( - ResourceUtilization.newInstance(2048, 0, 0.5f)); + ResourceUtilization.newInstance(2048, 0, 2.0f)); ((ContainerManagerForTest) containerManager) .checkNodeResourceUtilization(); @@ -990,7 +990,7 @@ public void testNoPreemptionUponHighMemoryUtilizationButNoOverallocation() // the node is being fully utilized, which is above the preemption // threshold (2048 * 0.75 = 1536 MB, 1.0f) setContainerResourceUtilization( - ResourceUtilization.newInstance(2048, 0, 1.0f)); + ResourceUtilization.newInstance(2048, 0, 4.0f)); ((ContainerManagerForTest) containerManager) .checkNodeResourceUtilization(); @@ -1064,10 +1064,10 @@ public void testPreemptionUponHighCPUUtilization() throws Exception { BaseContainerManagerTest.waitForContainerSubState(containerManager, createContainerId(3), ContainerSubState.RUNNING); - final float fullCpuUtilization = 1.0f; + final float fullCpuUtilization = 1.0f * 4; - // the containers CPU utilization is over its preemption threshold (0.8f) - // for the first time + // the containers CPU utilization is over its preemption threshold + // (0.8f * 4) for the first time setContainerResourceUtilization( ResourceUtilization.newInstance(2048, 0, fullCpuUtilization)); ((ContainerManagerForTest) containerManager) @@ -1084,8 +1084,8 @@ public void testPreemptionUponHighCPUUtilization() throws Exception { } }); - // the containers CPU utilization is over its preemption threshold (0.8f) - // for the second time + // the containers CPU utilization is over its preemption threshold + // (0.8f * 4) for the second time setContainerResourceUtilization( ResourceUtilization.newInstance(2048, 0, fullCpuUtilization)); ((ContainerManagerForTest) containerManager) @@ -1103,8 +1103,8 @@ public void testPreemptionUponHighCPUUtilization() throws Exception { } }); - // the containers CPU utilization is over the preemption threshold (0.8f) - // for the third time + // the containers CPU utilization is over the preemption threshold + // (0.8f * 4) for the third time setContainerResourceUtilization( ResourceUtilization.newInstance(2048, 0, fullCpuUtilization)); ((ContainerManagerForTest) containerManager) @@ -1127,8 +1127,8 @@ public void testPreemptionUponHighCPUUtilization() throws Exception { }); // again, the containers CPU utilization is over the preemption threshold - // (0.8f) for the first time (the cpu over-limit count is reset every time - // a preemption is triggered) + // (0.8f * 4) for the first time (the cpu over-limit count is reset every + // time a preemption is triggered) setContainerResourceUtilization( ResourceUtilization.newInstance(2048, 0, fullCpuUtilization)); ((ContainerManagerForTest) containerManager) @@ -1145,8 +1145,8 @@ public void testPreemptionUponHighCPUUtilization() throws Exception { } }); - // the containers CPU utilization is over the preemption threshold (0.9f) - // for the second time + // the containers CPU utilization is over the preemption threshold + // (0.8f * 4) for the second time setContainerResourceUtilization( ResourceUtilization.newInstance(2048, 0, fullCpuUtilization)); ((ContainerManagerForTest) containerManager) @@ -1172,8 +1172,8 @@ public void testPreemptionUponHighCPUUtilization() throws Exception { // because CPU utilization is over its preemption threshold three times // consecutively, the amount of cpu utilization over the preemption - // threshold, that is, 1.0 - 0.8 = 0.2f CPU needs to be reclaimed and - // as a result, the other OPPORTUNISTIC container should be killed + // threshold, that is, (1.0 - 0.8) * 4 = 0.8f CPU needs to be reclaimed + // and as a result, the other OPPORTUNISTIC container should be killed BaseContainerManagerTest.waitForContainerSubState(containerManager, createContainerId(1), ContainerSubState.DONE); verifyContainerStatuses(new HashMap() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestSnapshotBasedOverAllocationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestSnapshotBasedOverAllocationPolicy.java index 7900a61..afc05af 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestSnapshotBasedOverAllocationPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestSnapshotBasedOverAllocationPolicy.java @@ -60,6 +60,10 @@ private static int getMemoryMBs(double memoryUtilization) { return (int) (Math.round(memoryUtilization * MEMORY_CAPACITY_BYTE) >> 20); } + private static float getVcores(float cpuUtilization) { + return VCORE_CAPACITY * cpuUtilization; + } + @Test public void testNoVcoresAvailable() { SnapshotBasedOverAllocationPolicy overAllocationPolicy = @@ -69,7 +73,7 @@ public void testNoVcoresAvailable() { // the current cpu utilization is over the threshold, 0.8 when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( new ContainersMonitor.ContainersResourceUtilization( - ResourceUtilization.newInstance(0, 0, 0.9f), + ResourceUtilization.newInstance(0, 0, getVcores(0.9f)), System.currentTimeMillis())); Resource available = overAllocationPolicy.getAvailableResources(); Assert.assertEquals( @@ -105,7 +109,7 @@ public void testNoMemoryVcoreAvailable() { when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( new ContainersMonitor.ContainersResourceUtilization( ResourceUtilization.newInstance( - getMemoryMBs(0.9), 0, 0.9f), + getMemoryMBs(0.9), 0, getVcores(0.9f)), System.currentTimeMillis())); Resource available = overAllocationPolicy.getAvailableResources(); Assert.assertEquals( @@ -123,8 +127,8 @@ public void testResourcesAvailable() { int memoryUtilizationMBs = getMemoryMBs(0.6); when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( new ContainersMonitor.ContainersResourceUtilization( - ResourceUtilization.newInstance(memoryUtilizationMBs, 0, 0.6f), - System.currentTimeMillis())); + ResourceUtilization.newInstance(memoryUtilizationMBs, 0, + getVcores(0.6f)), System.currentTimeMillis())); Resource available = overAllocationPolicy.getAvailableResources(); Assert.assertEquals("Unexpected resources available for overallocation", Resource.newInstance( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestSnapshotBasedOverAllocationPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestSnapshotBasedOverAllocationPreemptionPolicy.java index bbc7c49..fc8a977 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestSnapshotBasedOverAllocationPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestSnapshotBasedOverAllocationPreemptionPolicy.java @@ -47,9 +47,11 @@ @Before public void setUp() { - // the node has an allocation of 2048 MB of memory + // the node has an allocation of 2048 MB of memory and 4 vcores when(containersMonitor.getPmemAllocatedForContainers()). thenReturn(2048 * 1024 * 1024L); + when(containersMonitor.getVCoresAllocatedForContainers()). + thenReturn(4L); } /** @@ -86,36 +88,36 @@ public void testCpuOverPreemptionThreshold() { new SnapshotBasedOverAllocationPreemptionPolicy(PREEMPTION_THRESHOLDS, MAX_CPU_OVER_PREEMPTION_THRESHOLDS, containersMonitor); - // the current CPU utilization, 1.0f, is over the preemption threshold, - // 0.75f, for the first time. The memory utilization, 1000 MB is below + // the current CPU utilization, 4.0f, is over the preemption threshold, + // 0.75f * 4, for the first time. The memory utilization, 1000 MB is below // the memory preemption threshold, 2048 * 0.75 = 1536 MB. when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( new ContainersMonitor.ContainersResourceUtilization( - ResourceUtilization.newInstance(1000, 0, 1.0f), + ResourceUtilization.newInstance(1000, 0, 4.0f), Time.now())); // no resources shall be reclaimed Assert.assertEquals( ResourceUtilization.newInstance(0, 0, 0.0f), preemptionPolicy.getResourcesToReclaim()); - // the current CPU utilization, 0.5f, is below the preemption threshold, - // 0.75f. In the meantime the memory utilization, 1000 MB is also below + // the current CPU utilization, 2.0f, is below the preemption threshold, + // 0.75f * 4. In the meantime the memory utilization, 1000 MB is also below // the memory preemption threshold, 2048 * 0.75 = 1536 MB. when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( new ContainersMonitor.ContainersResourceUtilization( - ResourceUtilization.newInstance(1000, 0, 0.5f), + ResourceUtilization.newInstance(1000, 0, 2.0f), Time.now())); // no resources shall be reclaimed Assert.assertEquals( ResourceUtilization.newInstance(0, 0, 0.0f), preemptionPolicy.getResourcesToReclaim()); - // the current CPU utilization, 1.0f, is over the preemption threshold, - // 0.75f. In the meantime the memory utilization, 1000 MB is below + // the current CPU utilization, 4.0f, is over the preemption threshold, + // 0.75f * 4. In the meantime the memory utilization, 1000 MB is below // the memory preemption threshold, 2048 * 0.75 = 1536 MB. when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( new ContainersMonitor.ContainersResourceUtilization( - ResourceUtilization.newInstance(1000, 0, 1.0f), + ResourceUtilization.newInstance(1000, 0, 4.0f), Time.now())); // no resources shall be reclaimed because the cpu utilization is allowed // to go over the preemption threshold at most two times in a row. It is @@ -124,12 +126,12 @@ public void testCpuOverPreemptionThreshold() { ResourceUtilization.newInstance(0, 0, 0.0f), preemptionPolicy.getResourcesToReclaim()); - // the current CPU utilization, 1.0f, is again over the preemption - // threshold, 0.75f. In the meantime the memory utilization, 1000 MB + // the current CPU utilization, 4.0f, is again over the preemption + // threshold, 0.75f * 4. In the meantime the memory utilization, 1000 MB // is below the memory preemption threshold, 2048 * 0.75 = 1536 MB. when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( new ContainersMonitor.ContainersResourceUtilization( - ResourceUtilization.newInstance(1000, 0, 1.0f), + ResourceUtilization.newInstance(1000, 0, 4.0f), Time.now())); // no resources shall be reclaimed because the cpu utilization is allowed // to go over the preemption threshold at most two times in a row. It is @@ -138,17 +140,17 @@ public void testCpuOverPreemptionThreshold() { ResourceUtilization.newInstance(0, 0, 0.0f), preemptionPolicy.getResourcesToReclaim()); - // the current CPU utilization, 1.0f, is over the preemption threshold, + // the current CPU utilization, 4.0f, is over the preemption threshold, // the third time in a row. In the meantime the memory utilization, 1000 MB // is below the memory preemption threshold, 2048 * 0.75 = 1536 MB. when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( new ContainersMonitor.ContainersResourceUtilization( - ResourceUtilization.newInstance(1000, 0, 1.0f), + ResourceUtilization.newInstance(1000, 0, 4.0f), Time.now())); // the amount of cpu utilization over the preemption threshold, that is, - // 1.0 - 0.75f = 0.25, shall be reclaimed. + // (1.0 - 0.75)*4 = 1.0f, shall be reclaimed. Assert.assertEquals( - ResourceUtilization.newInstance(0, 0, 0.25f), + ResourceUtilization.newInstance(0, 0, 1.0f), preemptionPolicy.getResourcesToReclaim()); } @@ -162,12 +164,12 @@ public void testMemoryCpuOverPreemptionThreshold() { new SnapshotBasedOverAllocationPreemptionPolicy(PREEMPTION_THRESHOLDS, MAX_CPU_OVER_PREEMPTION_THRESHOLDS, containersMonitor); - // the current CPU utilization, 1.0f, is over the preemption threshold, - // 0.75f, for the first time. The memory utilization, 1000 MB is below + // the current CPU utilization, 4.0f, is over the preemption threshold, + // 0.75f * 4, for the first time. The memory utilization, 1000 MB is below // the memory preemption threshold, 2048 * 0.75 = 1536 MB. when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( new ContainersMonitor.ContainersResourceUtilization( - ResourceUtilization.newInstance(1000, 0, 1.0f), + ResourceUtilization.newInstance(1000, 0, 4.0f), Time.now())); // no resources shall be reclaimed because the cpu utilization is allowed // to go over the preemption threshold at most two times in a row. It is @@ -176,12 +178,12 @@ public void testMemoryCpuOverPreemptionThreshold() { ResourceUtilization.newInstance(0, 0, 0.0f), preemptionPolicy.getResourcesToReclaim()); - // the current CPU utilization, 0.5f, is below the preemption threshold, - // 0.75f. The memory utilization, 2000 MB, however, is above the memory + // the current CPU utilization, 2.0f, is below the preemption threshold, + // 0.75f * 4. The memory utilization, 2000 MB, however, is above the memory // preemption threshold, 2048 * 0.75 = 1536 MB. when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( new ContainersMonitor.ContainersResourceUtilization( - ResourceUtilization.newInstance(2000, 0, 0.5f), + ResourceUtilization.newInstance(2000, 0, 2.0f), Time.now())); // the amount of memory utilization over the preemption threshold, that is, // 2000 - (2048 * 0.75) = 464 MB of memory, shall be reclaimed. @@ -189,12 +191,12 @@ public void testMemoryCpuOverPreemptionThreshold() { ResourceUtilization.newInstance(464, 0, 0.0f), preemptionPolicy.getResourcesToReclaim()); - // the current CPU utilization, 1.0f, is over the preemption threshold, - // 0.75f, for the first time. The memory utilization, 1000 MB is below + // the current CPU utilization, 4.0f, is over the preemption threshold, + // 0.75f * 4, for the first time. The memory utilization, 1000 MB is below // the memory preemption threshold, 2048 * 0.75 = 1536 MB. when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( new ContainersMonitor.ContainersResourceUtilization( - ResourceUtilization.newInstance(1000, 0, 1.0f), + ResourceUtilization.newInstance(1000, 0, 4.0f), Time.now())); // no resources shall be reclaimed because the cpu utilization is allowed // to go over the preemption threshold at most two times in a row. It is @@ -203,12 +205,12 @@ public void testMemoryCpuOverPreemptionThreshold() { ResourceUtilization.newInstance(0, 0, 0.0f), preemptionPolicy.getResourcesToReclaim()); - // the current CPU utilization, 1.0f, is again over the preemption - // threshold, 0.75f. In the meantime the memory utilization, 1000 MB + // the current CPU utilization, 4.0f, is again over the preemption + // threshold, 0.75f * 4. In the meantime the memory utilization, 1000 MB // is still below the memory preemption threshold, 2048 * 0.75 = 1536 MB. when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( new ContainersMonitor.ContainersResourceUtilization( - ResourceUtilization.newInstance(1000, 0, 1.0f), + ResourceUtilization.newInstance(1000, 0, 4.0f), Time.now())); // no resources shall be reclaimed because the cpu utilization is allowed // to go over the preemption threshold at most two times in a row. It is @@ -217,20 +219,20 @@ public void testMemoryCpuOverPreemptionThreshold() { ResourceUtilization.newInstance(0, 0, 0.0f), preemptionPolicy.getResourcesToReclaim()); - // the current CPU utilization, 1.0f, is over the CPU preemption threshold, - // 0.75f, the third time in a row. In the meantime, the memory utilization, - // 2000 MB, is also over the memory preemption threshold, + // the current CPU utilization, 4.0f, is over the CPU preemption threshold, + // 0.75f * 4, the third time in a row. In the meantime, the memory + // utilization, 2000 MB, is also over the memory preemption threshold, // 2048 * 0.75 = 1536 MB. when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( new ContainersMonitor.ContainersResourceUtilization( - ResourceUtilization.newInstance(2000, 0, 1.0f), + ResourceUtilization.newInstance(2000, 0, 4.0f), Time.now())); // the amount of memory utilization over the preemption threshold, that is, // 2000 - (2048 * 0.75) = 464 MB of memory, and the amount of cpu - // utilization over the preemption threshold, that is, 1.0f - 0.75f = 0.25f, + // utilization over the preemption threshold, that is, 4.0 - 0.75f*4 = 1.0f, // shall be reclaimed. Assert.assertEquals( - ResourceUtilization.newInstance(464, 0, 0.25f), + ResourceUtilization.newInstance(464, 0, 1.0f), preemptionPolicy.getResourcesToReclaim()); } @@ -243,12 +245,12 @@ public void testBothMemoryAndCpuUnderPreemptionThreshold() { new SnapshotBasedOverAllocationPreemptionPolicy(PREEMPTION_THRESHOLDS, MAX_CPU_OVER_PREEMPTION_THRESHOLDS, containersMonitor); - // the current CPU utilization, 0.5f, is below the preemption threshold, + // the current CPU utilization, 2.0f, is below the preemption threshold, // 0.75f. In the meantime the memory utilization, 1000 MB is also below // the memory preemption threshold, 2048 * 0.75 = 1536 MB. when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( new ContainersMonitor.ContainersResourceUtilization( - ResourceUtilization.newInstance(1000, 0, 0.5f), + ResourceUtilization.newInstance(1000, 0, 2.0f), Time.now())); // no resources shall be reclaimed because both CPU and memory utilization // are under the preemption threshold diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 8d47f34..8074114 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -744,8 +744,7 @@ public synchronized Resource allowedResourceForOverAllocation() { // account for resources allocated in this heartbeat projectedNodeUtilization.addTo( (int) (resourceAllocatedPendingLaunch.getMemorySize()), 0, - (float) resourceAllocatedPendingLaunch.getVirtualCores() / - capacity.getVirtualCores()); + (float) resourceAllocatedPendingLaunch.getVirtualCores()); ResourceThresholds thresholds = overAllocationInfo.getOverAllocationThresholds(); @@ -756,7 +755,7 @@ public synchronized Resource allowedResourceForOverAllocation() { - projectedNodeUtilization.getPhysicalMemory()); int allowedCpu = Math.max(0, (int) (overAllocationThreshold.getVirtualCores() - - projectedNodeUtilization.getCPU() * capacity.getVirtualCores())); + projectedNodeUtilization.getCPU())); Resource resourceAllowedForOpportunisticContainers = Resources.createResource(allowedMemory, allowedCpu); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index a0522d3..5b2e00c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2894,11 +2894,11 @@ public void testQueueOptOutOfOversubscription() throws IOException { } /** - * Test that NO OPPORTUNISTIC containers can be allocated on a node that - * is fully allocated and with a very high utilization. + * Test that NO OPPORTUNISTIC containers can be allocated on a node where + * the memory is fully allocated and with a very high utilization. */ @Test - public void testAllocateNoOpportunisticContainersOnBusyNode() + public void testAllocateNoOpportunisticContainersOnMemoryBusyNode() throws IOException { conf.setBoolean( YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, true); @@ -2918,7 +2918,7 @@ public void testAllocateNoOpportunisticContainersOnBusyNode() scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - // Add a node with 4G of memory and 4 vcores and an overallocation + // Add a node with 2G of memory and 2 vcores and an overallocation // threshold of 0.75f and 0.75f for memory and cpu respectively OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( ResourceThresholds.newInstance(0.75f, 0.75f)); @@ -2928,7 +2928,7 @@ public void testAllocateNoOpportunisticContainersOnBusyNode() // create a scheduling request that takes up the node's full memory ApplicationAttemptId appAttempt1 = - createSchedulingRequest(2048, "queue1", "user1", 1); + createSchedulingRequest(2048, 1, "queue1", "user1", 1); scheduler.handle(new NodeUpdateSchedulerEvent(node)); assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). getGuaranteedResourceUsage().getMemorySize()); @@ -2939,18 +2939,18 @@ public void testAllocateNoOpportunisticContainersOnBusyNode() ExecutionType.GUARANTEED, allocatedContainers1.get(0).getExecutionType()); - // node utilization shoots up after the container runs on the node + // memory utilization shoots up after the container runs on the node ContainerStatus containerStatus = ContainerStatus.newInstance( allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS); node.updateContainersInfoAndUtilization( new UpdatedContainerInfo(Collections.singletonList(containerStatus), Collections.emptyList()), - ResourceUtilization.newInstance(2000, 0, 0.8f)); + ResourceUtilization.newInstance(2000, 0, 0.0f)); // create another scheduling request ApplicationAttemptId appAttempt2 - = createSchedulingRequest(100, "queue2", "user1", 1); + = createSchedulingRequest(100, 1, "queue2", "user1", 1); scheduler.handle(new NodeUpdateSchedulerEvent(node)); List allocatedContainers2 = scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); @@ -2976,12 +2976,98 @@ public void testAllocateNoOpportunisticContainersOnBusyNode() } /** + * Test that NO OPPORTUNISTIC containers can be allocated on a node where + * the memory is fully allocated and with a very high utilization. + */ + @Test + public void testAllocateNoOpportunisticContainersOnCPUBusyNode() + throws Exception { + conf.setBoolean( + YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + FairSchedulerConfiguration. + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create a scheduling request that takes up the node's full CPU + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(1024, 4, "queue1", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + + List allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + assertEquals(4, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getVirtualCores()); + + // node utilization shoots up after the container runs on the node + ContainerStatus containerStatus = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersInfoAndUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus), + Collections.emptyList()), + ResourceUtilization.newInstance(0, 0, 3.5f)); + + // create another scheduling request that should NOT be satisfied + // immediately because the node cpu utilization is over its + // overallocation threshold + ApplicationAttemptId appAttempt3 + = createSchedulingRequest(1024, 1, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + List allocatedContainers3 = + scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers(); + assertTrue("Expecting no containers allocated", + allocatedContainers3.size() == 0); + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getVirtualCores()); + + // verify that a reservation is made for the second resource request + Resource reserved = scheduler.getNode(node.getNodeID()). + getReservedContainer().getReservedResource(); + assertTrue("Expect a reservation made for the second resource request", + reserved.equals(Resource.newInstance(1024, 1))); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } + + /** * Test that OPPORTUNISTIC containers can be allocated on a node with low - * utilization even though there is not enough unallocated resource on the - * node to accommodate the request. + * memory utilization even though there is not enough unallocated resource + * on the node to accommodate the request. */ @Test - public void testAllocateOpportunisticContainersOnPartiallyOverAllocatedNode() + public void + testAllocateOpportunisticContainersOnMemoryPartiallyOverAllocatedNode() throws IOException { conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, true); @@ -3010,9 +3096,9 @@ public void testAllocateOpportunisticContainersOnPartiallyOverAllocatedNode() Resources.createResource(4096, 4), overAllocationInfo); scheduler.handle(new NodeAddedSchedulerEvent(node)); - // create a scheduling request that leaves some unallocated resources + // create a scheduling request that leaves some unallocated memory ApplicationAttemptId appAttempt1 = - createSchedulingRequest(3600, "queue1", "user1", 1); + createSchedulingRequest(3600, 1, "queue1", "user1", 1); scheduler.handle(new NodeUpdateSchedulerEvent(node)); assertEquals(3600, scheduler.getQueueManager().getQueue("queue1"). getGuaranteedResourceUsage().getMemorySize()); @@ -3023,19 +3109,19 @@ public void testAllocateOpportunisticContainersOnPartiallyOverAllocatedNode() ExecutionType.GUARANTEED, allocatedContainers1.get(0).getExecutionType()); - // node utilization is low after the container is launched on the node + // memory utilization is low after the container is launched on the node ContainerStatus containerStatus = ContainerStatus.newInstance( allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS); node.updateContainersInfoAndUtilization( new UpdatedContainerInfo(Collections.singletonList(containerStatus), Collections.emptyList()), - ResourceUtilization.newInstance(1800, 0, 0.5f)); + ResourceUtilization.newInstance(1800, 0, 0.0f)); - // create another scheduling request that asks for more than what's left + // create another scheduling request that asks for more than the memory // unallocated on the node but can be served with overallocation. ApplicationAttemptId appAttempt2 = - createSchedulingRequest(1024, "queue2", "user1", 1); + createSchedulingRequest(1024, 1, "queue2", "user1", 1); scheduler.handle(new NodeUpdateSchedulerEvent(node)); assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). getOpportunisticResourceUsage().getMemorySize()); @@ -3063,11 +3149,98 @@ public void testAllocateOpportunisticContainersOnPartiallyOverAllocatedNode() } /** - * Test opportunistic containers can be allocated on a node that is fully - * allocated but whose utilization is very low. + * Test that OPPORTUNISTIC containers can be allocated on a node with low + * cpu utilization even though there is not enough unallocated resource + * on the node to accommodate the request. + */ + @Test + public void + testAllocateOpportunisticContainersOnCPUPartiallyOverAllocatedNode() + throws IOException { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + FairSchedulerConfiguration. + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create a scheduling request that leaves some unallocated CPU resources + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(1024, 3, "queue1", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(3, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getVirtualCores()); + List allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + + // cpu utilization is low after the container is launched on the node + ContainerStatus containerStatus = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersInfoAndUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus), + Collections.emptyList()), + ResourceUtilization.newInstance(0, 0, 1.0f)); + + // create another scheduling request that asks for more than what's left + // unallocated on the node but can be served with overallocation. + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(1024, 2, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(2, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getVirtualCores()); + List allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers2.get(0).getExecutionType()); + + // verify that no reservation is made for the second request given + // that it's satisfied by an OPPORTUNISTIC container allocation. + assertTrue("No reservation should be made because we have satisfied" + + " the second request with an OPPORTUNISTIC container allocation", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } + /** + * Test opportunistic containers can be allocated on a node where the memory + * is fully allocated but whose utilization is very low. */ @Test - public void testAllocateOpportunisticContainersOnFullyAllocatedNode() + public void testAllocateOpportunisticContainersOnMemoryFullyAllocatedNode() throws IOException { conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, true); @@ -3096,9 +3269,9 @@ public void testAllocateOpportunisticContainersOnFullyAllocatedNode() Resources.createResource(4096, 4), overAllocationInfo); scheduler.handle(new NodeAddedSchedulerEvent(node)); - // create a scheduling request that takes up the whole node + // create a scheduling request that takes up all memory ApplicationAttemptId appAttempt1 = createSchedulingRequest( - 4096, "queue1", "user1", 4); + 4096, 1, "queue1", "user1", 4); scheduler.handle(new NodeUpdateSchedulerEvent(node)); assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). getGuaranteedResourceUsage().getMemorySize()); @@ -3109,20 +3282,20 @@ public void testAllocateOpportunisticContainersOnFullyAllocatedNode() ExecutionType.GUARANTEED, allocatedContainers1.get(0).getExecutionType()); - // node utilization is low after the container is launched on the node + // memory utilization is low after the container is launched on the node ContainerStatus containerStatus = ContainerStatus.newInstance( allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", ContainerExitStatus.SUCCESS); node.updateContainersInfoAndUtilization( new UpdatedContainerInfo(Collections.singletonList(containerStatus), Collections.emptyList()), - ResourceUtilization.newInstance(1800, 0, 0.5f)); + ResourceUtilization.newInstance(1800, 0, 0.0f)); // create another scheduling request now that there is no unallocated - // resources left on the node, the request should be served with an + // memory resources left on the node, the request should be served with an // allocation of an opportunistic container ApplicationAttemptId appAttempt2 = createSchedulingRequest( - 1024, "queue2", "user1", 1); + 1024, 1, "queue2", "user1", 1); scheduler.handle(new NodeUpdateSchedulerEvent(node)); assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). getOpportunisticResourceUsage().getMemorySize()); @@ -3150,6 +3323,93 @@ public void testAllocateOpportunisticContainersOnFullyAllocatedNode() } /** + * Test opportunistic containers can be allocated on a node where the CPU + * is fully allocated but whose utilization is very low. + */ + @Test + public void testAllocateOpportunisticContainersOnCPUFullyAllocatedNode() + throws IOException { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + FairSchedulerConfiguration. + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 4 vcores and an overallocation + // threshold of 0.75f and 0.75f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(0.75f, 0.75f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 4), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create a scheduling request that takes up all vcores + ApplicationAttemptId appAttempt1 = createSchedulingRequest( + 1024, 4, "queue1", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(4, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getVirtualCores()); + List allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + + // cpu utilization is low after the container is launched on the node + ContainerStatus containerStatus = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersInfoAndUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus), + Collections.emptyList()), + ResourceUtilization.newInstance(0, 0, 1.0f)); + + // create another scheduling request now that there is no unallocated + // cpu resources left on the node, the request should be served with an + // allocation of an opportunistic container + ApplicationAttemptId appAttempt2 = createSchedulingRequest( + 1024, 2, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(2, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getVirtualCores()); + List allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers2.get(0).getExecutionType()); + + // verify that no reservation is made for the second request given + // that it's satisfied by an OPPORTUNISTIC container allocation. + assertTrue("No reservation should be made because we have satisfied" + + " the second request with an OPPORTUNISTIC container allocation", + scheduler.getNode(node.getNodeID()).getReservedContainer() == null); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + } + } + + /** * Test opportunistic containers can be allocated on a node with a low * utilization even though there are GUARANTEED containers allocated. */ @@ -3277,7 +3537,7 @@ public void testAllocateOpportunisticContainersWithGuaranteedOnes() * @throws Exception */ @Test - public void testMaxOverallocationPerNode() throws Exception { + public void testMaxMemoryOverallocationPerNode() throws Exception { conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, true); // disable resource request normalization in fair scheduler @@ -3309,9 +3569,9 @@ public void testMaxOverallocationPerNode() throws Exception { Resources.createResource(1024, 1), overAllocationInfo); scheduler.handle(new NodeAddedSchedulerEvent(node)); - // create a scheduling request that takes up the whole node + // create a scheduling request that takes up all memory on the node ApplicationAttemptId appAttempt1 = - createSchedulingRequest(1024, "queue1", "user1", 1); + createSchedulingRequest(1024, 1, "queue1", "user1", 1); scheduler.handle(new NodeUpdateSchedulerEvent(node)); assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). getGuaranteedResourceUsage().getMemorySize()); @@ -3332,7 +3592,7 @@ public void testMaxOverallocationPerNode() throws Exception { ResourceUtilization.newInstance(0, 0, 0.0f)); // create a scheduling request that should get allocated an OPPORTUNISTIC - // container because the node utilization is zero + // container because the memory utilization is zero ApplicationAttemptId appAttempt2 = createSchedulingRequest(1024, "queue2", "user1", 1); scheduler.handle(new NodeUpdateSchedulerEvent(node)); @@ -3355,7 +3615,7 @@ public void testMaxOverallocationPerNode() throws Exception { ResourceUtilization.newInstance(0, 0, 0.0f)); // create another scheduling request that should not get any allocation - // because of the max overallocation on the node will be exceeded. + // because of the max memory overallocation on the node will be exceeded. ApplicationAttemptId appAttempt3 = createSchedulingRequest(1024, "queue3", "user1", 1); scheduler.handle(new NodeUpdateSchedulerEvent(node)); @@ -3380,6 +3640,112 @@ public void testMaxOverallocationPerNode() throws Exception { } /** + * Test that max CPU overallocation per node is enforced by Fair Scheduler. + */ + @Test + public void testMaxCPUOverallocationPerNode() throws Exception { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + true); + // disable resource request normalization in fair scheduler + int memoryAllocationIncrement = conf.getInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + FairSchedulerConfiguration. + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1); + int memoryAllocationMinimum = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1); + float maxOverallocationRatio = conf.getFloat( + YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO, + YarnConfiguration.DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO); + conf.setFloat(YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO, 1.0f); + + try { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node with 4G of memory and 2 vcores and an overallocation + // threshold of 1.0f and 1.0f for memory and cpu respectively + OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(1f, 1f)); + MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1, + Resources.createResource(4096, 2), overAllocationInfo); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + + // create a scheduling request that takes up all CPU on the node + ApplicationAttemptId appAttempt1 = + createSchedulingRequest(1024, 2, "queue1", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(2, scheduler.getQueueManager().getQueue("queue1"). + getGuaranteedResourceUsage().getVirtualCores()); + List allocatedContainers1 = + scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers1.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.GUARANTEED, + allocatedContainers1.get(0).getExecutionType()); + + // cpu utilization is zero after the container runs + ContainerStatus containerStatus1 = ContainerStatus.newInstance( + allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersInfoAndUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus1), + Collections.emptyList()), + ResourceUtilization.newInstance(0, 0, 0.0f)); + + // create a scheduling request that should get allocated an OPPORTUNISTIC + // container because the cpu utilization is zero + ApplicationAttemptId appAttempt2 = + createSchedulingRequest(1024, 2, "queue2", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + List allocatedContainers2 = + scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers2.size() == 1); + assertEquals("unexpected container execution type", + ExecutionType.OPPORTUNISTIC, + allocatedContainers2.get(0).getExecutionType()); + assertEquals(2, scheduler.getQueueManager().getQueue("queue2"). + getOpportunisticResourceUsage().getVirtualCores()); + + // node utilization is still zero after the container runs + ContainerStatus containerStatus2 = ContainerStatus.newInstance( + allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "", + ContainerExitStatus.SUCCESS); + node.updateContainersInfoAndUtilization( + new UpdatedContainerInfo(Collections.singletonList(containerStatus2), + Collections.emptyList()), + ResourceUtilization.newInstance(0, 0, 0.0f)); + + // create another scheduling request that should not get any allocation + // because of the max CPU overallocation on the node will be exceeded. + ApplicationAttemptId appAttempt3 = + createSchedulingRequest(1024, 1, "queue3", "user1", 1); + scheduler.handle(new NodeUpdateSchedulerEvent(node)); + assertEquals(0, scheduler.getQueueManager().getQueue("queue3"). + getOpportunisticResourceUsage().getVirtualCores()); + List allocatedContainers3 = + scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers(); + assertTrue(allocatedContainers3.size() == 0); + assertEquals(0, scheduler.getQueueManager().getQueue("queue3"). + getOpportunisticResourceUsage().getVirtualCores()); + } finally { + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, + false); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + memoryAllocationMinimum); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + memoryAllocationIncrement); + conf.setFloat(YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO, + maxOverallocationRatio); + } + } + + /** * Test promotion of a single OPPORTUNISTIC container when no resources are * reserved on the node where the container is allocated. */