Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java (revision 12d0645990a878f78216235c800ae4e157796160) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java (revision a2d0da0678c74b1db92a3c3813dc545464f4ba5f) @@ -54,7 +54,7 @@ this.name = name; } - String getName() { + public String getName() { return name; } @@ -112,6 +112,13 @@ void deleteCGroup(CGroupController controller, String cGroupId) throws ResourceHandlerException; + /** + * Gets the absolute path to the specified cgroup controller. + * @param controller - controller type for the cgroup + * @return the root of the controller. + */ + String getControllerPath(CGroupController controller); + /** * Gets the relative path for the cgroup, independent of a controller, for a * given cgroup id. Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java (revision 12d0645990a878f78216235c800ae4e157796160) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java (revision a2d0da0678c74b1db92a3c3813dc545464f4ba5f) @@ -125,7 +125,8 @@ initializeControllerPaths(); } - private String getControllerPath(CGroupController controller) { + @Override + public String getControllerPath(CGroupController controller) { try { rwLock.readLock().lock(); return controllerPaths.get(controller); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsResourceCalculator.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsResourceCalculator.java (revision 453444393e98581d9741cfba2c303542778fe50a) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsResourceCalculator.java (revision 453444393e98581d9741cfba2c303542778fe50a) @@ -0,0 +1,352 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.CpuTimeTracker; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.SysInfoLinux; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; +import org.apache.hadoop.yarn.util.SystemClock; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.math.BigInteger; +import java.nio.charset.Charset; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A cgroups file-system based Resource calculator without the process tree + * features. + * + * CGroups has its limitations. It can only be enabled, if both CPU and memory + * cgroups are enabled with yarn.nodemanager.resource.cpu.enabled and + * yarn.nodemanager.resource.memory.enabled respectively. This means that + * memory limits are enforced by default. You can turn this off and keep + * memory reporting only with yarn.nodemanager.resource.memory.enforced. + * + * Another limitation is virtual memory measurement. CGroups does not have the + * ability to measure virtual memory usage. This includes memory reserved but + * not used. CGroups measures used memory as sa sum of + * physical memory and swap usage. This will be returned in the virtual + * memory counters. + * If the real virtual memory is required please use the legacy procfs based + * resource calculator or CombinedResourceCalculator. + */ +public class CGroupsResourceCalculator extends ResourceCalculatorProcessTree { + enum Result { + Continue, + Exit + } + protected static final Log LOG = LogFactory + .getLog(CGroupsResourceCalculator.class); + private static final String PROCFS = "/proc"; + static final String CGROUP = "cgroup"; + static final String CPU_STAT = "cpuacct.stat"; + static final String MEM_STAT = "memory.usage_in_bytes"; + static final String MEMSW_STAT = "memory.memsw.usage_in_bytes"; + private static final String USER = "user "; + private static final String SYSTEM = "system "; + + private static final Pattern CGROUP_FILE_FORMAT = Pattern.compile( + "^(\\d+):([^:]+):/(.*)$"); + private final String procfsDir; + private CGroupsHandler cGroupsHandler; + + private String pid; + private File cpuStat; + private File memStat; + private File memswStat; + + private BigInteger processTotalJiffies; + private long processPhysicalMemory; + private long processVirtualMemory; + + private final long jiffyLengthMs; + private final CpuTimeTracker cpuTimeTracker; + private Clock clock; + + /** + * Create resource calculator for all Yarn containers. + */ + public CGroupsResourceCalculator() + throws YarnException { + this(null, PROCFS, ResourceHandlerModule.getCGroupsHandler(), + SystemClock.getInstance(), SysInfoLinux.JIFFY_LENGTH_IN_MILLIS); + } + + /** + * Create resource calculator for the container that has the specified pid. + * @param pid A pid from the cgroup or null for all containers + */ + public CGroupsResourceCalculator(String pid) { + this(pid, PROCFS, ResourceHandlerModule.getCGroupsHandler(), + SystemClock.getInstance(), SysInfoLinux.JIFFY_LENGTH_IN_MILLIS); + } + + /** + * Create resource calculator for testing. + * @param pid A pid from the cgroup or null for all containers + * @param procfsDir Path to /proc or a mock /proc directory + * @param cGroupsHandler Initialized cgroups handler object + * @param clock A clock object + * @param jiffyLengthMs0 Jiffy length in milliseconds + */ + @VisibleForTesting + CGroupsResourceCalculator(String pid, String procfsDir, + CGroupsHandler cGroupsHandler, + Clock clock, + long jiffyLengthMs0) { + super(pid); + this.procfsDir = procfsDir; + this.cGroupsHandler = cGroupsHandler; + this.pid = pid != null && pid.equals("0") ? "1" : pid; + this.jiffyLengthMs = jiffyLengthMs0; + this.cpuTimeTracker = + new CpuTimeTracker(this.jiffyLengthMs); + this.clock = clock; + this.processTotalJiffies = BigInteger.ZERO; + this.processPhysicalMemory = 0L; + this.processVirtualMemory = 0L; + } + + @Override + public void initialize() throws YarnException { + if (!CGroupsResourceCalculator.isAvailable()) { + throw new YarnException("CGroupsResourceCalculator is not available"); + } + setCGroupFilePaths(); + } + + @Override + public float getCpuUsagePercent() { + if (LOG.isDebugEnabled()) { + LOG.debug("Process " + pid + " jiffies:" + processTotalJiffies); + } + return cpuTimeTracker.getCpuTrackerUsagePercent(); + } + + @Override + public long getCumulativeCpuTime() { + if (jiffyLengthMs < 0) { + return UNAVAILABLE; + } + return processTotalJiffies.longValue() * jiffyLengthMs; + } + + @Override + public long getRssMemorySize(int olderThanAge) { + if (olderThanAge > 1) { + return UNAVAILABLE; + } + return processPhysicalMemory; + } + + @Override + public long getVirtualMemorySize(int olderThanAge) { + if (olderThanAge > 1) { + return UNAVAILABLE; + } + return processVirtualMemory; + } + + @Override + public void updateProcessTree() { + try { + this.processTotalJiffies = readTotalProcessJiffies(); + cpuTimeTracker.updateElapsedJiffies(processTotalJiffies, + clock.getTime()); + } catch (YarnException e) { + LOG.warn("Failed to parse " + pid, e); + } + processPhysicalMemory = getMemorySize(memStat); + processVirtualMemory = getMemorySize(memswStat); + } + + @Override + public String getProcessTreeDump() { + // We do not have a process tree in cgroups return just the pid for tracking + return pid; + } + + @Override + public boolean checkPidPgrpidForMatch() { + // We do not have a process tree in cgroups returning default ok + return true; + } + + /** + * Checks if the CGroupsResourceCalculator is available on this system. + * This assumes that Linux container executor is already initialized. + * + * @return true if CGroupsResourceCalculator is available. False otherwise. + */ + public static boolean isAvailable() { + try { + if (!Shell.LINUX) { + LOG.info("CGroupsResourceCalculator currently is supported only on " + + "Linux."); + return false; + } + if (ResourceHandlerModule.getCGroupsHandler() == null || + ResourceHandlerModule.getCpuResourceHandler() == null || + ResourceHandlerModule.getMemoryResourceHandler() == null) { + LOG.info("CGroupsResourceCalculator requires enabling CGroups" + + "cpu and memory"); + return false; + } + } catch (SecurityException se) { + LOG.warn("Failed to get Operating System name. " + se); + return false; + } + return true; + } + + private long getMemorySize(File cgroupUsageFile) { + long[] mem = new long[1]; + try { + processFile(cgroupUsageFile, (String line) -> { + mem[0] = Long.parseLong(line); + return Result.Exit; + }); + return mem[0]; + } catch (YarnException e) { + LOG.warn("Failed to parse cgroups " + memswStat, e); + } + return UNAVAILABLE; + } + + private BigInteger readTotalProcessJiffies() throws YarnException { + final BigInteger[] totalCPUTimeJiffies = new BigInteger[1]; + totalCPUTimeJiffies[0] = BigInteger.ZERO; + processFile(cpuStat, (String line) -> { + if (line.startsWith(USER)) { + totalCPUTimeJiffies[0] = totalCPUTimeJiffies[0].add( + new BigInteger(line.substring(USER.length()))); + } + if (line.startsWith(SYSTEM)) { + totalCPUTimeJiffies[0] = totalCPUTimeJiffies[0].add( + new BigInteger(line.substring(SYSTEM.length()))); + } + return Result.Continue; + }); + return totalCPUTimeJiffies[0]; + } + + private String getCGroupRelativePath( + CGroupsHandler.CGroupController controller) + throws YarnException { + if (pid == null) { + return cGroupsHandler.getRelativePathForCGroup(""); + } else { + return getCGroupRelativePathForPid(controller); + } + } + + private String getCGroupRelativePathForPid( + CGroupsHandler.CGroupController controller) + throws YarnException { + File pidCgroupFile = new File(new File(procfsDir, pid), CGROUP); + String[] result = new String[1]; + processFile(pidCgroupFile, (String line)->{ + Matcher m = CGROUP_FILE_FORMAT.matcher(line); + boolean mat = m.find(); + if (mat) { + if (m.group(2).contains(controller.getName())) { + // Instead of returning the full path we compose it + // based on the last item as the container id + // This helps to avoid confusion within a privileged Docker container + // where the path is referred in /proc//cgroup as + // /docker//hadoop-yarn/ + // but it is /hadoop-yarn/ in the cgroups hierarchy + String cgroupPath = m.group(3); + + if (cgroupPath != null) { + String cgroup = + new File(cgroupPath).toPath().getFileName().toString(); + result[0] = cGroupsHandler.getRelativePathForCGroup(cgroup); + } else { + LOG.warn("Invalid cgroup path for " + pidCgroupFile); + } + return Result.Exit; + } + } else { + LOG.warn( + "Unexpected: cgroup file is not in the expected format" + + " for process with pid " + pid); + } + return Result.Continue; + }); + if (result[0] == null) { + throw new YarnException(controller.getName() + " CGroup for pid " + pid + + " not found " + pidCgroupFile); + } + return result[0]; + } + + private void processFile(File file, Function processLine) + throws YarnException { + // Read "procfsDir//stat" file - typically /proc//stat + try (InputStreamReader fReader = new InputStreamReader( + new FileInputStream(file), Charset.forName("UTF-8"))) { + try (BufferedReader in = new BufferedReader(fReader)) { + try { + String str; + while ((str = in.readLine()) != null) { + Result result = processLine.apply(str); + if (result == Result.Exit) { + return; + } + } + } catch (IOException io) { + throw new YarnException("Error reading the stream " + io, io); + } + } + } catch (IOException f) { + throw new YarnException("The process vanished in the interim " + pid, f); + } + } + + void setCGroupFilePaths() throws YarnException { + if (cGroupsHandler == null) { + throw new YarnException("CGroups handler is not initialized"); + } + File cpuDir = new File( + cGroupsHandler.getControllerPath( + CGroupsHandler.CGroupController.CPUACCT), + getCGroupRelativePath(CGroupsHandler.CGroupController.CPUACCT)); + File memDir = new File( + cGroupsHandler.getControllerPath( + CGroupsHandler.CGroupController.MEMORY), + getCGroupRelativePath(CGroupsHandler.CGroupController.MEMORY)); + cpuStat = new File(cpuDir, CPU_STAT); + memStat = new File(memDir, MEM_STAT); + memswStat = new File(memDir, MEMSW_STAT); + } + +} Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java (revision 12d0645990a878f78216235c800ae4e157796160) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java (revision 2648a3f0deae8dd1404a84ebc26e779832f6c334) @@ -98,7 +98,27 @@ return cGroupsHandler; } - private static CGroupsCpuResourceHandlerImpl getCGroupsCpuResourceHandler( + public static OutboundBandwidthResourceHandler + getNetworkResourceHandler() { + return trafficControlBandwidthHandler; + } + + public static DiskResourceHandler + getDiskResourceHandler() { + return cGroupsBlkioResourceHandler; + } + + public static MemoryResourceHandler + getMemoryResourceHandler() { + return cGroupsMemoryResourceHandler; + } + + public static CpuResourceHandler + getCpuResourceHandler() { + return cGroupsCpuResourceHandler; + } + + private static CGroupsCpuResourceHandlerImpl initCGroupsCpuResourceHandler( Configuration conf) throws ResourceHandlerException { boolean cgroupsCpuEnabled = conf.getBoolean(YarnConfiguration.NM_CPU_RESOURCE_ENABLED, @@ -148,12 +168,12 @@ } public static OutboundBandwidthResourceHandler - getOutboundBandwidthResourceHandler(Configuration conf) - throws ResourceHandlerException { + initOutboundBandwidthResourceHandler(Configuration conf) + throws ResourceHandlerException { return getTrafficControlBandwidthHandler(conf); } - public static DiskResourceHandler getDiskResourceHandler(Configuration conf) + public static DiskResourceHandler initDiskResourceHandler(Configuration conf) throws ResourceHandlerException { if (conf.getBoolean(YarnConfiguration.NM_DISK_RESOURCE_ENABLED, YarnConfiguration.DEFAULT_NM_DISK_RESOURCE_ENABLED)) { @@ -177,7 +197,7 @@ return cGroupsBlkioResourceHandler; } - public static MemoryResourceHandler getMemoryResourceHandler( + public static MemoryResourceHandler initMemoryResourceHandler( Configuration conf) throws ResourceHandlerException { if (conf.getBoolean(YarnConfiguration.NM_MEMORY_RESOURCE_ENABLED, YarnConfiguration.DEFAULT_NM_MEMORY_RESOURCE_ENABLED)) { @@ -213,10 +233,14 @@ throws ResourceHandlerException { ArrayList handlerList = new ArrayList<>(); - addHandlerIfNotNull(handlerList, getOutboundBandwidthResourceHandler(conf)); - addHandlerIfNotNull(handlerList, getDiskResourceHandler(conf)); - addHandlerIfNotNull(handlerList, getMemoryResourceHandler(conf)); - addHandlerIfNotNull(handlerList, getCGroupsCpuResourceHandler(conf)); + addHandlerIfNotNull(handlerList, + initOutboundBandwidthResourceHandler(conf)); + addHandlerIfNotNull(handlerList, + initDiskResourceHandler(conf)); + addHandlerIfNotNull(handlerList, + initMemoryResourceHandler(conf)); + addHandlerIfNotNull(handlerList, + initCGroupsCpuResourceHandler(conf)); addHandlersFromConfiguredResourcePlugins(handlerList, conf, nmContext); resourceHandlerChain = new ResourceHandlerChain(handlerList); } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java (revision 12d0645990a878f78216235c800ae4e157796160) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java (revision 453444393e98581d9741cfba2c303542778fe50a) @@ -215,15 +215,25 @@ YarnConfiguration.DEFAULT_NM_CONTAINER_MONITOR_ENABLED); } + /** + * Get the best process tree calculator. + * @param pId container process id + * @return process tree calculator + */ + private ResourceCalculatorProcessTree + getResourceCalculatorProcessTree(String pId) { + return ResourceCalculatorProcessTree. + getResourceCalculatorProcessTree( + pId, processTreeClass, conf); + } + private boolean isResourceCalculatorAvailable() { if (resourceCalculatorPlugin == null) { LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + this .getClass().getName() + " is disabled."); return false; } - if (ResourceCalculatorProcessTree - .getResourceCalculatorProcessTree("0", processTreeClass, conf) - == null) { + if (getResourceCalculatorProcessTree("0") == null) { LOG.info("ResourceCalculatorProcessTree is unavailable on this system. " + this.getClass().getName() + " is disabled."); return false; @@ -535,9 +545,7 @@ LOG.debug("Tracking ProcessTree " + pId + " for the first time"); } ResourceCalculatorProcessTree pt = - ResourceCalculatorProcessTree. - getResourceCalculatorProcessTree( - pId, processTreeClass, conf); + getResourceCalculatorProcessTree(pId); ptInfo.setPid(pId); ptInfo.setProcessTree(pt); @@ -599,11 +607,14 @@ long pmemLimit = ptInfo.getPmemLimit(); if (AUDITLOG.isDebugEnabled()) { AUDITLOG.debug(String.format( - "Memory usage of ProcessTree %s for container-id %s: ", - pId, containerId.toString()) + - formatUsageString( - currentVmemUsage, vmemLimit, - currentPmemUsage, pmemLimit)); + "Resource usage of ProcessTree %s for container-id %s:" + + " %s CPU:%f CPU/core:%f", + pId, containerId.toString(), + formatUsageString( + currentVmemUsage, vmemLimit, + currentPmemUsage, pmemLimit), + cpuUsagePercentPerCore, + cpuUsageTotalCoresPercentage)); } // Add resource utilization for this container Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsResourceCalculator.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsResourceCalculator.java (revision 423fa4a81a00f21df57120567c644f17d6d78273) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsResourceCalculator.java (revision 423fa4a81a00f21df57120567c644f17d6d78273) @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.ControlledClock; +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; + +import static org.mockito.Mockito.*; + +/** + * Unit test for CGroupsResourceCalculator. + */ +public class TestCGroupsResourceCalculator { + + private ControlledClock clock = new ControlledClock(); + private CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + private String basePath = "/tmp/" + this.getClass().getName(); + + public TestCGroupsResourceCalculator() { + when(cGroupsHandler.getRelativePathForCGroup("container_1")) + .thenReturn("/yarn/container_1"); + when(cGroupsHandler.getRelativePathForCGroup("")).thenReturn("/yarn/"); + } + + @Test(expected = YarnException.class) + public void testPidNotFound() throws Exception { + CGroupsResourceCalculator calculator = + new CGroupsResourceCalculator( + "1234", ".", cGroupsHandler, clock, 10); + calculator.setCGroupFilePaths(); + Assert.assertEquals("Expected exception", null, calculator); + } + + @Test(expected = YarnException.class) + public void testNoMemoryCGgroupMount() throws Exception { + File procfs = new File(basePath + "/1234"); + Assert.assertTrue("Setup error", procfs.mkdirs()); + try { + FileUtils.writeStringToFile( + new File(procfs, CGroupsResourceCalculator.CGROUP), + "7:devices:/yarn/container_1\n" + + "6:cpuacct,cpu:/yarn/container_1\n" + + "5:pids:/yarn/container_1\n"); + CGroupsResourceCalculator calculator = + new CGroupsResourceCalculator( + "1234", basePath, + cGroupsHandler, clock, 10); + calculator.setCGroupFilePaths(); + Assert.assertEquals("Expected exception", null, calculator); + } finally { + FileUtils.deleteDirectory(new File(basePath)); + } + } + + @Test + public void testCGgroupNotFound() throws Exception { + File procfs = new File(basePath + "/1234"); + Assert.assertTrue("Setup error", procfs.mkdirs()); + try { + FileUtils.writeStringToFile( + new File(procfs, CGroupsResourceCalculator.CGROUP), + "7:devices:/yarn/container_1\n" + + "6:cpuacct,cpu:/yarn/container_1\n" + + "5:pids:/yarn/container_1\n" + + "4:memory:/yarn/container_1\n"); + + CGroupsResourceCalculator calculator = + new CGroupsResourceCalculator( + "1234", basePath, + cGroupsHandler, clock, 10); + calculator.setCGroupFilePaths(); + calculator.updateProcessTree(); + Assert.assertEquals("cgroups should be missing", + (long)ResourceCalculatorProcessTree.UNAVAILABLE, + calculator.getRssMemorySize(0)); + } finally { + FileUtils.deleteDirectory(new File(basePath)); + } + } + + @Test + public void testCPUParsing() throws Exception { + File cgcpuacctDir = + new File(basePath + "/cgcpuacct"); + File cgcpuacctContainerDir = + new File(cgcpuacctDir, "/yarn/container_1"); + File procfs = new File(basePath + "/1234"); + when(cGroupsHandler.getControllerPath( + CGroupsHandler.CGroupController.CPUACCT)). + thenReturn(cgcpuacctDir.getAbsolutePath()); + Assert.assertTrue("Setup error", procfs.mkdirs()); + Assert.assertTrue("Setup error", cgcpuacctContainerDir.mkdirs()); + try { + FileUtils.writeStringToFile( + new File(procfs, CGroupsResourceCalculator.CGROUP), + "7:devices:/yarn/container_1\n" + + "6:cpuacct,cpu:/yarn/container_1\n" + + "5:pids:/yarn/container_1\n" + + "4:memory:/yarn/container_1\n"); + FileUtils.writeStringToFile( + new File(cgcpuacctContainerDir, CGroupsResourceCalculator.CPU_STAT), + "Can you handle this?\n" + + "user 5415\n" + + "system 3632"); + CGroupsResourceCalculator calculator = + new CGroupsResourceCalculator( + "1234", basePath, + cGroupsHandler, clock, 10); + calculator.setCGroupFilePaths(); + calculator.updateProcessTree(); + Assert.assertEquals("Incorrect CPU usage", + 90470, + calculator.getCumulativeCpuTime()); + } finally { + FileUtils.deleteDirectory(new File(basePath)); + } + } + + @Test + public void testMemoryParsing() throws Exception { + File cgcpuacctDir = + new File(basePath + "/cgcpuacct"); + File cgcpuacctContainerDir = + new File(cgcpuacctDir, "/yarn/container_1"); + File cgmemoryDir = + new File(basePath + "/memory"); + File cgMemoryContainerDir = + new File(cgmemoryDir, "/yarn/container_1"); + File procfs = new File(basePath + "/1234"); + when(cGroupsHandler.getControllerPath( + CGroupsHandler.CGroupController.MEMORY)). + thenReturn(cgmemoryDir.getAbsolutePath()); + Assert.assertTrue("Setup error", procfs.mkdirs()); + Assert.assertTrue("Setup error", cgcpuacctContainerDir.mkdirs()); + Assert.assertTrue("Setup error", cgMemoryContainerDir.mkdirs()); + try { + FileUtils.writeStringToFile( + new File(procfs, CGroupsResourceCalculator.CGROUP), + "6:cpuacct,cpu:/yarn/container_1\n" + + "4:memory:/yarn/container_1\n"); + FileUtils.writeStringToFile( + new File(cgMemoryContainerDir, CGroupsResourceCalculator.MEM_STAT), + "418496512\n"); + + CGroupsResourceCalculator calculator = + new CGroupsResourceCalculator( + "1234", basePath, + cGroupsHandler, clock, 10); + calculator.setCGroupFilePaths(); + + calculator.updateProcessTree(); + // Test the case where memsw is not available (Ubuntu) + Assert.assertEquals("Incorrect memory usage", + 418496512, + calculator.getRssMemorySize()); + Assert.assertEquals("Incorrect swap usage", + (long)ResourceCalculatorProcessTree.UNAVAILABLE, + calculator.getVirtualMemorySize()); + + // Test the case where memsw is available + FileUtils.writeStringToFile( + new File(cgMemoryContainerDir, CGroupsResourceCalculator.MEMSW_STAT), + "418496513\n"); + calculator.updateProcessTree(); + Assert.assertEquals("Incorrect swap usage", + 418496513, + calculator.getVirtualMemorySize()); + } finally { + FileUtils.deleteDirectory(new File(basePath)); + } + } + + @Test + public void testCPUParsingRoot() throws Exception { + File cgcpuacctDir = + new File(basePath + "/cgcpuacct"); + File cgcpuacctRootDir = + new File(cgcpuacctDir, "/yarn"); + when(cGroupsHandler.getControllerPath( + CGroupsHandler.CGroupController.CPUACCT)). + thenReturn(cgcpuacctDir.getAbsolutePath()); + Assert.assertTrue("Setup error", cgcpuacctRootDir.mkdirs()); + try { + FileUtils.writeStringToFile( + new File(cgcpuacctRootDir, CGroupsResourceCalculator.CPU_STAT), + "user 5415\n" + + "system 3632"); + CGroupsResourceCalculator calculator = + new CGroupsResourceCalculator( + null, basePath, + cGroupsHandler, clock, 10); + calculator.setCGroupFilePaths(); + calculator.updateProcessTree(); + Assert.assertEquals("Incorrect CPU usage", + 90470, + calculator.getCumulativeCpuTime()); + } finally { + FileUtils.deleteDirectory(new File(basePath)); + } + } + + @Test + public void testMemoryParsingRoot() throws Exception { + File cgcpuacctDir = + new File(basePath + "/cgcpuacct"); + File cgcpuacctRootDir = + new File(cgcpuacctDir, "/yarn"); + File cgmemoryDir = + new File(basePath + "/memory"); + File cgMemoryRootDir = + new File(cgmemoryDir, "/yarn"); + File procfs = new File(basePath + "/1234"); + when(cGroupsHandler.getControllerPath( + CGroupsHandler.CGroupController.MEMORY)). + thenReturn(cgmemoryDir.getAbsolutePath()); + Assert.assertTrue("Setup error", procfs.mkdirs()); + Assert.assertTrue("Setup error", cgcpuacctRootDir.mkdirs()); + Assert.assertTrue("Setup error", cgMemoryRootDir.mkdirs()); + try { + FileUtils.writeStringToFile( + new File(cgMemoryRootDir, CGroupsResourceCalculator.MEM_STAT), + "418496512\n"); + + CGroupsResourceCalculator calculator = + new CGroupsResourceCalculator( + null, basePath, + cGroupsHandler, clock, 10); + calculator.setCGroupFilePaths(); + + calculator.updateProcessTree(); + + // Test the case where memsw is not available (Ubuntu) + Assert.assertEquals("Incorrect memory usage", + 418496512, + calculator.getRssMemorySize()); + Assert.assertEquals("Incorrect swap usage", + (long)ResourceCalculatorProcessTree.UNAVAILABLE, + calculator.getVirtualMemorySize()); + + // Test the case where memsw is available + FileUtils.writeStringToFile( + new File(cgMemoryRootDir, CGroupsResourceCalculator.MEMSW_STAT), + "418496513\n"); + calculator.updateProcessTree(); + Assert.assertEquals("Incorrect swap usage", + 418496513, + calculator.getVirtualMemorySize()); + } finally { + FileUtils.deleteDirectory(new File(basePath)); + } + } +} Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java (revision 12d0645990a878f78216235c800ae4e157796160) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java (revision a2d0da0678c74b1db92a3c3813dc545464f4ba5f) @@ -36,8 +36,8 @@ public class TestResourceHandlerModule { private static final Logger LOG = LoggerFactory.getLogger(TestResourceHandlerModule.class); - Configuration emptyConf; - Configuration networkEnabledConf; + private Configuration emptyConf; + private Configuration networkEnabledConf; @Before public void setup() throws Exception { @@ -55,23 +55,28 @@ //This resourceHandler should be non-null only if network as a resource //is explicitly enabled OutboundBandwidthResourceHandler resourceHandler = ResourceHandlerModule - .getOutboundBandwidthResourceHandler(emptyConf); + .initOutboundBandwidthResourceHandler(emptyConf); Assert.assertNull(resourceHandler); //When network as a resource is enabled this should be non-null resourceHandler = ResourceHandlerModule - .getOutboundBandwidthResourceHandler(networkEnabledConf); + .initOutboundBandwidthResourceHandler(networkEnabledConf); Assert.assertNotNull(resourceHandler); //Ensure that outbound bandwidth resource handler is present in the chain ResourceHandlerChain resourceHandlerChain = ResourceHandlerModule - .getConfiguredResourceHandlerChain(networkEnabledConf, mock(Context.class)); - List resourceHandlers = resourceHandlerChain - .getResourceHandlerList(); - //Exactly one resource handler in chain - Assert.assertEquals(resourceHandlers.size(), 1); - //Same instance is expected to be in the chain. - Assert.assertTrue(resourceHandlers.get(0) == resourceHandler); + .getConfiguredResourceHandlerChain(networkEnabledConf, + mock(Context.class)); + if (resourceHandlerChain != null) { + List resourceHandlers = resourceHandlerChain + .getResourceHandlerList(); + //Exactly one resource handler in chain + Assert.assertEquals(resourceHandlers.size(), 1); + //Same instance is expected to be in the chain. + Assert.assertTrue(resourceHandlers.get(0) == resourceHandler); + } else { + Assert.fail("Null returned"); + } } catch (ResourceHandlerException e) { Assert.fail("Unexpected ResourceHandlerException: " + e); } @@ -81,23 +86,27 @@ public void testDiskResourceHandler() throws Exception { DiskResourceHandler handler = - ResourceHandlerModule.getDiskResourceHandler(emptyConf); + ResourceHandlerModule.initDiskResourceHandler(emptyConf); Assert.assertNull(handler); Configuration diskConf = new YarnConfiguration(); diskConf.setBoolean(YarnConfiguration.NM_DISK_RESOURCE_ENABLED, true); - handler = ResourceHandlerModule.getDiskResourceHandler(diskConf); + handler = ResourceHandlerModule.initDiskResourceHandler(diskConf); Assert.assertNotNull(handler); ResourceHandlerChain resourceHandlerChain = ResourceHandlerModule.getConfiguredResourceHandlerChain(diskConf, mock(Context.class)); - List resourceHandlers = - resourceHandlerChain.getResourceHandlerList(); - // Exactly one resource handler in chain - Assert.assertEquals(resourceHandlers.size(), 1); - // Same instance is expected to be in the chain. - Assert.assertTrue(resourceHandlers.get(0) == handler); + if (resourceHandlerChain != null) { + List resourceHandlers = + resourceHandlerChain.getResourceHandlerList(); + // Exactly one resource handler in chain + Assert.assertEquals(resourceHandlers.size(), 1); + // Same instance is expected to be in the chain. + Assert.assertTrue(resourceHandlers.get(0) == handler); + } else { + Assert.fail("Null returned"); + } } } \ No newline at end of file Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCompareResourceCalculators.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCompareResourceCalculators.java (revision 423fa4a81a00f21df57120567c644f17d6d78273) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCompareResourceCalculators.java (revision 423fa4a81a00f21df57120567c644f17d6d78273) @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + + +import org.apache.commons.lang3.SystemUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; +import org.junit.*; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Random; + +import static org.mockito.Mockito.mock; + +/** + * Functional test for CGroupsResourceCalculator to compare two resource + * calculators. It is OS dependent. + * Ignored in automated tests due to flakiness by design. + */ +public class TestCompareResourceCalculators { + private Process target = null; + private String cgroup = null; + private String cgroupCPU = null; + private String cgroupMemory = null; + public static final long SHMEM_KB = 100 * 1024; + + @Before + public void setup() throws IOException, YarnException { + Assume.assumeTrue(SystemUtils.IS_OS_LINUX); + + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_HIERARCHY, + "TestCompareResourceCalculators"); + conf.setBoolean(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_MOUNT, false); + conf.setStrings(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH, + "/sys/fs/cgroup"); + conf.setBoolean(YarnConfiguration.NM_CPU_RESOURCE_ENABLED, true); + ResourceHandlerChain module = null; + try { + module = ResourceHandlerModule.getConfiguredResourceHandlerChain(conf, + mock(Context.class)); + } catch (ResourceHandlerException e) { + throw new YarnException("Cannot access cgroups", e); + } + Assume.assumeNotNull(module); + Assume.assumeNotNull( + ResourceHandlerModule.getCGroupsHandler() + .getControllerPath(CGroupsHandler.CGroupController.CPU)); + Assume.assumeNotNull( + ResourceHandlerModule.getCGroupsHandler() + .getControllerPath(CGroupsHandler.CGroupController.MEMORY)); + + Random random = new Random(System.currentTimeMillis()); + cgroup = Long.toString(random.nextLong()); + cgroupCPU = ResourceHandlerModule.getCGroupsHandler() + .getPathForCGroup(CGroupsHandler.CGroupController.CPU, cgroup); + cgroupMemory = ResourceHandlerModule.getCGroupsHandler() + .getPathForCGroup(CGroupsHandler.CGroupController.MEMORY, cgroup); + } + + @After + public void tearDown() throws YarnException { + stopTestProcess(); + } + + + // Ignored in automated tests due to flakiness by design + @Ignore + @Test + public void testCompareResults() + throws YarnException, InterruptedException, IOException { + + startTestProcess(); + + ProcfsBasedProcessTree legacyCalculator = + new ProcfsBasedProcessTree(Long.toString(getPid())); + CGroupsResourceCalculator cgroupsCalculator = + new CGroupsResourceCalculator(Long.toString(getPid())); + cgroupsCalculator.setCGroupFilePaths(); + + for (int i = 0; i < 5; ++i) { + Thread.sleep(3000); + compareMetrics(legacyCalculator, cgroupsCalculator); + } + + stopTestProcess(); + + ensureCleanedUp(legacyCalculator, cgroupsCalculator); + } + + private void ensureCleanedUp( + ResourceCalculatorProcessTree metric1, + ResourceCalculatorProcessTree metric2) { + metric1.updateProcessTree(); + metric2.updateProcessTree(); + long pmem1 = metric1.getRssMemorySize(0); + long pmem2 = metric2.getRssMemorySize(0); + System.out.println(pmem1 + " " + pmem2); + Assert.assertTrue("pmem should be invalid " + pmem1 + " " + pmem2, + pmem1 == ResourceCalculatorProcessTree.UNAVAILABLE && + pmem2 == ResourceCalculatorProcessTree.UNAVAILABLE); + long vmem1 = metric1.getRssMemorySize(0); + long vmem2 = metric2.getRssMemorySize(0); + System.out.println(vmem1 + " " + vmem2); + Assert.assertTrue("vmem Error outside range " + vmem1 + " " + vmem2, + vmem1 == ResourceCalculatorProcessTree.UNAVAILABLE && + vmem2 == ResourceCalculatorProcessTree.UNAVAILABLE); + float cpu1 = metric1.getCpuUsagePercent(); + float cpu2 = metric2.getCpuUsagePercent(); + // TODO ProcfsBasedProcessTree may report negative on process exit + Assert.assertTrue("CPU% Error outside range " + cpu1 + " " + cpu2, + cpu1 == 0 && cpu2 == 0); + } + + private void compareMetrics( + ResourceCalculatorProcessTree metric1, + ResourceCalculatorProcessTree metric2) { + metric1.updateProcessTree(); + metric2.updateProcessTree(); + long pmem1 = metric1.getRssMemorySize(0); + long pmem2 = metric2.getRssMemorySize(0); + // TODO The calculation is different and cgroup + // can report a small amount after process stop + // This is not an issue since the cgroup is deleted + System.out.println(pmem1 + " " + (pmem2 - SHMEM_KB * 1024)); + Assert.assertTrue("pmem Error outside range " + pmem1 + " " + pmem2, + Math.abs(pmem1 - (pmem2 - SHMEM_KB * 1024)) < 5000000); + long vmem1 = metric1.getRssMemorySize(0); + long vmem2 = metric2.getRssMemorySize(0); + System.out.println(vmem1 + " " + (vmem2 - SHMEM_KB * 1024)); + // TODO The calculation is different and cgroup + // can report a small amount after process stop + // This is not an issue since the cgroup is deleted + Assert.assertTrue("vmem Error outside range " + vmem1 + " " + vmem2, + Math.abs(vmem1 - (vmem2 - SHMEM_KB * 1024)) < 5000000); + float cpu1 = metric1.getCpuUsagePercent(); + float cpu2 = metric2.getCpuUsagePercent(); + if (cpu1 > 0) { + // TODO ProcfsBasedProcessTree may report negative on process exit + Assert.assertTrue("CPU% Error outside range " + cpu1 + " " + cpu2, + Math.abs(cpu2 - cpu1) < 10); + } + } + + private void startTestProcess() throws IOException { + ProcessBuilder builder = new ProcessBuilder(); + String script = + "mkdir -p " + cgroupCPU + ";" + + "echo $$ >" + cgroupCPU + "/tasks;" + + "mkdir -p " + cgroupMemory + ";" + + "echo $$ >" + cgroupMemory + "/tasks;" + + "dd if=/dev/zero of=/dev/shm/" + + cgroup + " bs=1k count=" + SHMEM_KB + ";" + + "dd if=/dev/zero of=/dev/null bs=1k &" + + "echo $! >/tmp/\" + cgroup + \".pid;" + + //"echo while [ -f /tmp/" + cgroup + ".pid ]; do sleep 1; done;" + + "sleep 10000;" + + "echo kill $(jobs -p);"; + builder.command("bash", "-c", script); + builder.redirectError(new File("/tmp/a.txt")); + builder.redirectOutput(new File("/tmp/b.txt")); + target = builder.start(); + } + + private void stopTestProcess() throws YarnException { + if (target != null) { + target.destroyForcibly(); + target = null; + } + try { + ProcessBuilder builder = new ProcessBuilder(); + String script = + "rm -f /dev/shm/" + cgroup + ";" + + "cat " + cgroupCPU + "/tasks | xargs kill;" + + "rm -f /tmp/" + cgroup + ".pid;" + + "sleep 4;" + + "rmdir " + cgroupCPU + ";" + + "rmdir " + cgroupMemory + ";"; + builder.command("bash", "-c", script); + Process cleanup = builder.start(); + cleanup.waitFor(); + } catch (IOException|InterruptedException e) { + throw new YarnException("Could not clean up", e); + } + } + + private long getPid() throws YarnException { + Class processClass = target.getClass(); + if (processClass.getName().equals("java.lang.UNIXProcess")) { + try { + Field pidField = processClass.getDeclaredField("pid"); + pidField.setAccessible(true); + long pid = pidField.getLong(target); + pidField.setAccessible(false); + return pid; + } catch (NoSuchFieldException|IllegalAccessException e) { + throw new YarnException("Reflection error", e); + } + } else { + throw new YarnException("Not Unix " + processClass.getName()); + } + } + + +} Index: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CpuTimeTracker.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CpuTimeTracker.java (revision 3eed196580e14993c9bc1dbd28800259696e5bdc) +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CpuTimeTracker.java (revision 52d68825a6dcf5fe4408092d94b34360aea75114) @@ -99,7 +99,7 @@ public void updateElapsedJiffies(BigInteger elapsedJiffies, long newTime) { BigInteger newValue = elapsedJiffies.multiply(jiffyLengthInMillis); cumulativeCpuTime = newValue.compareTo(cumulativeCpuTime) >= 0 ? - newValue : cumulativeCpuTime; + newValue : cumulativeCpuTime; sampleTime = newTime; } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java (revision f4ec5845e12bcc4e7d1f3cc303ff4fe01f028790) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java (revision d72d5ce0679734c8288c7d79ead879af0ecd2ab8) @@ -468,6 +468,9 @@ @Override public float getCpuUsagePercent() { BigInteger processTotalJiffies = getTotalProcessJiffies(); + if (LOG.isDebugEnabled()) { + LOG.debug("Process " + pid + " jiffies:" + processTotalJiffies); + } cpuTimeTracker.updateElapsedJiffies(processTotalJiffies, clock.getTime()); return cpuTimeTracker.getCpuTrackerUsagePercent(); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CombinedResourceCalculator.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CombinedResourceCalculator.java (revision 453444393e98581d9741cfba2c303542778fe50a) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CombinedResourceCalculator.java (revision 453444393e98581d9741cfba2c303542778fe50a) @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; + +/** + * CombinedResourceCalculator is a resource calculator that uses cgroups but + * it is backward compatible with procfs in terms of virtual memory usage. + */ +public class CombinedResourceCalculator extends ResourceCalculatorProcessTree { + protected static final Log LOG = LogFactory + .getLog(CombinedResourceCalculator.class); + private ProcfsBasedProcessTree procfs; + private CGroupsResourceCalculator cgroup; + + public CombinedResourceCalculator(String pid) { + super(pid); + procfs = new ProcfsBasedProcessTree(pid); + cgroup = new CGroupsResourceCalculator(pid); + } + + @Override + public void initialize() throws YarnException { + procfs.initialize(); + cgroup.initialize(); + } + + @Override + public void updateProcessTree() { + procfs.updateProcessTree(); + cgroup.updateProcessTree(); + } + + @Override + public String getProcessTreeDump() { + return procfs.getProcessTreeDump(); + } + + @Override + public float getCpuUsagePercent() { + float cgroupUsage = cgroup.getCpuUsagePercent(); + if (LOG.isDebugEnabled()) { + float procfsUsage = procfs.getCpuUsagePercent(); + LOG.debug("CPU Comparison:" + procfsUsage + " " + cgroupUsage); + LOG.debug("Jiffy Comparison:" + + procfs.getCumulativeCpuTime() + " " + + cgroup.getCumulativeCpuTime()); + } + + return cgroupUsage; + } + + @Override + public boolean checkPidPgrpidForMatch() { + return procfs.checkPidPgrpidForMatch(); + } + + @Override + public long getCumulativeCpuTime() { + if (LOG.isDebugEnabled()) { + LOG.debug("CPU Comparison:" + + procfs.getCumulativeCpuTime() + " " + + cgroup.getCumulativeCpuTime()); + } + return cgroup.getCumulativeCpuTime(); + } + + @Override + public long getRssMemorySize(int olderThanAge) { + if (LOG.isDebugEnabled()) { + LOG.debug("MEM Comparison:" + + procfs.getRssMemorySize(olderThanAge) + " " + + cgroup.getRssMemorySize(olderThanAge)); + } + return cgroup.getRssMemorySize(olderThanAge); + } + + @Override + public long getVirtualMemorySize(int olderThanAge) { + if (LOG.isDebugEnabled()) { + LOG.debug("VMEM Comparison:" + + procfs.getVirtualMemorySize(olderThanAge) + " " + + cgroup.getVirtualMemorySize(olderThanAge)); + } + return procfs.getVirtualMemorySize(olderThanAge); + } +} Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (revision 0ebd1d63d9261054a5982394b4048f905a87d3a4) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (revision 423fa4a81a00f21df57120567c644f17d6d78273) @@ -1355,22 +1355,20 @@ public static final String NM_MEMORY_RESOURCE_PREFIX = NM_PREFIX + "resource.memory."; - @Private public static final String NM_MEMORY_RESOURCE_ENABLED = NM_MEMORY_RESOURCE_PREFIX + "enabled"; - @Private public static final boolean DEFAULT_NM_MEMORY_RESOURCE_ENABLED = false; - @Private + public static final String NM_MEMORY_RESOURCE_ENFORCED = + NM_MEMORY_RESOURCE_PREFIX + "enforced"; + public static final boolean DEFAULT_NM_MEMORY_RESOURCE_ENFORCED = true; + public static final String NM_MEMORY_RESOURCE_CGROUPS_SWAPPINESS = NM_MEMORY_RESOURCE_PREFIX + "cgroups.swappiness"; - @Private public static final int DEFAULT_NM_MEMORY_RESOURCE_CGROUPS_SWAPPINESS = 0; - @Private public static final String NM_MEMORY_RESOURCE_CGROUPS_SOFT_LIMIT_PERCENTAGE = NM_MEMORY_RESOURCE_PREFIX + "cgroups.soft-limit-percentage"; - @Private public static final float DEFAULT_NM_MEMORY_RESOURCE_CGROUPS_SOFT_LIMIT_PERCENTAGE = 90.0f; Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (revision 0ebd1d63d9261054a5982394b4048f905a87d3a4) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (revision 41321fe89ca8167bdf8f2002ff3750de50a6bc29) @@ -1308,6 +1308,37 @@ -1 + + Whether YARN CGroups memory tracking is enabled. + yarn.nodemanager.resource.memory.enabled + false + + + + Whether YARN CGroups strict memory enforcement is enabled. + + yarn.nodemanager.resource.memory.enforced + true + + + + If memory limit is enforced, this the percentage of soft limit + compared to the memory assigned to the container. If there is memory + pressure container memory usage will be pushed back to its soft limit + by swapping out memory. + + yarn.nodemanager.resource.memory.cgroups.soft-limit-percentage + 90.0 + + + + Container swappiness is the likelihood a page will be swapped + out compared to be kept in memory. Value is between 0-100. + + yarn.nodemanager.resource.memory.cgroups.swappiness + 0 + + Whether physical memory limits will be enforced for containers. @@ -1622,7 +1653,8 @@ or be allowed to consume spare resources if they need them. For example, turning the flag on will restrict apps to use only their share of CPU, even if the node has spare CPU cycles. The default value is false i.e. use available resources. Please note that - turning this flag on may reduce job throughput on the cluster. + turning this flag on may reduce job throughput on the cluster. This setting does + not apply to other subsystems like memory. yarn.nodemanager.linux-container-executor.cgroups.strict-resource-usage false Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java (revision 0ebd1d63d9261054a5982394b4048f905a87d3a4) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java (revision 9dc279de2a46535833ee024a25c26576e5092404) @@ -52,6 +52,7 @@ private static final int OPPORTUNISTIC_SOFT_LIMIT = 0; private CGroupsHandler cGroupsHandler; + private boolean enforce = true; private int swappiness = 0; // multiplier to set the soft limit - value should be between 0 and 1 private float softLimit = 0.0f; @@ -79,6 +80,9 @@ throw new ResourceHandlerException(msg); } this.cGroupsHandler.initializeCGroupController(MEMORY); + enforce = conf.getBoolean( + YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED, + YarnConfiguration.DEFAULT_NM_MEMORY_RESOURCE_ENFORCED); swappiness = conf .getInt(YarnConfiguration.NM_MEMORY_RESOURCE_CGROUPS_SWAPPINESS, YarnConfiguration.DEFAULT_NM_MEMORY_RESOURCE_CGROUPS_SWAPPINESS); @@ -124,31 +128,33 @@ (long) (container.getResource().getMemorySize() * this.softLimit); long containerHardLimit = container.getResource().getMemorySize(); cGroupsHandler.createCGroup(MEMORY, cgroupId); - try { - cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, - CGroupsHandler.CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, - String.valueOf(containerHardLimit) + "M"); - ContainerTokenIdentifier id = container.getContainerTokenIdentifier(); - if (id != null && id.getExecutionType() == - ExecutionType.OPPORTUNISTIC) { - cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, - CGroupsHandler.CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES, - String.valueOf(OPPORTUNISTIC_SOFT_LIMIT) + "M"); - cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, - CGroupsHandler.CGROUP_PARAM_MEMORY_SWAPPINESS, - String.valueOf(OPPORTUNISTIC_SWAPPINESS)); - } else { - cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, - CGroupsHandler.CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES, - String.valueOf(containerSoftLimit) + "M"); - cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, - CGroupsHandler.CGROUP_PARAM_MEMORY_SWAPPINESS, - String.valueOf(swappiness)); - } - } catch (ResourceHandlerException re) { - cGroupsHandler.deleteCGroup(MEMORY, cgroupId); - LOG.warn("Could not update cgroup for container", re); - throw re; + if (enforce) { + try { + cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, + CGroupsHandler.CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, + String.valueOf(containerHardLimit) + "M"); + ContainerTokenIdentifier id = container.getContainerTokenIdentifier(); + if (id != null && id.getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, + CGroupsHandler.CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES, + String.valueOf(OPPORTUNISTIC_SOFT_LIMIT) + "M"); + cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, + CGroupsHandler.CGROUP_PARAM_MEMORY_SWAPPINESS, + String.valueOf(OPPORTUNISTIC_SWAPPINESS)); + } else { + cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, + CGroupsHandler.CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES, + String.valueOf(containerSoftLimit) + "M"); + cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, + CGroupsHandler.CGROUP_PARAM_MEMORY_SWAPPINESS, + String.valueOf(swappiness)); + } + } catch (ResourceHandlerException re) { + cGroupsHandler.deleteCGroup(MEMORY, cgroupId); + LOG.warn("Could not update cgroup for container", re); + throw re; + } } List ret = new ArrayList<>(); ret.add(new PrivilegedOperation( Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsMemoryResourceHandlerImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsMemoryResourceHandlerImpl.java (revision 0ebd1d63d9261054a5982394b4048f905a87d3a4) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsMemoryResourceHandlerImpl.java (revision 9dc279de2a46535833ee024a25c26576e5092404) @@ -148,6 +148,51 @@ args.get(0)); } + @Test + public void testPreStartNonEnforced() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false); + conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false); + conf.setBoolean(YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED, false); + cGroupsMemoryResourceHandler.bootstrap(conf); + String id = "container_01_01"; + String path = "test-path/" + id; + ContainerId mockContainerId = mock(ContainerId.class); + when(mockContainerId.toString()).thenReturn(id); + Container mockContainer = mock(Container.class); + when(mockContainer.getContainerId()).thenReturn(mockContainerId); + when(mockCGroupsHandler + .getPathForCGroupTasks(CGroupsHandler.CGroupController.MEMORY, id)) + .thenReturn(path); + int memory = 1024; + when(mockContainer.getResource()) + .thenReturn(Resource.newInstance(memory, 1)); + List ret = + cGroupsMemoryResourceHandler.preStart(mockContainer); + verify(mockCGroupsHandler, times(1)) + .createCGroup(CGroupsHandler.CGroupController.MEMORY, id); + verify(mockCGroupsHandler, times(0)) + .updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, id, + CGroupsHandler.CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, + String.valueOf(memory) + "M"); + verify(mockCGroupsHandler, times(0)) + .updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, id, + CGroupsHandler.CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES, + String.valueOf((int) (memory * 0.9)) + "M"); + verify(mockCGroupsHandler, times(0)) + .updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, id, + CGroupsHandler.CGROUP_PARAM_MEMORY_SWAPPINESS, String.valueOf(0)); + Assert.assertNotNull(ret); + Assert.assertEquals(1, ret.size()); + PrivilegedOperation op = ret.get(0); + Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, + op.getOperationType()); + List args = op.getArguments(); + Assert.assertEquals(1, args.size()); + Assert.assertEquals(PrivilegedOperation.CGROUP_ARG_PREFIX + path, + args.get(0)); + } + @Test public void testReacquireContainer() throws Exception { ContainerId containerIdMock = mock(ContainerId.class); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java (revision 26116ce7064b0176c1ad2ffd6d591f681ca4b0f7) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java (revision 41321fe89ca8167bdf8f2002ff3750de50a6bc29) @@ -159,8 +159,6 @@ configurationPrefixToSkipCompare .add(YarnConfiguration.NM_DISK_RESOURCE_ENABLED); configurationPrefixToSkipCompare - .add(YarnConfiguration.NM_MEMORY_RESOURCE_PREFIX); - configurationPrefixToSkipCompare .add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED); // Ignore all Router Federation variables Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorProcessTree.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorProcessTree.java (revision 423fa4a81a00f21df57120567c644f17d6d78273) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorProcessTree.java (revision 1f9ccd89a05f3a93c389c3714598e4139fe3aa34) @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.yarn.exceptions.YarnException; /** * Interface class to obtain process resource usage @@ -50,6 +51,13 @@ public ResourceCalculatorProcessTree(String root) { } + /** + * Initialize the object. + * @throws YarnException Throws an exception on error. + */ + public void initialize() throws YarnException { + } + /** * Update the process-tree with latest state. * @@ -168,6 +176,7 @@ Constructor c = clazz.getConstructor(String.class); ResourceCalculatorProcessTree rctree = c.newInstance(pid); rctree.setConf(conf); + rctree.initialize(); return rctree; } catch(Exception e) { throw new RuntimeException(e);