Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java (revision 40b0045ebe0752cd3d1d09be00acbabdea983799) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java (revision d8172cc52117664666f6574f1b62e238d2ba2293) @@ -54,7 +54,7 @@ this.name = name; } - String getName() { + public String getName() { return name; } @@ -112,6 +112,13 @@ void deleteCGroup(CGroupController controller, String cGroupId) throws ResourceHandlerException; + /** + * Gets the absolute path to the specified cgroup controller. + * @param controller - controller type for the cgroup + * @return the root of the controller. + */ + String getControllerPath(CGroupController controller); + /** * Gets the relative path for the cgroup, independent of a controller, for a * given cgroup id. Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java (revision 40b0045ebe0752cd3d1d09be00acbabdea983799) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java (revision d8172cc52117664666f6574f1b62e238d2ba2293) @@ -125,7 +125,8 @@ initializeControllerPaths(); } - private String getControllerPath(CGroupController controller) { + @Override + public String getControllerPath(CGroupController controller) { try { rwLock.readLock().lock(); return controllerPaths.get(controller); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsResourceCalculator.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsResourceCalculator.java (revision afb5d9832ba86c5433fde40875914c0f037cbf71) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsResourceCalculator.java (revision afb5d9832ba86c5433fde40875914c0f037cbf71) @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.CpuTimeTracker; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.SysInfoLinux; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; +import org.apache.hadoop.yarn.util.SystemClock; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.math.BigInteger; +import java.nio.charset.Charset; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A cgroups file-system based Resource calculator without the process tree + * features. + * + * CGroups has its limitations. It can only be enabled, if both CPU and memory + * cgroups are enabled with yarn.nodemanager.resource.cpu.enabled and + * yarn.nodemanager.resource.memory.enabled respectively. This means that + * memory limits are enforced by default. You can turn this off and keep + * memory reporting only with yarn.nodemanager.resource.memory.enforced. + * + * Another limitation is virtual memory measurement. CGroups does not have the + * ability to measure virtual memory usage. This includes memory reserved but + * not used. CGroups measures used memory as sa sum of + * physical memory and swap usage. This will be returned in the virtual + * memory counters. + * If the real virtual memory is required please use the legacy procfs based + * resource calculator or CombinedResourceCalculator. + */ +public class CGroupsResourceCalculator extends ResourceCalculatorProcessTree { + enum Result { + Continue, + Exit + } + protected static final Log LOG = LogFactory + .getLog(CGroupsResourceCalculator.class); + private static final String PROCFS = "/proc"; + static final String CGROUP = "cgroup"; + static final String CPU_STAT = "cpuacct.stat"; + static final String MEM_STAT = "memory.usage_in_bytes"; + static final String MEMSW_STAT = "memory.memsw.usage_in_bytes"; + private static final String USER = "user "; + private static final String SYSTEM = "system "; + + private static final Pattern CGROUP_FILE_FORMAT = Pattern.compile( + "^(\\d+):([^:]+):/(.*)$"); + private final String procfsDir; + private CGroupsHandler cGroupsHandler; + + private String pid; + private File cpuStat; + private File memStat; + private File memswStat; + + private BigInteger processTotalJiffies; + private long processPhysicalMemory; + private long processVirtualMemory; + + private final long jiffyLengthMs; + private final CpuTimeTracker cpuTimeTracker; + private Clock clock; + + private final static Object LOCK = new Object(); + private static boolean firstError = true; + + /** + * Create resource calculator for all Yarn containers. + */ + public CGroupsResourceCalculator() + throws YarnException { + this(null, PROCFS, ResourceHandlerModule.getCGroupsHandler(), + SystemClock.getInstance()); + } + + /** + * Create resource calculator for the container that has the specified pid. + * @param pid A pid from the cgroup or null for all containers + */ + public CGroupsResourceCalculator(String pid) { + this(pid, PROCFS, ResourceHandlerModule.getCGroupsHandler(), + SystemClock.getInstance()); + } + + /** + * Create resource calculator for testing. + * @param pid A pid from the cgroup or null for all containers + * @param procfsDir Path to /proc or a mock /proc directory + * @param cGroupsHandler Initialized cgroups handler object + * @param clock A clock object + */ + @VisibleForTesting + CGroupsResourceCalculator(String pid, String procfsDir, + CGroupsHandler cGroupsHandler, Clock clock) { + super(pid); + this.procfsDir = procfsDir; + this.cGroupsHandler = cGroupsHandler; + this.pid = pid != null && pid.equals("0") ? "1" : pid; + // In case of a unit test we do not have system clock, + // and it might not run on Linux, so let's hard code + // the value to 10 in that case. + this.jiffyLengthMs = (clock == SystemClock.getInstance()) ? + SysInfoLinux.JIFFY_LENGTH_IN_MILLIS : 10; + this.cpuTimeTracker = + new CpuTimeTracker(this.jiffyLengthMs); + this.clock = clock; + this.processTotalJiffies = BigInteger.ZERO; + this.processPhysicalMemory = 0L; + this.processVirtualMemory = 0L; + } + + @Override + public float getCpuUsagePercent() { + if (LOG.isDebugEnabled()) { + LOG.debug("Process " + pid + " jiffies:" + processTotalJiffies); + } + return cpuTimeTracker.getCpuTrackerUsagePercent(); + } + + @Override + public long getCumulativeCpuTime() { + if (jiffyLengthMs < 0) { + return UNAVAILABLE; + } + return processTotalJiffies.longValue() * jiffyLengthMs; + } + + @Override + public long getRssMemorySize(int olderThanAge) { + if (olderThanAge > 1) { + return UNAVAILABLE; + } + return processPhysicalMemory; + } + + @Override + public long getVirtualMemorySize(int olderThanAge) { + if (olderThanAge > 1) { + return UNAVAILABLE; + } + return processVirtualMemory; + } + + @Override + public void updateProcessTree() { + try { + this.processTotalJiffies = readTotalProcessJiffies(); + cpuTimeTracker.updateElapsedJiffies(processTotalJiffies, + clock.getTime()); + } catch (YarnException e) { + LOG.debug(e.getMessage()); + } + processPhysicalMemory = getMemorySize(memStat); + processVirtualMemory = getMemorySize(memswStat); + } + + @Override + public String getProcessTreeDump() { + // We do not have a process tree in cgroups return just the pid for tracking + return pid; + } + + @Override + public boolean checkPidPgrpidForMatch() { + // We do not have a process tree in cgroups returning default ok + return true; + } + + /** + * Checks if the CGroupsResourceCalculator is available on this system. + * This assumes that Linux container executor is already initialized. + * + * @return true if CGroupsResourceCalculator is available. False otherwise. + */ + public static boolean isAvailable() { + try { + if (!Shell.LINUX) { + LOG.info("CGroupsResourceCalculator currently is supported only on " + + "Linux."); + return false; + } + if (ResourceHandlerModule.getCGroupsHandler() == null || + ResourceHandlerModule.getCpuResourceHandler() == null || + ResourceHandlerModule.getMemoryResourceHandler() == null) { + LOG.info("CGroupsResourceCalculator requires enabling CGroups" + + "cpu and memory"); + return false; + } + } catch (SecurityException se) { + LOG.warn("Failed to get Operating System name. " + se); + return false; + } + return true; + } + + private long getMemorySize(File cgroupUsageFile) { + long[] mem = new long[1]; + try { + processFile(cgroupUsageFile, (String line) -> { + mem[0] = Long.parseLong(line); + return Result.Exit; + }); + return mem[0]; + } catch (YarnException e) { + synchronized (LOCK) { + if (firstError) { + LOG.warn("Failed to parse cgroups " + memswStat, e); + firstError = false; + } + } + } + return UNAVAILABLE; + } + + private BigInteger readTotalProcessJiffies() throws YarnException{ + try { + final BigInteger[] totalCPUTimeJiffies = new BigInteger[1]; + totalCPUTimeJiffies[0] = BigInteger.ZERO; + processFile(cpuStat, (String line) -> { + if (line.startsWith(USER)) { + totalCPUTimeJiffies[0] = totalCPUTimeJiffies[0].add( + new BigInteger(line.substring(USER.length()))); + } + if (line.startsWith(SYSTEM)) { + totalCPUTimeJiffies[0] = totalCPUTimeJiffies[0].add( + new BigInteger(line.substring(SYSTEM.length()))); + } + return Result.Continue; + }); + return totalCPUTimeJiffies[0]; + } catch (YarnException e) { + synchronized (LOCK) { + if (firstError) { + LOG.warn("Failed to parse " + pid, e); + firstError = false; + } + } + throw new YarnException("Cannot read process jiffies", e); + } + } + + private String getCGroupRelativePath( + CGroupsHandler.CGroupController controller) + throws YarnException { + if (pid == null) { + return cGroupsHandler.getRelativePathForCGroup(""); + } else { + return getCGroupRelativePathForPid(controller); + } + } + + private String getCGroupRelativePathForPid( + CGroupsHandler.CGroupController controller) + throws YarnException { + File pidCgroupFile = new File(new File(procfsDir, pid), CGROUP); + String[] result = new String[1]; + processFile(pidCgroupFile, (String line)->{ + Matcher m = CGROUP_FILE_FORMAT.matcher(line); + boolean mat = m.find(); + if (mat) { + if (m.group(2).contains(controller.getName())) { + // Instead of returning the full path we compose it + // based on the last item as the container id + // This helps to avoid confusion within a privileged Docker container + // where the path is referred in /proc//cgroup as + // /docker//hadoop-yarn/ + // but it is /hadoop-yarn/ in the cgroups hierarchy + String cgroupPath = m.group(3); + + if (cgroupPath != null) { + String cgroup = + new File(cgroupPath).toPath().getFileName().toString(); + result[0] = cGroupsHandler.getRelativePathForCGroup(cgroup); + } else { + LOG.warn("Invalid cgroup path for " + pidCgroupFile); + } + return Result.Exit; + } + } else { + LOG.warn( + "Unexpected: cgroup file is not in the expected format" + + " for process with pid " + pid); + } + return Result.Continue; + }); + if (result[0] == null) { + throw new YarnException(controller.getName() + " CGroup for pid " + pid + + " not found " + pidCgroupFile); + } + return result[0]; + } + + private void processFile(File file, Function processLine) + throws YarnException { + // Read "procfsDir//stat" file - typically /proc//stat + try (InputStreamReader fReader = new InputStreamReader( + new FileInputStream(file), Charset.forName("UTF-8"))) { + try (BufferedReader in = new BufferedReader(fReader)) { + try { + String str; + while ((str = in.readLine()) != null) { + Result result = processLine.apply(str); + if (result == Result.Exit) { + return; + } + } + } catch (IOException io) { + throw new YarnException("Error reading the stream " + io, io); + } + } + } catch (IOException f) { + throw new YarnException("The process vanished in the interim " + pid, f); + } + } + + public void setCGroupFilePaths() throws YarnException { + if (cGroupsHandler == null) { + throw new YarnException("CGroups handler is not initialized"); + } + File cpuDir = new File( + cGroupsHandler.getControllerPath( + CGroupsHandler.CGroupController.CPUACCT), + getCGroupRelativePath(CGroupsHandler.CGroupController.CPUACCT)); + File memDir = new File( + cGroupsHandler.getControllerPath( + CGroupsHandler.CGroupController.MEMORY), + getCGroupRelativePath(CGroupsHandler.CGroupController.MEMORY)); + cpuStat = new File(cpuDir, CPU_STAT); + memStat = new File(memDir, MEM_STAT); + memswStat = new File(memDir, MEMSW_STAT); + } + +} Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java (revision 40b0045ebe0752cd3d1d09be00acbabdea983799) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java (revision a68dafbd2e9f566257db245d4e5c0db2a529a20e) @@ -98,7 +98,27 @@ return cGroupsHandler; } - private static CGroupsCpuResourceHandlerImpl getCGroupsCpuResourceHandler( + public static OutboundBandwidthResourceHandler + getNetworkResourceHandler() { + return trafficControlBandwidthHandler; + } + + public static DiskResourceHandler + getDiskResourceHandler() { + return cGroupsBlkioResourceHandler; + } + + public static MemoryResourceHandler + getMemoryResourceHandler() { + return cGroupsMemoryResourceHandler; + } + + public static CpuResourceHandler + getCpuResourceHandler() { + return cGroupsCpuResourceHandler; + } + + private static CGroupsCpuResourceHandlerImpl initCGroupsCpuResourceHandler( Configuration conf) throws ResourceHandlerException { boolean cgroupsCpuEnabled = conf.getBoolean(YarnConfiguration.NM_CPU_RESOURCE_ENABLED, @@ -148,12 +168,12 @@ } public static OutboundBandwidthResourceHandler - getOutboundBandwidthResourceHandler(Configuration conf) - throws ResourceHandlerException { + initOutboundBandwidthResourceHandler(Configuration conf) + throws ResourceHandlerException { return getTrafficControlBandwidthHandler(conf); } - public static DiskResourceHandler getDiskResourceHandler(Configuration conf) + public static DiskResourceHandler initDiskResourceHandler(Configuration conf) throws ResourceHandlerException { if (conf.getBoolean(YarnConfiguration.NM_DISK_RESOURCE_ENABLED, YarnConfiguration.DEFAULT_NM_DISK_RESOURCE_ENABLED)) { @@ -177,7 +197,7 @@ return cGroupsBlkioResourceHandler; } - public static MemoryResourceHandler getMemoryResourceHandler( + public static MemoryResourceHandler initMemoryResourceHandler( Configuration conf) throws ResourceHandlerException { if (conf.getBoolean(YarnConfiguration.NM_MEMORY_RESOURCE_ENABLED, YarnConfiguration.DEFAULT_NM_MEMORY_RESOURCE_ENABLED)) { @@ -213,10 +233,14 @@ throws ResourceHandlerException { ArrayList handlerList = new ArrayList<>(); - addHandlerIfNotNull(handlerList, getOutboundBandwidthResourceHandler(conf)); - addHandlerIfNotNull(handlerList, getDiskResourceHandler(conf)); - addHandlerIfNotNull(handlerList, getMemoryResourceHandler(conf)); - addHandlerIfNotNull(handlerList, getCGroupsCpuResourceHandler(conf)); + addHandlerIfNotNull(handlerList, + initOutboundBandwidthResourceHandler(conf)); + addHandlerIfNotNull(handlerList, + initDiskResourceHandler(conf)); + addHandlerIfNotNull(handlerList, + initMemoryResourceHandler(conf)); + addHandlerIfNotNull(handlerList, + initCGroupsCpuResourceHandler(conf)); addHandlersFromConfiguredResourcePlugins(handlerList, conf, nmContext); resourceHandlerChain = new ResourceHandlerChain(handlerList); } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java (revision 40b0045ebe0752cd3d1d09be00acbabdea983799) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java (revision f35d8a60de41c0010ebbe5238e9453f8d470cc02) @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CombinedResourceCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -33,11 +34,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsResourceCalculator; import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; @@ -89,6 +92,9 @@ private static final long UNKNOWN_MEMORY_LIMIT = -1L; private int nodeCpuPercentageForYARN; + private boolean cgroupsLogged = false; + private boolean cgroupsErrorLogged = false; + /** * Type of container metric. */ @@ -212,15 +218,79 @@ YarnConfiguration.DEFAULT_NM_CONTAINER_MONITOR_ENABLED); } + /** + * Get the best process tree calculator. + * @param pId container process id + * @return process tree calculator + */ + private ResourceCalculatorProcessTree + getResourceCalculatorProcessTree(String pId) { + ResourceCalculatorProcessTree pt = null; + + // Check, if CGroups is configured + if (processTreeClass == CGroupsResourceCalculator.class) { + if (CGroupsResourceCalculator.isAvailable()) { + try { + final CGroupsResourceCalculator cg = + new CGroupsResourceCalculator(pId); + cg.setCGroupFilePaths(); + pt = cg; + if (!cgroupsLogged) { + cgroupsLogged = true; + LOG.info("CGroups is enabled, so using CGroupsResourceCalculator"); + } else { + LOG.debug("CGroups is enabled, so using CGroupsResourceCalculator"); + } + } catch (YarnException e) { + if (!cgroupsErrorLogged) { + cgroupsErrorLogged = true; + LOG.info("CGroupsResourceCalculator cannot be created", e); + } else { + LOG.debug("CGroupsResourceCalculator cannot be created", e); + } + } + } else { + LOG.info("CGroupsResourceCalculator is not available"); + } + } + + if (processTreeClass == CombinedResourceCalculator.class) { + try { + final CombinedResourceCalculator cg = + new CombinedResourceCalculator(pId); + cg.setCGroupFilePaths(); + pt = cg; + if (!cgroupsLogged) { + cgroupsLogged = true; + LOG.info("CGroups is enabled, so using CombinedResourceCalculator"); + } else { + LOG.debug("CGroups is enabled, so using CombinedResourceCalculator"); + } + } catch (YarnException e) { + if (!cgroupsErrorLogged) { + cgroupsErrorLogged = true; + LOG.info("CombinedResourceCalculator cannot be created", e); + } else { + LOG.debug("CombinedResourceCalculator cannot be created", e); + } + } + } + + if (pt == null) { + pt = ResourceCalculatorProcessTree. + getResourceCalculatorProcessTree( + pId, processTreeClass, conf); + } + return pt; + } + private boolean isResourceCalculatorAvailable() { if (resourceCalculatorPlugin == null) { LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + this .getClass().getName() + " is disabled."); return false; } - if (ResourceCalculatorProcessTree - .getResourceCalculatorProcessTree("0", processTreeClass, conf) - == null) { + if (getResourceCalculatorProcessTree("0") == null) { LOG.info("ResourceCalculatorProcessTree is unavailable on this system. " + this.getClass().getName() + " is disabled."); return false; @@ -520,9 +590,7 @@ LOG.debug("Tracking ProcessTree " + pId + " for the first time"); } ResourceCalculatorProcessTree pt = - ResourceCalculatorProcessTree. - getResourceCalculatorProcessTree( - pId, processTreeClass, conf); + getResourceCalculatorProcessTree(pId); ptInfo.setPid(pId); ptInfo.setProcessTree(pt); @@ -584,11 +652,14 @@ long pmemLimit = ptInfo.getPmemLimit(); if (LOG.isDebugEnabled()) { LOG.debug(String.format( - "Memory usage of ProcessTree %s for container-id %s: ", - pId, containerId.toString()) + - formatUsageString( - currentVmemUsage, vmemLimit, - currentPmemUsage, pmemLimit)); + "Resource usage of ProcessTree %s for container-id %s:" + + " %s CPU:%f CPU/core:%f", + pId, containerId.toString(), + formatUsageString( + currentVmemUsage, vmemLimit, + currentPmemUsage, pmemLimit), + cpuUsagePercentPerCore, + cpuUsageTotalCoresPercentage)); } // Add resource utilization for this container Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsResourceCalculator.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsResourceCalculator.java (revision 5197e9a09e9ec448ef25cb98a58de9a7149bb023) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsResourceCalculator.java (revision 5197e9a09e9ec448ef25cb98a58de9a7149bb023) @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.ControlledClock; +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; + +import static org.mockito.Mockito.*; + +/** + * Unit test for CGroupsResourceCalculator. + */ +public class TestCGroupsResourceCalculator { + + private ControlledClock clock = new ControlledClock(); + private CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + private String basePath = "/tmp/" + this.getClass().getName(); + + public TestCGroupsResourceCalculator() { + when(cGroupsHandler.getRelativePathForCGroup("container_1")) + .thenReturn("/yarn/container_1"); + when(cGroupsHandler.getRelativePathForCGroup("")).thenReturn("/yarn/"); + } + + @Test(expected = YarnException.class) + public void testPidNotFound() throws Exception { + CGroupsResourceCalculator calculator = + new CGroupsResourceCalculator( + "1234", ".", cGroupsHandler, clock); + calculator.setCGroupFilePaths(); + Assert.assertEquals("Expected exception", null, calculator); + } + + @Test(expected = YarnException.class) + public void testNoMemoryCGgroupMount() throws Exception { + File procfs = new File(basePath + "/1234"); + Assert.assertTrue("Setup error", procfs.mkdirs()); + try { + FileUtils.writeStringToFile( + new File(procfs, CGroupsResourceCalculator.CGROUP), + "7:devices:/yarn/container_1\n" + + "6:cpuacct,cpu:/yarn/container_1\n" + + "5:pids:/yarn/container_1\n"); + CGroupsResourceCalculator calculator = + new CGroupsResourceCalculator( + "1234", basePath, + cGroupsHandler, clock); + calculator.setCGroupFilePaths(); + Assert.assertEquals("Expected exception", null, calculator); + } finally { + FileUtils.deleteDirectory(new File(basePath)); + } + } + + @Test + public void testCGgroupNotFound() throws Exception { + File procfs = new File(basePath + "/1234"); + Assert.assertTrue("Setup error", procfs.mkdirs()); + try { + FileUtils.writeStringToFile( + new File(procfs, CGroupsResourceCalculator.CGROUP), + "7:devices:/yarn/container_1\n" + + "6:cpuacct,cpu:/yarn/container_1\n" + + "5:pids:/yarn/container_1\n" + + "4:memory:/yarn/container_1\n"); + + CGroupsResourceCalculator calculator = + new CGroupsResourceCalculator( + "1234", basePath, + cGroupsHandler, clock); + calculator.setCGroupFilePaths(); + calculator.updateProcessTree(); + Assert.assertEquals("cgroups should be missing", + (long)ResourceCalculatorProcessTree.UNAVAILABLE, + calculator.getRssMemorySize(0)); + } finally { + FileUtils.deleteDirectory(new File(basePath)); + } + } + + @Test + public void testCPUParsing() throws Exception { + File cgcpuacctDir = + new File(basePath + "/cgcpuacct"); + File cgcpuacctContainerDir = + new File(cgcpuacctDir, "/yarn/container_1"); + File procfs = new File(basePath + "/1234"); + when(cGroupsHandler.getControllerPath( + CGroupsHandler.CGroupController.CPUACCT)). + thenReturn(cgcpuacctDir.getAbsolutePath()); + Assert.assertTrue("Setup error", procfs.mkdirs()); + Assert.assertTrue("Setup error", cgcpuacctContainerDir.mkdirs()); + try { + FileUtils.writeStringToFile( + new File(procfs, CGroupsResourceCalculator.CGROUP), + "7:devices:/yarn/container_1\n" + + "6:cpuacct,cpu:/yarn/container_1\n" + + "5:pids:/yarn/container_1\n" + + "4:memory:/yarn/container_1\n"); + FileUtils.writeStringToFile( + new File(cgcpuacctContainerDir, CGroupsResourceCalculator.CPU_STAT), + "Can you handle this?\n" + + "user 5415\n" + + "system 3632"); + CGroupsResourceCalculator calculator = + new CGroupsResourceCalculator( + "1234", basePath, + cGroupsHandler, clock); + calculator.setCGroupFilePaths(); + calculator.updateProcessTree(); + Assert.assertEquals("Incorrect CPU usage", + 90470, + calculator.getCumulativeCpuTime()); + } finally { + FileUtils.deleteDirectory(new File(basePath)); + } + } + + @Test + public void testMemoryParsing() throws Exception { + File cgcpuacctDir = + new File(basePath + "/cgcpuacct"); + File cgcpuacctContainerDir = + new File(cgcpuacctDir, "/yarn/container_1"); + File cgmemoryDir = + new File(basePath + "/memory"); + File cgMemoryContainerDir = + new File(cgmemoryDir, "/yarn/container_1"); + File procfs = new File(basePath + "/1234"); + when(cGroupsHandler.getControllerPath( + CGroupsHandler.CGroupController.MEMORY)). + thenReturn(cgmemoryDir.getAbsolutePath()); + Assert.assertTrue("Setup error", procfs.mkdirs()); + Assert.assertTrue("Setup error", cgcpuacctContainerDir.mkdirs()); + Assert.assertTrue("Setup error", cgMemoryContainerDir.mkdirs()); + try { + FileUtils.writeStringToFile( + new File(procfs, CGroupsResourceCalculator.CGROUP), + "6:cpuacct,cpu:/yarn/container_1\n" + + "4:memory:/yarn/container_1\n"); + FileUtils.writeStringToFile( + new File(cgMemoryContainerDir, CGroupsResourceCalculator.MEM_STAT), + "418496512\n"); + + CGroupsResourceCalculator calculator = + new CGroupsResourceCalculator( + "1234", basePath, + cGroupsHandler, clock); + calculator.setCGroupFilePaths(); + + calculator.updateProcessTree(); + // Test the case where memsw is not available (Ubuntu) + Assert.assertEquals("Incorrect memory usage", + 418496512, + calculator.getRssMemorySize()); + Assert.assertEquals("Incorrect swap usage", + (long)ResourceCalculatorProcessTree.UNAVAILABLE, + calculator.getVirtualMemorySize()); + + // Test the case where memsw is available + FileUtils.writeStringToFile( + new File(cgMemoryContainerDir, CGroupsResourceCalculator.MEMSW_STAT), + "418496513\n"); + calculator.updateProcessTree(); + Assert.assertEquals("Incorrect swap usage", + 418496513, + calculator.getVirtualMemorySize()); + } finally { + FileUtils.deleteDirectory(new File(basePath)); + } + } + + @Test + public void testCPUParsingRoot() throws Exception { + File cgcpuacctDir = + new File(basePath + "/cgcpuacct"); + File cgcpuacctRootDir = + new File(cgcpuacctDir, "/yarn"); + when(cGroupsHandler.getControllerPath( + CGroupsHandler.CGroupController.CPUACCT)). + thenReturn(cgcpuacctDir.getAbsolutePath()); + Assert.assertTrue("Setup error", cgcpuacctRootDir.mkdirs()); + try { + FileUtils.writeStringToFile( + new File(cgcpuacctRootDir, CGroupsResourceCalculator.CPU_STAT), + "user 5415\n" + + "system 3632"); + CGroupsResourceCalculator calculator = + new CGroupsResourceCalculator( + null, basePath, + cGroupsHandler, clock); + calculator.setCGroupFilePaths(); + calculator.updateProcessTree(); + Assert.assertEquals("Incorrect CPU usage", + 90470, + calculator.getCumulativeCpuTime()); + } finally { + FileUtils.deleteDirectory(new File(basePath)); + } + } + + @Test + public void testMemoryParsingRoot() throws Exception { + File cgcpuacctDir = + new File(basePath + "/cgcpuacct"); + File cgcpuacctRootDir = + new File(cgcpuacctDir, "/yarn"); + File cgmemoryDir = + new File(basePath + "/memory"); + File cgMemoryRootDir = + new File(cgmemoryDir, "/yarn"); + File procfs = new File(basePath + "/1234"); + when(cGroupsHandler.getControllerPath( + CGroupsHandler.CGroupController.MEMORY)). + thenReturn(cgmemoryDir.getAbsolutePath()); + Assert.assertTrue("Setup error", procfs.mkdirs()); + Assert.assertTrue("Setup error", cgcpuacctRootDir.mkdirs()); + Assert.assertTrue("Setup error", cgMemoryRootDir.mkdirs()); + try { + FileUtils.writeStringToFile( + new File(cgMemoryRootDir, CGroupsResourceCalculator.MEM_STAT), + "418496512\n"); + + CGroupsResourceCalculator calculator = + new CGroupsResourceCalculator( + null, basePath, + cGroupsHandler, clock); + calculator.setCGroupFilePaths(); + + calculator.updateProcessTree(); + + // Test the case where memsw is not available (Ubuntu) + Assert.assertEquals("Incorrect memory usage", + 418496512, + calculator.getRssMemorySize()); + Assert.assertEquals("Incorrect swap usage", + (long)ResourceCalculatorProcessTree.UNAVAILABLE, + calculator.getVirtualMemorySize()); + + // Test the case where memsw is available + FileUtils.writeStringToFile( + new File(cgMemoryRootDir, CGroupsResourceCalculator.MEMSW_STAT), + "418496513\n"); + calculator.updateProcessTree(); + Assert.assertEquals("Incorrect swap usage", + 418496513, + calculator.getVirtualMemorySize()); + } finally { + FileUtils.deleteDirectory(new File(basePath)); + } + } +} Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java (revision 40b0045ebe0752cd3d1d09be00acbabdea983799) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java (revision d8172cc52117664666f6574f1b62e238d2ba2293) @@ -36,8 +36,8 @@ public class TestResourceHandlerModule { private static final Logger LOG = LoggerFactory.getLogger(TestResourceHandlerModule.class); - Configuration emptyConf; - Configuration networkEnabledConf; + private Configuration emptyConf; + private Configuration networkEnabledConf; @Before public void setup() throws Exception { @@ -55,23 +55,28 @@ //This resourceHandler should be non-null only if network as a resource //is explicitly enabled OutboundBandwidthResourceHandler resourceHandler = ResourceHandlerModule - .getOutboundBandwidthResourceHandler(emptyConf); + .initOutboundBandwidthResourceHandler(emptyConf); Assert.assertNull(resourceHandler); //When network as a resource is enabled this should be non-null resourceHandler = ResourceHandlerModule - .getOutboundBandwidthResourceHandler(networkEnabledConf); + .initOutboundBandwidthResourceHandler(networkEnabledConf); Assert.assertNotNull(resourceHandler); //Ensure that outbound bandwidth resource handler is present in the chain ResourceHandlerChain resourceHandlerChain = ResourceHandlerModule - .getConfiguredResourceHandlerChain(networkEnabledConf, mock(Context.class)); - List resourceHandlers = resourceHandlerChain - .getResourceHandlerList(); - //Exactly one resource handler in chain - Assert.assertEquals(resourceHandlers.size(), 1); - //Same instance is expected to be in the chain. - Assert.assertTrue(resourceHandlers.get(0) == resourceHandler); + .getConfiguredResourceHandlerChain(networkEnabledConf, + mock(Context.class)); + if (resourceHandlerChain != null) { + List resourceHandlers = resourceHandlerChain + .getResourceHandlerList(); + //Exactly one resource handler in chain + Assert.assertEquals(resourceHandlers.size(), 1); + //Same instance is expected to be in the chain. + Assert.assertTrue(resourceHandlers.get(0) == resourceHandler); + } else { + Assert.fail("Null returned"); + } } catch (ResourceHandlerException e) { Assert.fail("Unexpected ResourceHandlerException: " + e); } @@ -81,23 +86,27 @@ public void testDiskResourceHandler() throws Exception { DiskResourceHandler handler = - ResourceHandlerModule.getDiskResourceHandler(emptyConf); + ResourceHandlerModule.initDiskResourceHandler(emptyConf); Assert.assertNull(handler); Configuration diskConf = new YarnConfiguration(); diskConf.setBoolean(YarnConfiguration.NM_DISK_RESOURCE_ENABLED, true); - handler = ResourceHandlerModule.getDiskResourceHandler(diskConf); + handler = ResourceHandlerModule.initDiskResourceHandler(diskConf); Assert.assertNotNull(handler); ResourceHandlerChain resourceHandlerChain = ResourceHandlerModule.getConfiguredResourceHandlerChain(diskConf, mock(Context.class)); - List resourceHandlers = - resourceHandlerChain.getResourceHandlerList(); - // Exactly one resource handler in chain - Assert.assertEquals(resourceHandlers.size(), 1); - // Same instance is expected to be in the chain. - Assert.assertTrue(resourceHandlers.get(0) == handler); + if (resourceHandlerChain != null) { + List resourceHandlers = + resourceHandlerChain.getResourceHandlerList(); + // Exactly one resource handler in chain + Assert.assertEquals(resourceHandlers.size(), 1); + // Same instance is expected to be in the chain. + Assert.assertTrue(resourceHandlers.get(0) == handler); + } else { + Assert.fail("Null returned"); + } } } \ No newline at end of file Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCompareResourceCalculators.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCompareResourceCalculators.java (revision ca07b5c3c63c91a83244588170b971f73e4fb153) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCompareResourceCalculators.java (revision ca07b5c3c63c91a83244588170b971f73e4fb153) @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + + +import org.apache.commons.lang3.SystemUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; +import org.junit.*; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Random; + +import static org.mockito.Mockito.mock; + +/** + * Unit test for CGroupsResourceCalculator. + */ +public class TestCompareResourceCalculators { + private Process target = null; + private String cgroup = null; + private String cgroupCPU = null; + private String cgroupMemory = null; + public static final long SHMEM_KB = 100 * 1024; + + @Before + public void setup() throws IOException, YarnException { + Assume.assumeTrue(SystemUtils.IS_OS_LINUX); + + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_HIERARCHY, + "TestCompareResourceCalculators"); + conf.setBoolean(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_MOUNT, false); + conf.setStrings(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH, + "/sys/fs/cgroup"); + conf.setBoolean(YarnConfiguration.NM_CPU_RESOURCE_ENABLED, true); + ResourceHandlerChain module = null; + try { + module = ResourceHandlerModule.getConfiguredResourceHandlerChain(conf, + mock(Context.class)); + } catch (ResourceHandlerException e) { + throw new YarnException("Cannot access cgroups", e); + } + Assume.assumeNotNull(module); + Assume.assumeNotNull( + ResourceHandlerModule.getCGroupsHandler() + .getControllerPath(CGroupsHandler.CGroupController.CPU)); + Assume.assumeNotNull( + ResourceHandlerModule.getCGroupsHandler() + .getControllerPath(CGroupsHandler.CGroupController.MEMORY)); + + Random random = new Random(System.currentTimeMillis()); + cgroup = Long.toString(random.nextLong()); + cgroupCPU = ResourceHandlerModule.getCGroupsHandler() + .getPathForCGroup(CGroupsHandler.CGroupController.CPU, cgroup); + cgroupMemory = ResourceHandlerModule.getCGroupsHandler() + .getPathForCGroup(CGroupsHandler.CGroupController.MEMORY, cgroup); + } + + @After + public void tearDown() throws YarnException { + stopTestProcess(); + } + + + // Ignored in automated tests due to flakiness by design + @Ignore + @Test + public void testCompareResults() + throws YarnException, InterruptedException, IOException { + + startTestProcess(); + + ProcfsBasedProcessTree legacyCalculator = + new ProcfsBasedProcessTree(Long.toString(getPid())); + CGroupsResourceCalculator cgroupsCalculator = + new CGroupsResourceCalculator(Long.toString(getPid())); + cgroupsCalculator.setCGroupFilePaths(); + + for (int i = 0; i < 5; ++i) { + Thread.sleep(3000); + compareMetrics(legacyCalculator, cgroupsCalculator); + } + + stopTestProcess(); + + ensureCleanedUp(legacyCalculator, cgroupsCalculator); + } + + private void ensureCleanedUp( + ResourceCalculatorProcessTree metric1, + ResourceCalculatorProcessTree metric2) { + metric1.updateProcessTree(); + metric2.updateProcessTree(); + long pmem1 = metric1.getRssMemorySize(0); + long pmem2 = metric2.getRssMemorySize(0); + System.out.println(pmem1 + " " + pmem2); + Assert.assertTrue("pmem should be invalid " + pmem1 + " " + pmem2, + pmem1 == ResourceCalculatorProcessTree.UNAVAILABLE && + pmem2 == ResourceCalculatorProcessTree.UNAVAILABLE); + long vmem1 = metric1.getRssMemorySize(0); + long vmem2 = metric2.getRssMemorySize(0); + System.out.println(vmem1 + " " + vmem2); + Assert.assertTrue("vmem Error outside range " + vmem1 + " " + vmem2, + vmem1 == ResourceCalculatorProcessTree.UNAVAILABLE && + vmem2 == ResourceCalculatorProcessTree.UNAVAILABLE); + float cpu1 = metric1.getCpuUsagePercent(); + float cpu2 = metric2.getCpuUsagePercent(); + // TODO ProcfsBasedProcessTree may report negative on process exit + Assert.assertTrue("CPU% Error outside range " + cpu1 + " " + cpu2, + cpu1 == 0 && cpu2 == 0); + } + + private void compareMetrics( + ResourceCalculatorProcessTree metric1, + ResourceCalculatorProcessTree metric2) { + metric1.updateProcessTree(); + metric2.updateProcessTree(); + long pmem1 = metric1.getRssMemorySize(0); + long pmem2 = metric2.getRssMemorySize(0); + // TODO The calculation is different and cgroup + // can report a small amount after process stop + // This is not an issue since the cgroup is deleted + System.out.println(pmem1 + " " + (pmem2 - SHMEM_KB * 1024)); + Assert.assertTrue("pmem Error outside range " + pmem1 + " " + pmem2, + Math.abs(pmem1 - (pmem2 - SHMEM_KB * 1024)) < 5000000); + long vmem1 = metric1.getRssMemorySize(0); + long vmem2 = metric2.getRssMemorySize(0); + System.out.println(vmem1 + " " + (vmem2 - SHMEM_KB * 1024)); + // TODO The calculation is different and cgroup + // can report a small amount after process stop + // This is not an issue since the cgroup is deleted + Assert.assertTrue("vmem Error outside range " + vmem1 + " " + vmem2, + Math.abs(vmem1 - (vmem2 - SHMEM_KB * 1024)) < 5000000); + float cpu1 = metric1.getCpuUsagePercent(); + float cpu2 = metric2.getCpuUsagePercent(); + if (cpu1 > 0) { + // TODO ProcfsBasedProcessTree may report negative on process exit + Assert.assertTrue("CPU% Error outside range " + cpu1 + " " + cpu2, + Math.abs(cpu2 - cpu1) < 10); + } + } + + private void startTestProcess() throws IOException { + ProcessBuilder builder = new ProcessBuilder(); + String script = + "mkdir -p " + cgroupCPU + ";" + + "echo $$ >" + cgroupCPU + "/tasks;" + + "mkdir -p " + cgroupMemory + ";" + + "echo $$ >" + cgroupMemory + "/tasks;" + + "dd if=/dev/zero of=/dev/shm/" + + cgroup + " bs=1k count=" + SHMEM_KB + ";" + + "dd if=/dev/zero of=/dev/null bs=1k &" + + "echo $! >/tmp/\" + cgroup + \".pid;" + + //"echo while [ -f /tmp/" + cgroup + ".pid ]; do sleep 1; done;" + + "sleep 10000;" + + "echo kill $(jobs -p);"; + builder.command("bash", "-c", script); + builder.redirectError(new File("/tmp/a.txt")); + builder.redirectOutput(new File("/tmp/b.txt")); + target = builder.start(); + } + + private void stopTestProcess() throws YarnException { + if (target != null) { + target.destroyForcibly(); + target = null; + } + try { + ProcessBuilder builder = new ProcessBuilder(); + String script = + "rm -f /dev/shm/" + cgroup + ";" + + "cat " + cgroupCPU + "/tasks | xargs kill;" + + "rm -f /tmp/" + cgroup + ".pid;" + + "sleep 4;" + + "rmdir " + cgroupCPU + ";" + + "rmdir " + cgroupMemory + ";"; + builder.command("bash", "-c", script); + Process cleanup = builder.start(); + cleanup.waitFor(); + } catch (IOException|InterruptedException e) { + throw new YarnException("Could not clean up", e); + } + } + + private long getPid() throws YarnException { + Class processClass = target.getClass(); + if (processClass.getName().equals("java.lang.UNIXProcess")) { + try { + Field pidField = processClass.getDeclaredField("pid"); + pidField.setAccessible(true); + long pid = pidField.getLong(target); + pidField.setAccessible(false); + return pid; + } catch (NoSuchFieldException|IllegalAccessException e) { + throw new YarnException("Reflection error", e); + } + } else { + throw new YarnException("Not Unix " + processClass.getName()); + } + } + + +} Index: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CpuTimeTracker.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CpuTimeTracker.java (revision c8163c28b974a622fcb03ed7d0b820b9e8340842) +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CpuTimeTracker.java (revision bfb9132e83f15b6ac76b5c95e3742599a304bc5a) @@ -99,7 +99,7 @@ public void updateElapsedJiffies(BigInteger elapsedJiffies, long newTime) { BigInteger newValue = elapsedJiffies.multiply(jiffyLengthInMillis); cumulativeCpuTime = newValue.compareTo(cumulativeCpuTime) >= 0 ? - newValue : cumulativeCpuTime; + newValue : cumulativeCpuTime; sampleTime = newTime; } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java (revision c48e75849e792ad8c25ad5e18db4fb9881fd71ff) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java (revision 7546729df5ef24a020bb57062c4e140ca57609c5) @@ -465,6 +465,9 @@ @Override public float getCpuUsagePercent() { BigInteger processTotalJiffies = getTotalProcessJiffies(); + if (LOG.isDebugEnabled()) { + LOG.debug("Process " + pid + " jiffies:" + processTotalJiffies); + } cpuTimeTracker.updateElapsedJiffies(processTotalJiffies, clock.getTime()); return cpuTimeTracker.getCpuTrackerUsagePercent(); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CombinedResourceCalculator.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CombinedResourceCalculator.java (revision afb5d9832ba86c5433fde40875914c0f037cbf71) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CombinedResourceCalculator.java (revision afb5d9832ba86c5433fde40875914c0f037cbf71) @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; + +/** + * CombinedResourceCalculator is a resource calculator that uses cgroups but + * it is backward compatible with procfs in terms of virtual memory usage. + */ +public class CombinedResourceCalculator extends ResourceCalculatorProcessTree { + protected static final Log LOG = LogFactory + .getLog(CombinedResourceCalculator.class); + private ProcfsBasedProcessTree procfs; + private CGroupsResourceCalculator cgroup; + + public CombinedResourceCalculator(String pid) { + super(pid); + procfs = new ProcfsBasedProcessTree(pid); + cgroup = new CGroupsResourceCalculator(pid); + } + + public void setCGroupFilePaths() throws YarnException { + cgroup.setCGroupFilePaths(); + } + + @Override + public void updateProcessTree() { + procfs.updateProcessTree(); + cgroup.updateProcessTree(); + } + + @Override + public String getProcessTreeDump() { + return null; + } + + @Override + public float getCpuUsagePercent() { + float cgroupUsage = cgroup.getCpuUsagePercent(); + if (LOG.isDebugEnabled()) { + float procfsUsage = procfs.getCpuUsagePercent(); + LOG.debug("CPU Comparison:" + procfsUsage + " " + cgroupUsage); + LOG.debug("Jiffy Comparison:" + + procfs.getCumulativeCpuTime() + " " + + cgroup.getCumulativeCpuTime()); + } + + return cgroupUsage; + } + + @Override + public boolean checkPidPgrpidForMatch() { + return procfs.checkPidPgrpidForMatch(); + } + + @Override + public long getCumulativeCpuTime() { + if (LOG.isDebugEnabled()) { + LOG.debug("CPU Comparison:" + + procfs.getCumulativeCpuTime() + " " + + cgroup.getCumulativeCpuTime()); + } + return cgroup.getCumulativeCpuTime(); + } + + @Override + public long getRssMemorySize(int olderThanAge) { + if (LOG.isDebugEnabled()) { + LOG.debug("MEM Comparison:" + + procfs.getRssMemorySize(olderThanAge) + " " + + cgroup.getRssMemorySize(olderThanAge)); + } + return cgroup.getRssMemorySize(olderThanAge); + } + + @Override + public long getVirtualMemorySize(int olderThanAge) { + if (LOG.isDebugEnabled()) { + LOG.debug("VMEM Comparison:" + + procfs.getVirtualMemorySize(olderThanAge) + " " + + cgroup.getVirtualMemorySize(olderThanAge)); + } + return procfs.getVirtualMemorySize(olderThanAge); + } +} Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (revision e46c20756547dea935c66d537945aac2cfd3ec52) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (revision 56055f8e660763694d9bad5e53e8bf0b4bbb8d4d) @@ -1361,6 +1361,12 @@ @Private public static final boolean DEFAULT_NM_MEMORY_RESOURCE_ENABLED = false; + @Private + public static final String NM_MEMORY_RESOURCE_ENFORCED = + NM_MEMORY_RESOURCE_PREFIX + "enforced"; + @Private + public static final boolean DEFAULT_NM_MEMORY_RESOURCE_ENFORCED = true; + @Private public static final String NM_MEMORY_RESOURCE_CGROUPS_SWAPPINESS = NM_MEMORY_RESOURCE_PREFIX + "cgroups.swappiness"; Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (revision e46c20756547dea935c66d537945aac2cfd3ec52) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (revision ffc9772a18e16f7261e2fe5980d5801856877c35) @@ -1308,6 +1308,37 @@ -1 + + Whether YARN CGroups memory tracking is enabled. + yarn.nodemanager.resource.memory.enabled + false + + + + Whether YARN CGroups strict memory enforcement is enabled. + + yarn.nodemanager.resource.memory.enforced + true + + + + If memory limit is enforced, this the percentage of soft limit + compared to the memory assigned to the container. If there is memory + pressure container memory usage will be pushed back to its soft limit + by swapping out memory. + + yarn.nodemanager.resource.memory.cgroups.soft-limit-percentage + 90.0 + + + + Container swappiness is the likelihood a page will be swapped + out compared to be kept in memory. Value is between 0-100. + + yarn.nodemanager.resource.memory.cgroups.swappiness + 0 + + Whether physical memory limits will be enforced for containers. @@ -1622,7 +1653,8 @@ or be allowed to consume spare resources if they need them. For example, turning the flag on will restrict apps to use only their share of CPU, even if the node has spare CPU cycles. The default value is false i.e. use available resources. Please note that - turning this flag on may reduce job throughput on the cluster. + turning this flag on may reduce job throughput on the cluster. This setting does + not apply to other subsystems like memory. yarn.nodemanager.linux-container-executor.cgroups.strict-resource-usage false Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java (revision e46c20756547dea935c66d537945aac2cfd3ec52) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java (revision 56055f8e660763694d9bad5e53e8bf0b4bbb8d4d) @@ -52,6 +52,7 @@ private static final int OPPORTUNISTIC_SOFT_LIMIT = 0; private CGroupsHandler cGroupsHandler; + private boolean enforce = true; private int swappiness = 0; // multiplier to set the soft limit - value should be between 0 and 1 private float softLimit = 0.0f; @@ -79,6 +80,9 @@ throw new ResourceHandlerException(msg); } this.cGroupsHandler.initializeCGroupController(MEMORY); + enforce = conf.getBoolean( + YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED, + YarnConfiguration.DEFAULT_NM_MEMORY_RESOURCE_ENFORCED); swappiness = conf .getInt(YarnConfiguration.NM_MEMORY_RESOURCE_CGROUPS_SWAPPINESS, YarnConfiguration.DEFAULT_NM_MEMORY_RESOURCE_CGROUPS_SWAPPINESS); @@ -124,31 +128,33 @@ (long) (container.getResource().getMemorySize() * this.softLimit); long containerHardLimit = container.getResource().getMemorySize(); cGroupsHandler.createCGroup(MEMORY, cgroupId); - try { - cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, - CGroupsHandler.CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, - String.valueOf(containerHardLimit) + "M"); - ContainerTokenIdentifier id = container.getContainerTokenIdentifier(); - if (id != null && id.getExecutionType() == - ExecutionType.OPPORTUNISTIC) { - cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, - CGroupsHandler.CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES, - String.valueOf(OPPORTUNISTIC_SOFT_LIMIT) + "M"); - cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, - CGroupsHandler.CGROUP_PARAM_MEMORY_SWAPPINESS, - String.valueOf(OPPORTUNISTIC_SWAPPINESS)); - } else { - cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, - CGroupsHandler.CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES, - String.valueOf(containerSoftLimit) + "M"); - cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, - CGroupsHandler.CGROUP_PARAM_MEMORY_SWAPPINESS, - String.valueOf(swappiness)); - } - } catch (ResourceHandlerException re) { - cGroupsHandler.deleteCGroup(MEMORY, cgroupId); - LOG.warn("Could not update cgroup for container", re); - throw re; + if (enforce) { + try { + cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, + CGroupsHandler.CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, + String.valueOf(containerHardLimit) + "M"); + ContainerTokenIdentifier id = container.getContainerTokenIdentifier(); + if (id != null && id.getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, + CGroupsHandler.CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES, + String.valueOf(OPPORTUNISTIC_SOFT_LIMIT) + "M"); + cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, + CGroupsHandler.CGROUP_PARAM_MEMORY_SWAPPINESS, + String.valueOf(OPPORTUNISTIC_SWAPPINESS)); + } else { + cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, + CGroupsHandler.CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES, + String.valueOf(containerSoftLimit) + "M"); + cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, + CGroupsHandler.CGROUP_PARAM_MEMORY_SWAPPINESS, + String.valueOf(swappiness)); + } + } catch (ResourceHandlerException re) { + cGroupsHandler.deleteCGroup(MEMORY, cgroupId); + LOG.warn("Could not update cgroup for container", re); + throw re; + } } List ret = new ArrayList<>(); ret.add(new PrivilegedOperation( Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsMemoryResourceHandlerImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsMemoryResourceHandlerImpl.java (revision e46c20756547dea935c66d537945aac2cfd3ec52) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsMemoryResourceHandlerImpl.java (revision 56055f8e660763694d9bad5e53e8bf0b4bbb8d4d) @@ -148,6 +148,51 @@ args.get(0)); } + @Test + public void testPreStartNonEnforced() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false); + conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false); + conf.setBoolean(YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED, false); + cGroupsMemoryResourceHandler.bootstrap(conf); + String id = "container_01_01"; + String path = "test-path/" + id; + ContainerId mockContainerId = mock(ContainerId.class); + when(mockContainerId.toString()).thenReturn(id); + Container mockContainer = mock(Container.class); + when(mockContainer.getContainerId()).thenReturn(mockContainerId); + when(mockCGroupsHandler + .getPathForCGroupTasks(CGroupsHandler.CGroupController.MEMORY, id)) + .thenReturn(path); + int memory = 1024; + when(mockContainer.getResource()) + .thenReturn(Resource.newInstance(memory, 1)); + List ret = + cGroupsMemoryResourceHandler.preStart(mockContainer); + verify(mockCGroupsHandler, times(1)) + .createCGroup(CGroupsHandler.CGroupController.MEMORY, id); + verify(mockCGroupsHandler, times(0)) + .updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, id, + CGroupsHandler.CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, + String.valueOf(memory) + "M"); + verify(mockCGroupsHandler, times(0)) + .updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, id, + CGroupsHandler.CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES, + String.valueOf((int) (memory * 0.9)) + "M"); + verify(mockCGroupsHandler, times(0)) + .updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, id, + CGroupsHandler.CGROUP_PARAM_MEMORY_SWAPPINESS, String.valueOf(0)); + Assert.assertNotNull(ret); + Assert.assertEquals(1, ret.size()); + PrivilegedOperation op = ret.get(0); + Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, + op.getOperationType()); + List args = op.getArguments(); + Assert.assertEquals(1, args.size()); + Assert.assertEquals(PrivilegedOperation.CGROUP_ARG_PREFIX + path, + args.get(0)); + } + @Test public void testReacquireContainer() throws Exception { ContainerId containerIdMock = mock(ContainerId.class); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java (revision afb5d9832ba86c5433fde40875914c0f037cbf71) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java (revision ffc9772a18e16f7261e2fe5980d5801856877c35) @@ -159,8 +159,6 @@ configurationPrefixToSkipCompare .add(YarnConfiguration.NM_DISK_RESOURCE_ENABLED); configurationPrefixToSkipCompare - .add(YarnConfiguration.NM_MEMORY_RESOURCE_PREFIX); - configurationPrefixToSkipCompare .add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED); // Ignore all Router Federation variables