diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 57d1bad..bdbfdf7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Map.Entry; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -32,12 +33,14 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; @@ -56,8 +59,10 @@ private boolean containerMetricsEnabled; private long containerMetricsPeriodMs; - final List containersToBeRemoved; - final Map containersToBeAdded; + private final List containersToBeRemoved; + private final Map containersToBeAdded; + private final List containersToBeChanged; + Map trackingContainers = new HashMap(); @@ -66,6 +71,7 @@ private final Context context; private ResourceCalculatorPlugin resourceCalculatorPlugin; private Configuration conf; + private static float vmemRatio; private Class processTreeClass; private long maxVmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT; @@ -81,6 +87,8 @@ private ResourceUtilization containersUtilization; + private volatile boolean stopped = false; + public ContainersMonitorImpl(ContainerExecutor exec, AsyncDispatcher dispatcher, Context context) { super("containers-monitor"); @@ -91,6 +99,7 @@ public ContainersMonitorImpl(ContainerExecutor exec, this.containersToBeAdded = new HashMap(); this.containersToBeRemoved = new ArrayList(); + this.containersToBeChanged = new ArrayList<>(); this.monitoringThread = new MonitoringThread(); this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f); @@ -135,7 +144,7 @@ protected void serviceInit(Configuration conf) throws Exception { this.maxVCoresAllottedForContainers = configuredVCoresForContainers; // ///////// Virtual memory configuration ////// - float vmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, + vmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); Preconditions.checkArgument(vmemRatio > 0.99f, YarnConfiguration.NM_VMEM_PMEM_RATIO + " should be at least 1.0"); @@ -210,6 +219,7 @@ protected void serviceStart() throws Exception { @Override protected void serviceStop() throws Exception { if (this.isEnabled()) { + stopped = true; this.monitoringThread.interrupt(); try { this.monitoringThread.join(); @@ -220,7 +230,8 @@ protected void serviceStop() throws Exception { super.serviceStop(); } - private static class ProcessTreeInfo { + @VisibleForTesting + static class ProcessTreeInfo { private ContainerId containerId; private String pid; private ResourceCalculatorProcessTree pTree; @@ -279,6 +290,32 @@ public int getCpuVcores() { } } + private static class ContainerResourceToChange { + private final ContainerId containerId; + private final long vmemLimit; + private final long pmemLimit; + private final int cpuVcores; + + public ContainerResourceToChange(ContainerId containerId, + Resource resource) { + this.containerId = containerId; + this.pmemLimit = resource.getMemory() * 1024L * 1024L; + this.vmemLimit = (long) (pmemLimit * vmemRatio); + this.cpuVcores = resource.getVirtualCores(); + } + public ContainerId getContainerId() { + return this.containerId; + } + public long getVmemLimit() { + return this.vmemLimit; + } + public long getPmemLimit() { + return this.pmemLimit; + } + public int getCpuVcores() { + return this.cpuVcores; + } + } /** * Check whether a container's process tree's current memory usage is over @@ -351,250 +388,288 @@ public MonitoringThread() { @Override public void run() { - while (true) { + while (!stopped && !Thread.currentThread().isInterrupted()) { - // Print the processTrees for debugging. - if (LOG.isDebugEnabled()) { - StringBuilder tmp = new StringBuilder("[ "); - for (ProcessTreeInfo p : trackingContainers.values()) { - tmp.append(p.getPID()); - tmp.append(" "); - } - LOG.debug("Current ProcessTree list : " - + tmp.substring(0, tmp.length()) + "]"); - } + enforceResourceLimits(); - // Add new containers - synchronized (containersToBeAdded) { - for (Entry entry : containersToBeAdded - .entrySet()) { - ContainerId containerId = entry.getKey(); - ProcessTreeInfo processTreeInfo = entry.getValue(); - LOG.info("Starting resource-monitoring for " + containerId); - trackingContainers.put(containerId, processTreeInfo); - } - containersToBeAdded.clear(); + try { + Thread.sleep(monitoringInterval); + } catch (InterruptedException e) { + LOG.warn(ContainersMonitorImpl.class.getName() + + " is interrupted. Exiting."); + break; } + } + } + } - // Remove finished containers - synchronized (containersToBeRemoved) { - for (ContainerId containerId : containersToBeRemoved) { - if (containerMetricsEnabled) { - ContainerMetrics.forContainer( - containerId, containerMetricsPeriodMs).finished(); - } - trackingContainers.remove(containerId); - LOG.info("Stopping resource-monitoring for " + containerId); - } - containersToBeRemoved.clear(); - } + // Synchronize this function to guard access to trackingContainers + // as it may be called from different threads + @SuppressWarnings("unchecked") + private synchronized void enforceResourceLimits() { + // Print the processTrees for debugging. + if (LOG.isDebugEnabled()) { + StringBuilder tmp = new StringBuilder("[ "); + for (ProcessTreeInfo p : trackingContainers.values()) { + tmp.append(p.getPID()); + tmp.append(" "); + } + LOG.debug("Current ProcessTree list : " + + tmp.substring(0, tmp.length()) + "]"); + } - // Temporary structure to calculate the total resource utilization of - // the containers - ResourceUtilization trackedContainersUtilization = - ResourceUtilization.newInstance(0, 0, 0.0f); - - // Now do the monitoring for the trackingContainers - // Check memory usage and kill any overflowing containers - long vmemUsageByAllContainers = 0; - long pmemByAllContainers = 0; - long cpuUsagePercentPerCoreByAllContainers = 0; - long cpuUsageTotalCoresByAllContainers = 0; - for (Iterator> it = - trackingContainers.entrySet().iterator(); it.hasNext();) { - - Map.Entry entry = it.next(); - ContainerId containerId = entry.getKey(); - ProcessTreeInfo ptInfo = entry.getValue(); - try { - String pId = ptInfo.getPID(); - - // Initialize any uninitialized processTrees - if (pId == null) { - // get pid from ContainerId - pId = containerExecutor.getProcessId(ptInfo.getContainerId()); - if (pId != null) { - // pId will be null, either if the container is not spawned yet - // or if the container's pid is removed from ContainerExecutor - LOG.debug("Tracking ProcessTree " + pId - + " for the first time"); - - ResourceCalculatorProcessTree pt = - ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pId, processTreeClass, conf); - ptInfo.setPid(pId); - ptInfo.setProcessTree(pt); - - if (containerMetricsEnabled) { - ContainerMetrics usageMetrics = ContainerMetrics - .forContainer(containerId, containerMetricsPeriodMs); - int cpuVcores = ptInfo.getCpuVcores(); - final int vmemLimit = (int) (ptInfo.getVmemLimit() >> 20); - final int pmemLimit = (int) (ptInfo.getPmemLimit() >> 20); - usageMetrics.recordResourceLimit( - vmemLimit, pmemLimit, cpuVcores); - usageMetrics.recordProcessId(pId); - } - } - } - // End of initializing any uninitialized processTrees + // Add new containers + synchronized (containersToBeAdded) { + for (Entry entry : containersToBeAdded + .entrySet()) { + ContainerId containerId = entry.getKey(); + ProcessTreeInfo processTreeInfo = entry.getValue(); + LOG.info("Starting resource-monitoring for " + containerId); + trackingContainers.put(containerId, processTreeInfo); + } + containersToBeAdded.clear(); + } - if (pId == null) { - continue; // processTree cannot be tracked - } + // Remove finished containers + synchronized (containersToBeRemoved) { + for (ContainerId containerId : containersToBeRemoved) { + if (containerMetricsEnabled) { + ContainerMetrics.forContainer( + containerId, containerMetricsPeriodMs).finished(); + } + trackingContainers.remove(containerId); + LOG.info("Stopping resource-monitoring for " + containerId); + } + containersToBeRemoved.clear(); + } - LOG.debug("Constructing ProcessTree for : PID = " + pId - + " ContainerId = " + containerId); - ResourceCalculatorProcessTree pTree = ptInfo.getProcessTree(); - pTree.updateProcessTree(); // update process-tree - long currentVmemUsage = pTree.getVirtualMemorySize(); - long currentPmemUsage = pTree.getRssMemorySize(); - // if machine has 6 cores and 3 are used, - // cpuUsagePercentPerCore should be 300% and - // cpuUsageTotalCoresPercentage should be 50% - float cpuUsagePercentPerCore = pTree.getCpuUsagePercent(); - float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore / - resourceCalculatorPlugin.getNumProcessors(); - - // Multiply by 1000 to avoid losing data when converting to int - int milliVcoresUsed = (int) (cpuUsageTotalCoresPercentage * 1000 - * maxVCoresAllottedForContainers /nodeCpuPercentageForYARN); - // as processes begin with an age 1, we want to see if there - // are processes more than 1 iteration old. - long curMemUsageOfAgedProcesses = pTree.getVirtualMemorySize(1); - long curRssMemUsageOfAgedProcesses = pTree.getRssMemorySize(1); - long vmemLimit = ptInfo.getVmemLimit(); - long pmemLimit = ptInfo.getPmemLimit(); - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( - "Memory usage of ProcessTree %s for container-id %s: ", - pId, containerId.toString()) + - formatUsageString( - currentVmemUsage, vmemLimit, - currentPmemUsage, pmemLimit)); - } + // Handle resized containers + synchronized (containersToBeChanged) { + for (ContainerResourceToChange c : containersToBeChanged) { + ContainerId containerId = c.getContainerId(); + ProcessTreeInfo info = trackingContainers.get(containerId); + if (info == null) { + LOG.warn("Failed to track container " + + containerId.toString() + + ". It may have already completed."); + continue; + } + LOG.info("Changing resource-monitoring for " + containerId); + info.pmemLimit = c.getPmemLimit(); + info.vmemLimit = c.getVmemLimit(); + info.cpuVcores = c.getCpuVcores(); + changeContainerResource(containerId, Resource.newInstance( + (int) (info.pmemLimit >> 20), info.cpuVcores)); + } + containersToBeChanged.clear(); + } - // Add resource utilization for this container - trackedContainersUtilization.addTo( - (int) (currentPmemUsage >> 20), - (int) (currentVmemUsage >> 20), - milliVcoresUsed / 1000.0f); + // Temporary structure to calculate the total resource utilization of + // the containers + ResourceUtilization trackedContainersUtilization = + ResourceUtilization.newInstance(0, 0, 0.0f); + + // Now do the monitoring for the trackingContainers + // Check memory usage and kill any overflowing containers + long vmemUsageByAllContainers = 0; + long pmemByAllContainers = 0; + long cpuUsagePercentPerCoreByAllContainers = 0; + long cpuUsageTotalCoresByAllContainers = 0; + for (Iterator> it = + trackingContainers.entrySet().iterator(); it.hasNext();) { + + Map.Entry entry = it.next(); + ContainerId containerId = entry.getKey(); + ProcessTreeInfo ptInfo = entry.getValue(); + try { + String pId = ptInfo.getPID(); + + // Initialize any uninitialized processTrees + if (pId == null) { + // get pid from ContainerId + pId = containerExecutor.getProcessId(ptInfo.getContainerId()); + if (pId != null) { + // pId will be null, either if the container is not spawned yet + // or if the container's pid is removed from ContainerExecutor + LOG.debug("Tracking ProcessTree " + pId + + " for the first time"); + + ResourceCalculatorProcessTree pt = + ResourceCalculatorProcessTree.getResourceCalculatorProcessTree( + pId, processTreeClass, conf); + ptInfo.setPid(pId); + ptInfo.setProcessTree(pt); - // Add usage to container metrics if (containerMetricsEnabled) { - ContainerMetrics.forContainer( - containerId, containerMetricsPeriodMs).recordMemoryUsage( - (int) (currentPmemUsage >> 20)); - ContainerMetrics.forContainer( - containerId, containerMetricsPeriodMs).recordCpuUsage - ((int)cpuUsagePercentPerCore, milliVcoresUsed); - } - - boolean isMemoryOverLimit = false; - String msg = ""; - int containerExitStatus = ContainerExitStatus.INVALID; - if (isVmemCheckEnabled() - && isProcessTreeOverLimit(containerId.toString(), - currentVmemUsage, curMemUsageOfAgedProcesses, vmemLimit)) { - // Container (the root process) is still alive and overflowing - // memory. - // Dump the process-tree and then clean it up. - msg = formatErrorMessage("virtual", - currentVmemUsage, vmemLimit, - currentPmemUsage, pmemLimit, - pId, containerId, pTree); - isMemoryOverLimit = true; - containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_VMEM; - } else if (isPmemCheckEnabled() - && isProcessTreeOverLimit(containerId.toString(), - currentPmemUsage, curRssMemUsageOfAgedProcesses, - pmemLimit)) { - // Container (the root process) is still alive and overflowing - // memory. - // Dump the process-tree and then clean it up. - msg = formatErrorMessage("physical", - currentVmemUsage, vmemLimit, - currentPmemUsage, pmemLimit, - pId, containerId, pTree); - isMemoryOverLimit = true; - containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_PMEM; + ContainerMetrics usageMetrics = ContainerMetrics + .forContainer(containerId, containerMetricsPeriodMs); + int cpuVcores = ptInfo.getCpuVcores(); + final int vmemLimit = (int) (ptInfo.getVmemLimit() >> 20); + final int pmemLimit = (int) (ptInfo.getPmemLimit() >> 20); + usageMetrics.recordResourceLimit( + vmemLimit, pmemLimit, cpuVcores); + usageMetrics.recordProcessId(pId); } - - // Accounting the total memory in usage for all containers - vmemUsageByAllContainers += currentVmemUsage; - pmemByAllContainers += currentPmemUsage; - // Accounting the total cpu usage for all containers - cpuUsagePercentPerCoreByAllContainers += cpuUsagePercentPerCore; - cpuUsageTotalCoresByAllContainers += cpuUsagePercentPerCore; - - if (isMemoryOverLimit) { - // Virtual or physical memory over limit. Fail the container and - // remove - // the corresponding process tree - LOG.warn(msg); - // warn if not a leader - if (!pTree.checkPidPgrpidForMatch()) { - LOG.error("Killed container process with PID " + pId - + " but it is not a process group leader."); - } - // kill the container - eventDispatcher.getEventHandler().handle( - new ContainerKillEvent(containerId, - containerExitStatus, msg)); - it.remove(); - LOG.info("Removed ProcessTree with root " + pId); - } - } catch (Exception e) { - // Log the exception and proceed to the next container. - LOG.warn("Uncaught exception in ContainerMemoryManager " - + "while managing memory of " + containerId, e); } } + // End of initializing any uninitialized processTrees + + if (pId == null) { + continue; // processTree cannot be tracked + } + + LOG.debug("Constructing ProcessTree for : PID = " + pId + + " ContainerId = " + containerId); + ResourceCalculatorProcessTree pTree = ptInfo.getProcessTree(); + pTree.updateProcessTree(); // update process-tree + long currentVmemUsage = pTree.getVirtualMemorySize(); + long currentPmemUsage = pTree.getRssMemorySize(); + // if machine has 6 cores and 3 are used, + // cpuUsagePercentPerCore should be 300% and + // cpuUsageTotalCoresPercentage should be 50% + float cpuUsagePercentPerCore = pTree.getCpuUsagePercent(); + float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore / + resourceCalculatorPlugin.getNumProcessors(); + + // Multiply by 1000 to avoid losing data when converting to int + int milliVcoresUsed = (int) (cpuUsageTotalCoresPercentage * 1000 + * maxVCoresAllottedForContainers /nodeCpuPercentageForYARN); + // as processes begin with an age 1, we want to see if there + // are processes more than 1 iteration old. + long curMemUsageOfAgedProcesses = pTree.getVirtualMemorySize(1); + long curRssMemUsageOfAgedProcesses = pTree.getRssMemorySize(1); + long vmemLimit = ptInfo.getVmemLimit(); + long pmemLimit = ptInfo.getPmemLimit(); if (LOG.isDebugEnabled()) { - LOG.debug("Total Resource Usage stats in NM by all containers : " - + "Virtual Memory= " + vmemUsageByAllContainers - + ", Physical Memory= " + pmemByAllContainers - + ", Total CPU usage= " + cpuUsageTotalCoresByAllContainers - + ", Total CPU(% per core) usage" - + cpuUsagePercentPerCoreByAllContainers); + LOG.debug(String.format( + "Memory usage of ProcessTree %s for container-id %s: ", + pId, containerId.toString()) + + formatUsageString( + currentVmemUsage, vmemLimit, + currentPmemUsage, pmemLimit)); } - // Save the aggregated utilization of the containers - setContainersUtilization(trackedContainersUtilization); + // Add resource utilization for this container + trackedContainersUtilization.addTo( + (int) (currentPmemUsage >> 20), + (int) (currentVmemUsage >> 20), + milliVcoresUsed / 1000.0f); + + // Add usage to container metrics + if (containerMetricsEnabled) { + ContainerMetrics.forContainer( + containerId, containerMetricsPeriodMs).recordMemoryUsage( + (int) (currentPmemUsage >> 20)); + ContainerMetrics.forContainer( + containerId, containerMetricsPeriodMs).recordCpuUsage( + (int) cpuUsagePercentPerCore, milliVcoresUsed); + } - try { - Thread.sleep(monitoringInterval); - } catch (InterruptedException e) { - LOG.warn(ContainersMonitorImpl.class.getName() - + " is interrupted. Exiting."); - break; + boolean isMemoryOverLimit = false; + String msg = ""; + int containerExitStatus = ContainerExitStatus.INVALID; + if (isVmemCheckEnabled() + && isProcessTreeOverLimit(containerId.toString(), + currentVmemUsage, curMemUsageOfAgedProcesses, vmemLimit)) { + // Container (the root process) is still alive and overflowing + // memory. + // Dump the process-tree and then clean it up. + msg = formatErrorMessage("virtual", + currentVmemUsage, vmemLimit, + currentPmemUsage, pmemLimit, + pId, containerId, pTree); + isMemoryOverLimit = true; + containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_VMEM; + } else if (isPmemCheckEnabled() + && isProcessTreeOverLimit(containerId.toString(), + currentPmemUsage, curRssMemUsageOfAgedProcesses, + pmemLimit)) { + // Container (the root process) is still alive and overflowing + // memory. + // Dump the process-tree and then clean it up. + msg = formatErrorMessage("physical", + currentVmemUsage, vmemLimit, + currentPmemUsage, pmemLimit, + pId, containerId, pTree); + isMemoryOverLimit = true; + containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_PMEM; + } + + // Accounting the total memory in usage for all containers + vmemUsageByAllContainers += currentVmemUsage; + pmemByAllContainers += currentPmemUsage; + // Accounting the total cpu usage for all containers + cpuUsagePercentPerCoreByAllContainers += cpuUsagePercentPerCore; + cpuUsageTotalCoresByAllContainers += cpuUsagePercentPerCore; + + if (isMemoryOverLimit) { + // Virtual or physical memory over limit. Fail the container and + // remove + // the corresponding process tree + LOG.warn(msg); + // warn if not a leader + if (!pTree.checkPidPgrpidForMatch()) { + LOG.error("Killed container process with PID " + pId + + " but it is not a process group leader."); + } + // kill the container + eventDispatcher.getEventHandler().handle( + new ContainerKillEvent(containerId, + containerExitStatus, msg)); + it.remove(); + LOG.info("Removed ProcessTree with root " + pId); } + } catch (Exception e) { + // Log the exception and proceed to the next container. + LOG.warn("Uncaught exception in ContainerMemoryManager " + + "while managing memory of " + containerId, e); } } + if (LOG.isDebugEnabled()) { + LOG.debug("Total Resource Usage stats in NM by all containers : " + + "Virtual Memory= " + vmemUsageByAllContainers + + ", Physical Memory= " + pmemByAllContainers + + ", Total CPU usage= " + cpuUsageTotalCoresByAllContainers + + ", Total CPU(% per core) usage" + + cpuUsagePercentPerCoreByAllContainers); + } - private String formatErrorMessage(String memTypeExceeded, - long currentVmemUsage, long vmemLimit, - long currentPmemUsage, long pmemLimit, - String pId, ContainerId containerId, ResourceCalculatorProcessTree pTree) { - return - String.format("Container [pid=%s,containerID=%s] is running beyond %s memory limits. ", - pId, containerId, memTypeExceeded) + - "Current usage: " + - formatUsageString(currentVmemUsage, vmemLimit, - currentPmemUsage, pmemLimit) + - ". Killing container.\n" + - "Dump of the process-tree for " + containerId + " :\n" + - pTree.getProcessTreeDump(); - } - - private String formatUsageString(long currentVmemUsage, long vmemLimit, - long currentPmemUsage, long pmemLimit) { - return String.format("%sB of %sB physical memory used; " + - "%sB of %sB virtual memory used", - TraditionalBinaryPrefix.long2String(currentPmemUsage, "", 1), - TraditionalBinaryPrefix.long2String(pmemLimit, "", 1), - TraditionalBinaryPrefix.long2String(currentVmemUsage, "", 1), - TraditionalBinaryPrefix.long2String(vmemLimit, "", 1)); + // Save the aggregated utilization of the containers + setContainersUtilization(trackedContainersUtilization); + } + + private void changeContainerResource( + ContainerId containerId, Resource resource) { + Container container = context.getContainers().get(containerId); + // Check container existence + if (container == null) { + LOG.warn("Container " + containerId.toString() + "does not exist"); + return; } + container.setResource(resource); + } + + private String formatErrorMessage(String memTypeExceeded, + long currentVmemUsage, long vmemLimit, long currentPmemUsage, + long pmemLimit, String pId, ContainerId containerId, + ResourceCalculatorProcessTree pTree) { + return String.format( + "Container [pid=%s,containerID=%s] is running beyond %s memory limits." + + " ", pId, containerId, memTypeExceeded) + "Current usage: " + + formatUsageString( + currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit) + + ". Killing container.\n" + "Dump of the process-tree for " + + containerId + " :\n" + pTree.getProcessTreeDump(); + } + + private String formatUsageString(long currentVmemUsage, long vmemLimit, + long currentPmemUsage, long pmemLimit) { + return String.format("%sB of %sB physical memory used; " + + "%sB of %sB virtual memory used", + TraditionalBinaryPrefix.long2String(currentPmemUsage, "", 1), + TraditionalBinaryPrefix.long2String(pmemLimit, "", 1), + TraditionalBinaryPrefix.long2String(currentVmemUsage, "", 1), + TraditionalBinaryPrefix.long2String(vmemLimit, "", 1)); } @Override @@ -642,13 +717,20 @@ public void setContainersUtilization(ResourceUtilization utilization) { } @Override + @SuppressWarnings("unchecked") public void handle(ContainersMonitorEvent monitoringEvent) { - + ContainerId containerId = monitoringEvent.getContainerId(); if (!isEnabled()) { + if (monitoringEvent.getType() == ContainersMonitorEventType + .CHANGE_MONITORING_CONTAINER_RESOURCE) { + // Nothing to enforce. Update container resource immediately. + ChangeMonitoringContainerResourceEvent changeEvent = + (ChangeMonitoringContainerResourceEvent) monitoringEvent; + changeContainerResource(containerId, changeEvent.getResource()); + } return; } - ContainerId containerId = monitoringEvent.getContainerId(); switch (monitoringEvent.getType()) { case START_MONITORING_CONTAINER: ContainerStartMonitoringEvent startEvent = @@ -675,6 +757,16 @@ public void handle(ContainersMonitorEvent monitoringEvent) { this.containersToBeRemoved.add(containerId); } break; + case CHANGE_MONITORING_CONTAINER_RESOURCE: + ChangeMonitoringContainerResourceEvent changeEvent = + (ChangeMonitoringContainerResourceEvent) monitoringEvent; + synchronized (this.containersToBeChanged) { + this.containersToBeChanged.add(new ContainerResourceToChange( + containerId, changeEvent.getResource())); + } + // Trigger a round of containers check immediately + enforceResourceLimits(); + break; default: // TODO: Wrong event. } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 853c789..6a605c5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -1047,6 +1047,102 @@ public void testIncreaseContainerResourceWithInvalidResource() throws Exception } } + @Test + public void testChangeContainerResource() throws Exception { + containerManager.start(); + File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); + PrintWriter fileWriter = new PrintWriter(scriptFile); + // Construct the Container-id + ContainerId cId = createContainerId(0); + if (Shell.WINDOWS) { + fileWriter.println("@ping -n 100 127.0.0.1 >nul"); + } else { + fileWriter.write("\numask 0"); + fileWriter.write("\nexec sleep 100"); + } + fileWriter.close(); + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + URL resource_alpha = + ConverterUtils.getYarnUrlFromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource rsrc_alpha = + recordFactory.newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file"; + Map localResources = + new HashMap(); + localResources.put(destinationFile, rsrc_alpha); + containerLaunchContext.setLocalResources(localResources); + List commands = + Arrays.asList(Shell.getRunScriptCommand(scriptFile)); + containerLaunchContext.setCommands(commands); + StartContainerRequest scRequest = + StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(cId, DUMMY_RM_IDENTIFIER, + context.getNodeId(), user, + context.getContainerTokenSecretManager())); + List list = new ArrayList(); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + // Make sure the container reaches RUNNING state + BaseContainerManagerTest.waitForNMContainerState(containerManager, cId, + org.apache.hadoop.yarn.server.nodemanager. + containermanager.container.ContainerState.RUNNING); + // Construct container resource increase request, + List increaseTokens = new ArrayList(); + // Add increase request. + Resource targetResource = Resource.newInstance(4096, 2); + Token containerToken = createContainerToken(cId, DUMMY_RM_IDENTIFIER, + context.getNodeId(), user, targetResource, + context.getContainerTokenSecretManager(), null); + increaseTokens.add(containerToken); + IncreaseContainersResourceRequest increaseRequest = + IncreaseContainersResourceRequest.newInstance(increaseTokens); + IncreaseContainersResourceResponse increaseResponse = + containerManager.increaseContainersResource(increaseRequest); + Assert.assertEquals( + 1, increaseResponse.getSuccessfullyIncreasedContainers().size()); + Assert.assertTrue(increaseResponse.getFailedRequests().isEmpty()); + // Check status + List containerIds = new ArrayList<>(); + containerIds.add(cId); + GetContainerStatusesRequest gcsRequest = + GetContainerStatusesRequest.newInstance(containerIds); + ContainerStatus containerStatus = containerManager + .getContainerStatuses(gcsRequest).getContainerStatuses().get(0); + // Check status immediately as resource increase is blocking + assertEquals(targetResource, containerStatus.getCapability()); + // Simulate a decrease request + List containersToDecrease + = new ArrayList<>(); + targetResource = Resource.newInstance(2048, 2); + org.apache.hadoop.yarn.api.records.Container decreasedContainer = + org.apache.hadoop.yarn.api.records.Container + .newInstance(cId, null, null, targetResource, null, null); + containersToDecrease.add(decreasedContainer); + containerManager.handle( + new CMgrDecreaseContainersResourceEvent(containersToDecrease)); + // Check status with retry + containerStatus = containerManager + .getContainerStatuses(gcsRequest).getContainerStatuses().get(0); + int retry = 0; + while (!targetResource.equals(containerStatus.getCapability()) && + (retry++ < 5)) { + Thread.sleep(200); + containerStatus = containerManager.getContainerStatuses(gcsRequest) + .getContainerStatuses().get(0); + } + assertEquals(targetResource, containerStatus.getCapability()); + } + public static Token createContainerToken(ContainerId cId, long rmIdentifier, NodeId nodeId, String user, NMContainerTokenSecretManager containerTokenSecretManager) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java new file mode 100644 index 0000000..4a18a8c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; + +public class MockResourceCalculatorPlugin extends ResourceCalculatorPlugin { + + @Override + public long getVirtualMemorySize() { + return 0; + } + + @Override + public long getPhysicalMemorySize() { + return 0; + } + + @Override + public long getAvailableVirtualMemorySize() { + return 0; + } + + @Override + public long getAvailablePhysicalMemorySize() { + return 0; + } + + @Override + public int getNumProcessors() { + return 0; + } + + @Override + public int getNumCores() { + return 0; + } + + @Override + public long getCpuFrequency() { + return 0; + } + + @Override + public long getCumulativeCpuTime() { + return 0; + } + + @Override + public float getCpuUsage() { + return 0; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java new file mode 100644 index 0000000..c5aaa77 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; + +public class MockResourceCalculatorProcessTree extends ResourceCalculatorProcessTree { + + private long rssMemorySize = 0; + + public MockResourceCalculatorProcessTree(String root) { + super(root); + } + + @Override + public void updateProcessTree() { + } + + @Override + public String getProcessTreeDump() { + return ""; + } + + @Override + public long getCumulativeCpuTime() { + return 0; + } + + @Override + public boolean checkPidPgrpidForMatch() { + return true; + } + + public void setRssMemorySize(long rssMemorySize) { + this.rssMemorySize = rssMemorySize; + } + + public long getRssMemorySize() { + return this.rssMemorySize; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java new file mode 100644 index 0000000..a3c60f6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; +import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; +import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.Matchers; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +public class TestContainersMonitorResourceChange { + + private ContainersMonitorImpl containersMonitor; + private MockExecutor executor; + private Configuration conf; + private AsyncDispatcher dispatcher; + private Context context; + private MockContainerEventHandler containerEventHandler; + + private static class MockExecutor extends ContainerExecutor { + @Override + public void init() throws IOException { + } + @Override + public void startLocalizer(LocalizerStartContext ctx) + throws IOException, InterruptedException { + } + @Override + public int launchContainer(ContainerStartContext ctx) throws + IOException { + return 0; + } + @Override + public boolean signalContainer(ContainerSignalContext ctx) + throws IOException { + return true; + } + @Override + public void deleteAsUser(DeletionAsUserContext ctx) + throws IOException, InterruptedException { + } + @Override + public String getProcessId(ContainerId containerId) { + return String.valueOf(containerId.getContainerId()); + } + @Override + public boolean isContainerAlive(ContainerLivenessContext ctx) + throws IOException { + return true; + } + } + + private static class MockContainerEventHandler implements + EventHandler { + final private Set killedContainer + = new HashSet<>(); + @Override + public void handle(ContainerEvent event) { + if (event.getType() == ContainerEventType.KILL_CONTAINER) { + synchronized (killedContainer) { + killedContainer.add(event.getContainerID()); + } + } + } + public boolean isContainerKilled(ContainerId containerId) { + synchronized (killedContainer) { + return killedContainer.contains(containerId); + } + } + } + + @Before + public void setup() { + executor = new MockExecutor(); + dispatcher = new AsyncDispatcher(); + context = Mockito.mock(Context.class); + Mockito.doReturn(new ConcurrentSkipListMap()) + .when(context).getContainers(); + conf = new Configuration(); + conf.set( + YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR, + MockResourceCalculatorPlugin.class.getCanonicalName()); + conf.set( + YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, + MockResourceCalculatorProcessTree.class.getCanonicalName()); + dispatcher.init(conf); + dispatcher.start(); + containerEventHandler = new MockContainerEventHandler(); + dispatcher.register(ContainerEventType.class, containerEventHandler); + } + + @After + public void tearDown() throws Exception { + if (containersMonitor != null) { + containersMonitor.stop(); + } + if (dispatcher != null) { + dispatcher.stop(); + } + } + + @Test + public void testContainersResourceChange() throws Exception { + // set container monitor interval to be 20ms + conf.setLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS, 20L); + containersMonitor = createContainersMonitor(executor, dispatcher, context); + containersMonitor.init(conf); + containersMonitor.start(); + // create container 1 + containersMonitor.handle(new ContainerStartMonitoringEvent( + getContainerId(1), 2100L, 1000L, 1, 0, 0)); + // verify that this container is properly tracked + Thread.sleep(200); + assertNotNull(getProcessTreeInfo(getContainerId(1))); + assertEquals(1000L, getProcessTreeInfo(getContainerId(1)) + .getPmemLimit()); + assertEquals(2100L, getProcessTreeInfo(getContainerId(1)) + .getVmemLimit()); + // increase pmem usage, the container should be killed + MockResourceCalculatorProcessTree mockTree = + (MockResourceCalculatorProcessTree) getProcessTreeInfo( + getContainerId(1)).getProcessTree(); + mockTree.setRssMemorySize(2500L); + // verify that this container is killed + Thread.sleep(200); + assertTrue(containerEventHandler + .isContainerKilled(getContainerId(1))); + // create container 2 + containersMonitor.handle(new ContainerStartMonitoringEvent( + getContainerId(2), 2202009L, 1048576L, 1, 0, 0)); + // verify that this container is properly tracked + Thread.sleep(200); + assertNotNull(getProcessTreeInfo(getContainerId(2))); + assertEquals(1048576L, getProcessTreeInfo(getContainerId(2)) + .getPmemLimit()); + assertEquals(2202009L, getProcessTreeInfo(getContainerId(2)) + .getVmemLimit()); + // trigger a change resource event, check limit after change + containersMonitor.handle(new ChangeMonitoringContainerResourceEvent( + getContainerId(2), Resource.newInstance(2, 1))); + Thread.sleep(200); + assertNotNull(getProcessTreeInfo(getContainerId(2))); + assertEquals(2097152L, getProcessTreeInfo(getContainerId(2)) + .getPmemLimit()); + assertEquals(4404019L, getProcessTreeInfo(getContainerId(2)) + .getVmemLimit()); + // increase pmem usage, the container should NOT be killed + mockTree = + (MockResourceCalculatorProcessTree) getProcessTreeInfo( + getContainerId(2)).getProcessTree(); + mockTree.setRssMemorySize(2000000L); + // verify that this container is not killed + Thread.sleep(200); + assertFalse(containerEventHandler + .isContainerKilled(getContainerId(2))); + containersMonitor.stop(); + } + + @Test + public void testContainersResourceChangeIsTriggeredImmediately() + throws Exception { + // set container monitor interval to be 20s + conf.setLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS, 20000L); + containersMonitor = createContainersMonitor(executor, dispatcher, context); + containersMonitor.init(conf); + containersMonitor.start(); + // sleep 1 second to make sure the container monitor thread is + // now waiting for the next monitor cycle + Thread.sleep(1000); + // create a container with id 3 + containersMonitor.handle(new ContainerStartMonitoringEvent( + getContainerId(3), 2202009L, 1048576L, 1, 0, 0)); + // sleep another second, and verify that this container still has not + // been tracked because the container monitor thread is still waiting + Thread.sleep(1000); + assertNull(getProcessTreeInfo(getContainerId(3))); + // trigger a change resource event, check limit after change + containersMonitor.handle(new ChangeMonitoringContainerResourceEvent( + getContainerId(3), Resource.newInstance(2, 1))); + // verify that this container has been properly tracked with the + // correct size + assertNotNull(getProcessTreeInfo(getContainerId(3))); + assertEquals(2097152L, getProcessTreeInfo(getContainerId(3)) + .getPmemLimit()); + assertEquals(4404019L, getProcessTreeInfo(getContainerId(3)) + .getVmemLimit()); + containersMonitor.stop(); + } + + private ContainersMonitorImpl createContainersMonitor( + ContainerExecutor containerExecutor, AsyncDispatcher dispatcher, + Context context) { + return new ContainersMonitorImpl(containerExecutor, dispatcher, context); + } + + private ContainerId getContainerId(int id) { + return ContainerId.newContainerId(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(123456L, 1), 1), id); + } + + private ProcessTreeInfo getProcessTreeInfo(ContainerId id) { + return containersMonitor.trackingContainers.get(id); + } +}