diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 068056d..7be9091 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -26,8 +26,11 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerOverallocationPreemptionEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPreemptionPolicy; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.SnapshotBasedOverAllocationPolicy; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.SnapshotBasedOverAllocationPreemptionPolicy; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,6 +122,7 @@ private NMAllocationPolicy overAllocationPolicy; private ResourceThresholds overAllocationPreemptionThresholds; private int overAlloctionPreemptionCpuCount = -1; + private NMAllocationPreemptionPolicy overAllocationPreemptionPolicy; private volatile boolean stopped = false; @@ -373,6 +377,9 @@ private void initializeOverAllocation(Configuration conf) { this.overAllocationPolicy = createOverAllocationPolicy(resourceThresholds); + this.overAllocationPreemptionPolicy = createOverAllocationPreemptionPolicy( + overAllocationPreemptionThresholds, overAlloctionPreemptionCpuCount); + LOG.info("NodeManager oversubscription enabled with overallocation " + "thresholds (memory:" + overAllocationMemoryUtilizationThreshold + ", CPU:" + overAllocationCpuUtilizationThreshold + ") and preemption" + @@ -385,6 +392,12 @@ protected NMAllocationPolicy createOverAllocationPolicy( return new SnapshotBasedOverAllocationPolicy(resourceThresholds, this); } + private NMAllocationPreemptionPolicy createOverAllocationPreemptionPolicy( + ResourceThresholds resourceThresholds, int maxTimesCpuOverLimit) { + return new SnapshotBasedOverAllocationPreemptionPolicy( + resourceThresholds, maxTimesCpuOverLimit, this); + } + private boolean isResourceCalculatorAvailable() { if (resourceCalculatorPlugin == null) { LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + this @@ -672,7 +685,7 @@ public void run() { // check opportunity to start containers if over-allocation is on if (context.isOverAllocationEnabled()) { - attemptToStartContainersUponLowUtilization(); + checkUtilization(); } // Publish the container utilization metrics to node manager @@ -1054,13 +1067,22 @@ public NMAllocationPolicy getContainerOverAllocationPolicy() { return overAllocationPolicy; } + public NMAllocationPreemptionPolicy getOverAllocationPreemptionPolicy() { + return overAllocationPreemptionPolicy; + } + private void setLatestContainersUtilization(ResourceUtilization utilization) { this.latestContainersUtilization = new ContainersResourceUtilization( utilization, System.currentTimeMillis()); } @VisibleForTesting - public void attemptToStartContainersUponLowUtilization() { + public boolean checkUtilization() { + return checkLowUtilization() || checkHighUtilization(); + } + + private boolean checkLowUtilization() { + boolean opportunisticContainersToStart = false; if (getContainerOverAllocationPolicy() != null) { Resource available = getContainerOverAllocationPolicy() .getAvailableResources(); @@ -1069,8 +1091,28 @@ public void attemptToStartContainersUponLowUtilization() { eventDispatcher.getEventHandler().handle( new ContainerSchedulerEvent(null, ContainerSchedulerEventType.SCHEDULE_CONTAINERS)); + opportunisticContainersToStart = true; + LOG.info("Node utilization is below its allocation threshold. " + + "Inform container scheduler to launch opportunistic containers."); } } + return opportunisticContainersToStart; + } + + public boolean checkHighUtilization() { + ResourceUtilization overLimit = getOverAllocationPreemptionPolicy() + .getResourcesToReclaim(); + + boolean opportunisticContainersToPreempt = false; + + if (overLimit.getPhysicalMemory() > 0 || overLimit.getCPU() > 0) { + opportunisticContainersToPreempt = true; + eventDispatcher.getEventHandler().handle( + new ContainerSchedulerOverallocationPreemptionEvent(overLimit)); + LOG.info("Node utilization is over the preemption threshold. " + + "Inform container scheduler to reclaim " + overLimit); + } + return opportunisticContainersToPreempt; } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java index a3e19eb..7840fef 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java @@ -53,6 +53,11 @@ */ @Override public ResourceUtilization getCurrentUtilization() { + return getTotalAllocation(); + } + + @Override + public ResourceUtilization getTotalAllocation() { return this.containersAllocation; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index 304c2b8..f5513e5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -219,12 +219,61 @@ public void handle(ContainerSchedulerEvent event) { // node utilization is low. startOpportunisticContainers(utilizationTracker.getAvailableResources()); break; + case PREEMPT_CONTAINERS: + if (event instanceof ContainerSchedulerOverallocationPreemptionEvent) { + preemptOpportunisticContainers( + (ContainerSchedulerOverallocationPreemptionEvent) event); + } else { + LOG.error( + "Unknown event type on Preempt containers: " + event.getType()); + } + break; default: LOG.error("Unknown event arrived at ContainerScheduler: " + event.toString()); } } + private void preemptOpportunisticContainers( + ContainerSchedulerOverallocationPreemptionEvent event) { + ResourceUtilization resourcesToReclaim = + getResourcesToReclaim(event.getResourcesOverPreemptionThresholds()); + + List oppContainersToReclaim = + pickOpportunisticContainersToReclaimResources( + resourcesToReclaim); + + killOpportunisticContainers(oppContainersToReclaim); + } + + /** + * Get the amount of resources that need to be reclaimed by preempting + * OPPORTUNISTIC containers considering the amount of resources that + * are over the preemption thresholds and over the capacity of the node. + * When the node is not being over-allocated, its resource utilization + * can safely go to 100% without any OPPORTUNISTIC containers being killed. + */ + private ResourceUtilization getResourcesToReclaim( + ResourceUtilization resourcesOverPreemptionThresholds) { + ResourceUtilization totalAllocation = ResourceUtilization.newInstance( + utilizationTracker.getTotalAllocation()); + getContainersMonitor().subtractNodeResourcesFromResourceUtilization( + totalAllocation); + ResourceUtilization overAllocatedResources = + ResourceUtilization.newInstance( + Math.max(0, totalAllocation.getPhysicalMemory()), + Math.max(0, totalAllocation.getVirtualMemory()), + Math.max(0, totalAllocation.getCPU())); + + return ResourceUtilization.newInstance( + Math.min(overAllocatedResources.getPhysicalMemory(), + resourcesOverPreemptionThresholds.getPhysicalMemory()), + Math.min(overAllocatedResources.getVirtualMemory(), + resourcesOverPreemptionThresholds.getVirtualMemory()), + Math.min(overAllocatedResources.getCPU(), + resourcesOverPreemptionThresholds.getCPU())); + } + /** * We assume that the ContainerManager has already figured out what kind * of update this is. @@ -593,8 +642,9 @@ protected void scheduleContainer(Container container) { @SuppressWarnings("unchecked") private void reclaimOpportunisticContainerResources() { + ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(); List extraOppContainersToReclaim = - pickOpportunisticContainersToReclaimResources(); + pickOpportunisticContainersToReclaimResources(resourcesToFreeUp); killOpportunisticContainers(extraOppContainersToReclaim); } @@ -634,12 +684,12 @@ private void startContainer(Container container) { container.sendLaunchEvent(); } - private List pickOpportunisticContainersToReclaimResources() { + private List pickOpportunisticContainersToReclaimResources( + ResourceUtilization resourcesToFreeUp) { // The opportunistic containers that need to be killed for the // given container to start. List extraOpportContainersToKill = new ArrayList<>(); // Track resources that need to be freed. - ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(); // Go over the running opportunistic containers. // Use a descending iterator to kill more recently started containers. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java index 9ad4f91..f76727d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java @@ -30,5 +30,7 @@ CONTAINER_PAUSED, RECOVERY_COMPLETED, // Producer: Containers Monitor when over-allocation is on - SCHEDULE_CONTAINERS + SCHEDULE_CONTAINERS, + // Producer: Containers Monitor when over-allocation is on + PREEMPT_CONTAINERS } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerOverallocationPreemptionEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerOverallocationPreemptionEvent.java new file mode 100644 index 0000000..f9911a9 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerOverallocationPreemptionEvent.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ResourceUtilization; + +/** + * An Container Scheduler event generated by ContainersMonitor when + * overallocation is turned on and the utilization goes over the + * proactive preemption thresholds. + */ +public class ContainerSchedulerOverallocationPreemptionEvent + extends ContainerSchedulerEvent { + private final ResourceUtilization resourcesOverPreemptionThresholds; + /** + * Create instance of Event. + * + * @param toFree resource to free up. + */ + public ContainerSchedulerOverallocationPreemptionEvent( + ResourceUtilization toFree) { + super(null, ContainerSchedulerEventType.PREEMPT_CONTAINERS); + this.resourcesOverPreemptionThresholds = toFree; + } + + public ResourceUtilization getResourcesOverPreemptionThresholds() { + return resourcesOverPreemptionThresholds; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPreemptionPolicy.java new file mode 100644 index 0000000..3d96650 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPreemptionPolicy.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; + +/** + * Keeps track of containers utilization over time and determines how much + * resources needs to be reclaimed by preempting opportunistic containers. + */ +public abstract class NMAllocationPreemptionPolicy { + protected final ResourceThresholds overAllocationPreemptionThresholds; + protected final ContainersMonitor containersMonitor; + + public NMAllocationPreemptionPolicy( + ResourceThresholds preemptionThresholds, + ContainersMonitor containersMonitor) { + this.containersMonitor = containersMonitor; + this.overAllocationPreemptionThresholds = preemptionThresholds; + } + + /** + * Get the amount of resources to reclaim by preempting opportunistic + * containers when over-allocation is turned on. + * @return the amount of resources available to be reclaimed + */ + public abstract ResourceUtilization getResourcesToReclaim(); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java index 7a7c78e..9be7574 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java @@ -39,6 +39,13 @@ ResourceUtilization getCurrentUtilization(); /** + * Get the total amount of resources allocated to running containers + * in terms of resource utilization. + * @return ResourceUtilization resource allocation + */ + ResourceUtilization getTotalAllocation(); + + /** * Get the amount of resources currently available to launch containers. * @return Resource resources available to launch containers */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java new file mode 100644 index 0000000..aa76204 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; + +/** + * An implementation of NMAllocationPreemptionPolicy based on the snapshot of + * the latest containers utilization to determine how much resources needs to + * be reclaimed by preempting opportunistic containers when over-allocation is + * turned on. + */ +public class SnapshotBasedOverAllocationPreemptionPolicy + extends NMAllocationPreemptionPolicy { + private int absoluteMemoryPreemptionThresholdMb; + private float cpuPreemptionThreshold; + private int maxTimesCpuOverPreemption; + private int timesCpuOverPreemption; + + public SnapshotBasedOverAllocationPreemptionPolicy( + ResourceThresholds preemptionThresholds, + int timesCpuOverPreemptionThreshold, + ContainersMonitor containersMonitor) { + super(preemptionThresholds, containersMonitor); + int memoryCapacityMb = (int) + (containersMonitor.getPmemAllocatedForContainers() / (1024 * 1024)); + this.absoluteMemoryPreemptionThresholdMb = (int) + (preemptionThresholds.getMemoryThreshold() * memoryCapacityMb); + this.cpuPreemptionThreshold = preemptionThresholds.getCpuThreshold(); + this.maxTimesCpuOverPreemption = timesCpuOverPreemptionThreshold; + } + + @Override + public ResourceUtilization getResourcesToReclaim() { + ResourceUtilization utilization = + containersMonitor.getContainersUtilization(true).getUtilization(); + + int memoryOverLimit = utilization.getPhysicalMemory() - + absoluteMemoryPreemptionThresholdMb; + float vcoreOverLimit = utilization.getCPU() - cpuPreemptionThreshold; + + if (vcoreOverLimit > 0) { + timesCpuOverPreemption++; + if (timesCpuOverPreemption > maxTimesCpuOverPreemption) { + timesCpuOverPreemption = 0; + } else { + // report no over limit for cpu if # of times CPU is over the preemption + // threshold is not greater the max number of times allowed + vcoreOverLimit = 0; + } + } + + if (memoryOverLimit > 0 || vcoreOverLimit > 0) { + return ResourceUtilization.newInstance( + memoryOverLimit, 0, vcoreOverLimit); + } + + return ResourceUtilization.newInstance(0, 0, 0.0f); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java index 183d868..9ac402c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java @@ -115,6 +115,11 @@ public void setup() throws IOException { conf.setFloat( YarnConfiguration.NM_OVERALLOCATION_MEMORY_UTILIZATION_THRESHOLD, 0.75f); + conf.setFloat(YarnConfiguration.NM_OVERALLOCATION_CPU_PREEMPTION_THRESHOLD, + 0.8f); + conf.setFloat( + YarnConfiguration.NM_OVERALLOCATION_MEMORY_PREEMPTION_THRESHOLD, 0.8f); + conf.setInt(YarnConfiguration.NM_OVERALLOCATION_PREEMPTION_CPU_COUNT, 2); // disable the monitor thread in ContainersMonitor to allow control over // when opportunistic containers are launched with over-allocation conf.setBoolean(YarnConfiguration.NM_CONTAINER_MONITOR_ENABLED, false); @@ -213,7 +218,7 @@ public void testStartOppContainersWithPartialOverallocationLowUtilization() // try to start opportunistic containers out of band. ((LongRunningContainerSimulatingContainersManager) containerManager) - .startContainersOutOfBandUponLowUtilization(); + .checkNodeResourceUtilization(); // this container is expected to be started immediately because there // are (memory: 1024, vcore: 0.625) available based on over-allocation @@ -274,7 +279,7 @@ public void testQueueOppContainerWithPartialOverallocationHighUtilization() // not be launched at container scheduler event if the node would be // over-allocated. ((LongRunningContainerSimulatingContainersManager) containerManager) - .startContainersOutOfBandUponLowUtilization(); + .checkNodeResourceUtilization(); ((LongRunningContainerSimulatingContainersManager) containerManager) .drainAsyncEvents(); @@ -353,7 +358,7 @@ public void testStartOppContainersWithOverallocationLowUtilization() // try to start opportunistic containers out of band. ((LongRunningContainerSimulatingContainersManager) containerManager) - .startContainersOutOfBandUponLowUtilization(); + .checkNodeResourceUtilization(); // this container is expected to be started because there is resources // available because the actual utilization is very low @@ -656,7 +661,7 @@ public void testKillNoOppContainersWithOverallocationLowUtilization() // not be launched at container scheduler event if the node would be // over-allocated. ((LongRunningContainerSimulatingContainersManager) containerManager) - .startContainersOutOfBandUponLowUtilization(); + .checkNodeResourceUtilization(); // now the queued opportunistic container should also start BaseContainerManagerTest.waitForContainerSubState(containerManager, @@ -690,83 +695,6 @@ public void testKillNoOppContainersWithOverallocationLowUtilization() } /** - * Start four OPPORTUNISTIC containers which in aggregates exceeds the - * capacity of the node. The real resource utilization of the first two - * OPPORTUNISTIC containers are high whereas that of the latter two are - * almost zero. Then try to start a GUARANTEED container. The GUARANTEED - * container will eventually start running after preempting the third - * and fourth OPPORTUNISTIC container (which releases no resources) and - * then the second OPPORTUNISTIC container. - */ - public void - testKillOppContainersConservativelyWithOverallocationHighUtilization() - throws Exception { - containerManager.start(); - - containerManager.startContainers(StartContainersRequest.newInstance( - new ArrayList() { - { - add(createStartContainerRequest(0, - BuilderUtils.newResource(1024, 1), false)); - add(createStartContainerRequest(1, - BuilderUtils.newResource(1024, 1), false)); - add(createStartContainerRequest(2, - BuilderUtils.newResource(512, 1), false)); - add(createStartContainerRequest(3, - BuilderUtils.newResource(1024, 1), false)); - } - } - )); - // All four GUARANTEED containers are all expected to start - // because the containers utilization is low (0 at the point) - BaseContainerManagerTest.waitForContainerSubState(containerManager, - createContainerId(0), ContainerSubState.RUNNING); - BaseContainerManagerTest.waitForContainerSubState(containerManager, - createContainerId(1), ContainerSubState.RUNNING); - BaseContainerManagerTest.waitForContainerSubState(containerManager, - createContainerId(2), ContainerSubState.RUNNING); - BaseContainerManagerTest.waitForContainerSubState(containerManager, - createContainerId(3), ContainerSubState.RUNNING); - - // the containers utilization is at the overallocation threshold - setContainerResourceUtilization( - ResourceUtilization.newInstance(1536, 0, 1.0f/2)); - - // try to start a GUARANTEED container when there's nothing left unallocated - containerManager.startContainers(StartContainersRequest.newInstance( - Collections.singletonList( - createStartContainerRequest(4, - BuilderUtils.newResource(1024, 1), true)) - )); - - BaseContainerManagerTest.waitForContainerSubState(containerManager, - createContainerId(4), ContainerSubState.RUNNING); - GetContainerStatusesRequest statRequest = GetContainerStatusesRequest. - newInstance(new ArrayList() { - { - add(createContainerId(0)); - add(createContainerId(1)); - add(createContainerId(2)); - add(createContainerId(3)); - add(createContainerId(4)); - } - }); - List containerStatuses = containerManager - .getContainerStatuses(statRequest).getContainerStatuses(); - for (ContainerStatus status : containerStatuses) { - if (status.getContainerId().equals(createContainerId(0)) || - status.getContainerId().equals(createContainerId(4))) { - Assert.assertEquals( - ContainerSubState.RUNNING, status.getContainerSubState()); - } else { - Assert.assertTrue(status.getDiagnostics().contains( - "Container Killed to make room for Guaranteed Container")); - } - System.out.println("\nStatus : [" + status + "]\n"); - } - } - - /** * Start two OPPORTUNISTIC containers followed by one GUARANTEED container, * which in aggregate exceeds the capacity of the node. The first two * OPPORTUNISTIC containers use almost no resources whereas the GUARANTEED @@ -855,7 +783,7 @@ public void testStartOppContainersUponContainerCompletion() throws Exception { // we don't start OPPORTUNISTIC containers at container finish event if // the node would be over-allocated ((LongRunningContainerSimulatingContainersManager) containerManager) - .startContainersOutOfBandUponLowUtilization(); + .checkNodeResourceUtilization(); ((LongRunningContainerSimulatingContainersManager) containerManager) .drainAsyncEvents(); BaseContainerManagerTest.waitForContainerSubState(containerManager, @@ -914,7 +842,7 @@ public void testStartOpportunisticContainersOutOfBand() throws Exception { // try to start opportunistic containers out of band. ((LongRunningContainerSimulatingContainersManager)containerManager) - .startContainersOutOfBandUponLowUtilization(); + .checkNodeResourceUtilization(); // no containers in queue are expected to be launched because the // containers utilization is not below the over-allocation threshold @@ -935,7 +863,7 @@ public void testStartOpportunisticContainersOutOfBand() throws Exception { ResourceUtilization.newInstance(512, 0, 1.0f/8)); ((LongRunningContainerSimulatingContainersManager)containerManager) - .startContainersOutOfBandUponLowUtilization(); + .checkNodeResourceUtilization(); // the two OPPORTUNISTIC containers are expected to be launched BaseContainerManagerTest.waitForContainerSubState(containerManager, @@ -952,6 +880,279 @@ public void testStartOpportunisticContainersOutOfBand() throws Exception { }); } + /** + * Start a GUARANTEED container, an OPPORTUNISTIC container, a GUARANTEED + * container and another OPPORTUNISTIC container in order. When the node + * memory utilization is over its preemption threshold, the two OPPORTUNISTIC + * containers should be killed. + */ + @Test + public void testPreemptOpportunisticContainersUponHighMemoryUtilization() + throws Exception { + containerManager.start(); + + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), true)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(512, 1), false)); + add(createStartContainerRequest(2, + BuilderUtils.newResource(1024, 1), true)); + add(createStartContainerRequest(3, + BuilderUtils.newResource(300, 1), false)); + } + } + )); + ((LongRunningContainerSimulatingContainersManager) containerManager) + .drainAsyncEvents(); + + // the first three containers are all expected to start + // because the containers utilization is low (0 at the point) + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.RUNNING); + + // try to start the second opportunistic containers out of band. + ((LongRunningContainerSimulatingContainersManager)containerManager) + .checkNodeResourceUtilization(); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(3), ContainerSubState.RUNNING); + + // the containers utilization is over the overallocation threshold (fully) + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, 1.0f/2)); + ((LongRunningContainerSimulatingContainersManager)containerManager) + .checkNodeResourceUtilization(); + + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.DONE); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(3), ContainerSubState.DONE); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.DONE); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.DONE); + } + }); + + } + + /** + * Start a GUARANTEED container followed by an OPPORTUNISTIC container, which + * in aggregates does not take more than the capacity of the node. + * When the node memory utilization is above the preemption threshold, the + * OPPORTUNISTIC container should not be killed because the node is not over- + * allocated. + */ + @Test + public void testNoPreemptionUponHighMemoryUtilizationButNoOverallocation() + throws Exception { + containerManager.start(); + + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), true)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(1024, 1), false)); + } + } + )); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + + // containers utilization is above the over-allocation threshold + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, 1.0f/2)); + ((LongRunningContainerSimulatingContainersManager)containerManager) + .checkNodeResourceUtilization(); + + ((LongRunningContainerSimulatingContainersManager) containerManager) + .drainAsyncEvents(); + + // no containers shall be preempted because there is no overallocation + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + } + }); + } + + /** + * Start a GUARANTEED container, an OPPORTUNISTIC container, a GUARANTEED + * container and another OPPORTUNISTIC container in order. When the node + * cpu utilization is over its preemption threshold a few times in a row, + * the two OPPORTUNISTIC containers should be killed one by one. + */ + @Test + public void testPreemptionUponHighCPUUtilization() throws Exception { + containerManager.start(); + + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(512, 2), true)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(512, 1), false)); + add(createStartContainerRequest(2, + BuilderUtils.newResource(512, 2), true)); + add(createStartContainerRequest(3, + BuilderUtils.newResource(512, 1), false)); + } + } + )); + + ((LongRunningContainerSimulatingContainersManager) containerManager) + .drainAsyncEvents(); + // the first three containers are all expected to start because the + // containers utilization is low (0 at the point) + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.RUNNING); + + // try to start opportunistic containers out of band. + ((LongRunningContainerSimulatingContainersManager)containerManager) + .checkNodeResourceUtilization(); + + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(3), ContainerSubState.RUNNING); + + // the containers CPU utilization is over the overallocation threshold + // for the first time + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, 2.0f/2)); + ((LongRunningContainerSimulatingContainersManager)containerManager) + .checkNodeResourceUtilization(); + ((LongRunningContainerSimulatingContainersManager) containerManager) + .drainAsyncEvents(); + + // all containers should continue to be running because we don't + // preempt OPPORTUNISTIC containers right away + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.RUNNING); + } + }); + + // the containers CPU utilization is over the overallocation threshold + // for the second time + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, 2.0f/2)); + ((LongRunningContainerSimulatingContainersManager)containerManager) + .checkNodeResourceUtilization(); + ((LongRunningContainerSimulatingContainersManager) containerManager) + .drainAsyncEvents(); + + // all containers should continue to be running because we don't preempt + // OPPORTUNISTIC containers when the cpu is over limit the second time + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.RUNNING); + } + }); + + // the containers CPU utilization is over the overallocation threshold + // for the third time + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, 2.0f/2)); + ((LongRunningContainerSimulatingContainersManager)containerManager) + .checkNodeResourceUtilization(); + + // one OPPORTUNISTIC container should be killed because CPU utilization + // is over its preemption threshold three times consecutively + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(3), ContainerSubState.DONE); + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.DONE); + } + }); + + // again, the containers CPU utilization is over the overallocation + // threshold for the first time (the cpu over-limit count is reset + // everytime a preemption is triggered as a result) + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, 2.0f/2)); + ((LongRunningContainerSimulatingContainersManager)containerManager) + .checkNodeResourceUtilization(); + ((LongRunningContainerSimulatingContainersManager) containerManager) + .drainAsyncEvents(); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.DONE); + } + }); + + // the containers CPU utilization is over the overallocation threshold + // for the second time + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, 2.0f/2)); + ((LongRunningContainerSimulatingContainersManager)containerManager) + .checkNodeResourceUtilization(); + ((LongRunningContainerSimulatingContainersManager) containerManager) + .drainAsyncEvents(); + + // all containers should continue to be running because we don't preempt + // OPPORTUNISTIC containers when the cpu is over limit the second time + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.DONE); + } + }); + + // the containers CPU utilization is over the overallocation threshold + // for the third time + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, 2.0f/2)); + ((LongRunningContainerSimulatingContainersManager)containerManager) + .checkNodeResourceUtilization(); + + // the other OPPORTUNISTIC container should be killed because CPU + // utilization is over its preemption threshold three times consecutively + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.DONE); + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.DONE); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.DONE); + } + }); + } + private void setContainerResourceUtilization(ResourceUtilization usage) { ((ContainerMonitorForOverallocationTest) @@ -1073,9 +1274,9 @@ protected ContainersMonitor createContainersMonitor( dispatcher, context); } - public void startContainersOutOfBandUponLowUtilization() { + public void checkNodeResourceUtilization() { ((ContainerMonitorForOverallocationTest) getContainersMonitor()) - .attemptToStartContainersUponLowUtilization(); + .checkUtilization(); } public void drainAsyncEvents() {