diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 44f9740..a2f70d5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -532,7 +532,7 @@ private OpportunisticContainersStatus getOpportunisticContainersStatus() { private ResourceUtilization getContainersUtilization() { ContainersMonitor containersMonitor = this.context.getContainerManager().getContainersMonitor(); - return containersMonitor.getContainersUtilization(); + return containersMonitor.getContainersUtilization(false).getUtilization(); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 3470910..77ea4fa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -243,6 +243,12 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, metrics); addService(rsrcLocalizationSrvc); + this.containersMonitor = createContainersMonitor(exec); + addService(this.containersMonitor); + + // ContainersLauncher must be added after ContainersMonitor + // because the former depends on the latter to initialize + // over-allocation first. containersLauncher = createContainersLauncher(context, exec); addService(containersLauncher); @@ -267,8 +273,6 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, nmMetricsPublisher = createNMTimelinePublisher(context); context.setNMTimelinePublisher(nmMetricsPublisher); } - this.containersMonitor = createContainersMonitor(exec); - addService(this.containersMonitor); dispatcher.register(ContainerEventType.class, new ContainerEventDispatcher()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 3875cbc..6bdaeca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -1070,17 +1070,17 @@ public void resumeContainer() throws IOException { /** * Loop through for a time-bounded interval waiting to * read the process id from a file generated by a running process. - * @param pidFilePath File from which to read the process id + * @param pidFile File from which to read the process id * @return Process ID * @throws Exception */ - private String getContainerPid(Path pidFilePath) throws Exception { + protected String getContainerPid(Path pidFile) throws Exception { String containerIdStr = container.getContainerId().toString(); String processId = null; if (LOG.isDebugEnabled()) { LOG.debug("Accessing pid for container " + containerIdStr - + " from pid file " + pidFilePath); + + " from pid file " + pidFile); } int sleepCounter = 0; final int sleepInterval = 100; @@ -1088,7 +1088,7 @@ private String getContainerPid(Path pidFilePath) throws Exception { // loop waiting for pid file to show up // until our timer expires in which case we admit defeat while (true) { - processId = ProcessIdFileReader.getProcessId(pidFilePath); + processId = ProcessIdFileReader.getProcessId(pidFile); if (processId != null) { if (LOG.isDebugEnabled()) { LOG.debug( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index cfd5d6a..c3d0a4d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -116,8 +116,7 @@ public void handle(ContainersLauncherEvent event) { containerId.getApplicationAttemptId().getApplicationId()); ContainerLaunch launch = - new ContainerLaunch(context, getConfig(), dispatcher, exec, app, - event.getContainer(), dirsHandler, containerManager); + createContainerLaunch(app, event.getContainer()); containerLauncher.submit(launch); running.put(containerId, launch); break; @@ -213,4 +212,10 @@ public void handle(ContainersLauncherEvent event) { break; } } + + protected ContainerLaunch createContainerLaunch( + Application app, Container container) { + return new ContainerLaunch(context, getConfig(), dispatcher, + exec, app, container, dirsHandler, containerManager); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java index 64831e9..8da4ec4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java @@ -23,10 +23,24 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.ResourceView; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy; public interface ContainersMonitor extends Service, EventHandler, ResourceView { - ResourceUtilization getContainersUtilization(); + + /** + * Get the aggregate resource utilization of containers running on the node, + * with a timestamp of the measurement. + * @param latest true if the latest result should be returned + * @return ResourceUtilization resource utilization of all containers + */ + ContainersResourceUtilization getContainersUtilization(boolean latest); + + /** + * Get the policy to over-allocate containers when over-allocation is on. + * @return null if over-allocation is turned off + */ + NMAllocationPolicy getContainerOverAllocationPolicy(); float getVmemRatio(); @@ -66,4 +80,26 @@ static void decreaseResourceUtilization( * containersMonitor.getVmemRatio()); resourceUtil.subtractFrom((int)resource.getMemorySize(), vmem, vCores); } + + /** + * A snapshot of resource utilization of all containers with the timestamp. + */ + final class ContainersResourceUtilization { + private final ResourceUtilization utilization; + private final long timestamp; + + public ContainersResourceUtilization( + ResourceUtilization utilization, long timestamp) { + this.utilization = utilization; + this.timestamp = timestamp; + } + + public long getTimestamp() { + return timestamp; + } + + public ResourceUtilization getUtilization() { + return utilization; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index acc256f..b44d324 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -23,6 +23,10 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.NMAllocationPolicy; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.SnapshotBasedOverAllocationPolicy; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,8 +110,9 @@ CPU, MEMORY } - private ResourceUtilization containersUtilization; + private ContainersResourceUtilization latestContainersUtilization; + private NMAllocationPolicy overAllocationPolicy; private ResourceThresholds overAllocationPreemptionThresholds; private int overAlloctionPreemptionCpuCount = -1; @@ -123,7 +128,8 @@ public ContainersMonitorImpl(ContainerExecutor exec, this.monitoringThread = new MonitoringThread(); - this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f); + this.latestContainersUtilization = new ContainersResourceUtilization( + ResourceUtilization.newInstance(-1, -1, -1.0f), -1L); } @Override @@ -230,7 +236,7 @@ protected void serviceInit(Configuration myConf) throws Exception { /** * Check all prerequisites for NM over-allocation. */ - private void checkOverAllocationPrerequisites() throws YarnException { + protected void checkOverAllocationPrerequisites() throws YarnException { // LinuxContainerExecutor is required to enable overallocation if (!(containerExecutor instanceof LinuxContainerExecutor)) { throw new YarnException(LinuxContainerExecutor.class.getName() + @@ -330,6 +336,10 @@ private void initializeOverAllocation(Configuration conf) { this.overAllocationPreemptionThresholds = ResourceThresholds.newInstance( cpuPreemptionThreshold, memoryPreemptionThreshold); + // TODO make this configurable + this.overAllocationPolicy = + createOverAllocationPolicy(resourceThresholds); + LOG.info("NodeManager oversubscription enabled with overallocation " + "thresholds (memory:" + overAllocationMemoryUtilizationThreshold + ", CPU:" + overAllocationCpuUtilizationThreshold + ") and preemption" + @@ -337,6 +347,11 @@ private void initializeOverAllocation(Configuration conf) { cpuPreemptionThreshold + ")"); } + protected NMAllocationPolicy createOverAllocationPolicy( + ResourceThresholds resourceThresholds) { + return new SnapshotBasedOverAllocationPolicy(resourceThresholds, this); + } + private boolean isResourceCalculatorAvailable() { if (resourceCalculatorPlugin == null) { LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + this @@ -609,7 +624,12 @@ public void run() { } // Save the aggregated utilization of the containers - setContainersUtilization(trackedContainersUtilization); + setLatestContainersUtilization(trackedContainersUtilization); + + // check opportunity to start containers if over-allocation is on + if (context.isOverAllocationEnabled()) { + attemptToStartContainersUponLowUtilization(); + } // Publish the container utilization metrics to node manager // metrics system. @@ -975,12 +995,34 @@ public boolean isVmemCheckEnabled() { } @Override - public ResourceUtilization getContainersUtilization() { - return this.containersUtilization; + public ContainersResourceUtilization getContainersUtilization( + boolean latest) { + // TODO update containerUtilization if latest is true + return this.latestContainersUtilization; + } + + @Override + public NMAllocationPolicy getContainerOverAllocationPolicy() { + return overAllocationPolicy; + } + + private void setLatestContainersUtilization(ResourceUtilization utilization) { + this.latestContainersUtilization = new ContainersResourceUtilization( + utilization, System.currentTimeMillis()); } - private void setContainersUtilization(ResourceUtilization utilization) { - this.containersUtilization = utilization; + @VisibleForTesting + public void attemptToStartContainersUponLowUtilization() { + if (getContainerOverAllocationPolicy() != null) { + Resource available = getContainerOverAllocationPolicy() + .getAvailableResources(); + if (available.getMemorySize() > 0 && + available.getVirtualCores() > 0) { + eventDispatcher.getEventHandler().handle( + new ContainerSchedulerEvent(null, + ContainerSchedulerEventType.SCHEDULE_CONTAINERS)); + } + } } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java new file mode 100644 index 0000000..86b3698 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceTracker.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of the resource utilization tracker that equates + * resource utilization with the total resource allocated to the container. + */ +public class AllocationBasedResourceTracker + implements ResourceUtilizationTracker { + + private static final Logger LOG = + LoggerFactory.getLogger(AllocationBasedResourceTracker.class); + + private static final Resource UNAVAILABLE = + Resource.newInstance(0, 0); + + private ResourceUtilization containersAllocation; + private ContainerScheduler scheduler; + + + AllocationBasedResourceTracker(ContainerScheduler scheduler) { + this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f); + this.scheduler = scheduler; + } + + /** + * Get the accumulation of totally allocated resources to containers. + * @return ResourceUtilization Resource Utilization. + */ + @Override + public ResourceUtilization getCurrentUtilization() { + return this.containersAllocation; + } + + /** + * Get the amount of resources that have not been allocated to containers. + * @return Resource resources that have not been allocated to containers. + */ + protected Resource getUnallocatedResources() { + // unallocated resources = node capacity - containers allocation + // = -(container allocation - node capacity) + ResourceUtilization allocationClone = + ResourceUtilization.newInstance(containersAllocation); + getContainersMonitor() + .subtractNodeResourcesFromResourceUtilization(allocationClone); + + Resource unallocated = UNAVAILABLE; + if (allocationClone.getCPU() <= 0 && + allocationClone.getPhysicalMemory() <= 0 && + allocationClone.getVirtualMemory() <= 0) { + int cpu = Math.round(allocationClone.getCPU() * + getContainersMonitor().getVCoresAllocatedForContainers()); + long memory = allocationClone.getPhysicalMemory(); + unallocated = Resource.newInstance(-memory, -cpu); + } + return unallocated; + } + + + @Override + public Resource getAvailableResources() { + return getUnallocatedResources(); + } + + /** + * Add Container's resources to the accumulated allocation. + * @param container Container. + */ + @Override + public void containerLaunched(Container container) { + ContainersMonitor.increaseResourceUtilization( + getContainersMonitor(), this.containersAllocation, + container.getResource()); + } + + /** + * Subtract Container's resources to the accumulated allocation. + * @param container Container. + */ + @Override + public void containerReleased(Container container) { + ContainersMonitor.decreaseResourceUtilization( + getContainersMonitor(), this.containersAllocation, + container.getResource()); + } + + public ContainersMonitor getContainersMonitor() { + return this.scheduler.getContainersMonitor(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java deleted file mode 100644 index 6e2b617..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; - -import org.apache.hadoop.yarn.api.records.ResourceUtilization; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An implementation of the {@link ResourceUtilizationTracker} that equates - * resource utilization with the total resource allocated to the container. - */ -public class AllocationBasedResourceUtilizationTracker implements - ResourceUtilizationTracker { - - private static final Logger LOG = - LoggerFactory.getLogger(AllocationBasedResourceUtilizationTracker.class); - - private ResourceUtilization containersAllocation; - private ContainerScheduler scheduler; - - AllocationBasedResourceUtilizationTracker(ContainerScheduler scheduler) { - this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f); - this.scheduler = scheduler; - } - - /** - * Get the accumulation of totally allocated resources to a container. - * @return ResourceUtilization Resource Utilization. - */ - @Override - public ResourceUtilization getCurrentUtilization() { - return this.containersAllocation; - } - - /** - * Add Container's resources to the accumulated Utilization. - * @param container Container. - */ - @Override - public void addContainerResources(Container container) { - ContainersMonitor.increaseResourceUtilization( - getContainersMonitor(), this.containersAllocation, - container.getResource()); - } - - /** - * Subtract Container's resources to the accumulated Utilization. - * @param container Container. - */ - @Override - public void subtractContainerResource(Container container) { - ContainersMonitor.decreaseResourceUtilization( - getContainersMonitor(), this.containersAllocation, - container.getResource()); - } - - /** - * Check if NM has resources available currently to run the container. - * @param container Container. - * @return True, if NM has resources available currently to run the container. - */ - @Override - public boolean hasResourcesAvailable(Container container) { - long pMemBytes = container.getResource().getMemorySize() * 1024 * 1024L; - return hasResourcesAvailable(pMemBytes, - (long) (getContainersMonitor().getVmemRatio()* pMemBytes), - container.getResource().getVirtualCores()); - } - - private boolean hasResourcesAvailable(long pMemBytes, long vMemBytes, - int cpuVcores) { - // Check physical memory. - if (LOG.isDebugEnabled()) { - LOG.debug("pMemCheck [current={} + asked={} > allowed={}]", - this.containersAllocation.getPhysicalMemory(), - (pMemBytes >> 20), - (getContainersMonitor().getPmemAllocatedForContainers() >> 20)); - } - if (this.containersAllocation.getPhysicalMemory() + - (int) (pMemBytes >> 20) > - (int) (getContainersMonitor() - .getPmemAllocatedForContainers() >> 20)) { - return false; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("before vMemCheck" + - "[isEnabled={}, current={} + asked={} > allowed={}]", - getContainersMonitor().isVmemCheckEnabled(), - this.containersAllocation.getVirtualMemory(), (vMemBytes >> 20), - (getContainersMonitor().getVmemAllocatedForContainers() >> 20)); - } - // Check virtual memory. - if (getContainersMonitor().isVmemCheckEnabled() && - this.containersAllocation.getVirtualMemory() + - (int) (vMemBytes >> 20) > - (int) (getContainersMonitor() - .getVmemAllocatedForContainers() >> 20)) { - return false; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("before cpuCheck [asked={} > allowed={}]", - this.containersAllocation.getCPU(), - getContainersMonitor().getVCoresAllocatedForContainers()); - } - // Check CPU. Compare using integral values of cores to avoid decimal - // inaccuracies. - if (!hasEnoughCpu(this.containersAllocation.getCPU(), - getContainersMonitor().getVCoresAllocatedForContainers(), cpuVcores)) { - return false; - } - return true; - } - - /** - * Returns whether there is enough space for coresRequested in totalCores. - * Converts currentAllocation usage to nearest integer count before comparing, - * as floats are inherently imprecise. NOTE: this calculation assumes that - * requested core counts must be integers, and currentAllocation core count - * must also be an integer. - * - * @param currentAllocation The current allocation, a float value from 0 to 1. - * @param totalCores The total cores in the system. - * @param coresRequested The number of cores requested. - * @return True if currentAllocationtotalCores*coresRequested <= - * totalCores. - */ - public boolean hasEnoughCpu(float currentAllocation, long totalCores, - int coresRequested) { - // Must not cast here, as it would truncate the decimal digits. - return Math.round(currentAllocation * totalCores) - + coresRequested <= totalCores; - } - - public ContainersMonitor getContainersMonitor() { - return this.scheduler.getContainersMonitor(); - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index d9b713f..e8341c9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; +import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,6 +76,10 @@ // Queue of Guaranteed Containers waiting for resources to run private final LinkedHashMap queuedGuaranteedContainers = new LinkedHashMap<>(); + // sum of the resources requested by guaranteed containers in queue + private final Resource guaranteedResourcesDemanded = + Resource.newInstance(0, 0); + // Queue of Opportunistic Containers waiting for resources to run private final LinkedHashMap queuedOpportunisticContainers = new LinkedHashMap<>(); @@ -82,6 +88,10 @@ // or paused to make room for a guaranteed container. private final Map oppContainersToKill = new HashMap<>(); + // sum of the resources to be released by opportunistic containers that + // have been marked to be killed or paused. + private final Resource opportunisticResourcesToBeReleased = + Resource.newInstance(0, 0); // Containers launched by the Scheduler will take a while to actually // move to the RUNNING state, but should still be fair game for killing @@ -119,6 +129,17 @@ public ContainerScheduler(Context context, AsyncDispatcher dispatcher, DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH)); } + @VisibleForTesting + public ContainerScheduler(Context context, AsyncDispatcher dispatcher, + NodeManagerMetrics metrics, int qLength) { + super(ContainerScheduler.class.getName()); + this.context = context; + this.dispatcher = dispatcher; + this.metrics = metrics; + this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength; + this.opportunisticContainersStatus = + OpportunisticContainersStatus.newInstance(); + } @Override public void serviceInit(Configuration conf) throws Exception { @@ -128,20 +149,16 @@ public void serviceInit(Configuration conf) throws Exception { YarnConfiguration.NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION, YarnConfiguration. DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION); + // We assume over allocation configurations have been initialized + this.utilizationTracker = getResourceTracker(); } - @VisibleForTesting - public ContainerScheduler(Context context, AsyncDispatcher dispatcher, - NodeManagerMetrics metrics, int qLength) { - super(ContainerScheduler.class.getName()); - this.context = context; - this.dispatcher = dispatcher; - this.metrics = metrics; - this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength; - this.utilizationTracker = - new AllocationBasedResourceUtilizationTracker(this); - this.opportunisticContainersStatus = - OpportunisticContainersStatus.newInstance(); + private AllocationBasedResourceTracker getResourceTracker() { + if (context.isOverAllocationEnabled()) { + return new UtilizationBasedResourceTracker(this); + } else { + return new AllocationBasedResourceTracker(this); + } } /** @@ -164,14 +181,18 @@ public void handle(ContainerSchedulerEvent event) { if (event instanceof UpdateContainerSchedulerEvent) { onUpdateContainer((UpdateContainerSchedulerEvent) event); } else { - LOG.error("Unknown event type on UpdateCOntainer: " + event.getType()); + LOG.error("Unknown event type on UpdateContainer: " + event.getType()); } break; case SHED_QUEUED_CONTAINERS: shedQueuedOpportunisticContainers(); break; case RECOVERY_COMPLETED: - startPendingContainers(maxOppQueueLength <= 0); + startPendingContainers(false); + break; + case SCHEDULE_CONTAINERS: + startPendingContainers(true); + break; default: LOG.error("Unknown event arrived at ContainerScheduler: " + event.toString()); @@ -186,10 +207,10 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) { ContainerId containerId = updateEvent.getContainer().getContainerId(); if (updateEvent.isResourceChange()) { if (runningContainers.containsKey(containerId)) { - this.utilizationTracker.subtractContainerResource( + this.utilizationTracker.containerReleased( new ContainerImpl(getConfig(), null, null, null, null, updateEvent.getOriginalToken(), context)); - this.utilizationTracker.addContainerResources( + this.utilizationTracker.containerLaunched( updateEvent.getContainer()); getContainersMonitor().handle( new ChangeMonitoringContainerResourceEvent(containerId, @@ -205,20 +226,22 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) { if (queuedOpportunisticContainers.remove(containerId) != null) { queuedGuaranteedContainers.put(containerId, updateEvent.getContainer()); - //Kill/pause opportunistic containers if any to make room for - // promotion request - reclaimOpportunisticContainerResources(updateEvent.getContainer()); + Resources.addTo(guaranteedResourcesDemanded, + updateEvent.getContainer().getResource()); + startPendingContainers(true); } } else { // Demotion of queued container.. Should not happen too often // since you should not find too many queued guaranteed // containers if (queuedGuaranteedContainers.remove(containerId) != null) { + Resources.subtractFrom(guaranteedResourcesDemanded, + updateEvent.getContainer().getResource()); queuedOpportunisticContainers.put(containerId, updateEvent.getContainer()); + startPendingContainers(false); } } - startPendingContainers(maxOppQueueLength <= 0); } } @@ -236,6 +259,7 @@ public void recoverActiveContainer(Container container, || rcs == RecoveredContainerStatus.PAUSED) { if (execType == ExecutionType.GUARANTEED) { queuedGuaranteedContainers.put(container.getContainerId(), container); + Resources.addTo(guaranteedResourcesDemanded, container.getResource()); } else if (execType == ExecutionType.OPPORTUNISTIC) { queuedOpportunisticContainers .put(container.getContainerId(), container); @@ -246,7 +270,7 @@ public void recoverActiveContainer(Container container, } } else if (rcs == RecoveredContainerStatus.LAUNCHED) { runningContainers.put(container.getContainerId(), container); - utilizationTracker.addContainerResources(container); + utilizationTracker.containerLaunched(container); } } @@ -306,65 +330,107 @@ public OpportunisticContainersStatus getOpportunisticContainersStatus() { } private void onResourcesReclaimed(Container container) { - oppContainersToKill.remove(container.getContainerId()); + ContainerId containerId = container.getContainerId(); // This could be killed externally for eg. by the ContainerManager, // in which case, the container might still be queued. - Container queued = - queuedOpportunisticContainers.remove(container.getContainerId()); - if (queued == null) { - queuedGuaranteedContainers.remove(container.getContainerId()); + if (queuedOpportunisticContainers.remove(containerId) != null) { + return; + } + + // This could be killed externally for eg. by the ContainerManager, + // in which case, the container might still be queued. + if (queuedGuaranteedContainers.remove(containerId) != null) { + Resources.addTo(guaranteedResourcesDemanded, container.getResource()); + return; + } + + if (oppContainersToKill.remove(containerId) != null) { + Resources.subtractFrom( + opportunisticResourcesToBeReleased, container.getResource()); } // Requeue PAUSED containers if (container.getContainerState() == ContainerState.PAUSED) { if (container.getContainerTokenIdentifier().getExecutionType() == ExecutionType.GUARANTEED) { - queuedGuaranteedContainers.put(container.getContainerId(), container); + queuedGuaranteedContainers.put(containerId, container); + Resources.addTo(guaranteedResourcesDemanded, container.getResource()); } else { - queuedOpportunisticContainers.put( - container.getContainerId(), container); + queuedOpportunisticContainers.put(containerId, container); } } // decrement only if it was a running container - Container completedContainer = runningContainers.remove(container - .getContainerId()); + Container completedContainer = runningContainers.remove(containerId); // only a running container releases resources upon completion boolean resourceReleased = completedContainer != null; if (resourceReleased) { - this.utilizationTracker.subtractContainerResource(container); + this.utilizationTracker.containerReleased(container); if (container.getContainerTokenIdentifier().getExecutionType() == ExecutionType.OPPORTUNISTIC) { this.metrics.completeOpportunisticContainer(container.getResource()); } - boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0); - startPendingContainers(forceStartGuaranteedContainers); + + // In case of over-allocation being turned on, we may need to reclaim + // more resources since the opportunistic containers that have been + // killed or paused may have not released as much resource as we need. + boolean reclaimOpportunisticResources = context.isOverAllocationEnabled(); + startPendingContainers(reclaimOpportunisticResources); } } /** * Start pending containers in the queue. - * @param forceStartGuaranteedContaieners When this is true, start guaranteed - * container without looking at available resource + * @param reclaimOpportunisticResources if set to true, resources allocated + * to running OPPORTUNISTIC containers will be reclaimed in + * cases where there are GUARANTEED containers being queued */ - private void startPendingContainers(boolean forceStartGuaranteedContaieners) { - // Start guaranteed containers that are paused, if resources available. - boolean resourcesAvailable = startContainers( - queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners); - // Start opportunistic containers, if resources available. - if (resourcesAvailable) { - startContainers(queuedOpportunisticContainers.values(), false); + private void startPendingContainers(boolean reclaimOpportunisticResources) { + // When opportunistic container not allowed (which is determined by + // max-queue length of pending opportunistic containers <= 0), start + // guaranteed containers without looking at available resources and + // skip scanning the queue of opportunistic containers + if (maxOppQueueLength <= 0) { + forcefullyStartGuaranteedContainers(); + return; + } + + Resource available = utilizationTracker.getAvailableResources(); + + // Start guaranteed containers that are queued, if resources available. + boolean allGuaranteedContainersLaunched = + startGuaranteedContainers(available); + // Start opportunistic containers, if resources available, which is true + // if all guaranteed containers in queue have been launched. + if (allGuaranteedContainersLaunched) { + startOpportunisticContainers(available); + } else { + // If not all guaranteed containers in queue are launched, we may need + // to reclaim resources from opportunistic containers that are running. + if (reclaimOpportunisticResources) { + reclaimOpportunisticContainerResources(); + } } } - private boolean startContainers( - Collection containersToBeStarted, boolean force) { - Iterator cIter = containersToBeStarted.iterator(); + /** + * Try to launch as many GUARANTEED containers as possible. + * @param available the amount of resources available to launch containers + * @return true if all queued GUARANTEED containers are launched + * or there is no GUARANTEED containers to launch + */ + private boolean startGuaranteedContainers(Resource available) { + Iterator cIter = + queuedGuaranteedContainers.values().iterator(); boolean resourcesAvailable = true; while (cIter.hasNext() && resourcesAvailable) { Container container = cIter.next(); - if (tryStartContainer(container, force)) { + if (isResourceAvailable(available, container)) { + startContainer(container); + Resources.subtractFrom(available, container.getResource()); cIter.remove(); + Resources.subtractFrom( + guaranteedResourcesDemanded, container.getResource()); } else { resourcesAvailable = false; } @@ -372,25 +438,49 @@ private boolean startContainers( return resourcesAvailable; } - private boolean tryStartContainer(Container container, boolean force) { - boolean containerStarted = false; - // call startContainer without checking available resource when force==true - if (force || resourceAvailableToStartContainer( - container)) { + /** + * Launch all queued GUARANTEED containers without checking resource + * availability. This is an optimization in cases where OPPORTUNISTIC + * containers are not allowed on the node. + */ + private void forcefullyStartGuaranteedContainers() { + Iterator cIter = + queuedGuaranteedContainers.values().iterator(); + while (cIter.hasNext()) { + Container container = cIter.next(); startContainer(container); - containerStarted = true; + cIter.remove(); + Resources.subtractFrom( + guaranteedResourcesDemanded, container.getResource()); } - return containerStarted; } - /** - * Check if there is resource available to start a given container - * immediately. (This can be extended to include overallocated resources) - * @param container the container to start - * @return true if container can be launched directly + * Try to launch as many OPPORTUNISTIC containers as possible. + * @param available the amount of resources available to launch containers + * @return true if all OPPORTUNISTIC containers are launched + * or there is no OPPORTUNISTIC containers to launch */ - private boolean resourceAvailableToStartContainer(Container container) { - return this.utilizationTracker.hasResourcesAvailable(container); + private boolean startOpportunisticContainers(Resource available) { + Iterator cIter = + queuedOpportunisticContainers.values().iterator(); + boolean resourcesAvailable = true; + while (cIter.hasNext() && resourcesAvailable) { + Container container = cIter.next(); + if (isResourceAvailable(available, container)) { + startContainer(container); + Resources.subtractFrom(available, container.getResource()); + cIter.remove(); + } else { + resourcesAvailable = false; + } + } + return resourcesAvailable; + } + + private static boolean isResourceAvailable( + Resource resource, Container container) { + Resource left = Resources.subtract(resource, container.getResource()); + return left.getMemorySize() >= 0 && left.getVirtualCores() >= 0; } private boolean enqueueContainer(Container container) { @@ -400,6 +490,7 @@ private boolean enqueueContainer(Container container) { boolean isQueued; if (isGuaranteedContainer) { queuedGuaranteedContainers.put(container.getContainerId(), container); + Resources.addTo(guaranteedResourcesDemanded, container.getResource()); isQueued = true; } else { if (queuedOpportunisticContainers.size() < maxOppQueueLength) { @@ -444,18 +535,7 @@ protected void scheduleContainer(Container container) { // enough number of opportunistic containers. if (isGuaranteedContainer) { enqueueContainer(container); - - // When opportunistic container not allowed (which is determined by - // max-queue length of pending opportunistic containers <= 0), start - // guaranteed containers without looking at available resources. - boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0); - startPendingContainers(forceStartGuaranteedContainers); - - // if the guaranteed container is queued, we need to preempt opportunistic - // containers for make room for it - if (queuedGuaranteedContainers.containsKey(container.getContainerId())) { - reclaimOpportunisticContainerResources(container); - } + startPendingContainers(true); } else { // Given an opportunistic container, we first try to start as many queuing // guaranteed containers as possible followed by queuing opportunistic @@ -473,19 +553,19 @@ protected void scheduleContainer(Container container) { } @SuppressWarnings("unchecked") - private void reclaimOpportunisticContainerResources(Container container) { + private void reclaimOpportunisticContainerResources() { List extraOppContainersToReclaim = - pickOpportunisticContainersToReclaimResources( - container.getContainerId()); - // Kill the opportunistic containers that were chosen. - for (Container contToReclaim : extraOppContainersToReclaim) { + pickOpportunisticContainersToReclaimResources(); + killOpportunisticContainers(extraOppContainersToReclaim); + } + + private void killOpportunisticContainers( + Collection containersToReclaim) { + for (Container contToReclaim : containersToReclaim) { String preemptionAction = usePauseEventForPreemption == true ? "paused" : - "resumed"; - LOG.info( - "Container {} will be {} to start the " - + "execution of guaranteed container {}.", - contToReclaim.getContainerId(), preemptionAction, - container.getContainerId()); + "preempted"; + LOG.info("Container {} will be {} to start the execution of guaranteed" + + " containers.", contToReclaim.getContainerId(), preemptionAction); if (usePauseEventForPreemption) { contToReclaim.sendPauseEvent( @@ -496,13 +576,15 @@ private void reclaimOpportunisticContainerResources(Container container) { "Container Killed to make room for Guaranteed Container."); } oppContainersToKill.put(contToReclaim.getContainerId(), contToReclaim); + Resources.addTo( + opportunisticResourcesToBeReleased, contToReclaim.getResource()); } } private void startContainer(Container container) { LOG.info("Starting container [" + container.getContainerId()+ "]"); runningContainers.put(container.getContainerId(), container); - this.utilizationTracker.addContainerResources(container); + this.utilizationTracker.containerLaunched(container); if (container.getContainerTokenIdentifier().getExecutionType() == ExecutionType.OPPORTUNISTIC) { this.metrics.startOpportunisticContainer(container.getResource()); @@ -510,14 +592,12 @@ private void startContainer(Container container) { container.sendLaunchEvent(); } - private List pickOpportunisticContainersToReclaimResources( - ContainerId containerToStartId) { + private List pickOpportunisticContainersToReclaimResources() { // The opportunistic containers that need to be killed for the // given container to start. List extraOpportContainersToKill = new ArrayList<>(); // Track resources that need to be freed. - ResourceUtilization resourcesToFreeUp = resourcesToFreeUp( - containerToStartId); + ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(); // Go over the running opportunistic containers. // Use a descending iterator to kill more recently started containers. @@ -536,15 +616,19 @@ private void startContainer(Container container) { continue; } extraOpportContainersToKill.add(runningCont); + // In the case of over-allocation, the running container may not + // release as much resources as it has requested, but we'll check + // again if more containers need to be killed/paused when this + // container is released. ContainersMonitor.decreaseResourceUtilization( getContainersMonitor(), resourcesToFreeUp, runningCont.getResource()); } } if (!hasSufficientResources(resourcesToFreeUp)) { - LOG.warn("There are no sufficient resources to start guaranteed [{}]" + - "at the moment. Opportunistic containers are in the process of" + - "being killed to make room.", containerToStartId); + LOG.warn("There are no sufficient resources to start guaranteed" + + " containers at the moment. Opportunistic containers are in" + + " the process of being killed to make room."); } return extraOpportContainersToKill; } @@ -559,34 +643,42 @@ private boolean hasSufficientResources( * getContainersMonitor().getVCoresAllocatedForContainers()) <= 0; } - private ResourceUtilization resourcesToFreeUp( - ContainerId containerToStartId) { + /** + * Determine how much resources are needed to be freed up to launch the given + * GUARANTEED container. Used to determine how many running OPPORTUNISTIC + * containers need to be killed/paused, assuming OPPORTUNISTIC containers to + * be killed/paused will release the amount of resources they have requested. + * + * If the node is over-allocating itself, this may cause not enough + * OPPORTUNISTIC containers being killed/paused in cases where the running + * OPPORTUNISTIC containers are not consuming fully their resource requests. + * We'd check again upon container completion events to see if more running + * OPPORTUNISTIC containers need to be killed/paused. + * + * @return the amount of resource needed to be reclaimed for this container + */ + private ResourceUtilization resourcesToFreeUp() { // Get allocation of currently allocated containers. ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization - .newInstance(this.utilizationTracker.getCurrentUtilization()); - - // Add to the allocation the allocation of the pending guaranteed - // containers that will start before the current container will be started. - for (Container container : queuedGuaranteedContainers.values()) { - ContainersMonitor.increaseResourceUtilization( - getContainersMonitor(), resourceAllocationToFreeUp, - container.getResource()); - if (container.getContainerId().equals(containerToStartId)) { - break; - } - } + .newInstance(0, 0, 0.0f); + + // Add to the allocation the allocation of pending guaranteed containers. + ContainersMonitor.increaseResourceUtilization(getContainersMonitor(), + resourceAllocationToFreeUp, guaranteedResourcesDemanded); // These resources are being freed, likely at the behest of another // guaranteed container.. - for (Container container : oppContainersToKill.values()) { - ContainersMonitor.decreaseResourceUtilization( - getContainersMonitor(), resourceAllocationToFreeUp, - container.getResource()); + ContainersMonitor.decreaseResourceUtilization(getContainersMonitor(), + resourceAllocationToFreeUp, opportunisticResourcesToBeReleased); + + // Deduct any remaining resources available + Resource availableResources = utilizationTracker.getAvailableResources(); + if (availableResources.getVirtualCores() > 0 && + availableResources.getMemorySize() > 0) { + ContainersMonitor.decreaseResourceUtilization(getContainersMonitor(), + resourceAllocationToFreeUp, availableResources); } - // Subtract the overall node resources. - getContainersMonitor().subtractNodeResourcesFromResourceUtilization( - resourceAllocationToFreeUp); return resourceAllocationToFreeUp; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java index 294eddf..9ad4f91 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java @@ -28,5 +28,7 @@ // Producer: Node HB response - RM has asked to shed the queue SHED_QUEUED_CONTAINERS, CONTAINER_PAUSED, - RECOVERY_COMPLETED + RECOVERY_COMPLETED, + // Producer: Containers Monitor when over-allocation is on + SCHEDULE_CONTAINERS } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java new file mode 100644 index 0000000..58b73d2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/NMAllocationPolicy.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; + +/** + * Keeps track of containers utilization over time and determines how much + * resources are available to launch containers when over-allocation is on. + */ +public abstract class NMAllocationPolicy { + protected final ResourceThresholds overAllocationThresholds; + protected final ContainersMonitor containersMonitor; + + public NMAllocationPolicy( + ResourceThresholds overAllocationThresholds, + ContainersMonitor containersMonitor) { + this.containersMonitor = containersMonitor; + this.overAllocationThresholds = overAllocationThresholds; + } + + /** + * Handle container launch events. + * @param container the container that has been launched + */ + public void containerLaunched(Container container) { + + } + + /** + * Handle container release events. + * @param container the container that has been released + */ + public void containerReleased(Container container) { + + } + + /** + * Get the amount of resources to launch containers when + * over-allocation is turned on. + * @return the amount of resources available to launch containers + */ + public abstract Resource getAvailableResources(); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java index 3c17eca..98d99c6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -38,22 +39,20 @@ ResourceUtilization getCurrentUtilization(); /** - * Add Container's resources to Node Utilization. - * @param container Container. + * Get the amount of resources currently available to launch containers. + * @return Resource resources available to launch containers */ - void addContainerResources(Container container); + Resource getAvailableResources(); /** - * Subtract Container's resources to Node Utilization. + * Add Container's resources to Node Utilization upon container launch. * @param container Container. */ - void subtractContainerResource(Container container); + void containerLaunched(Container container); /** - * Check if NM has resources available currently to run the container. + * Subtract Container's resources to Node Utilization upon container release. * @param container Container. - * @return True, if NM has resources available currently to run the container. */ - boolean hasResourcesAvailable(Container container); - + void containerReleased(Container container); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java new file mode 100644 index 0000000..f486506 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/SnapshotBasedOverAllocationPolicy.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; + +/** + * An implementation of NMAllocationPolicy based on the + * snapshot of the latest containers utilization to determine how much + * resources are available * to launch containers when over-allocation + * is turned on. + */ +public class SnapshotBasedOverAllocationPolicy + extends NMAllocationPolicy { + + public SnapshotBasedOverAllocationPolicy( + ResourceThresholds overAllocationThresholds, + ContainersMonitor containersMonitor) { + super(overAllocationThresholds, containersMonitor); + } + + @Override + public Resource getAvailableResources() { + ResourceUtilization utilization = + containersMonitor.getContainersUtilization(true).getUtilization(); + long memoryAvailable = Math.round( + overAllocationThresholds.getMemoryThreshold() * + containersMonitor.getPmemAllocatedForContainers()) - + (utilization.getPhysicalMemory() << 20); + int vcoreAvailable = Math.round( + (overAllocationThresholds.getCpuThreshold() - utilization.getCPU()) * + containersMonitor.getVCoresAllocatedForContainers()); + return Resource.newInstance(memoryAvailable >> 20, vcoreAvailable); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java new file mode 100644 index 0000000..6f9bc82 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UtilizationBasedResourceTracker.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** +* An resource availability tracker that determines if there are resources +* available based on if there are unallocated resources or if there are +* un-utilized resources. +*/ +public class UtilizationBasedResourceTracker + extends AllocationBasedResourceTracker { + private static final Logger LOG = + LoggerFactory.getLogger(AllocationBasedResourceTracker.class); + + private final NMAllocationPolicy overAllocationPolicy; + + UtilizationBasedResourceTracker(ContainerScheduler scheduler) { + super(scheduler); + this.overAllocationPolicy = + getContainersMonitor().getContainerOverAllocationPolicy(); + } + + @Override + public void containerLaunched(Container container) { + super.containerLaunched(container); + if (overAllocationPolicy != null) { + overAllocationPolicy.containerLaunched(container); + } + } + + @Override + public void containerReleased(Container container) { + super.containerReleased(container); + if (overAllocationPolicy != null) { + overAllocationPolicy.containerReleased(container); + } + } + + @Override + public Resource getAvailableResources() { + Resource resourceBasedOnAllocation = getUnallocatedResources(); + Resource resourceBasedOnUtilization = + getResourcesAvailableBasedOnUtilization(); + if (LOG.isDebugEnabled()) { + LOG.debug("The amount of resources available based on allocation is " + + resourceBasedOnAllocation + ", based on utilization is " + + resourceBasedOnUtilization); + } + + return Resources.componentwiseMax(resourceBasedOnAllocation, + resourceBasedOnUtilization); + } + + /** + * Get the amount of resources based on the slack between + * the actual utilization and desired utilization. + * @return Resource resource available + */ + private Resource getResourcesAvailableBasedOnUtilization() { + if (overAllocationPolicy == null) { + return Resources.none(); + } + + return overAllocationPolicy.getAvailableResources(); + } + + @Override + public ResourceUtilization getCurrentUtilization() { + return getContainersMonitor().getContainersUtilization(false) + .getUtilization(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 93d0afb..05e9dd0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -26,6 +26,7 @@ import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; +import org.apache.hadoop.yarn.api.records.ContainerSubState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -345,6 +346,40 @@ public static void waitForContainerState( fStates.contains(containerStatus.getState())); } + public static void waitForContainerSubState( + ContainerManagementProtocol containerManager, ContainerId containerID, + ContainerSubState finalState) + throws InterruptedException, YarnException, IOException { + waitForContainerSubState(containerManager, containerID, + Arrays.asList(finalState), 20); + } + public static void waitForContainerSubState( + ContainerManagementProtocol containerManager, ContainerId containerID, + List finalStates, int timeOutMax) + throws InterruptedException, YarnException, IOException { + List list = new ArrayList<>(); + list.add(containerID); + GetContainerStatusesRequest request = + GetContainerStatusesRequest.newInstance(list); + ContainerStatus containerStatus; + HashSet fStates = new HashSet<>(finalStates); + int timeoutSecs = 0; + do { + Thread.sleep(1000); + containerStatus = + containerManager.getContainerStatuses(request) + .getContainerStatuses().get(0); + LOG.info("Waiting for container to get into one of states " + fStates + + ". Current state is " + containerStatus.getContainerSubState()); + timeoutSecs += 1; + } while (!fStates.contains(containerStatus.getContainerSubState()) + && timeoutSecs < timeOutMax); + LOG.info("Container state is " + containerStatus.getContainerSubState()); + Assert.assertTrue("ContainerSubState is not correct (timedout)", + fStates.contains(containerStatus.getContainerSubState())); + } + + public static void waitForApplicationState( ContainerManagerImpl containerManager, ApplicationId appID, ApplicationState finalState) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java index d7d826c..21d1889 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java @@ -282,7 +282,7 @@ public void testContainersCPUResourceForDefaultValue() throws Exception { // will be 0. assertEquals( "Resource utilization must be default with MonitorThread's first run", - 0, containersMonitor.getContainersUtilization() + 0, containersMonitor.getContainersUtilization(false).getUtilization() .compareTo(ResourceUtilization.newInstance(0, 0, 0.0f))); // Verify the container utilization value. Since atleast one round is done, @@ -297,8 +297,9 @@ public static void waitForContainerResourceUtilizationChange( ContainersMonitorImpl containersMonitor, int timeoutMsecs) throws InterruptedException { int timeWaiting = 0; - while (0 == containersMonitor.getContainersUtilization() - .compareTo(ResourceUtilization.newInstance(0, 0, 0.0f))) { + while (0 == containersMonitor.getContainersUtilization(false) + .getUtilization().compareTo( + ResourceUtilization.newInstance(0, 0, 0.0f))) { if (timeWaiting >= timeoutMsecs) { break; } @@ -310,7 +311,7 @@ public static void waitForContainerResourceUtilizationChange( } assertTrue("Resource utilization is not changed from second run onwards", - 0 != containersMonitor.getContainersUtilization() + 0 != containersMonitor.getContainersUtilization(false).getUtilization() .compareTo(ResourceUtilization.newInstance(0, 0, 0.0f))); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java new file mode 100644 index 0000000..1e8bfdf --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceTracker.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for the {@link AllocationBasedResourceTracker} class. + */ +public class TestAllocationBasedResourceTracker { + + private ContainerScheduler mockContainerScheduler; + + @Before + public void setup() { + mockContainerScheduler = mock(ContainerScheduler.class); + ContainersMonitor containersMonitor = + new ContainersMonitorImpl(mock(ContainerExecutor.class), + mock(AsyncDispatcher.class), mock(Context.class)); + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.NM_PMEM_MB, 1024); + conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true); + conf.setFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, 2.0f); + conf.setInt(YarnConfiguration.NM_VCORES, 8); + containersMonitor.init(conf); + when(mockContainerScheduler.getContainersMonitor()) + .thenReturn(containersMonitor); + } + + /** + * Node has capacity for 1024 MB and 8 cores. Saturate the node. When full the + * hasResourceAvailable should return false. + */ + @Test + public void testHasResourcesAvailable() { + AllocationBasedResourceTracker tracker = + new AllocationBasedResourceTracker(mockContainerScheduler); + Container testContainer = mock(Container.class); + when(testContainer.getResource()).thenReturn(Resource.newInstance(512, 4)); + for (int i = 0; i < 2; i++) { + Assert.assertTrue( + isResourcesAvailable(tracker.getAvailableResources(), testContainer)); + tracker.containerLaunched(testContainer); + } + Assert.assertFalse( + isResourcesAvailable(tracker.getAvailableResources(), testContainer)); + } + + private static boolean isResourcesAvailable( + Resource available, Container container) { + return available.compareTo(container.getResource()) >= 0; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java deleted file mode 100644 index 82c2147..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestAllocationBasedResourceUtilizationTracker.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.Context; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - * Tests for the {@link AllocationBasedResourceUtilizationTracker} class. - */ -public class TestAllocationBasedResourceUtilizationTracker { - - private ContainerScheduler mockContainerScheduler; - - @Before - public void setup() { - mockContainerScheduler = mock(ContainerScheduler.class); - ContainersMonitor containersMonitor = - new ContainersMonitorImpl(mock(ContainerExecutor.class), - mock(AsyncDispatcher.class), mock(Context.class)); - YarnConfiguration conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.NM_PMEM_MB, 1024); - conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true); - conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true); - conf.setFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, 2.0f); - conf.setInt(YarnConfiguration.NM_VCORES, 8); - containersMonitor.init(conf); - when(mockContainerScheduler.getContainersMonitor()) - .thenReturn(containersMonitor); - } - - /** - * Node has capacity for 1024 MB and 8 cores. Saturate the node. When full the - * hasResourceAvailable should return false. - */ - @Test - public void testHasResourcesAvailable() { - AllocationBasedResourceUtilizationTracker tracker = - new AllocationBasedResourceUtilizationTracker(mockContainerScheduler); - Container testContainer = mock(Container.class); - when(testContainer.getResource()).thenReturn(Resource.newInstance(512, 4)); - for (int i = 0; i < 2; i++) { - Assert.assertTrue(tracker.hasResourcesAvailable(testContainer)); - tracker.addContainerResources(testContainer); - } - Assert.assertFalse(tracker.hasResourcesAvailable(testContainer)); - } - - /** - * Test the case where the current allocation has been truncated to 0.8888891 - * (8/9 cores used). Request 1 additional core - hasEnoughCpu should return - * true. - */ - @Test - public void testHasEnoughCpu() { - AllocationBasedResourceUtilizationTracker tracker = - new AllocationBasedResourceUtilizationTracker(mockContainerScheduler); - float currentAllocation = 0.8888891f; - long totalCores = 9; - int alreadyUsedCores = 8; - Assert.assertTrue(tracker.hasEnoughCpu(currentAllocation, totalCores, - (int) totalCores - alreadyUsedCores)); - Assert.assertFalse(tracker.hasEnoughCpu(currentAllocation, totalCores, - (int) totalCores - alreadyUsedCores + 1)); - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java index 2ae8b97..2cef53a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; @@ -63,8 +64,8 @@ @Mock private ContainerId containerId; - @Mock private AllocationBasedResourceUtilizationTracker - allocationBasedResourceUtilizationTracker; + @Mock private AllocationBasedResourceTracker + allocationBasedResourceTracker; @InjectMocks private ContainerScheduler tempContainerScheduler = new ContainerScheduler(context, dispatcher, metrics, 0); @@ -75,12 +76,13 @@ MockitoAnnotations.initMocks(this); spy = spy(tempContainerScheduler); when(container.getContainerId()).thenReturn(containerId); + when(container.getResource()).thenReturn(Resource.newInstance(1024, 1)); when(containerId.getApplicationAttemptId()).thenReturn(appAttemptId); when(containerId.getApplicationAttemptId().getApplicationId()) .thenReturn(appId); when(containerId.getContainerId()).thenReturn(123L); - doNothing().when(allocationBasedResourceUtilizationTracker) - .addContainerResources(container); + doNothing().when(allocationBasedResourceTracker) + .containerLaunched(container); } @After public void tearDown() { @@ -101,8 +103,8 @@ assertEquals(1, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) - .addContainerResources(container); + Mockito.verify(allocationBasedResourceTracker, Mockito.times(0)) + .containerLaunched(container); } /*Test if a container is recovered as QUEUED, OPPORTUNISTIC, @@ -120,8 +122,8 @@ assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(1, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) - .addContainerResources(container); + Mockito.verify(allocationBasedResourceTracker, Mockito.times(0)) + .containerLaunched(container); } /*Test if a container is recovered as PAUSED, GUARANTEED, @@ -139,8 +141,8 @@ assertEquals(1, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) - .addContainerResources(container); + Mockito.verify(allocationBasedResourceTracker, Mockito.times(0)) + .containerLaunched(container); } /*Test if a container is recovered as PAUSED, OPPORTUNISTIC, @@ -158,8 +160,8 @@ assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(1, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) - .addContainerResources(container); + Mockito.verify(allocationBasedResourceTracker, Mockito.times(0)) + .containerLaunched(container); } /*Test if a container is recovered as LAUNCHED, GUARANTEED, @@ -177,8 +179,8 @@ assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(1, spy.getNumRunningContainers()); - Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(1)) - .addContainerResources(container); + Mockito.verify(allocationBasedResourceTracker, Mockito.times(1)) + .containerLaunched(container); } /*Test if a container is recovered as LAUNCHED, OPPORTUNISTIC, @@ -196,8 +198,8 @@ assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(1, spy.getNumRunningContainers()); - Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(1)) - .addContainerResources(container); + Mockito.verify(allocationBasedResourceTracker, Mockito.times(1)) + .containerLaunched(container); } /*Test if a container is recovered as REQUESTED, GUARANTEED, @@ -215,8 +217,8 @@ assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) - .addContainerResources(container); + Mockito.verify(allocationBasedResourceTracker, Mockito.times(0)) + .containerLaunched(container); } /*Test if a container is recovered as REQUESTED, OPPORTUNISTIC, @@ -234,8 +236,8 @@ assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) - .addContainerResources(container); + Mockito.verify(allocationBasedResourceTracker, Mockito.times(0)) + .containerLaunched(container); } /*Test if a container is recovered as COMPLETED, GUARANTEED, @@ -253,8 +255,8 @@ assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) - .addContainerResources(container); + Mockito.verify(allocationBasedResourceTracker, Mockito.times(0)) + .containerLaunched(container); } /*Test if a container is recovered as COMPLETED, OPPORTUNISTIC, @@ -272,8 +274,8 @@ assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) - .addContainerResources(container); + Mockito.verify(allocationBasedResourceTracker, Mockito.times(0)) + .containerLaunched(container); } /*Test if a container is recovered as GUARANTEED but no executionType set, @@ -290,8 +292,8 @@ assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) - .addContainerResources(container); + Mockito.verify(allocationBasedResourceTracker, Mockito.times(0)) + .containerLaunched(container); } /*Test if a container is recovered as PAUSED but no executionType set, @@ -308,7 +310,7 @@ assertEquals(0, spy.getNumQueuedGuaranteedContainers()); assertEquals(0, spy.getNumQueuedOpportunisticContainers()); assertEquals(0, spy.getNumRunningContainers()); - Mockito.verify(allocationBasedResourceUtilizationTracker, Mockito.times(0)) - .addContainerResources(container); + Mockito.verify(allocationBasedResourceTracker, Mockito.times(0)) + .containerLaunched(container); } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java new file mode 100644 index 0000000..4499a0c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerWithOverAllocation.java @@ -0,0 +1,1126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ContainerSubState; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.exceptions.ConfigurationException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Test ContainerScheduler behaviors when NM overallocation is turned on. + */ +public class TestContainerSchedulerWithOverAllocation + extends BaseContainerManagerTest { + private static final int NM_OPPORTUNISTIC_QUEUE_LIMIT = 3; + private static final int NM_CONTAINERS_VCORES = 4; + private static final int NM_CONTAINERS_MEMORY_MB = 2048; + + static { + LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class); + } + + public TestContainerSchedulerWithOverAllocation() + throws UnsupportedFileSystemException { + } + + @Override + protected ContainerExecutor createContainerExecutor() { + DefaultContainerExecutor exec = + new LongRunningContainerSimulatingContainerExecutor(); + exec.setConf(conf); + return exec; + } + + @Override + protected ContainerManagerImpl createContainerManager( + DeletionService delSrvc) { + return new LongRunningContainerSimulatingContainersManager( + context, exec, delSrvc, nodeStatusUpdater, metrics, dirsHandler, user); + } + + @Override + public void setup() throws IOException { + conf.setInt( + YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, + NM_OPPORTUNISTIC_QUEUE_LIMIT); + conf.setFloat( + YarnConfiguration.NM_OVERALLOCATION_CPU_UTILIZATION_THRESHOLD, + 0.75f); + conf.setFloat( + YarnConfiguration.NM_OVERALLOCATION_MEMORY_UTILIZATION_THRESHOLD, + 0.75f); + super.setup(); + } + + /** + * Start one GUARANTEED and one OPPORTUNISTIC container, which in aggregate do + * not exceed the capacity of the node. Both containers are expected to start + * running immediately. + */ + @Test + public void testStartMultipleContainersWithoutOverallocation() + throws Exception { + containerManager.start(); + + StartContainersRequest allRequests = StartContainersRequest.newInstance( + new ArrayList() { { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), false)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(1024, 1), true)); + } } + ); + containerManager.startContainers(allRequests); + + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(1), ContainerSubState.RUNNING); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + } + }); + } + + /** + * Start one GUARANTEED and one OPPORTUNISTIC containers whose utilization + * is very low relative to their resource request, resulting in a low node + * utilization. Then start another OPPORTUNISTIC containers which requests + * more than what's left unallocated on the node. Due to overallocation + * being turned on and node utilization being low, the second OPPORTUNISTIC + * container is also expected to be launched immediately. + */ + @Test + public void testStartOppContainersWithPartialOverallocationLowUtilization() + throws Exception { + containerManager.start(); + + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), true)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(824, 1), true)); + } + } + )); + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(1), ContainerSubState.RUNNING); + + // the current containers utilization is low + setContainerResourceUtilization( + ResourceUtilization.newInstance(512, 0, 1.0f/8)); + + // start a container that requests more than what's left unallocated + // 512 + 1024 + 824 > 2048 + containerManager.startContainers(StartContainersRequest.newInstance( + Collections.singletonList( + createStartContainerRequest(2, + BuilderUtils.newResource(512, 1), false)) + )); + + // this container is expected to be started immediately because there + // are (memory: 1024, vcore: 0.625) available based on over-allocation + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(2), ContainerSubState.RUNNING); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + } + }); + } + + /** + * Start one GUARANTEED and one OPPORTUNISTIC containers which utilizes most + * of the resources they requested, resulting in a high node utilization. + * Then start another OPPORTUNISTIC containers which requests more than what's + * left unallocated on the node. Because of the high resource utilization on + * the node, the projected utilization, if we were to start the second + * OPPORTUNISTIC container immediately, will go over the NM overallocation + * threshold, so the second OPPORTUNISTIC container is expected to be queued. + */ + @Test + public void testQueueOppContainerWithPartialOverallocationHighUtilization() + throws Exception { + containerManager.start(); + + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), true)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(824, 1), true)); + } + } + )); + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState( + containerManager, createContainerId(1), ContainerSubState.RUNNING); + + // the containers utilization is high + setContainerResourceUtilization( + ResourceUtilization.newInstance(1500, 0, 1.0f/8)); + + // start a container that requests more than what's left unallocated + // 512 + 1024 + 824 > 2048 + containerManager.startContainers(StartContainersRequest.newInstance( + Collections.singletonList( + createStartContainerRequest(2, + BuilderUtils.newResource(512, 1), false)) + )); + // this container will not start immediately because there is not + // enough resource available at the moment either in terms of + // resources unallocated or in terms of the actual utilization + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.SCHEDULED); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.SCHEDULED); + } + }); + } + + /** + * Start two GUARANTEED containers which in aggregate takes up the whole node + * capacity, yet whose utilization is low relative to their resource request, + * resulting in a low node resource utilization. Then try to start another + * OPPORTUNISTIC containers. Because the resource utilization across the node + * is low and overallocation being turned on, the OPPORTUNISTIC container is + * expected to be launched immediately even though there is no resources left + * unallocated. + */ + @Test + public void testStartOppContainersWithOverallocationLowUtilization() + throws Exception { + containerManager.start(); + + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), true)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(1024, 1), true)); + } + } + )); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + + // the current containers utilization is low + setContainerResourceUtilization( + ResourceUtilization.newInstance(800, 0, 1.0f/8)); + + // start a container when there is no resources left unallocated. + containerManager.startContainers(StartContainersRequest.newInstance( + Collections.singletonList( + createStartContainerRequest(2, + BuilderUtils.newResource(512, 1), false)) + )); + + // this container is expected to be started because there is resources + // available because the actual utilization is very low + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.RUNNING); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + } + }); + } + + + /** + * Start two GUARANTEED containers which in aggregate take up the whole node + * capacity and fully utilize the resources they requested. Then try to start + * four OPPORTUNISTIC containers of which three will be queued and one will be + * killed because of the max queue length is 3. + */ + @Test + public void testQueueOppContainersWithFullUtilization() throws Exception { + containerManager.start(); + + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), true)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(1024, 1), true)); + } + } + )); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + + // the containers are fully utilizing their resources + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, 1.0f/8)); + + // start more OPPORTUNISTIC containers than what the OPPORTUNISTIC container + // queue can hold when there is no unallocated resource left. + List moreContainerRequests = + new ArrayList<>(NM_OPPORTUNISTIC_QUEUE_LIMIT + 1); + for (int a = 0; a < NM_OPPORTUNISTIC_QUEUE_LIMIT + 1; a++) { + moreContainerRequests.add( + createStartContainerRequest(2 + a, + BuilderUtils.newResource(512, 1), false)); + } + containerManager.startContainers( + StartContainersRequest.newInstance(moreContainerRequests)); + + // All OPPORTUNISTIC containers but the last one should be queued. + // The last OPPORTUNISTIC container to launch should be killed. + BaseContainerManagerTest.waitForContainerState( + containerManager, createContainerId(NM_OPPORTUNISTIC_QUEUE_LIMIT + 2), + ContainerState.COMPLETE); + + HashMap expectedContainerStatus = + new HashMap<>(); + expectedContainerStatus.put( + createContainerId(0), ContainerSubState.RUNNING); + expectedContainerStatus.put( + createContainerId(1), ContainerSubState.RUNNING); + expectedContainerStatus.put( + createContainerId(NM_OPPORTUNISTIC_QUEUE_LIMIT), + ContainerSubState.DONE); + for (int i = 0; i < NM_OPPORTUNISTIC_QUEUE_LIMIT; i++) { + expectedContainerStatus.put( + createContainerId(i + 2), ContainerSubState.SCHEDULED); + } + verifyContainerStatuses(expectedContainerStatus); + } + + /** + * Start two GUARANTEED containers that together does not take up the + * whole node. Then try to start one OPPORTUNISTIC container that will + * fit into the remaining unallocated space on the node. + * The OPPORTUNISTIC container is expected to start even though the + * current node utilization is above the NM overallocation threshold, + * because it's always safe to launch containers as long as the node + * has not been fully allocated. + */ + @Test + public void testStartOppContainerWithHighUtilizationNoOverallocation() + throws Exception { + containerManager.start(); + + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1200, 1), true)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(400, 1), true)); + } + } + )); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + + // containers utilization is above the over-allocation threshold + setContainerResourceUtilization( + ResourceUtilization.newInstance(1600, 0, 1.0f/2)); + + // start a container that can just fit in the remaining unallocated space + containerManager.startContainers(StartContainersRequest.newInstance( + Collections.singletonList( + createStartContainerRequest(2, + BuilderUtils.newResource(400, 1), false)) + )); + + // the OPPORTUNISTIC container can be safely launched even though + // the container utilization is above the NM overallocation threshold + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.RUNNING); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + } + }); + } + + /** + * Start two OPPORTUNISTIC containers first whose utilization is low relative + * to the resources they requested, resulting in a low node utilization. Then + * try to start a GUARANTEED container which requests more than what's left + * unallocated on the node. Because the node utilization is low and NM + * overallocation is turned on, the GUARANTEED container is expected to be + * started immediately without killing any running OPPORTUNISTIC containers. + */ + @Test + public void testKillNoOppContainersWithPartialOverallocationLowUtilization() + throws Exception { + containerManager.start(); + + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), false)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(824, 1), false)); + } + } + )); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + + // containers utilization is low + setContainerResourceUtilization( + ResourceUtilization.newInstance(512, 0, 1.0f/8)); + + // start a GUARANTEED container that requests more than what's left + // unallocated on the node: (512 + 1024 + 824) > 2048 + containerManager.startContainers(StartContainersRequest.newInstance( + Collections.singletonList( + createStartContainerRequest(2, + BuilderUtils.newResource(512, 1), true)) + )); + + // the GUARANTEED container is expected be launched immediately without + // killing any OPPORTUNISTIC containers. + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.RUNNING); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + } + }); + } + + /** + * Start two OPPORTUNISTIC containers whose utilization will be high relative + * to the resources they requested, resulting in a high node utilization. + * Then try to start a GUARANTEED container which requests more than what's + * left unallocated on the node. Because the node is under high utilization, + * the second OPPORTUNISTIC container is expected to be killed in order to + * make room for the GUARANTEED container. + */ + @Test + public void testKillOppContainersWithPartialOverallocationHighUtilization() + throws Exception { + containerManager.start(); + + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), false)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(824, 1), false)); + } + } + )); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + + // the containers utilization is very high + setContainerResourceUtilization( + ResourceUtilization.newInstance(1800, 0, 1.0f/8)); + + // start a GUARANTEED container that requests more than what's left + // unallocated on the node 512 + 1024 + 824 > 2048 + containerManager.startContainers(StartContainersRequest.newInstance( + Collections.singletonList( + createStartContainerRequest(2, + BuilderUtils.newResource(512, 1), true)) + )); + + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.RUNNING); + // the last launched OPPORTUNISTIC container is expected to be killed + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.DONE); + + GetContainerStatusesRequest statRequest = GetContainerStatusesRequest. + newInstance(new ArrayList() { + { + add(createContainerId(0)); + add(createContainerId(1)); + add(createContainerId(2)); + } + }); + List containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + if (status.getContainerId().equals(createContainerId(1))) { + Assert.assertTrue(status.getDiagnostics().contains( + "Container Killed to make room for Guaranteed Container")); + } else { + Assert.assertEquals(status.getContainerId() + " is not RUNNING", + ContainerSubState.RUNNING, status.getContainerSubState()); + } + System.out.println("\nStatus : [" + status + "]\n"); + } + } + + + /** + * Start three OPPORTUNISTIC containers which in aggregates exceeds the + * capacity of the node, yet whose utilization is low relative + * to the resources they requested, resulting in a low node utilization. + * Then try to start a GUARANTEED container. Even though the node has + * nothing left unallocated, it is expected to start immediately + * without killing any running OPPORTUNISTIC containers because the node + * utilization is very low and overallocation is turned on. + */ + @Test + public void testKillNoOppContainersWithOverallocationLowUtilization() + throws Exception { + containerManager.start(); + + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), false)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(1024, 1), false)); + add(createStartContainerRequest(2, + BuilderUtils.newResource(1024, 1), false)); + } + } + )); + // All three GUARANTEED containers are all expected to start + // because the containers utilization is low (0 at the point) + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.RUNNING); + + // the containers utilization is low + setContainerResourceUtilization( + ResourceUtilization.newInstance(1024, 0, 1.0f/8)); + + // start a GUARANTEED container that requests more than what's left + // unallocated on the node: (512 + 1024 + 824) > 2048 + containerManager.startContainers(StartContainersRequest.newInstance( + Collections.singletonList( + createStartContainerRequest(3, + BuilderUtils.newResource(512, 1), true)) + )); + + // the GUARANTEED container is expected be launched immediately without + // killing any OPPORTUNISTIC containers + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(3), ContainerSubState.RUNNING); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + put(createContainerId(3), ContainerSubState.RUNNING); + } + }); + } + + /** + * Start four OPPORTUNISTIC containers which in aggregates exceeds the + * capacity of the node. The real resource utilization of the first two + * OPPORTUNISTIC containers are high whereas that of the latter two are + * almost zero. Then try to start a GUARANTEED container. The GUARANTEED + * container will eventually start running after preempting the third + * and fourth OPPORTUNISTIC container (which releases no resources) and + * then the second OPPORTUNISTIC container. + */ + @Test + public void + testKillOppContainersConservativelyWithOverallocationHighUtilization() + throws Exception { + containerManager.start(); + + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(1024, 1), false)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(1024, 1), false)); + add(createStartContainerRequest(2, + BuilderUtils.newResource(512, 1), false)); + add(createStartContainerRequest(3, + BuilderUtils.newResource(1024, 1), false)); + } + } + )); + // All four GUARANTEED containers are all expected to start + // because the containers utilization is low (0 at the point) + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(3), ContainerSubState.RUNNING); + + // the containers utilization is at the overallocation threshold + setContainerResourceUtilization( + ResourceUtilization.newInstance(1536, 0, 1.0f/2)); + + // try to start a GUARANTEED container when there's nothing left unallocated + containerManager.startContainers(StartContainersRequest.newInstance( + Collections.singletonList( + createStartContainerRequest(4, + BuilderUtils.newResource(1024, 1), true)) + )); + + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(4), ContainerSubState.RUNNING); + GetContainerStatusesRequest statRequest = GetContainerStatusesRequest. + newInstance(new ArrayList() { + { + add(createContainerId(0)); + add(createContainerId(1)); + add(createContainerId(2)); + add(createContainerId(3)); + add(createContainerId(4)); + } + }); + List containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + if (status.getContainerId().equals(createContainerId(0)) || + status.getContainerId().equals(createContainerId(4))) { + Assert.assertEquals( + ContainerSubState.RUNNING, status.getContainerSubState()); + } else { + Assert.assertTrue(status.getDiagnostics().contains( + "Container Killed to make room for Guaranteed Container")); + } + System.out.println("\nStatus : [" + status + "]\n"); + } + } + + /** + * Start two OPPORTUNISTIC containers followed by one GUARANTEED container, + * which in aggregate exceeds the capacity of the node. The first two + * OPPORTUNISTIC containers use almost no resources whereas the GUARANTEED + * one utilizes nearly all of its resource requested. Then try to start two + * more OPPORTUNISTIC containers. The two OPPORTUNISTIC containers are + * expected to be queued immediately. Upon the completion of the + * resource-usage-heavy GUARANTEED container, both OPPORTUNISTIC containers + * are expected to start. + */ + @Test + public void testStartOppContainersUponContainerCompletion() throws Exception { + containerManager.start(); + + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(0, + BuilderUtils.newResource(512, 1), false)); + add(createStartContainerRequest(1, + BuilderUtils.newResource(512, 1), false)); + add(createStartContainerRequest(2, + BuilderUtils.newResource(1024, 1), true)); + } + } + )); + + // All three containers are all expected to start immediately + // because the node utilization is low (0 at the point) + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.RUNNING); + + // the contianers utilization is at the overallocation threshold + setContainerResourceUtilization( + ResourceUtilization.newInstance(1536, 0, 1.0f/2)); + + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(3, + BuilderUtils.newResource(512, 1), false)); + add(createStartContainerRequest(4, + BuilderUtils.newResource(512, 1), false)); + } + } + )); + // the two new OPPORTUNISTIC containers are expected to be queued + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(3), ContainerSubState.SCHEDULED); + put(createContainerId(4), ContainerSubState.SCHEDULED); + } + }); + + // the GUARANTEED container is completed releasing resources + setContainerResourceUtilization( + ResourceUtilization.newInstance(100, 0, 1.0f/5)); + allowContainerToSucceed(2); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.DONE); + + // the two OPPORTUNISTIC containers are expected to start together + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(3), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(4), ContainerSubState.RUNNING); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.DONE); + put(createContainerId(3), ContainerSubState.RUNNING); + put(createContainerId(4), ContainerSubState.RUNNING); + } + }); + } + + /** + * Start one GUARANTEED container that consumes all the resources on the + * node and keeps running, followed by two OPPORTUNISTIC containers that + * will be queued forever because there is no containers starting or + * finishing. Then try to start OPPORTUNISTIC containers out of band. + */ + @Test + public void testStartOpportunisticContainersOutOfBand() throws Exception { + containerManager.start(); + + containerManager.startContainers(StartContainersRequest.newInstance( + Collections.singletonList( + createStartContainerRequest(0, + BuilderUtils.newResource(2048, 4), true)))); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(0), ContainerSubState.RUNNING); + + // the container is fully utilizing its resources + setContainerResourceUtilization( + ResourceUtilization.newInstance(2048, 0, 1.0f)); + + // send two OPPORTUNISTIC container requests that are expected to be queued + containerManager.startContainers(StartContainersRequest.newInstance( + new ArrayList() { + { + add(createStartContainerRequest(1, + BuilderUtils.newResource(512, 1), false)); + add(createStartContainerRequest(2, + BuilderUtils.newResource(512, 1), false)); + } + } + )); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.SCHEDULED); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.SCHEDULED); + + // the containers utilization dropped to the overallocation threshold + setContainerResourceUtilization( + ResourceUtilization.newInstance(1536, 0, 1.0f/2)); + + // try to start containers out of band. + ((LongRunningContainerSimulatingContainersManager)containerManager) + .startContainersOutOfBandUponLowUtilization(); + + // no containers in queue are expected to be launched because the + // containers utilization is not below the over-allocation threshold + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.SCHEDULED); + put(createContainerId(2), ContainerSubState.SCHEDULED); + } + }); + + // the GUARANTEED container is completed releasing resources + setContainerResourceUtilization( + ResourceUtilization.newInstance(100, 0, 1.0f/5)); + + // the containers utilization dropped way below the overallocation threshold + setContainerResourceUtilization( + ResourceUtilization.newInstance(512, 0, 1.0f/8)); + + ((LongRunningContainerSimulatingContainersManager)containerManager) + .startContainersOutOfBandUponLowUtilization(); + + // the two OPPORTUNISTIC containers are expected to be launched + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(1), ContainerSubState.RUNNING); + BaseContainerManagerTest.waitForContainerSubState(containerManager, + createContainerId(2), ContainerSubState.RUNNING); + + verifyContainerStatuses(new HashMap() { + { + put(createContainerId(0), ContainerSubState.RUNNING); + put(createContainerId(1), ContainerSubState.RUNNING); + put(createContainerId(2), ContainerSubState.RUNNING); + } + }); + } + + + private void setContainerResourceUtilization(ResourceUtilization usage) { + ((ContainerMonitorForOverallocationTest) + containerManager.getContainersMonitor()) + .setContainerResourceUsage(usage); + } + + private void allowContainerToSucceed(int containerId) { + ((LongRunningContainerSimulatingContainerExecutor) this.exec) + .containerSucceeded(createContainerId(containerId)); + } + + + protected StartContainerRequest createStartContainerRequest(int containerId, + Resource resource, boolean isGuaranteed) throws IOException { + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + ExecutionType executionType = isGuaranteed ? ExecutionType.GUARANTEED : + ExecutionType.OPPORTUNISTIC; + Token containerToken = createContainerToken( + createContainerId(containerId), + DUMMY_RM_IDENTIFIER, context.getNodeId(), user, resource, + context.getContainerTokenSecretManager(), + null, executionType); + + return StartContainerRequest.newInstance( + containerLaunchContext, containerToken); + } + + protected void verifyContainerStatuses( + Map expected) + throws IOException, YarnException { + List statList = new ArrayList<>(expected.keySet()); + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + + for (ContainerStatus status : containerStatuses) { + ContainerId containerId = status.getContainerId(); + Assert.assertEquals(containerId + " is in unexpected state", + expected.get(containerId), status.getContainerSubState()); + } + } + + /** + * A container manager that sends a dummy container pid while it's cleaning + * up running containers. Used along with + * LongRunningContainerSimulatingContainerExecutor to simulate long running + * container processes for testing purposes. + */ + private static class LongRunningContainerSimulatingContainersManager + extends ContainerManagerImpl { + + private final String user; + + LongRunningContainerSimulatingContainersManager( + Context context, ContainerExecutor exec, + DeletionService deletionContext, + NodeStatusUpdater nodeStatusUpdater, + NodeManagerMetrics metrics, + LocalDirsHandlerService dirsHandler, String user) { + super(context, exec, deletionContext, + nodeStatusUpdater, metrics, dirsHandler); + this.user = user; + } + + @Override + protected UserGroupInformation getRemoteUgi() throws YarnException { + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(appAttemptId.toString()); + ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context + .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey() + .getKeyId())); + return ugi; + } + + /** + * Create a container launcher that signals container processes + * with a dummy pid. The container processes are simulated in + * LongRunningContainerSimulatingContainerExecutor which does + * not write a pid file on behalf of containers to launch, so + * the pid does not matter. + */ + @Override + protected ContainersLauncher createContainersLauncher( + Context context, ContainerExecutor exec) { + ContainerManagerImpl containerManager = this; + return new ContainersLauncher(context, dispatcher, exec, dirsHandler, + this) { + @Override + protected ContainerLaunch createContainerLaunch( + Application app, Container container) { + return new ContainerLaunch(context, getConfig(), dispatcher, + exec, app, container, dirsHandler, containerManager) { + @Override + protected String getContainerPid(Path pidFile) + throws Exception { + return "123"; + } + + }; + } + }; + } + + @Override + protected ContainersMonitor createContainersMonitor( + ContainerExecutor exec) { + return new ContainerMonitorForOverallocationTest(exec, + dispatcher, context); + } + + public void startContainersOutOfBandUponLowUtilization() { + ((ContainerMonitorForOverallocationTest) getContainersMonitor()) + .attemptToStartContainersUponLowUtilization(); + } + } + + /** + * A container executor that simulates long running container processes + * by having container launch threads sleep infinitely until it's given + * a signal to finish with either a success or failure exit code. + */ + private static class LongRunningContainerSimulatingContainerExecutor + extends DefaultContainerExecutor { + private ConcurrentHashMap containers = + new ConcurrentHashMap<>(); + + public void containerSucceeded(ContainerId containerId) { + ContainerFinishLatch containerFinishLatch = containers.get(containerId); + if (containerFinishLatch != null) { + containerFinishLatch.toSucceed(); + } + } + + public void containerFailed(ContainerId containerId) { + ContainerFinishLatch containerFinishLatch = containers.get(containerId); + if (containerFinishLatch != null) { + containerFinishLatch.toFail(); + } + } + + /** + * Simulate long running container processes by having container launcher + * threads wait infinitely for a signal to finish. + */ + @Override + public int launchContainer(ContainerStartContext ctx) + throws IOException, ConfigurationException { + ContainerId container = ctx.getContainer().getContainerId(); + containers.putIfAbsent(container, new ContainerFinishLatch(container)); + + // simulate a long running container process by having the + // container launch thread sleep forever until it's given a + // signal to finish with a exit code. + while (!containers.get(container).toProceed) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + return -1; + } + } + + return containers.get(container).getContainerExitCode(); + } + + /** + * Override signalContainer() so that simulated container processes + * are properly cleaned up. + */ + @Override + public boolean signalContainer(ContainerSignalContext ctx) + throws IOException { + containerSucceeded(ctx.getContainer().getContainerId()); + return true; + } + + /** + * A signal that container launch threads wait for before exiting + * in order to simulate long running container processes. + */ + private static final class ContainerFinishLatch { + volatile boolean toProceed; + int exitCode; + ContainerId container; + + ContainerFinishLatch(ContainerId containerId) { + exitCode = 0; + toProceed = false; + container = containerId; + } + + void toSucceed() { + exitCode = 0; + toProceed = true; + } + + void toFail() { + exitCode = -101; + toProceed = true; + } + + int getContainerExitCode() { + // read barrier of toProceed to make sure the exit code is not stale + if (toProceed) { + LOG.debug(container + " finished with exit code: " + exitCode); + } + return exitCode; + } + } + } + + /** + * A test implementation of container monitor that allows control of + * current resource utilization. + */ + private static class ContainerMonitorForOverallocationTest + extends ContainersMonitorImpl { + + private ResourceUtilization containerResourceUsage = + ResourceUtilization.newInstance(0, 0, 0.0f); + + ContainerMonitorForOverallocationTest(ContainerExecutor exec, + AsyncDispatcher dispatcher, Context context) { + super(exec, dispatcher, context); + } + + @Override + public long getPmemAllocatedForContainers() { + return NM_CONTAINERS_MEMORY_MB * 1024 * 1024L; + } + + @Override + public long getVmemAllocatedForContainers() { + float pmemRatio = getConfig().getFloat( + YarnConfiguration.NM_VMEM_PMEM_RATIO, + YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); + return (long) (pmemRatio * getPmemAllocatedForContainers()); + } + + @Override + public long getVCoresAllocatedForContainers() { + return NM_CONTAINERS_VCORES; + } + + @Override + public ContainersResourceUtilization getContainersUtilization( + boolean latest) { + return new ContainersMonitor.ContainersResourceUtilization( + containerResourceUsage, System.currentTimeMillis()); + } + + public void setContainerResourceUsage( + ResourceUtilization containerResourceUsage) { + this.containerResourceUsage = containerResourceUsage; + } + + @Override + protected void checkOverAllocationPrerequisites() throws YarnException { + // disable cgroup checking for unit tests + } + } +}