diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 068056d..3504662 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -20,14 +20,18 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupElasticMemoryController; import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerOverallocationPreemptionEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPreemptionPolicy; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.SnapshotBasedOverAllocationPolicy; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.SnapshotBasedOverAllocationPreemptionPolicy; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,6 +123,7 @@ private NMAllocationPolicy overAllocationPolicy; private ResourceThresholds overAllocationPreemptionThresholds; private int overAlloctionPreemptionCpuCount = -1; + private NMAllocationPreemptionPolicy overAllocationPreemptionPolicy; private volatile boolean stopped = false; @@ -373,6 +378,9 @@ private void initializeOverAllocation(Configuration conf) { this.overAllocationPolicy = createOverAllocationPolicy(resourceThresholds); + this.overAllocationPreemptionPolicy = createOverAllocationPreemptionPolicy( + overAllocationPreemptionThresholds, overAlloctionPreemptionCpuCount); + LOG.info("NodeManager oversubscription enabled with overallocation " + "thresholds (memory:" + overAllocationMemoryUtilizationThreshold + ", CPU:" + overAllocationCpuUtilizationThreshold + ") and preemption" + @@ -385,6 +393,12 @@ protected NMAllocationPolicy createOverAllocationPolicy( return new SnapshotBasedOverAllocationPolicy(resourceThresholds, this); } + private NMAllocationPreemptionPolicy createOverAllocationPreemptionPolicy( + ResourceThresholds resourceThresholds, int maxTimesCpuOverLimit) { + return new SnapshotBasedOverAllocationPreemptionPolicy( + resourceThresholds, maxTimesCpuOverLimit, this); + } + private boolean isResourceCalculatorAvailable() { if (resourceCalculatorPlugin == null) { LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + this @@ -671,9 +685,7 @@ public void run() { setLatestContainersUtilization(trackedContainersUtilization); // check opportunity to start containers if over-allocation is on - if (context.isOverAllocationEnabled()) { - attemptToStartContainersUponLowUtilization(); - } + checkUtilization(); // Publish the container utilization metrics to node manager // metrics system. @@ -1054,13 +1066,25 @@ public NMAllocationPolicy getContainerOverAllocationPolicy() { return overAllocationPolicy; } + public NMAllocationPreemptionPolicy getOverAllocationPreemptionPolicy() { + return overAllocationPreemptionPolicy; + } + private void setLatestContainersUtilization(ResourceUtilization utilization) { this.latestContainersUtilization = new ContainersResourceUtilization( - utilization, System.currentTimeMillis()); + utilization, Time.now()); } @VisibleForTesting - public void attemptToStartContainersUponLowUtilization() { + public boolean checkUtilization() { + if (context.isOverAllocationEnabled()) { + return checkLowUtilization() || checkHighUtilization(); + } + return false; + } + + private boolean checkLowUtilization() { + boolean opportunisticContainersToStart = false; if (getContainerOverAllocationPolicy() != null) { Resource available = getContainerOverAllocationPolicy() .getAvailableResources(); @@ -1069,8 +1093,28 @@ public void attemptToStartContainersUponLowUtilization() { eventDispatcher.getEventHandler().handle( new ContainerSchedulerEvent(null, ContainerSchedulerEventType.SCHEDULE_CONTAINERS)); + opportunisticContainersToStart = true; + LOG.info("Node utilization is below its allocation threshold. " + + "Inform container scheduler to launch opportunistic containers."); } } + return opportunisticContainersToStart; + } + + public boolean checkHighUtilization() { + ResourceUtilization overLimit = getOverAllocationPreemptionPolicy() + .getResourcesToReclaim(); + + boolean opportunisticContainersToPreempt = false; + + if (overLimit.getPhysicalMemory() > 0 || overLimit.getCPU() > 0) { + opportunisticContainersToPreempt = true; + eventDispatcher.getEventHandler().handle( + new ContainerSchedulerOverallocationPreemptionEvent(overLimit)); + LOG.info("Node utilization is over the preemption threshold. " + + "Inform container scheduler to reclaim " + overLimit); + } + return opportunisticContainersToPreempt; } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java index a3e19eb..7840fef 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java @@ -53,6 +53,11 @@ */ @Override public ResourceUtilization getCurrentUtilization() { + return getTotalAllocation(); + } + + @Override + public ResourceUtilization getTotalAllocation() { return this.containersAllocation; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index 304c2b8..947c096 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -219,12 +219,60 @@ public void handle(ContainerSchedulerEvent event) { // node utilization is low. startOpportunisticContainers(utilizationTracker.getAvailableResources()); break; + case PREEMPT_CONTAINERS: + if (event instanceof ContainerSchedulerOverallocationPreemptionEvent) { + preemptOpportunisticContainers( + (ContainerSchedulerOverallocationPreemptionEvent) event); + } else { + LOG.error( + "Unknown event type on Preempt containers: {}", event.getType()); + } + break; default: - LOG.error("Unknown event arrived at ContainerScheduler: " - + event.toString()); + LOG.error("Unknown event arrived at ContainerScheduler: {}", event); } } + private void preemptOpportunisticContainers( + ContainerSchedulerOverallocationPreemptionEvent event) { + ResourceUtilization resourcesToReclaim = + getResourcesToReclaim(event.getResourcesOverPreemptionThresholds()); + + List oppContainersToReclaim = + pickOpportunisticContainersToReclaimResources( + resourcesToReclaim); + + killOpportunisticContainers(oppContainersToReclaim); + } + + /** + * Get the amount of resources that need to be reclaimed by preempting + * OPPORTUNISTIC containers considering the amount of resources that + * are over the preemption thresholds and over the capacity of the node. + * When the node is not being over-allocated, its resource utilization + * can safely go to 100% without any OPPORTUNISTIC containers being killed. + */ + private ResourceUtilization getResourcesToReclaim( + ResourceUtilization resourcesOverPreemptionThresholds) { + ResourceUtilization totalAllocation = ResourceUtilization.newInstance( + utilizationTracker.getTotalAllocation()); + getContainersMonitor().subtractNodeResourcesFromResourceUtilization( + totalAllocation); + ResourceUtilization overAllocatedResources = + ResourceUtilization.newInstance( + Math.max(0, totalAllocation.getPhysicalMemory()), + Math.max(0, totalAllocation.getVirtualMemory()), + Math.max(0, totalAllocation.getCPU())); + + return ResourceUtilization.newInstance( + Math.min(overAllocatedResources.getPhysicalMemory(), + resourcesOverPreemptionThresholds.getPhysicalMemory()), + Math.min(overAllocatedResources.getVirtualMemory(), + resourcesOverPreemptionThresholds.getVirtualMemory()), + Math.min(overAllocatedResources.getCPU(), + resourcesOverPreemptionThresholds.getCPU())); + } + /** * We assume that the ContainerManager has already figured out what kind * of update this is. @@ -593,8 +641,9 @@ protected void scheduleContainer(Container container) { @SuppressWarnings("unchecked") private void reclaimOpportunisticContainerResources() { + ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(); List extraOppContainersToReclaim = - pickOpportunisticContainersToReclaimResources(); + pickOpportunisticContainersToReclaimResources(resourcesToFreeUp); killOpportunisticContainers(extraOppContainersToReclaim); } @@ -634,12 +683,12 @@ private void startContainer(Container container) { container.sendLaunchEvent(); } - private List pickOpportunisticContainersToReclaimResources() { + private List pickOpportunisticContainersToReclaimResources( + ResourceUtilization resourcesToFreeUp) { // The opportunistic containers that need to be killed for the // given container to start. List extraOpportContainersToKill = new ArrayList<>(); // Track resources that need to be freed. - ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(); // Go over the running opportunistic containers. // Use a descending iterator to kill more recently started containers. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java index 9ad4f91..f76727d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java @@ -30,5 +30,7 @@ CONTAINER_PAUSED, RECOVERY_COMPLETED, // Producer: Containers Monitor when over-allocation is on - SCHEDULE_CONTAINERS + SCHEDULE_CONTAINERS, + // Producer: Containers Monitor when over-allocation is on + PREEMPT_CONTAINERS } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerOverallocationPreemptionEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerOverallocationPreemptionEvent.java new file mode 100644 index 0000000..547036a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerOverallocationPreemptionEvent.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; + +/** + * A {@link ContainerSchedulerEvent} generated by {@link ContainersMonitorImpl} + * when overallocation is turned on and the utilization goes over the + * proactive preemption thresholds. + */ +public class ContainerSchedulerOverallocationPreemptionEvent + extends ContainerSchedulerEvent { + private final ResourceUtilization resourcesOverPreemptionThresholds; + /** + * Create instance of Event. + * + * @param toFree resource to free up. + */ + public ContainerSchedulerOverallocationPreemptionEvent( + ResourceUtilization toFree) { + super(null, ContainerSchedulerEventType.PREEMPT_CONTAINERS); + this.resourcesOverPreemptionThresholds = toFree; + } + + public ResourceUtilization getResourcesOverPreemptionThresholds() { + return resourcesOverPreemptionThresholds; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPreemptionPolicy.java new file mode 100644 index 0000000..8d31acf --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPreemptionPolicy.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; + +/** + * Keeps track of containers utilization over time and determines how many + * resources need to be reclaimed by preempting opportunistic containers + * when over-allocation is turned on. + */ +public abstract class NMAllocationPreemptionPolicy { + protected final ResourceThresholds overAllocationPreemptionThresholds; + protected final ContainersMonitor containersMonitor; + + public NMAllocationPreemptionPolicy( + ResourceThresholds preemptionThresholds, + ContainersMonitor containersMonitor) { + this.containersMonitor = containersMonitor; + this.overAllocationPreemptionThresholds = preemptionThresholds; + } + + /** + * Get the amount of resources to reclaim by preempting opportunistic + * containers when over-allocation is turned on. + * @return the amount of resources to be reclaimed + */ + public abstract ResourceUtilization getResourcesToReclaim(); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java index 7a7c78e..9be7574 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java @@ -39,6 +39,13 @@ ResourceUtilization getCurrentUtilization(); /** + * Get the total amount of resources allocated to running containers + * in terms of resource utilization. + * @return ResourceUtilization resource allocation + */ + ResourceUtilization getTotalAllocation(); + + /** * Get the amount of resources currently available to launch containers. * @return Resource resources available to launch containers */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java new file mode 100644 index 0000000..b2f85db --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPreemptionPolicy.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; + +/** + * An implementation of {@link NMAllocationPreemptionPolicy} based on the + * snapshot of the latest containers utilization to determine how many + * resources need to be reclaimed by preempting opportunistic containers + * when over-allocation is turned on. + */ +public class SnapshotBasedOverAllocationPreemptionPolicy + extends NMAllocationPreemptionPolicy { + private final int absoluteMemoryPreemptionThresholdMb; + private final float cpuPreemptionThreshold; + private final int maxTimesCpuOverPreemption; + private int timesCpuOverPreemption; + + public SnapshotBasedOverAllocationPreemptionPolicy( + ResourceThresholds preemptionThresholds, + int timesCpuOverPreemptionThreshold, + ContainersMonitor containersMonitor) { + super(preemptionThresholds, containersMonitor); + int memoryCapacityMb = (int) + (containersMonitor.getPmemAllocatedForContainers() / (1024 * 1024)); + this.absoluteMemoryPreemptionThresholdMb = (int) + (preemptionThresholds.getMemoryThreshold() * memoryCapacityMb); + this.cpuPreemptionThreshold = preemptionThresholds.getCpuThreshold(); + this.maxTimesCpuOverPreemption = timesCpuOverPreemptionThreshold; + } + + @Override + public ResourceUtilization getResourcesToReclaim() { + ResourceUtilization utilization = + containersMonitor.getContainersUtilization(true).getUtilization(); + + int memoryOverLimit = utilization.getPhysicalMemory() - + absoluteMemoryPreemptionThresholdMb; + float vcoreOverLimit = utilization.getCPU() - cpuPreemptionThreshold; + + if (vcoreOverLimit > 0) { + timesCpuOverPreemption++; + if (timesCpuOverPreemption > maxTimesCpuOverPreemption) { + timesCpuOverPreemption = 0; + } else { + // report no over limit for cpu if # of times CPU is over the preemption + // threshold is not greater the max number of times allowed + vcoreOverLimit = 0; + } + } else { + // reset the counter when cpu utilization goes under the preemption + // threshold before the max times allowed is reached + timesCpuOverPreemption = 0; + } + + // sanitize so that zero is returned if the utilization is below + // the preemption threshold + vcoreOverLimit = Math.max(0, vcoreOverLimit); + memoryOverLimit = Math.max(0, memoryOverLimit); + + return ResourceUtilization.newInstance(memoryOverLimit, 0, vcoreOverLimit); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 05e9dd0..b514f97 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -26,7 +26,12 @@ import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ContainerSubState; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -534,4 +539,107 @@ public static ContainerId createContainerId(int cId, int aId) { ContainerId.newContainerId(appAttemptId, cId); return containerId; } + + /** + * A test implementation of {@link ContainersMonitor} that allows control of + * the current resource utilization. + */ + protected static class ContainerMonitorForTest + extends ContainersMonitorImpl { + private static final int NM_CONTAINERS_VCORES = 4; + private static final int NM_CONTAINERS_MEMORY_MB = 2048; + + private ResourceUtilization containerResourceUsage = + ResourceUtilization.newInstance(0, 0, 0.0f); + + ContainerMonitorForTest(ContainerExecutor exec, + AsyncDispatcher dispatcher, Context context) { + super(exec, dispatcher, context); + } + + @Override + public long getPmemAllocatedForContainers() { + return NM_CONTAINERS_MEMORY_MB * 1024 * 1024L; + } + + @Override + public long getVmemAllocatedForContainers() { + float pmemRatio = getConfig().getFloat( + YarnConfiguration.NM_VMEM_PMEM_RATIO, + YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); + return (long) (pmemRatio * getPmemAllocatedForContainers()); + } + + @Override + public long getVCoresAllocatedForContainers() { + return NM_CONTAINERS_VCORES; + } + + @Override + public ContainersResourceUtilization getContainersUtilization( + boolean latest) { + return new ContainersMonitor.ContainersResourceUtilization( + containerResourceUsage, Time.now()); + } + + @Override + protected void checkOverAllocationPrerequisites() { + // do not check + } + + public void setContainerResourceUsage( + ResourceUtilization containerResourceUsage) { + this.containerResourceUsage = containerResourceUsage; + } + } + + protected static class ContainerManagerForTest + extends ContainerManagerImpl { + + private final String user; + + public ContainerManagerForTest( + Context context, ContainerExecutor exec, + DeletionService deletionContext, + NodeStatusUpdater nodeStatusUpdater, + NodeManagerMetrics metrics, + LocalDirsHandlerService dirsHandler, String user) { + super(context, exec, deletionContext, + nodeStatusUpdater, metrics, dirsHandler); + this.user = user; + } + + @Override + protected UserGroupInformation getRemoteUgi() throws YarnException { + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(appAttemptId.toString()); + ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context + .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey() + .getKeyId())); + return ugi; + } + + @Override + protected AsyncDispatcher createDispatcher() { + return new DrainDispatcher(); + } + + @Override + protected ContainersMonitor createContainersMonitor( + ContainerExecutor exec) { + return new ContainerMonitorForTest(exec, dispatcher, context); + } + + public void checkNodeResourceUtilization() { + ((ContainerMonitorForTest) getContainersMonitor()).checkUtilization(); + drainAsyncEvents(); + } + + public void drainAsyncEvents() { + ((DrainDispatcher) dispatcher).await(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java index 70066c6..d5c09b8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java @@ -30,7 +30,6 @@ import com.google.common.base.Supplier; import org.apache.hadoop.fs.UnsupportedFileSystemException; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse; @@ -38,18 +37,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerSubState; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ConfigurationException; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener; @@ -65,8 +61,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; @@ -131,47 +125,8 @@ public void postTransition(ContainerImpl op, @Override protected ContainerManagerImpl createContainerManager( DeletionService delSrvc) { - return new ContainerManagerImpl(context, exec, delSrvc, - nodeStatusUpdater, metrics, dirsHandler) { - - @Override - protected UserGroupInformation getRemoteUgi() throws YarnException { - ApplicationId appId = ApplicationId.newInstance(0, 0); - ApplicationAttemptId appAttemptId = - ApplicationAttemptId.newInstance(appId, 1); - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(appAttemptId.toString()); - ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context - .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey() - .getKeyId())); - return ugi; - } - - @Override - protected ContainersMonitor createContainersMonitor( - ContainerExecutor exec) { - return new ContainersMonitorImpl(exec, dispatcher, this.context) { - // Define resources available for containers to be executed. - @Override - public long getPmemAllocatedForContainers() { - return 2048 * 1024 * 1024L; - } - - @Override - public long getVmemAllocatedForContainers() { - float pmemRatio = getConfig().getFloat( - YarnConfiguration.NM_VMEM_PMEM_RATIO, - YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); - return (long) (pmemRatio * getPmemAllocatedForContainers()); - } - - @Override - public long getVCoresAllocatedForContainers() { - return 4; - } - }; - } - }; + return new ContainerManagerForTest(context, exec, delSrvc, + nodeStatusUpdater, metrics, dirsHandler, user); } @Override @@ -393,6 +348,37 @@ public void testStartAndQueueMultipleContainers() throws Exception { containerScheduler.getNumQueuedGuaranteedContainers()); Assert.assertEquals(2, containerScheduler.getNumQueuedOpportunisticContainers()); + + // we have just one container that requested 2048 MB of memory and 1 vcore + // running, its resource utilization is zero. + // check the node resource utilization and the two OPPORTUNISTIC containers + // that are being queued should stay in the queue because over-allocation + // is turn off. + ((ContainerMonitorForTest) containerManager.getContainersMonitor()) + .setContainerResourceUsage( + ResourceUtilization.newInstance(0, 0, 0.0f)); + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); + + containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + ContainerId containerId0 = createContainerId(0); + for (ContainerStatus status : containerStatuses) { + if (status.getContainerId().equals(containerId0)) { + Assert.assertEquals(ContainerSubState.RUNNING, + status.getContainerSubState()); + } else { + Assert.assertEquals(ContainerSubState.SCHEDULED, + status.getContainerSubState()); + } + } + containerScheduler = containerManager.getContainerScheduler(); + // Ensure two containers are properly queued. + Assert.assertEquals(2, containerScheduler.getNumQueuedContainers()); + Assert.assertEquals(0, + containerScheduler.getNumQueuedGuaranteedContainers()); + Assert.assertEquals(2, + containerScheduler.getNumQueuedOpportunisticContainers()); } /** @@ -476,6 +462,91 @@ public void testStartOpportunistcsWhenOppQueueIsFull() throws Exception { } /** + * Start two OPPORTUNISTIC containers which together ask for all + * the allocations available on the node. When the node resource + * utilization goes over the preemption thresholds, neither of + * the containers should be preempted because there is no + * over-allocation at the moment and they can safely use up all + * the resources availabe on the node. + */ + @Test + public void testNoOpportunisticContainerPreemptionUponHighUtilization() + throws Exception { + containerManager.start(); + + // start two OPPORTUNISTIC containers that together takes up + // all allocations of the node. They two can be launched immediately + // because there is enough free allocation. When they uses up + // all their resource allocations, that is, the node is fully + // utilized, none of the OPPORTUNISTIC containers shall be killed + // because the node is not being over-allocated + List list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + recordFactory.newRecordInstance(ContainerLaunchContext.class), + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(1024, 2), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + recordFactory.newRecordInstance(ContainerLaunchContext.class), + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(1024, 2), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + // the two OPPORTUNISTIC containers shall be launched immediately + // because there is just enough allocation to launch them both. + BaseContainerManagerTest.waitForContainerState(containerManager, + createContainerId(0), + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING); + BaseContainerManagerTest.waitForContainerState(containerManager, + createContainerId(1), + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING); + + // Ensure all containers are running. + List statList = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + status.getState()); + } + + // we have two containers running, both of which are using all of + // their allocations. The node is being fully utilized in terms + // of both memory and CPU + ((ContainerMonitorForTest) containerManager.getContainersMonitor()) + .setContainerResourceUsage( + ResourceUtilization.newInstance(2048, 0, 1.0f)); + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); + + // the two running OPPORTUNISTIC containers shall continue to run + // because when the node is not be over-allocated, it is safe to + // let the containers use up all the resources, no OPPORTUNISTIC + // containers shall be preempted + containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + status.getState()); + } + } + + /** * Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources * requests by each container as such that only one can run in parallel. * Thus, the OPPORTUNISTIC container that started running, will be diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java index 183d868..55f0258 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java @@ -20,12 +20,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -36,11 +33,8 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.ConfigurationException; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; @@ -53,8 +47,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; @@ -78,8 +70,6 @@ public class TestContainerSchedulerWithOverAllocation extends BaseContainerManagerTest { private static final int NM_OPPORTUNISTIC_QUEUE_LIMIT = 3; - private static final int NM_CONTAINERS_VCORES = 4; - private static final int NM_CONTAINERS_MEMORY_MB = 2048; static { LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class); @@ -115,6 +105,11 @@ public void setup() throws IOException { conf.setFloat( YarnConfiguration.NM_OVERALLOCATION_MEMORY_UTILIZATION_THRESHOLD, 0.75f); + conf.setFloat(YarnConfiguration.NM_OVERALLOCATION_CPU_PREEMPTION_THRESHOLD, + 0.8f); + conf.setFloat( + YarnConfiguration.NM_OVERALLOCATION_MEMORY_PREEMPTION_THRESHOLD, 0.8f); + conf.setInt(YarnConfiguration.NM_OVERALLOCATION_PREEMPTION_CPU_COUNT, 2); // disable the monitor thread in ContainersMonitor to allow control over // when opportunistic containers are launched with over-allocation conf.setBoolean(YarnConfiguration.NM_CONTAINER_MONITOR_ENABLED, false); @@ -134,9 +129,9 @@ public void testStartMultipleContainersWithoutOverallocation() StartContainersRequest allRequests = StartContainersRequest.newInstance( new ArrayList() { { add(createStartContainerRequest(0, - BuilderUtils.newResource(1024, 1), false)); + BuilderUtils.newResource(1024, 1), ExecutionType.OPPORTUNISTIC)); add(createStartContainerRequest(1, - BuilderUtils.newResource(1024, 1), true)); + BuilderUtils.newResource(1024, 1), ExecutionType.GUARANTEED)); } } ); containerManager.startContainers(allRequests); @@ -171,9 +166,9 @@ public void testStartOppContainersWithPartialOverallocationLowUtilization() new ArrayList() { { add(createStartContainerRequest(0, - BuilderUtils.newResource(1024, 1), true)); + BuilderUtils.newResource(1024, 1), ExecutionType.GUARANTEED)); add(createStartContainerRequest(1, - BuilderUtils.newResource(824, 1), true)); + BuilderUtils.newResource(824, 1), ExecutionType.GUARANTEED)); } } )); @@ -191,11 +186,9 @@ public void testStartOppContainersWithPartialOverallocationLowUtilization() containerManager.startContainers(StartContainersRequest.newInstance( Collections.singletonList( createStartContainerRequest(2, - BuilderUtils.newResource(512, 1), false)) + BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC)) )); - - ((LongRunningContainerSimulatingContainersManager) containerManager) - .drainAsyncEvents(); + ((ContainerManagerForTest) containerManager).drainAsyncEvents(); // this container is not expected to be started immediately because // opportunistic containers cannot be started if the node would be @@ -212,8 +205,8 @@ public void testStartOppContainersWithPartialOverallocationLowUtilization() }); // try to start opportunistic containers out of band. - ((LongRunningContainerSimulatingContainersManager) containerManager) - .startContainersOutOfBandUponLowUtilization(); + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); // this container is expected to be started immediately because there // are (memory: 1024, vcore: 0.625) available based on over-allocation @@ -247,9 +240,9 @@ public void testQueueOppContainerWithPartialOverallocationHighUtilization() new ArrayList() { { add(createStartContainerRequest(0, - BuilderUtils.newResource(1024, 1), true)); + BuilderUtils.newResource(1024, 1), ExecutionType.GUARANTEED)); add(createStartContainerRequest(1, - BuilderUtils.newResource(824, 1), true)); + BuilderUtils.newResource(824, 1), ExecutionType.GUARANTEED)); } } )); @@ -267,17 +260,14 @@ public void testQueueOppContainerWithPartialOverallocationHighUtilization() containerManager.startContainers(StartContainersRequest.newInstance( Collections.singletonList( createStartContainerRequest(2, - BuilderUtils.newResource(512, 1), false)) + BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC)) )); // try to start opportunistic containers out of band because they can // not be launched at container scheduler event if the node would be // over-allocated. - ((LongRunningContainerSimulatingContainersManager) containerManager) - .startContainersOutOfBandUponLowUtilization(); - - ((LongRunningContainerSimulatingContainersManager) containerManager) - .drainAsyncEvents(); + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); // this container will not start immediately because there is not // enough resource available at the moment either in terms of @@ -312,9 +302,9 @@ public void testStartOppContainersWithOverallocationLowUtilization() new ArrayList() { { add(createStartContainerRequest(0, - BuilderUtils.newResource(1024, 1), true)); + BuilderUtils.newResource(1024, 1), ExecutionType.GUARANTEED)); add(createStartContainerRequest(1, - BuilderUtils.newResource(1024, 1), true)); + BuilderUtils.newResource(1024, 1), ExecutionType.GUARANTEED)); } } )); @@ -331,11 +321,9 @@ public void testStartOppContainersWithOverallocationLowUtilization() containerManager.startContainers(StartContainersRequest.newInstance( Collections.singletonList( createStartContainerRequest(2, - BuilderUtils.newResource(512, 1), false)) + BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC)) )); - - ((LongRunningContainerSimulatingContainersManager) containerManager) - .drainAsyncEvents(); + ((ContainerManagerForTest) containerManager).drainAsyncEvents(); // this container is not expected to be started immediately because // opportunistic containers cannot be started if the node would be @@ -352,8 +340,8 @@ public void testStartOppContainersWithOverallocationLowUtilization() }); // try to start opportunistic containers out of band. - ((LongRunningContainerSimulatingContainersManager) containerManager) - .startContainersOutOfBandUponLowUtilization(); + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); // this container is expected to be started because there is resources // available because the actual utilization is very low @@ -384,9 +372,9 @@ public void testQueueOppContainersWithFullUtilization() throws Exception { new ArrayList() { { add(createStartContainerRequest(0, - BuilderUtils.newResource(1024, 1), true)); + BuilderUtils.newResource(1024, 1), ExecutionType.GUARANTEED)); add(createStartContainerRequest(1, - BuilderUtils.newResource(1024, 1), true)); + BuilderUtils.newResource(1024, 1), ExecutionType.GUARANTEED)); } } )); @@ -406,13 +394,11 @@ public void testQueueOppContainersWithFullUtilization() throws Exception { for (int a = 0; a < NM_OPPORTUNISTIC_QUEUE_LIMIT + 1; a++) { moreContainerRequests.add( createStartContainerRequest(2 + a, - BuilderUtils.newResource(512, 1), false)); + BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC)); } containerManager.startContainers( StartContainersRequest.newInstance(moreContainerRequests)); - - ((LongRunningContainerSimulatingContainersManager) containerManager) - .drainAsyncEvents(); + ((ContainerManagerForTest) containerManager).drainAsyncEvents(); // All OPPORTUNISTIC containers but the last one should be queued. // The last OPPORTUNISTIC container to launch should be killed. @@ -454,9 +440,9 @@ public void testStartOppContainerWithHighUtilizationNoOverallocation() new ArrayList() { { add(createStartContainerRequest(0, - BuilderUtils.newResource(1200, 1), true)); + BuilderUtils.newResource(1200, 1), ExecutionType.GUARANTEED)); add(createStartContainerRequest(1, - BuilderUtils.newResource(400, 1), true)); + BuilderUtils.newResource(400, 1), ExecutionType.GUARANTEED)); } } )); @@ -473,7 +459,7 @@ public void testStartOppContainerWithHighUtilizationNoOverallocation() containerManager.startContainers(StartContainersRequest.newInstance( Collections.singletonList( createStartContainerRequest(2, - BuilderUtils.newResource(400, 1), false)) + BuilderUtils.newResource(400, 1), ExecutionType.OPPORTUNISTIC)) )); // the OPPORTUNISTIC container can be safely launched even though @@ -507,9 +493,11 @@ public void testKillNoOppContainersWithPartialOverallocationLowUtilization() new ArrayList() { { add(createStartContainerRequest(0, - BuilderUtils.newResource(1024, 1), false)); + BuilderUtils.newResource(1024, 1), + ExecutionType.OPPORTUNISTIC)); add(createStartContainerRequest(1, - BuilderUtils.newResource(824, 1), false)); + BuilderUtils.newResource(824, 1), + ExecutionType.OPPORTUNISTIC)); } } )); @@ -527,7 +515,7 @@ public void testKillNoOppContainersWithPartialOverallocationLowUtilization() containerManager.startContainers(StartContainersRequest.newInstance( Collections.singletonList( createStartContainerRequest(2, - BuilderUtils.newResource(512, 1), true)) + BuilderUtils.newResource(512, 1), ExecutionType.GUARANTEED)) )); // the GUARANTEED container is expected be launched immediately without @@ -561,9 +549,11 @@ public void testKillOppContainersWithPartialOverallocationHighUtilization() new ArrayList() { { add(createStartContainerRequest(0, - BuilderUtils.newResource(1024, 1), false)); + BuilderUtils.newResource(1024, 1), + ExecutionType.OPPORTUNISTIC)); add(createStartContainerRequest(1, - BuilderUtils.newResource(824, 1), false)); + BuilderUtils.newResource(824, 1), + ExecutionType.OPPORTUNISTIC)); } } )); @@ -581,7 +571,7 @@ public void testKillOppContainersWithPartialOverallocationHighUtilization() containerManager.startContainers(StartContainersRequest.newInstance( Collections.singletonList( createStartContainerRequest(2, - BuilderUtils.newResource(512, 1), true)) + BuilderUtils.newResource(512, 1), ExecutionType.GUARANTEED)) )); BaseContainerManagerTest.waitForContainerSubState(containerManager, @@ -631,16 +621,18 @@ public void testKillNoOppContainersWithOverallocationLowUtilization() new ArrayList() { { add(createStartContainerRequest(0, - BuilderUtils.newResource(1024, 1), false)); + BuilderUtils.newResource(1024, 1), + ExecutionType.OPPORTUNISTIC)); add(createStartContainerRequest(1, - BuilderUtils.newResource(1024, 1), false)); + BuilderUtils.newResource(1024, 1), + ExecutionType.OPPORTUNISTIC)); add(createStartContainerRequest(2, - BuilderUtils.newResource(1024, 1), false)); + BuilderUtils.newResource(1024, 1), + ExecutionType.OPPORTUNISTIC)); } } )); - ((LongRunningContainerSimulatingContainersManager) containerManager) - .drainAsyncEvents(); + ((ContainerManagerForTest) containerManager).drainAsyncEvents(); // Two OPPORTUNISTIC containers are expected to start with the // unallocated resources, but one will be queued because no @@ -655,8 +647,8 @@ public void testKillNoOppContainersWithOverallocationLowUtilization() // try to start the opportunistic container out of band because it can // not be launched at container scheduler event if the node would be // over-allocated. - ((LongRunningContainerSimulatingContainersManager) containerManager) - .startContainersOutOfBandUponLowUtilization(); + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); // now the queued opportunistic container should also start BaseContainerManagerTest.waitForContainerSubState(containerManager, @@ -671,7 +663,7 @@ public void testKillNoOppContainersWithOverallocationLowUtilization() containerManager.startContainers(StartContainersRequest.newInstance( Collections.singletonList( createStartContainerRequest(3, - BuilderUtils.newResource(512, 1), true)) + BuilderUtils.newResource(512, 1), ExecutionType.GUARANTEED)) )); // the GUARANTEED container is expected be launched immediately without @@ -690,83 +682,6 @@ public void testKillNoOppContainersWithOverallocationLowUtilization() } /** - * Start four OPPORTUNISTIC containers which in aggregates exceeds the - * capacity of the node. The real resource utilization of the first two - * OPPORTUNISTIC containers are high whereas that of the latter two are - * almost zero. Then try to start a GUARANTEED container. The GUARANTEED - * container will eventually start running after preempting the third - * and fourth OPPORTUNISTIC container (which releases no resources) and - * then the second OPPORTUNISTIC container. - */ - public void - testKillOppContainersConservativelyWithOverallocationHighUtilization() - throws Exception { - containerManager.start(); - - containerManager.startContainers(StartContainersRequest.newInstance( - new ArrayList() { - { - add(createStartContainerRequest(0, - BuilderUtils.newResource(1024, 1), false)); - add(createStartContainerRequest(1, - BuilderUtils.newResource(1024, 1), false)); - add(createStartContainerRequest(2, - BuilderUtils.newResource(512, 1), false)); - add(createStartContainerRequest(3, - BuilderUtils.newResource(1024, 1), false)); - } - } - )); - // All four GUARANTEED containers are all expected to start - // because the containers utilization is low (0 at the point) - BaseContainerManagerTest.waitForContainerSubState(containerManager, - createContainerId(0), ContainerSubState.RUNNING); - BaseContainerManagerTest.waitForContainerSubState(containerManager, - createContainerId(1), ContainerSubState.RUNNING); - BaseContainerManagerTest.waitForContainerSubState(containerManager, - createContainerId(2), ContainerSubState.RUNNING); - BaseContainerManagerTest.waitForContainerSubState(containerManager, - createContainerId(3), ContainerSubState.RUNNING); - - // the containers utilization is at the overallocation threshold - setContainerResourceUtilization( - ResourceUtilization.newInstance(1536, 0, 1.0f/2)); - - // try to start a GUARANTEED container when there's nothing left unallocated - containerManager.startContainers(StartContainersRequest.newInstance( - Collections.singletonList( - createStartContainerRequest(4, - BuilderUtils.newResource(1024, 1), true)) - )); - - BaseContainerManagerTest.waitForContainerSubState(containerManager, - createContainerId(4), ContainerSubState.RUNNING); - GetContainerStatusesRequest statRequest = GetContainerStatusesRequest. - newInstance(new ArrayList() { - { - add(createContainerId(0)); - add(createContainerId(1)); - add(createContainerId(2)); - add(createContainerId(3)); - add(createContainerId(4)); - } - }); - List containerStatuses = containerManager - .getContainerStatuses(statRequest).getContainerStatuses(); - for (ContainerStatus status : containerStatuses) { - if (status.getContainerId().equals(createContainerId(0)) || - status.getContainerId().equals(createContainerId(4))) { - Assert.assertEquals( - ContainerSubState.RUNNING, status.getContainerSubState()); - } else { - Assert.assertTrue(status.getDiagnostics().contains( - "Container Killed to make room for Guaranteed Container")); - } - System.out.println("\nStatus : [" + status + "]\n"); - } - } - - /** * Start two OPPORTUNISTIC containers followed by one GUARANTEED container, * which in aggregate exceeds the capacity of the node. The first two * OPPORTUNISTIC containers use almost no resources whereas the GUARANTEED @@ -784,11 +699,11 @@ public void testStartOppContainersUponContainerCompletion() throws Exception { new ArrayList() { { add(createStartContainerRequest(0, - BuilderUtils.newResource(512, 1), false)); + BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC)); add(createStartContainerRequest(1, - BuilderUtils.newResource(512, 1), false)); + BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC)); add(createStartContainerRequest(2, - BuilderUtils.newResource(1024, 1), true)); + BuilderUtils.newResource(1024, 1), ExecutionType.GUARANTEED)); } } )); @@ -810,9 +725,9 @@ public void testStartOppContainersUponContainerCompletion() throws Exception { new ArrayList() { { add(createStartContainerRequest(3, - BuilderUtils.newResource(512, 1), false)); + BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC)); add(createStartContainerRequest(4, - BuilderUtils.newResource(800, 1), false)); + BuilderUtils.newResource(800, 1), ExecutionType.OPPORTUNISTIC)); } } )); @@ -831,8 +746,7 @@ public void testStartOppContainersUponContainerCompletion() throws Exception { BaseContainerManagerTest.waitForContainerSubState(containerManager, createContainerId(2), ContainerSubState.DONE); - ((LongRunningContainerSimulatingContainersManager) containerManager) - .drainAsyncEvents(); + ((ContainerManagerForTest) containerManager).drainAsyncEvents(); // only one OPPORTUNISTIC container is start because no over-allocation // is allowed to start OPPORTUNISTIC containers at container finish event. @@ -854,10 +768,8 @@ public void testStartOppContainersUponContainerCompletion() throws Exception { // now try to start the OPPORTUNISTIC container that was queued because // we don't start OPPORTUNISTIC containers at container finish event if // the node would be over-allocated - ((LongRunningContainerSimulatingContainersManager) containerManager) - .startContainersOutOfBandUponLowUtilization(); - ((LongRunningContainerSimulatingContainersManager) containerManager) - .drainAsyncEvents(); + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); BaseContainerManagerTest.waitForContainerSubState(containerManager, createContainerId(4), ContainerSubState.RUNNING); verifyContainerStatuses(new HashMap() { @@ -884,7 +796,7 @@ public void testStartOpportunisticContainersOutOfBand() throws Exception { containerManager.startContainers(StartContainersRequest.newInstance( Collections.singletonList( createStartContainerRequest(0, - BuilderUtils.newResource(2048, 4), true)))); + BuilderUtils.newResource(2048, 4), ExecutionType.GUARANTEED)))); BaseContainerManagerTest.waitForContainerSubState(containerManager, createContainerId(0), ContainerSubState.RUNNING); @@ -897,9 +809,9 @@ public void testStartOpportunisticContainersOutOfBand() throws Exception { new ArrayList() { { add(createStartContainerRequest(1, - BuilderUtils.newResource(512, 1), false)); + BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC)); add(createStartContainerRequest(2, - BuilderUtils.newResource(512, 1), false)); + BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC)); } } )); @@ -913,8 +825,8 @@ public void testStartOpportunisticContainersOutOfBand() throws Exception { ResourceUtilization.newInstance(1536, 0, 1.0f/2)); // try to start opportunistic containers out of band. - ((LongRunningContainerSimulatingContainersManager)containerManager) - .startContainersOutOfBandUponLowUtilization(); + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); // no containers in queue are expected to be launched because the // containers utilization is not below the over-allocation threshold @@ -934,8 +846,8 @@ public void testStartOpportunisticContainersOutOfBand() throws Exception { setContainerResourceUtilization( ResourceUtilization.newInstance(512, 0, 1.0f/8)); - ((LongRunningContainerSimulatingContainersManager)containerManager) - .startContainersOutOfBandUponLowUtilization(); + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); // the two OPPORTUNISTIC containers are expected to be launched BaseContainerManagerTest.waitForContainerSubState(containerManager, @@ -952,11 +864,330 @@ public void testStartOpportunisticContainersOutOfBand() throws Exception { }); } + /** + * Start a GUARANTEED container, an OPPORTUNISTIC container, a GUARANTEED + * container and another OPPORTUNISTIC container in order. When the node + * memory utilization is over its preemption threshold, the two OPPORTUNISTIC + * containers should be killed. + */ + @Test + public void testPreemptOpportunisticContainersUponHighMemoryUtilization() + throws Exception { + containerManager.start(); + + // try to start four containers at once. the first GUARANTEED container + // that requests (1024 MB, 1 vcore) can be launched because there is + // (2048 MB, 4 vcores) unallocated. The second container, which is + // OPPORTUNISTIC, can also be launched because it asks for 512 MB, 1 vcore + // which is less than what is left unallocated after launching the first + // one GUARANTEED container, (1024 MB, 3 vcores). + // The 3rd container, which is GUARANTEED, can also be launched because + // the node resource utilization utilization is zero such that + // over-allocation kicks in. The 4th , an OPPORTUNISTIC container, + // will be queued, because OPPORTUNISTIC containers can only be + // launched when node resource utilization is checked, if launching them + // would cause node over-allocation. + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), ExecutionType.GUARANTEED)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC)); + add(createStartContainerRequest(2, + BuilderUtils.newResource(1024, 1), ExecutionType.GUARANTEED)); + add(createStartContainerRequest(3, + BuilderUtils.newResource(300, 1), ExecutionType.OPPORTUNISTIC)); + } + } + )); + ((ContainerManagerForTest) containerManager).drainAsyncEvents(); + + // the first three containers are all expected to start + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.RUNNING); + + // try to check node resource utilization and start the second + // opportunistic containers out of band. Because the node resource + // utilization is zero at the moment, over-allocation will kick in + // and the container will be launched. + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(3), ContainerSubState.RUNNING); + + // the containers memory utilization is over the preemption threshold + // (2048 > 2048 * 0.8 = 1638.4) + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, 0.5f)); + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); + + // (2048 - 2048 * 0.8) = 409.6 MB of memory needs to be reclaimed, + // which shall result in both OPPORTUNISTIC containers to be preempted. + // (Preempting the most recently launched OPPORTUNISTIC container, that + // is the 4th container, would only release at most 300 MB of memory) + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.DONE); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(3), ContainerSubState.DONE); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.DONE); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.DONE); + } + }); + + } + + /** + * Start a GUARANTEED container followed by an OPPORTUNISTIC container, which + * in aggregates does not take more than the capacity of the node. + * When the node memory utilization is above the preemption threshold, the + * OPPORTUNISTIC container should not be killed because the node is not being + * over-allocated. + */ + @Test + public void testNoPreemptionUponHighMemoryUtilizationButNoOverallocation() + throws Exception { + containerManager.start(); + + // start two containers, one GUARANTEED and one OPPORTUNISTIC, that together + // take up all the allocations (2048 MB of memory and 4 vcores available on + // the node). They can be both launched immediately because there are enough + // allocations to do so. When the two containers fully utilize their + // resource requests, that is, the node is being 100% utilized, the + // OPPORTUNISTIC container shall continue to run because the node is + // not be over-allocated. + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 2), + ExecutionType.GUARANTEED)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(1024, 2), + ExecutionType.OPPORTUNISTIC)); + } + } + )); + // both containers shall be launched immediately because there are + // enough allocations to do so + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + + // the node is being fully utilized, which is above the preemption + // threshold (2048 * 0.75 = 1536 MB, 1.0f) + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, 1.0f)); + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); + + // no containers shall be preempted because the node is not being + // over-allocated so it is safe to allow the node to be fully utilized + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + } + }); + } + + /** + * Start a GUARANTEED container, an OPPORTUNISTIC container, a GUARANTEED + * container and another OPPORTUNISTIC container in order. When the node + * cpu utilization is over its preemption threshold a few times in a row, + * the two OPPORTUNISTIC containers should be killed one by one. + */ + @Test + public void testPreemptionUponHighCPUUtilization() throws Exception { + containerManager.start(); + + // try to start 4 containers at once. The first container, can be + // safely launched immediately (2048 MB, 4 vcores left unallocated). + // The second container, can also be launched immediately, because + // there is enough resources unallocated after launching the first + // container (2048 - 512 = 1536 MB, 4 - 2 = 2 vcores). After launching + // the first two containers, there are 1024 MBs of memory and 1 vcore + // left unallocated, so there is not enough allocation to launch the + // third container. But because the third container is GUARANTEED and + // the node resource utilization is zero, we can launch it based on + // over-allocation (the projected resource utilization will be 512 MB + // of memory and 2 vcores, below the over-allocation threshold) + // The fourth container, which is OPPORTUNISTIC, will be queued because + // OPPORTUNISTIC containers can not be launched based on over-allocation + // upon container start requests (they can only be launched when node + // resource utilization is checked in ContainersMonitor) + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(512, 2), ExecutionType.GUARANTEED)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC)); + add(createStartContainerRequest(2, + BuilderUtils.newResource(512, 2), ExecutionType.GUARANTEED)); + add(createStartContainerRequest(3, + BuilderUtils.newResource(512, 1), ExecutionType.OPPORTUNISTIC)); + } + } + )); + ((ContainerManagerForTest) containerManager).drainAsyncEvents(); + // the first three containers are expected to start. The first two + // can be launched based on free allocation, the third can be + // launched based on over-allocation + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.RUNNING); + + // try to start second opportunistic containers out of band. + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); + + // the second opportunistic container is expected to start because + // the node resource utilization is at zero, the projected utilization + // is 512 MBs of memory and 1 vcore + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(3), ContainerSubState.RUNNING); + + final float fullCpuUtilization = 1.0f; + + // the containers CPU utilization is over its preemption threshold (0.8f) + // for the first time + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, fullCpuUtilization)); + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); + + // all containers should continue to be running because we don't + // preempt OPPORTUNISTIC containers right away + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.RUNNING); + } + }); + + // the containers CPU utilization is over its preemption threshold (0.8f) + // for the second time + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, fullCpuUtilization)); + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); + + // all containers should continue to be running because we don't preempt + // OPPORTUNISTIC containers when the cpu is over the preemption threshold + // (0.8f) the second time + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.RUNNING); + } + }); + + // the containers CPU utilization is over the preemption threshold (0.8f) + // for the third time + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, fullCpuUtilization)); + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); + + // because CPU utilization is over its preemption threshold three times + // consecutively, the amount of cpu utilization over the preemption + // threshold, that is, 1.0 - 0.8 = 0.2f CPU needs to be reclaimed and + // as a result, the most recently launched OPPORTUNISTIC container should + // be killed + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(3), ContainerSubState.DONE); + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.DONE); + } + }); + + // again, the containers CPU utilization is over the preemption threshold + // (0.8f) for the first time (the cpu over-limit count is reset every time + // a preemption is triggered) + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, fullCpuUtilization)); + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); + + // no CPU resource is expected to be reclaimed when the CPU utilization + // goes over the preemption threshold the first time + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.DONE); + } + }); + + // the containers CPU utilization is over the preemption threshold (0.9f) + // for the second time + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, fullCpuUtilization)); + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); + + // still no CPU resource is expected to be reclaimed when the CPU utilization + // goes over the preemption threshold the second time + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.DONE); + } + }); + + // the containers CPU utilization is over the preemption threshold + // for the third time + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, fullCpuUtilization)); + ((ContainerManagerForTest) containerManager) + .checkNodeResourceUtilization(); + + // because CPU utilization is over its preemption threshold three times + // consecutively, the amount of cpu utilization over the preemption + // threshold, that is, 1.0 - 0.8 = 0.2f CPU needs to be reclaimed and + // as a result, the other OPPORTUNISTIC container should be killed + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.DONE); + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.DONE); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.DONE); + } + }); + } + private void setContainerResourceUtilization(ResourceUtilization usage) { - ((ContainerMonitorForOverallocationTest) - containerManager.getContainersMonitor()) - .setContainerResourceUsage(usage); + ((ContainerMonitorForTest) containerManager.getContainersMonitor()) + .setContainerResourceUsage(usage); } private void allowContainerToSucceed(int containerId) { @@ -965,12 +1196,11 @@ private void allowContainerToSucceed(int containerId) { } - protected StartContainerRequest createStartContainerRequest(int containerId, - Resource resource, boolean isGuaranteed) throws IOException { + protected StartContainerRequest createStartContainerRequest( + int containerId, Resource resource, ExecutionType executionType) + throws IOException { ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); - ExecutionType executionType = isGuaranteed ? ExecutionType.GUARANTEED : - ExecutionType.OPPORTUNISTIC; Token containerToken = createContainerToken( createContainerId(containerId), DUMMY_RM_IDENTIFIER, context.getNodeId(), user, resource, @@ -1004,9 +1234,7 @@ protected void verifyContainerStatuses( * container processes for testing purposes. */ private static class LongRunningContainerSimulatingContainersManager - extends ContainerManagerImpl { - - private final String user; + extends ContainerManagerForTest { LongRunningContainerSimulatingContainersManager( Context context, ContainerExecutor exec, @@ -1014,27 +1242,8 @@ protected void verifyContainerStatuses( NodeStatusUpdater nodeStatusUpdater, NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler, String user) { - super(context, exec, deletionContext, - nodeStatusUpdater, metrics, dirsHandler); - this.user = user; - } - - @Override - protected UserGroupInformation getRemoteUgi() throws YarnException { - ApplicationId appId = ApplicationId.newInstance(0, 0); - ApplicationAttemptId appAttemptId = - ApplicationAttemptId.newInstance(appId, 1); - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(appAttemptId.toString()); - ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context - .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey() - .getKeyId())); - return ugi; - } - - @Override - protected AsyncDispatcher createDispatcher() { - return new DrainDispatcher(); + super(context, exec, deletionContext, + nodeStatusUpdater, metrics, dirsHandler, user); } /** @@ -1060,27 +1269,10 @@ protected String getContainerPid(Path pidFilePath) throws Exception { return "123"; } - }; } }; } - - @Override - protected ContainersMonitor createContainersMonitor( - ContainerExecutor exec) { - return new ContainerMonitorForOverallocationTest(exec, - dispatcher, context); - } - - public void startContainersOutOfBandUponLowUtilization() { - ((ContainerMonitorForOverallocationTest) getContainersMonitor()) - .attemptToStartContainersUponLowUtilization(); - } - - public void drainAsyncEvents() { - ((DrainDispatcher) dispatcher).await(); - } } /** @@ -1176,56 +1368,4 @@ int getContainerExitCode() { } } } - - /** - * A test implementation of container monitor that allows control of - * current resource utilization. - */ - private static class ContainerMonitorForOverallocationTest - extends ContainersMonitorImpl { - - private ResourceUtilization containerResourceUsage = - ResourceUtilization.newInstance(0, 0, 0.0f); - - ContainerMonitorForOverallocationTest(ContainerExecutor exec, - AsyncDispatcher dispatcher, Context context) { - super(exec, dispatcher, context); - } - - @Override - public long getPmemAllocatedForContainers() { - return NM_CONTAINERS_MEMORY_MB * 1024 * 1024L; - } - - @Override - public long getVmemAllocatedForContainers() { - float pmemRatio = getConfig().getFloat( - YarnConfiguration.NM_VMEM_PMEM_RATIO, - YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); - return (long) (pmemRatio * getPmemAllocatedForContainers()); - } - - @Override - public long getVCoresAllocatedForContainers() { - return NM_CONTAINERS_VCORES; - } - - @Override - public ContainersResourceUtilization getContainersUtilization( - boolean latest) { - return new ContainersMonitor.ContainersResourceUtilization( - containerResourceUsage, System.currentTimeMillis()); - } - - @Override - protected void checkOverAllocationPrerequisites() { - // do not check - } - - - public void setContainerResourceUsage( - ResourceUtilization containerResourceUsage) { - this.containerResourceUsage = containerResourceUsage; - } - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestSnapshotBasedOverAllocationPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestSnapshotBasedOverAllocationPreemptionPolicy.java new file mode 100644 index 0000000..bbc7c49 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestSnapshotBasedOverAllocationPreemptionPolicy.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link SnapshotBasedOverAllocationPreemptionPolicy}. + */ +public class TestSnapshotBasedOverAllocationPreemptionPolicy { + // Both the CPU preemption threshold and the memory preemption threshold + // are 75% + private final static ResourceThresholds PREEMPTION_THRESHOLDS = + ResourceThresholds.newInstance(0.75f, 0.75f); + + // The CPU utilization is allowed to go over the cpu preemption threshold + // 2 times in a row before any container is preempted to reclaim cpu resources + private final static int MAX_CPU_OVER_PREEMPTION_THRESHOLDS = 2; + + private final ContainersMonitor containersMonitor = + mock(ContainersMonitor.class); + + @Before + public void setUp() { + // the node has an allocation of 2048 MB of memory + when(containersMonitor.getPmemAllocatedForContainers()). + thenReturn(2048 * 1024 * 1024L); + } + + /** + * The memory utilization goes above its preemption threshold, + * 2048 * 0.75f = 1536 MB (the node has an allocation of 2048 MB memory). + */ + @Test + public void testMemoryOverPreemptionThreshold() { + SnapshotBasedOverAllocationPreemptionPolicy preemptionPolicy = + new SnapshotBasedOverAllocationPreemptionPolicy(PREEMPTION_THRESHOLDS, + MAX_CPU_OVER_PREEMPTION_THRESHOLDS, containersMonitor); + + // the current memory utilization, 2000 MB is over the preemption + // threshold, 2048 * 0.75, which is 1536 MB. The CPU utilization, + // 0.5f is below the preemption threshold, 0.75f. + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(2000, 0, 0.5f), + Time.now())); + + // the amount of memory utilization over the preemption threshold, that is, + // 2000 - (2048 * 0.75) = 464 MB of memory, shall be reclaimed. + Assert.assertEquals( + ResourceUtilization.newInstance(464, 0, 0f), + preemptionPolicy.getResourcesToReclaim()); + } + + /** + * The CPU utilization goes above its preemption threshold, 0.75f. + */ + @Test + public void testCpuOverPreemptionThreshold() { + SnapshotBasedOverAllocationPreemptionPolicy preemptionPolicy = + new SnapshotBasedOverAllocationPreemptionPolicy(PREEMPTION_THRESHOLDS, + MAX_CPU_OVER_PREEMPTION_THRESHOLDS, containersMonitor); + + // the current CPU utilization, 1.0f, is over the preemption threshold, + // 0.75f, for the first time. The memory utilization, 1000 MB is below + // the memory preemption threshold, 2048 * 0.75 = 1536 MB. + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(1000, 0, 1.0f), + Time.now())); + // no resources shall be reclaimed + Assert.assertEquals( + ResourceUtilization.newInstance(0, 0, 0.0f), + preemptionPolicy.getResourcesToReclaim()); + + // the current CPU utilization, 0.5f, is below the preemption threshold, + // 0.75f. In the meantime the memory utilization, 1000 MB is also below + // the memory preemption threshold, 2048 * 0.75 = 1536 MB. + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(1000, 0, 0.5f), + Time.now())); + // no resources shall be reclaimed + Assert.assertEquals( + ResourceUtilization.newInstance(0, 0, 0.0f), + preemptionPolicy.getResourcesToReclaim()); + + // the current CPU utilization, 1.0f, is over the preemption threshold, + // 0.75f. In the meantime the memory utilization, 1000 MB is below + // the memory preemption threshold, 2048 * 0.75 = 1536 MB. + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(1000, 0, 1.0f), + Time.now())); + // no resources shall be reclaimed because the cpu utilization is allowed + // to go over the preemption threshold at most two times in a row. It is + // just over the preemption threshold for the first time + Assert.assertEquals( + ResourceUtilization.newInstance(0, 0, 0.0f), + preemptionPolicy.getResourcesToReclaim()); + + // the current CPU utilization, 1.0f, is again over the preemption + // threshold, 0.75f. In the meantime the memory utilization, 1000 MB + // is below the memory preemption threshold, 2048 * 0.75 = 1536 MB. + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(1000, 0, 1.0f), + Time.now())); + // no resources shall be reclaimed because the cpu utilization is allowed + // to go over the preemption threshold at most two times in a row. It is + // just over the preemption threshold for the second time in a row + Assert.assertEquals( + ResourceUtilization.newInstance(0, 0, 0.0f), + preemptionPolicy.getResourcesToReclaim()); + + // the current CPU utilization, 1.0f, is over the preemption threshold, + // the third time in a row. In the meantime the memory utilization, 1000 MB + // is below the memory preemption threshold, 2048 * 0.75 = 1536 MB. + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(1000, 0, 1.0f), + Time.now())); + // the amount of cpu utilization over the preemption threshold, that is, + // 1.0 - 0.75f = 0.25, shall be reclaimed. + Assert.assertEquals( + ResourceUtilization.newInstance(0, 0, 0.25f), + preemptionPolicy.getResourcesToReclaim()); + } + + /** + * Both memory and CPU utilization go over their preemption thresholds + * respectively. + */ + @Test + public void testMemoryCpuOverPreemptionThreshold() { + SnapshotBasedOverAllocationPreemptionPolicy preemptionPolicy = + new SnapshotBasedOverAllocationPreemptionPolicy(PREEMPTION_THRESHOLDS, + MAX_CPU_OVER_PREEMPTION_THRESHOLDS, containersMonitor); + + // the current CPU utilization, 1.0f, is over the preemption threshold, + // 0.75f, for the first time. The memory utilization, 1000 MB is below + // the memory preemption threshold, 2048 * 0.75 = 1536 MB. + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(1000, 0, 1.0f), + Time.now())); + // no resources shall be reclaimed because the cpu utilization is allowed + // to go over the preemption threshold at most two times in a row. It is + // just over the preemption threshold for the first time. + Assert.assertEquals( + ResourceUtilization.newInstance(0, 0, 0.0f), + preemptionPolicy.getResourcesToReclaim()); + + // the current CPU utilization, 0.5f, is below the preemption threshold, + // 0.75f. The memory utilization, 2000 MB, however, is above the memory + // preemption threshold, 2048 * 0.75 = 1536 MB. + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(2000, 0, 0.5f), + Time.now())); + // the amount of memory utilization over the preemption threshold, that is, + // 2000 - (2048 * 0.75) = 464 MB of memory, shall be reclaimed. + Assert.assertEquals( + ResourceUtilization.newInstance(464, 0, 0.0f), + preemptionPolicy.getResourcesToReclaim()); + + // the current CPU utilization, 1.0f, is over the preemption threshold, + // 0.75f, for the first time. The memory utilization, 1000 MB is below + // the memory preemption threshold, 2048 * 0.75 = 1536 MB. + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(1000, 0, 1.0f), + Time.now())); + // no resources shall be reclaimed because the cpu utilization is allowed + // to go over the preemption threshold at most two times in a row. It is + // just over the preemption threshold for the first time. + Assert.assertEquals( + ResourceUtilization.newInstance(0, 0, 0.0f), + preemptionPolicy.getResourcesToReclaim()); + + // the current CPU utilization, 1.0f, is again over the preemption + // threshold, 0.75f. In the meantime the memory utilization, 1000 MB + // is still below the memory preemption threshold, 2048 * 0.75 = 1536 MB. + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(1000, 0, 1.0f), + Time.now())); + // no resources shall be reclaimed because the cpu utilization is allowed + // to go over the preemption threshold at most two times in a row. It is + // just over the preemption threshold for the second time in a row. + Assert.assertEquals( + ResourceUtilization.newInstance(0, 0, 0.0f), + preemptionPolicy.getResourcesToReclaim()); + + // the current CPU utilization, 1.0f, is over the CPU preemption threshold, + // 0.75f, the third time in a row. In the meantime, the memory utilization, + // 2000 MB, is also over the memory preemption threshold, + // 2048 * 0.75 = 1536 MB. + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(2000, 0, 1.0f), + Time.now())); + // the amount of memory utilization over the preemption threshold, that is, + // 2000 - (2048 * 0.75) = 464 MB of memory, and the amount of cpu + // utilization over the preemption threshold, that is, 1.0f - 0.75f = 0.25f, + // shall be reclaimed. + Assert.assertEquals( + ResourceUtilization.newInstance(464, 0, 0.25f), + preemptionPolicy.getResourcesToReclaim()); + } + + /** + * Both memory and CPU utilization are under their preemption thresholds. + */ + @Test + public void testBothMemoryAndCpuUnderPreemptionThreshold() { + SnapshotBasedOverAllocationPreemptionPolicy preemptionPolicy = + new SnapshotBasedOverAllocationPreemptionPolicy(PREEMPTION_THRESHOLDS, + MAX_CPU_OVER_PREEMPTION_THRESHOLDS, containersMonitor); + + // the current CPU utilization, 0.5f, is below the preemption threshold, + // 0.75f. In the meantime the memory utilization, 1000 MB is also below + // the memory preemption threshold, 2048 * 0.75 = 1536 MB. + when(containersMonitor.getContainersUtilization(anyBoolean())).thenReturn( + new ContainersMonitor.ContainersResourceUtilization( + ResourceUtilization.newInstance(1000, 0, 0.5f), + Time.now())); + // no resources shall be reclaimed because both CPU and memory utilization + // are under the preemption threshold + Assert.assertEquals( + ResourceUtilization.newInstance(0, 0, 0f), + preemptionPolicy.getResourcesToReclaim()); + } +}