diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 068056d..c8aac26 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -26,8 +26,11 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerOverallocationPreemptionEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPreemptionPolicy; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.SnapshotBasedOverAllocationPolicy; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.SnapshotBasedOverAllocationPreemptionPolicy; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,6 +122,7 @@ private NMAllocationPolicy overAllocationPolicy; private ResourceThresholds overAllocationPreemptionThresholds; private int overAlloctionPreemptionCpuCount = -1; + private NMAllocationPreemptionPolicy overAllocationPreemptionPolicy; private volatile boolean stopped = false; @@ -373,6 +377,9 @@ private void initializeOverAllocation(Configuration conf) { this.overAllocationPolicy = createOverAllocationPolicy(resourceThresholds); + this.overAllocationPreemptionPolicy = createOverAllocationPreemptionPolicy( + overAllocationPreemptionThresholds, overAlloctionPreemptionCpuCount); + LOG.info("NodeManager oversubscription enabled with overallocation " + "thresholds (memory:" + overAllocationMemoryUtilizationThreshold + ", CPU:" + overAllocationCpuUtilizationThreshold + ") and preemption" + @@ -385,6 +392,12 @@ protected NMAllocationPolicy createOverAllocationPolicy( return new SnapshotBasedOverAllocationPolicy(resourceThresholds, this); } + private NMAllocationPreemptionPolicy createOverAllocationPreemptionPolicy( + ResourceThresholds resourceThresholds, int maxTimesCpuOverLimit) { + return new SnapshotBasedOverAllocationPreemptionPolicy( + resourceThresholds, maxTimesCpuOverLimit, this); + } + private boolean isResourceCalculatorAvailable() { if (resourceCalculatorPlugin == null) { LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + this @@ -671,9 +684,7 @@ public void run() { setLatestContainersUtilization(trackedContainersUtilization); // check opportunity to start containers if over-allocation is on - if (context.isOverAllocationEnabled()) { - attemptToStartContainersUponLowUtilization(); - } + checkUtilization(); // Publish the container utilization metrics to node manager // metrics system. @@ -1054,13 +1065,25 @@ public NMAllocationPolicy getContainerOverAllocationPolicy() { return overAllocationPolicy; } + public NMAllocationPreemptionPolicy getOverAllocationPreemptionPolicy() { + return overAllocationPreemptionPolicy; + } + private void setLatestContainersUtilization(ResourceUtilization utilization) { this.latestContainersUtilization = new ContainersResourceUtilization( utilization, System.currentTimeMillis()); } @VisibleForTesting - public void attemptToStartContainersUponLowUtilization() { + public boolean checkUtilization() { + if (context.isOverAllocationEnabled()) { + return checkLowUtilization() || checkHighUtilization(); + } + return false; + } + + private boolean checkLowUtilization() { + boolean opportunisticContainersToStart = false; if (getContainerOverAllocationPolicy() != null) { Resource available = getContainerOverAllocationPolicy() .getAvailableResources(); @@ -1069,8 +1092,28 @@ public void attemptToStartContainersUponLowUtilization() { eventDispatcher.getEventHandler().handle( new ContainerSchedulerEvent(null, ContainerSchedulerEventType.SCHEDULE_CONTAINERS)); + opportunisticContainersToStart = true; + LOG.info("Node utilization is below its allocation threshold. " + + "Inform container scheduler to launch opportunistic containers."); } } + return opportunisticContainersToStart; + } + + public boolean checkHighUtilization() { + ResourceUtilization overLimit = getOverAllocationPreemptionPolicy() + .getResourcesToReclaim(); + + boolean opportunisticContainersToPreempt = false; + + if (overLimit.getPhysicalMemory() > 0 || overLimit.getCPU() > 0) { + opportunisticContainersToPreempt = true; + eventDispatcher.getEventHandler().handle( + new ContainerSchedulerOverallocationPreemptionEvent(overLimit)); + LOG.info("Node utilization is over the preemption threshold. " + + "Inform container scheduler to reclaim " + overLimit); + } + return opportunisticContainersToPreempt; } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java index a3e19eb..7840fef 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java @@ -53,6 +53,11 @@ */ @Override public ResourceUtilization getCurrentUtilization() { + return getTotalAllocation(); + } + + @Override + public ResourceUtilization getTotalAllocation() { return this.containersAllocation; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index 304c2b8..9de4447 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -219,12 +219,61 @@ public void handle(ContainerSchedulerEvent event) { // node utilization is low. startOpportunisticContainers(utilizationTracker.getAvailableResources()); break; + case PREEMPT_CONTAINERS: + if (event instanceof ContainerSchedulerOverallocationPreemptionEvent) { + preemptOpportunisticContainers( + (ContainerSchedulerOverallocationPreemptionEvent) event); + } else { + LOG.error( + "Unknown event type on Preempt containers: {}", event.getType()); + } + break; default: - LOG.error("Unknown event arrived at ContainerScheduler: " - + event.toString()); + LOG.error( + "Unknown event arrived at ContainerScheduler: {}", event.toString()); } } + private void preemptOpportunisticContainers( + ContainerSchedulerOverallocationPreemptionEvent event) { + ResourceUtilization resourcesToReclaim = + getResourcesToReclaim(event.getResourcesOverPreemptionThresholds()); + + List oppContainersToReclaim = + pickOpportunisticContainersToReclaimResources( + resourcesToReclaim); + + killOpportunisticContainers(oppContainersToReclaim); + } + + /** + * Get the amount of resources that need to be reclaimed by preempting + * OPPORTUNISTIC containers considering the amount of resources that + * are over the preemption thresholds and over the capacity of the node. + * When the node is not being over-allocated, its resource utilization + * can safely go to 100% without any OPPORTUNISTIC containers being killed. + */ + private ResourceUtilization getResourcesToReclaim( + ResourceUtilization resourcesOverPreemptionThresholds) { + ResourceUtilization totalAllocation = ResourceUtilization.newInstance( + utilizationTracker.getTotalAllocation()); + getContainersMonitor().subtractNodeResourcesFromResourceUtilization( + totalAllocation); + ResourceUtilization overAllocatedResources = + ResourceUtilization.newInstance( + Math.max(0, totalAllocation.getPhysicalMemory()), + Math.max(0, totalAllocation.getVirtualMemory()), + Math.max(0, totalAllocation.getCPU())); + + return ResourceUtilization.newInstance( + Math.min(overAllocatedResources.getPhysicalMemory(), + resourcesOverPreemptionThresholds.getPhysicalMemory()), + Math.min(overAllocatedResources.getVirtualMemory(), + resourcesOverPreemptionThresholds.getVirtualMemory()), + Math.min(overAllocatedResources.getCPU(), + resourcesOverPreemptionThresholds.getCPU())); + } + /** * We assume that the ContainerManager has already figured out what kind * of update this is. @@ -593,8 +642,9 @@ protected void scheduleContainer(Container container) { @SuppressWarnings("unchecked") private void reclaimOpportunisticContainerResources() { + ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(); List extraOppContainersToReclaim = - pickOpportunisticContainersToReclaimResources(); + pickOpportunisticContainersToReclaimResources(resourcesToFreeUp); killOpportunisticContainers(extraOppContainersToReclaim); } @@ -634,12 +684,12 @@ private void startContainer(Container container) { container.sendLaunchEvent(); } - private List pickOpportunisticContainersToReclaimResources() { + private List pickOpportunisticContainersToReclaimResources( + ResourceUtilization resourcesToFreeUp) { // The opportunistic containers that need to be killed for the // given container to start. List extraOpportContainersToKill = new ArrayList<>(); // Track resources that need to be freed. - ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(); // Go over the running opportunistic containers. // Use a descending iterator to kill more recently started containers. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java index 9ad4f91..f76727d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java @@ -30,5 +30,7 @@ CONTAINER_PAUSED, RECOVERY_COMPLETED, // Producer: Containers Monitor when over-allocation is on - SCHEDULE_CONTAINERS + SCHEDULE_CONTAINERS, + // Producer: Containers Monitor when over-allocation is on + PREEMPT_CONTAINERS } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerOverallocationPreemptionEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerOverallocationPreemptionEvent.java new file mode 100644 index 0000000..547036a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerOverallocationPreemptionEvent.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; + +/** + * A {@link ContainerSchedulerEvent} generated by {@link ContainersMonitorImpl} + * when overallocation is turned on and the utilization goes over the + * proactive preemption thresholds. + */ +public class ContainerSchedulerOverallocationPreemptionEvent + extends ContainerSchedulerEvent { + private final ResourceUtilization resourcesOverPreemptionThresholds; + /** + * Create instance of Event. + * + * @param toFree resource to free up. + */ + public ContainerSchedulerOverallocationPreemptionEvent( + ResourceUtilization toFree) { + super(null, ContainerSchedulerEventType.PREEMPT_CONTAINERS); + this.resourcesOverPreemptionThresholds = toFree; + } + + public ResourceUtilization getResourcesOverPreemptionThresholds() { + return resourcesOverPreemptionThresholds; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPreemptionPolicy.java new file mode 100644 index 0000000..3d96650 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPreemptionPolicy.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; + +/** + * Keeps track of containers utilization over time and determines how much + * resources needs to be reclaimed by preempting opportunistic containers. + */ +public abstract class NMAllocationPreemptionPolicy { + protected final ResourceThresholds overAllocationPreemptionThresholds; + protected final ContainersMonitor containersMonitor; + + public NMAllocationPreemptionPolicy( + ResourceThresholds preemptionThresholds, + ContainersMonitor containersMonitor) { + this.containersMonitor = containersMonitor; + this.overAllocationPreemptionThresholds = preemptionThresholds; + } + + /** + * Get the amount of resources to reclaim by preempting opportunistic + * containers when over-allocation is turned on. + * @return the amount of resources available to be reclaimed + */ + public abstract ResourceUtilization getResourcesToReclaim(); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java index 7a7c78e..9be7574 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java @@ -39,6 +39,13 @@ ResourceUtilization getCurrentUtilization(); /** + * Get the total amount of resources allocated to running containers + * in terms of resource utilization. + * @return ResourceUtilization resource allocation + */ + ResourceUtilization getTotalAllocation(); + + /** * Get the amount of resources currently available to launch containers. * @return Resource resources available to launch containers */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java new file mode 100644 index 0000000..299f028 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; + +/** + * An implementation of NMAllocationPreemptionPolicy based on the snapshot of + * the latest containers utilization to determine how much resources needs to + * be reclaimed by preempting opportunistic containers when over-allocation is + * turned on. + */ +public class SnapshotBasedOverAllocationPreemptionPolicy + extends NMAllocationPreemptionPolicy { + private final int absoluteMemoryPreemptionThresholdMb; + private final float cpuPreemptionThreshold; + private final int maxTimesCpuOverPreemption; + private int timesCpuOverPreemption; + + public SnapshotBasedOverAllocationPreemptionPolicy( + ResourceThresholds preemptionThresholds, + int timesCpuOverPreemptionThreshold, + ContainersMonitor containersMonitor) { + super(preemptionThresholds, containersMonitor); + int memoryCapacityMb = (int) + (containersMonitor.getPmemAllocatedForContainers() / (1024 * 1024)); + this.absoluteMemoryPreemptionThresholdMb = (int) + (preemptionThresholds.getMemoryThreshold() * memoryCapacityMb); + this.cpuPreemptionThreshold = preemptionThresholds.getCpuThreshold(); + this.maxTimesCpuOverPreemption = timesCpuOverPreemptionThreshold; + } + + @Override + public ResourceUtilization getResourcesToReclaim() { + ResourceUtilization utilization = + containersMonitor.getContainersUtilization(true).getUtilization(); + + int memoryOverLimit = utilization.getPhysicalMemory() - + absoluteMemoryPreemptionThresholdMb; + float vcoreOverLimit = utilization.getCPU() - cpuPreemptionThreshold; + + if (vcoreOverLimit > 0) { + timesCpuOverPreemption++; + if (timesCpuOverPreemption > maxTimesCpuOverPreemption) { + timesCpuOverPreemption = 0; + } else { + // report no over limit for cpu if # of times CPU is over the preemption + // threshold is not greater the max number of times allowed + vcoreOverLimit = 0; + } + } else { + // reset the counter when cpu utilization goes under the preemption + // threshold before the max times allowed is reached + timesCpuOverPreemption = 0; + } + + // sanitize so that zero is returned if the utilization is below + // the preemption threshold + vcoreOverLimit = Math.max(0, vcoreOverLimit); + memoryOverLimit = Math.max(0, memoryOverLimit); + + return ResourceUtilization.newInstance(memoryOverLimit, 0, vcoreOverLimit); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 05e9dd0..013d817 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -27,6 +27,10 @@ import static org.mockito.Matchers.anyString; import org.apache.hadoop.yarn.api.records.ContainerSubState; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -534,4 +538,111 @@ public static ContainerId createContainerId(int cId, int aId) { ContainerId.newContainerId(appAttemptId, cId); return containerId; } + + /** + * A test implementation of container monitor that allows control of + * current resource utilization. + */ + protected static class ContainerMonitorForTest + extends ContainersMonitorImpl { + private static final int NM_CONTAINERS_VCORES = 4; + private static final int NM_CONTAINERS_MEMORY_MB = 2048; + + private ResourceUtilization containerResourceUsage = + ResourceUtilization.newInstance(0, 0, 0.0f); + + ContainerMonitorForTest(ContainerExecutor exec, + AsyncDispatcher dispatcher, Context context) { + super(exec, dispatcher, context); + } + + @Override + public long getPmemAllocatedForContainers() { + return NM_CONTAINERS_MEMORY_MB * 1024 * 1024L; + } + + @Override + public long getVmemAllocatedForContainers() { + float pmemRatio = getConfig().getFloat( + YarnConfiguration.NM_VMEM_PMEM_RATIO, + YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); + return (long) (pmemRatio * getPmemAllocatedForContainers()); + } + + @Override + public long getVCoresAllocatedForContainers() { + return NM_CONTAINERS_VCORES; + } + + @Override + public ContainersResourceUtilization getContainersUtilization( + boolean latest) { + return new ContainersMonitor.ContainersResourceUtilization( + containerResourceUsage, System.currentTimeMillis()); + } + + @Override + protected void checkOverAllocationPrerequisites() { + // do not check + } + + public void setContainerResourceUsage( + ResourceUtilization containerResourceUsage) { + this.containerResourceUsage = containerResourceUsage; + } + } + + /** + * A container manager that allows all events on its internal event queue + * to be drained and processed. + */ + protected static class DrainableContainerManager + extends ContainerManagerImpl { + + private final String user; + + public DrainableContainerManager( + Context context, ContainerExecutor exec, + DeletionService deletionContext, + NodeStatusUpdater nodeStatusUpdater, + NodeManagerMetrics metrics, + LocalDirsHandlerService dirsHandler, String user) { + super(context, exec, deletionContext, + nodeStatusUpdater, metrics, dirsHandler); + this.user = user; + } + + @Override + protected UserGroupInformation getRemoteUgi() throws YarnException { + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(appAttemptId.toString()); + ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context + .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey() + .getKeyId())); + return ugi; + } + + @Override + protected AsyncDispatcher createDispatcher() { + return new DrainDispatcher(); + } + + @Override + protected ContainersMonitor createContainersMonitor( + ContainerExecutor exec) { + return new ContainerMonitorForTest(exec, dispatcher, context); + } + + public void checkNodeResourceUtilization() { + ((ContainerMonitorForTest) getContainersMonitor()).checkUtilization(); + drainAsyncEvents(); + } + + public void drainAsyncEvents() { + ((DrainDispatcher) dispatcher).await(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java index 70066c6..1bd0100 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java @@ -30,7 +30,6 @@ import com.google.common.base.Supplier; import org.apache.hadoop.fs.UnsupportedFileSystemException; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse; @@ -38,18 +37,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerSubState; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ConfigurationException; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener; @@ -65,8 +61,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; @@ -131,47 +125,8 @@ public void postTransition(ContainerImpl op, @Override protected ContainerManagerImpl createContainerManager( DeletionService delSrvc) { - return new ContainerManagerImpl(context, exec, delSrvc, - nodeStatusUpdater, metrics, dirsHandler) { - - @Override - protected UserGroupInformation getRemoteUgi() throws YarnException { - ApplicationId appId = ApplicationId.newInstance(0, 0); - ApplicationAttemptId appAttemptId = - ApplicationAttemptId.newInstance(appId, 1); - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(appAttemptId.toString()); - ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context - .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey() - .getKeyId())); - return ugi; - } - - @Override - protected ContainersMonitor createContainersMonitor( - ContainerExecutor exec) { - return new ContainersMonitorImpl(exec, dispatcher, this.context) { - // Define resources available for containers to be executed. - @Override - public long getPmemAllocatedForContainers() { - return 2048 * 1024 * 1024L; - } - - @Override - public long getVmemAllocatedForContainers() { - float pmemRatio = getConfig().getFloat( - YarnConfiguration.NM_VMEM_PMEM_RATIO, - YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); - return (long) (pmemRatio * getPmemAllocatedForContainers()); - } - - @Override - public long getVCoresAllocatedForContainers() { - return 4; - } - }; - } - }; + return new DrainableContainerManager(context, exec, delSrvc, + nodeStatusUpdater, metrics, dirsHandler, user); } @Override @@ -393,6 +348,33 @@ public void testStartAndQueueMultipleContainers() throws Exception { containerScheduler.getNumQueuedGuaranteedContainers()); Assert.assertEquals(2, containerScheduler.getNumQueuedOpportunisticContainers()); + + // check if OPPORTUNISTIC containers stay queued when the node + // utilization is low + ((ContainerMonitorForTest) containerManager.getContainersMonitor()) + .setContainerResourceUsage( + ResourceUtilization.newInstance(100, 0, 0.2f)); + ((DrainableContainerManager) containerManager) + .checkNodeResourceUtilization(); + + containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + if (status.getContainerId().equals(createContainerId(0))) { + Assert.assertEquals(ContainerSubState.RUNNING, + status.getContainerSubState()); + } else { + Assert.assertEquals(ContainerSubState.SCHEDULED, + status.getContainerSubState()); + } + } + containerScheduler = containerManager.getContainerScheduler(); + // Ensure two containers are properly queued. + Assert.assertEquals(2, containerScheduler.getNumQueuedContainers()); + Assert.assertEquals(0, + containerScheduler.getNumQueuedGuaranteedContainers()); + Assert.assertEquals(2, + containerScheduler.getNumQueuedOpportunisticContainers()); } /** @@ -475,6 +457,70 @@ public void testStartOpportunistcsWhenOppQueueIsFull() throws Exception { containerScheduler.getNumQueuedOpportunisticContainers()); } + @Test + public void testNoOpportunisticContainerPreemptionUponHighUtilization() + throws Exception { + containerManager.start(); + + List list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + recordFactory.newRecordInstance(ContainerLaunchContext.class), + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(1024, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + recordFactory.newRecordInstance(ContainerLaunchContext.class), + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(1024, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + BaseContainerManagerTest.waitForContainerState(containerManager, + createContainerId(0), + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING); + BaseContainerManagerTest.waitForContainerState(containerManager, + createContainerId(1), + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING); + + // Ensure all containers are running. + List statList = new ArrayList(); + for (int i = 0; i < 2; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + status.getState()); + } + + // check if OPPORTUNISTIC containers stay running when the node + // utilization is high + ((ContainerMonitorForTest) containerManager.getContainersMonitor()) + .setContainerResourceUsage( + ResourceUtilization.newInstance(2048, 0, 1.0f)); + ((DrainableContainerManager) containerManager) + .checkNodeResourceUtilization(); + + containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + status.getState()); + } + } + /** * Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources * requests by each container as such that only one can run in parallel. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java index 183d868..3bb1e6d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java @@ -20,12 +20,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -36,11 +33,8 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.ConfigurationException; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; @@ -53,8 +47,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; @@ -78,8 +70,6 @@ public class TestContainerSchedulerWithOverAllocation extends BaseContainerManagerTest { private static final int NM_OPPORTUNISTIC_QUEUE_LIMIT = 3; - private static final int NM_CONTAINERS_VCORES = 4; - private static final int NM_CONTAINERS_MEMORY_MB = 2048; static { LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class); @@ -115,6 +105,11 @@ public void setup() throws IOException { conf.setFloat( YarnConfiguration.NM_OVERALLOCATION_MEMORY_UTILIZATION_THRESHOLD, 0.75f); + conf.setFloat(YarnConfiguration.NM_OVERALLOCATION_CPU_PREEMPTION_THRESHOLD, + 0.8f); + conf.setFloat( + YarnConfiguration.NM_OVERALLOCATION_MEMORY_PREEMPTION_THRESHOLD, 0.8f); + conf.setInt(YarnConfiguration.NM_OVERALLOCATION_PREEMPTION_CPU_COUNT, 2); // disable the monitor thread in ContainersMonitor to allow control over // when opportunistic containers are launched with over-allocation conf.setBoolean(YarnConfiguration.NM_CONTAINER_MONITOR_ENABLED, false); @@ -193,9 +188,7 @@ public void testStartOppContainersWithPartialOverallocationLowUtilization() createStartContainerRequest(2, BuilderUtils.newResource(512, 1), false)) )); - - ((LongRunningContainerSimulatingContainersManager) containerManager) - .drainAsyncEvents(); + ((DrainableContainerManager) containerManager).drainAsyncEvents(); // this container is not expected to be started immediately because // opportunistic containers cannot be started if the node would be @@ -212,8 +205,8 @@ public void testStartOppContainersWithPartialOverallocationLowUtilization() }); // try to start opportunistic containers out of band. - ((LongRunningContainerSimulatingContainersManager) containerManager) - .startContainersOutOfBandUponLowUtilization(); + ((DrainableContainerManager) containerManager) + .checkNodeResourceUtilization(); // this container is expected to be started immediately because there // are (memory: 1024, vcore: 0.625) available based on over-allocation @@ -273,11 +266,8 @@ public void testQueueOppContainerWithPartialOverallocationHighUtilization() // try to start opportunistic containers out of band because they can // not be launched at container scheduler event if the node would be // over-allocated. - ((LongRunningContainerSimulatingContainersManager) containerManager) - .startContainersOutOfBandUponLowUtilization(); - - ((LongRunningContainerSimulatingContainersManager) containerManager) - .drainAsyncEvents(); + ((DrainableContainerManager) containerManager) + .checkNodeResourceUtilization(); // this container will not start immediately because there is not // enough resource available at the moment either in terms of @@ -333,9 +323,7 @@ public void testStartOppContainersWithOverallocationLowUtilization() createStartContainerRequest(2, BuilderUtils.newResource(512, 1), false)) )); - - ((LongRunningContainerSimulatingContainersManager) containerManager) - .drainAsyncEvents(); + ((DrainableContainerManager) containerManager).drainAsyncEvents(); // this container is not expected to be started immediately because // opportunistic containers cannot be started if the node would be @@ -352,8 +340,8 @@ public void testStartOppContainersWithOverallocationLowUtilization() }); // try to start opportunistic containers out of band. - ((LongRunningContainerSimulatingContainersManager) containerManager) - .startContainersOutOfBandUponLowUtilization(); + ((DrainableContainerManager) containerManager) + .checkNodeResourceUtilization(); // this container is expected to be started because there is resources // available because the actual utilization is very low @@ -410,9 +398,7 @@ public void testQueueOppContainersWithFullUtilization() throws Exception { } containerManager.startContainers( StartContainersRequest.newInstance(moreContainerRequests)); - - ((LongRunningContainerSimulatingContainersManager) containerManager) - .drainAsyncEvents(); + ((DrainableContainerManager) containerManager).drainAsyncEvents(); // All OPPORTUNISTIC containers but the last one should be queued. // The last OPPORTUNISTIC container to launch should be killed. @@ -639,8 +625,7 @@ public void testKillNoOppContainersWithOverallocationLowUtilization() } } )); - ((LongRunningContainerSimulatingContainersManager) containerManager) - .drainAsyncEvents(); + ((DrainableContainerManager) containerManager).drainAsyncEvents(); // Two OPPORTUNISTIC containers are expected to start with the // unallocated resources, but one will be queued because no @@ -655,8 +640,8 @@ public void testKillNoOppContainersWithOverallocationLowUtilization() // try to start the opportunistic container out of band because it can // not be launched at container scheduler event if the node would be // over-allocated. - ((LongRunningContainerSimulatingContainersManager) containerManager) - .startContainersOutOfBandUponLowUtilization(); + ((DrainableContainerManager) containerManager) + .checkNodeResourceUtilization(); // now the queued opportunistic container should also start BaseContainerManagerTest.waitForContainerSubState(containerManager, @@ -690,83 +675,6 @@ public void testKillNoOppContainersWithOverallocationLowUtilization() } /** - * Start four OPPORTUNISTIC containers which in aggregates exceeds the - * capacity of the node. The real resource utilization of the first two - * OPPORTUNISTIC containers are high whereas that of the latter two are - * almost zero. Then try to start a GUARANTEED container. The GUARANTEED - * container will eventually start running after preempting the third - * and fourth OPPORTUNISTIC container (which releases no resources) and - * then the second OPPORTUNISTIC container. - */ - public void - testKillOppContainersConservativelyWithOverallocationHighUtilization() - throws Exception { - containerManager.start(); - - containerManager.startContainers(StartContainersRequest.newInstance( - new ArrayList() { - { - add(createStartContainerRequest(0, - BuilderUtils.newResource(1024, 1), false)); - add(createStartContainerRequest(1, - BuilderUtils.newResource(1024, 1), false)); - add(createStartContainerRequest(2, - BuilderUtils.newResource(512, 1), false)); - add(createStartContainerRequest(3, - BuilderUtils.newResource(1024, 1), false)); - } - } - )); - // All four GUARANTEED containers are all expected to start - // because the containers utilization is low (0 at the point) - BaseContainerManagerTest.waitForContainerSubState(containerManager, - createContainerId(0), ContainerSubState.RUNNING); - BaseContainerManagerTest.waitForContainerSubState(containerManager, - createContainerId(1), ContainerSubState.RUNNING); - BaseContainerManagerTest.waitForContainerSubState(containerManager, - createContainerId(2), ContainerSubState.RUNNING); - BaseContainerManagerTest.waitForContainerSubState(containerManager, - createContainerId(3), ContainerSubState.RUNNING); - - // the containers utilization is at the overallocation threshold - setContainerResourceUtilization( - ResourceUtilization.newInstance(1536, 0, 1.0f/2)); - - // try to start a GUARANTEED container when there's nothing left unallocated - containerManager.startContainers(StartContainersRequest.newInstance( - Collections.singletonList( - createStartContainerRequest(4, - BuilderUtils.newResource(1024, 1), true)) - )); - - BaseContainerManagerTest.waitForContainerSubState(containerManager, - createContainerId(4), ContainerSubState.RUNNING); - GetContainerStatusesRequest statRequest = GetContainerStatusesRequest. - newInstance(new ArrayList() { - { - add(createContainerId(0)); - add(createContainerId(1)); - add(createContainerId(2)); - add(createContainerId(3)); - add(createContainerId(4)); - } - }); - List containerStatuses = containerManager - .getContainerStatuses(statRequest).getContainerStatuses(); - for (ContainerStatus status : containerStatuses) { - if (status.getContainerId().equals(createContainerId(0)) || - status.getContainerId().equals(createContainerId(4))) { - Assert.assertEquals( - ContainerSubState.RUNNING, status.getContainerSubState()); - } else { - Assert.assertTrue(status.getDiagnostics().contains( - "Container Killed to make room for Guaranteed Container")); - } - System.out.println("\nStatus : [" + status + "]\n"); - } - } - - /** * Start two OPPORTUNISTIC containers followed by one GUARANTEED container, * which in aggregate exceeds the capacity of the node. The first two * OPPORTUNISTIC containers use almost no resources whereas the GUARANTEED @@ -831,8 +739,7 @@ public void testStartOppContainersUponContainerCompletion() throws Exception { BaseContainerManagerTest.waitForContainerSubState(containerManager, createContainerId(2), ContainerSubState.DONE); - ((LongRunningContainerSimulatingContainersManager) containerManager) - .drainAsyncEvents(); + ((DrainableContainerManager) containerManager).drainAsyncEvents(); // only one OPPORTUNISTIC container is start because no over-allocation // is allowed to start OPPORTUNISTIC containers at container finish event. @@ -854,10 +761,8 @@ public void testStartOppContainersUponContainerCompletion() throws Exception { // now try to start the OPPORTUNISTIC container that was queued because // we don't start OPPORTUNISTIC containers at container finish event if // the node would be over-allocated - ((LongRunningContainerSimulatingContainersManager) containerManager) - .startContainersOutOfBandUponLowUtilization(); - ((LongRunningContainerSimulatingContainersManager) containerManager) - .drainAsyncEvents(); + ((DrainableContainerManager) containerManager) + .checkNodeResourceUtilization(); BaseContainerManagerTest.waitForContainerSubState(containerManager, createContainerId(4), ContainerSubState.RUNNING); verifyContainerStatuses(new HashMap() { @@ -913,8 +818,8 @@ public void testStartOpportunisticContainersOutOfBand() throws Exception { ResourceUtilization.newInstance(1536, 0, 1.0f/2)); // try to start opportunistic containers out of band. - ((LongRunningContainerSimulatingContainersManager)containerManager) - .startContainersOutOfBandUponLowUtilization(); + ((DrainableContainerManager) containerManager) + .checkNodeResourceUtilization(); // no containers in queue are expected to be launched because the // containers utilization is not below the over-allocation threshold @@ -934,8 +839,8 @@ public void testStartOpportunisticContainersOutOfBand() throws Exception { setContainerResourceUtilization( ResourceUtilization.newInstance(512, 0, 1.0f/8)); - ((LongRunningContainerSimulatingContainersManager)containerManager) - .startContainersOutOfBandUponLowUtilization(); + ((DrainableContainerManager) containerManager) + .checkNodeResourceUtilization(); // the two OPPORTUNISTIC containers are expected to be launched BaseContainerManagerTest.waitForContainerSubState(containerManager, @@ -952,11 +857,270 @@ public void testStartOpportunisticContainersOutOfBand() throws Exception { }); } + /** + * Start a GUARANTEED container, an OPPORTUNISTIC container, a GUARANTEED + * container and another OPPORTUNISTIC container in order. When the node + * memory utilization is over its preemption threshold, the two OPPORTUNISTIC + * containers should be killed. + */ + @Test + public void testPreemptOpportunisticContainersUponHighMemoryUtilization() + throws Exception { + containerManager.start(); + + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), true)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(512, 1), false)); + add(createStartContainerRequest(2, + BuilderUtils.newResource(1024, 1), true)); + add(createStartContainerRequest(3, + BuilderUtils.newResource(300, 1), false)); + } + } + )); + ((DrainableContainerManager) containerManager).drainAsyncEvents(); + + // the first three containers are all expected to start + // because the containers utilization is low (0 at the point) + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.RUNNING); + + // try to start the second opportunistic containers out of band. + ((DrainableContainerManager) containerManager) + .checkNodeResourceUtilization(); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(3), ContainerSubState.RUNNING); + + // the containers utilization is over the overallocation threshold (fully) + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, 0.5f)); + ((DrainableContainerManager) containerManager) + .checkNodeResourceUtilization(); + + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.DONE); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(3), ContainerSubState.DONE); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.DONE); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.DONE); + } + }); + + } + + /** + * Start a GUARANTEED container followed by an OPPORTUNISTIC container, which + * in aggregates does not take more than the capacity of the node. + * When the node memory utilization is above the preemption threshold, the + * OPPORTUNISTIC container should not be killed because the node is not over- + * allocated. + */ + @Test + public void testNoPreemptionUponHighMemoryUtilizationButNoOverallocation() + throws Exception { + containerManager.start(); + + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), true)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(1024, 1), false)); + } + } + )); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + + // containers utilization is above the over-allocation threshold + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, 0.5f)); + ((DrainableContainerManager) containerManager) + .checkNodeResourceUtilization(); + + // no containers shall be preempted because there is no overallocation + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + } + }); + } + + /** + * Start a GUARANTEED container, an OPPORTUNISTIC container, a GUARANTEED + * container and another OPPORTUNISTIC container in order. When the node + * cpu utilization is over its preemption threshold a few times in a row, + * the two OPPORTUNISTIC containers should be killed one by one. + */ + @Test + public void testPreemptionUponHighCPUUtilization() throws Exception { + containerManager.start(); + + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(512, 2), true)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(512, 1), false)); + add(createStartContainerRequest(2, + BuilderUtils.newResource(512, 2), true)); + add(createStartContainerRequest(3, + BuilderUtils.newResource(512, 1), false)); + } + } + )); + ((DrainableContainerManager) containerManager).drainAsyncEvents(); + // the first three containers are expected to start + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.RUNNING); + + // try to start opportunistic containers out of band. + ((DrainableContainerManager) containerManager) + .checkNodeResourceUtilization(); + + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(3), ContainerSubState.RUNNING); + + final float fullCpuUtilization = 1.0f; + + // the containers CPU utilization is over the overallocation threshold + // for the first time + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, fullCpuUtilization)); + ((DrainableContainerManager) containerManager) + .checkNodeResourceUtilization(); + + // all containers should continue to be running because we don't + // preempt OPPORTUNISTIC containers right away + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.RUNNING); + } + }); + + // the containers CPU utilization is over the overallocation threshold + // for the second time + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, fullCpuUtilization)); + ((DrainableContainerManager) containerManager) + .checkNodeResourceUtilization(); + + // all containers should continue to be running because we don't preempt + // OPPORTUNISTIC containers when the cpu is over limit the second time + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.RUNNING); + } + }); + + // the containers CPU utilization is over the overallocation threshold + // for the third time + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, fullCpuUtilization)); + ((DrainableContainerManager) containerManager) + .checkNodeResourceUtilization(); + + // one OPPORTUNISTIC container should be killed because CPU utilization + // is over its preemption threshold three times consecutively + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(3), ContainerSubState.DONE); + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.DONE); + } + }); + + // again, the containers CPU utilization is over the overallocation + // threshold for the first time (the cpu over-limit count is reset + // everytime a preemption is triggered as a result) + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, fullCpuUtilization)); + ((DrainableContainerManager) containerManager) + .checkNodeResourceUtilization(); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.DONE); + } + }); + + // the containers CPU utilization is over the overallocation threshold + // for the second time + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, fullCpuUtilization)); + ((DrainableContainerManager) containerManager) + .checkNodeResourceUtilization(); + + // all containers should continue to be running because we don't preempt + // OPPORTUNISTIC containers when the cpu is over limit the second time + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.DONE); + } + }); + + // the containers CPU utilization is over the overallocation threshold + // for the third time + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, fullCpuUtilization)); + ((DrainableContainerManager) containerManager) + .checkNodeResourceUtilization(); + + // the other OPPORTUNISTIC container should be killed because CPU + // utilization is over its preemption threshold three times consecutively + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.DONE); + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.DONE); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.DONE); + } + }); + } + private void setContainerResourceUtilization(ResourceUtilization usage) { - ((ContainerMonitorForOverallocationTest) - containerManager.getContainersMonitor()) - .setContainerResourceUsage(usage); + ((ContainerMonitorForTest) containerManager.getContainersMonitor()) + .setContainerResourceUsage(usage); } private void allowContainerToSucceed(int containerId) { @@ -1004,9 +1168,7 @@ protected void verifyContainerStatuses( * container processes for testing purposes. */ private static class LongRunningContainerSimulatingContainersManager - extends ContainerManagerImpl { - - private final String user; + extends DrainableContainerManager { LongRunningContainerSimulatingContainersManager( Context context, ContainerExecutor exec, @@ -1014,27 +1176,8 @@ protected void verifyContainerStatuses( NodeStatusUpdater nodeStatusUpdater, NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler, String user) { - super(context, exec, deletionContext, - nodeStatusUpdater, metrics, dirsHandler); - this.user = user; - } - - @Override - protected UserGroupInformation getRemoteUgi() throws YarnException { - ApplicationId appId = ApplicationId.newInstance(0, 0); - ApplicationAttemptId appAttemptId = - ApplicationAttemptId.newInstance(appId, 1); - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(appAttemptId.toString()); - ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context - .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey() - .getKeyId())); - return ugi; - } - - @Override - protected AsyncDispatcher createDispatcher() { - return new DrainDispatcher(); + super(context, exec, deletionContext, + nodeStatusUpdater, metrics, dirsHandler, user); } /** @@ -1060,27 +1203,10 @@ protected String getContainerPid(Path pidFilePath) throws Exception { return "123"; } - }; } }; } - - @Override - protected ContainersMonitor createContainersMonitor( - ContainerExecutor exec) { - return new ContainerMonitorForOverallocationTest(exec, - dispatcher, context); - } - - public void startContainersOutOfBandUponLowUtilization() { - ((ContainerMonitorForOverallocationTest) getContainersMonitor()) - .attemptToStartContainersUponLowUtilization(); - } - - public void drainAsyncEvents() { - ((DrainDispatcher) dispatcher).await(); - } } /** @@ -1176,56 +1302,4 @@ int getContainerExitCode() { } } } - - /** - * A test implementation of container monitor that allows control of - * current resource utilization. - */ - private static class ContainerMonitorForOverallocationTest - extends ContainersMonitorImpl { - - private ResourceUtilization containerResourceUsage = - ResourceUtilization.newInstance(0, 0, 0.0f); - - ContainerMonitorForOverallocationTest(ContainerExecutor exec, - AsyncDispatcher dispatcher, Context context) { - super(exec, dispatcher, context); - } - - @Override - public long getPmemAllocatedForContainers() { - return NM_CONTAINERS_MEMORY_MB * 1024 * 1024L; - } - - @Override - public long getVmemAllocatedForContainers() { - float pmemRatio = getConfig().getFloat( - YarnConfiguration.NM_VMEM_PMEM_RATIO, - YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); - return (long) (pmemRatio * getPmemAllocatedForContainers()); - } - - @Override - public long getVCoresAllocatedForContainers() { - return NM_CONTAINERS_VCORES; - } - - @Override - public ContainersResourceUtilization getContainersUtilization( - boolean latest) { - return new ContainersMonitor.ContainersResourceUtilization( - containerResourceUsage, System.currentTimeMillis()); - } - - @Override - protected void checkOverAllocationPrerequisites() { - // do not check - } - - - public void setContainerResourceUsage( - ResourceUtilization containerResourceUsage) { - this.containerResourceUsage = containerResourceUsage; - } - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestSnapshotBasedOverAllocationPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestSnapshotBasedOverAllocationPreemptionPolicy.java new file mode 100644 index 0000000..2be4d2f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestSnapshotBasedOverAllocationPreemptionPolicy.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link SnapshotBasedOverAllocationPreemptionPolicy}. + */ +public class TestSnapshotBasedOverAllocationPreemptionPolicy { + private final static ResourceThresholds PREEMPTION_THRESHOLDS = + ResourceThresholds.newInstance(0.75f, 0.75f); + private final static int MAX_CPU_OVER_PREEMPTION_THRESHOLDS = 2; + + private final ContainersMonitor containersMonitor = + mock(ContainersMonitor.class); + + @Before + public void setUp() { + when(containersMonitor.getPmemAllocatedForContainers()). + thenReturn(2048 * 1024 * 1024L); + } + + /** + * The memory utilization goes above its preemption threshold. + */ + @Test + public void testMemoryOverPreemptionThreshold() { + SnapshotBasedOverAllocationPreemptionPolicy preemptionPolicy = + new SnapshotBasedOverAllocationPreemptionPolicy(PREEMPTION_THRESHOLDS, + MAX_CPU_OVER_PREEMPTION_THRESHOLDS, containersMonitor); + + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(2000, 0, 0.5f), + System.currentTimeMillis())); + + Assert.assertEquals( + ResourceUtilization.newInstance(464, 0, 0f), + preemptionPolicy.getResourcesToReclaim()); + } + + /** + * The CPU utilization goes above its preemption threshold. + */ + @Test + public void testCpuOverPreemptionThreshold() { + SnapshotBasedOverAllocationPreemptionPolicy preemptionPolicy = + new SnapshotBasedOverAllocationPreemptionPolicy(PREEMPTION_THRESHOLDS, + MAX_CPU_OVER_PREEMPTION_THRESHOLDS, containersMonitor); + + // CPU utilization is over the preemption threshold for the first time + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(1000, 0, 1.0f), + System.currentTimeMillis())); + Assert.assertEquals( + ResourceUtilization.newInstance(0, 0, 0.0f), + preemptionPolicy.getResourcesToReclaim()); + + // CPU utilization goes below the preemption threshold + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(1000, 0, 0.5f), + System.currentTimeMillis())); + Assert.assertEquals( + ResourceUtilization.newInstance(0, 0, 0.0f), + preemptionPolicy.getResourcesToReclaim()); + + // CPU utilization is over the preemption threshold for the first time again + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(1000, 0, 1.0f), + System.currentTimeMillis())); + Assert.assertEquals( + ResourceUtilization.newInstance(0, 0, 0.0f), + preemptionPolicy.getResourcesToReclaim()); + + // CPU utilization is over the preemption threshold for the second time + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(1000, 0, 1.0f), + System.currentTimeMillis())); + Assert.assertEquals( + ResourceUtilization.newInstance(0, 0, 0.0f), + preemptionPolicy.getResourcesToReclaim()); + + // CPU utilization is over the preemption threshold for the third time + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(1000, 0, 1.0f), + System.currentTimeMillis())); + Assert.assertEquals( + ResourceUtilization.newInstance(0, 0, 0.25f), + preemptionPolicy.getResourcesToReclaim()); + } + + /** + * Both memory and CPU utilization go over their preemption thresholds + * respectively. + */ + @Test + public void testMemoryCpuOverPreemptionThreshold() { + SnapshotBasedOverAllocationPreemptionPolicy preemptionPolicy = + new SnapshotBasedOverAllocationPreemptionPolicy(PREEMPTION_THRESHOLDS, + MAX_CPU_OVER_PREEMPTION_THRESHOLDS, containersMonitor); + + // CPU utilization is over the preemption threshold for the first time + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(1000, 0, 1.0f), + System.currentTimeMillis())); + Assert.assertEquals( + ResourceUtilization.newInstance(0, 0, 0.0f), + preemptionPolicy.getResourcesToReclaim()); + + // CPU utilization goes below the preemption threshold, but memory goes + // above the preemption threshold + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(2000, 0, 0.5f), + System.currentTimeMillis())); + Assert.assertEquals( + ResourceUtilization.newInstance(464, 0, 0.0f), + preemptionPolicy.getResourcesToReclaim()); + + // CPU utilization is over the preemption threshold for the first time again + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(1000, 0, 1.0f), + System.currentTimeMillis())); + Assert.assertEquals( + ResourceUtilization.newInstance(0, 0, 0.0f), + preemptionPolicy.getResourcesToReclaim()); + + // CPU utilization is over the preemption threshold for the second time + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(1200, 0, 1.0f), + System.currentTimeMillis())); + Assert.assertEquals( + ResourceUtilization.newInstance(0, 0, 0.0f), + preemptionPolicy.getResourcesToReclaim()); + + // CPU utilization is over the preemption threshold for the third time + // and the memory utilization is over its preemption threshold as well + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(2000, 0, 1.0f), + System.currentTimeMillis())); + Assert.assertEquals( + ResourceUtilization.newInstance(464, 0, 0.25f), + preemptionPolicy.getResourcesToReclaim()); + } + + /** + * Both memory and CPU utilization is under their preemption thresholds. + */ + @Test + public void testBothMemoryAndCpuUnderPreemptionThreshold() { + SnapshotBasedOverAllocationPreemptionPolicy preemptionPolicy = + new SnapshotBasedOverAllocationPreemptionPolicy(PREEMPTION_THRESHOLDS, + MAX_CPU_OVER_PREEMPTION_THRESHOLDS, containersMonitor); + + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(1500, 0, 0.5f), + System.currentTimeMillis())); + + Assert.assertEquals( + ResourceUtilization.newInstance(0, 0, 0f), + preemptionPolicy.getResourcesToReclaim()); + } +}