Index: .gitignore IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- .gitignore (revision 6c63cc7d304571578e6551170552182d30b8e8fa) +++ .gitignore (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) @@ -17,6 +17,7 @@ target build dependency-reduced-pom.xml +make-build-debug # Filesystem contract test options and credentials auth-keys.xml Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (revision 6c63cc7d304571578e6551170552182d30b8e8fa) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) @@ -1406,6 +1406,26 @@ NM_PREFIX + "vmem-pmem-ratio"; public static final float DEFAULT_NM_VMEM_PMEM_RATIO = 2.1f; + /** Specifies whether to do memory check on overall usage vs + * individual containers. */ + public static final String NM_ELASTIC_MEMORY_CONTROL_ENABLED = NM_PREFIX + + "elastic-memory-control.enabled"; + public static final boolean DEFAULT_NM_ELASTIC_MEMORY_CONTROL_ENABLED = false; + + /** Specifies the OOM handler code. */ + public static final String NM_ELASTIC_MEMORY_CONTROL_OOM_HANDLER = NM_PREFIX + + "elastic-memory-control.oom-hander"; + + /** The path to the OOM listener.*/ + public static final String NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH = + NM_PREFIX + "elastic-memory-control.oom-listener.path"; + + /** Maximum time to resolve an OOM situation. */ + public static final String NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC = + NM_PREFIX + "elastic-memory-control.timeout"; + public static final Integer + DEFAULT_NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC = 5; + /** Number of Virtual CPU Cores which can be allocated for containers.*/ public static final String NM_VCORES = NM_PREFIX + "resource.cpu-vcores"; public static final int DEFAULT_NM_VCORES = 8; @@ -1958,13 +1978,6 @@ /** The path to the Linux container executor.*/ public static final String NM_LINUX_CONTAINER_EXECUTOR_PATH = NM_PREFIX + "linux-container-executor.path"; - - /** - * The UNIX group that the linux-container-executor should run as. - * This is intended to be set as part of container-executor.cfg. - */ - public static final String NM_LINUX_CONTAINER_GROUP = - NM_PREFIX + "linux-container-executor.group"; /** * True if linux-container-executor should limit itself to one user Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (revision 6c63cc7d304571578e6551170552182d30b8e8fa) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) @@ -1835,14 +1835,6 @@ 1000 - - - The UNIX group that the linux-container-executor should run as. - - yarn.nodemanager.linux-container-executor.group - - - T-file compression types used to compress aggregated logs. yarn.nodemanager.log-aggregation.compression-type @@ -3764,4 +3756,57 @@ /usr/bin/numactl + + + Enable elastic memory control. This is a Linux only feature. + When enabled, the node manager adds a listener to receive an + event, if all the containers exceeded the a limit. + The limit is specified by yarn.nodemanager.resource.memory-mb. + If this is not set, the limit is set based on the capabilities. + See yarn.nodemanager.resource.detect-hardware-capabilities + for details. + The limit applies to the physical or virtual (rss+swap) memory + depending on whether yarn.nodemanager.pmem-check-enabled or + yarn.nodemanager.vmem-check-enabled is set. + + yarn.nodemanager.elastic-memory-control.enabled + false + + + + + The name of a JVM class. The class must implement the Runnable + interface. It is called, + if yarn.nodemanager.elastic-memory-control.enabled + is set and the system reaches it's memory limit. + When called the handler must preempt a container, + since all containers are frozen by cgroups. + Once preempted some memory is released, so that the + kernel can resume all containers. Because of this the + handler has to act quickly. + + yarn.nodemanager.elastic-memory-control.oom-handler + org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.DefaultOOMHandler + + + + + The path to the oom-listener tool. Elastic memory control is only + supported on Linux. It relies on kernel events. The tool forwards + these kernel events to the standard input, so that the node manager + can preempt containers, in and out-of-memory scenario. + You rarely need to update this setting. + + yarn.nodemanager.elastic-memory-control.oom-listener.path + + + + + + Maximum time to wait for an OOM situation to get resolved before + bringing down the node. + + yarn.nodemanager.elastic-memory-control.timeout + 5 + Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt (revision 6c63cc7d304571578e6551170552182d30b8e8fa) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) @@ -30,6 +30,7 @@ string(COMPARE EQUAL ${HCD_ONE} / HADOOP_CONF_DIR_IS_ABS) set (CMAKE_C_STANDARD 99) +set (CMAKE_CXX_STANDARD 11) include(CheckIncludeFiles) check_include_files("sys/types.h;sys/sysctl.h" HAVE_SYS_SYSCTL_H) @@ -113,6 +114,7 @@ ${GTEST_SRC_DIR}/include main/native/container-executor main/native/container-executor/impl + main/native/oom-listener/impl ) # add gtest as system library to suppress gcc warnings include_directories(SYSTEM ${GTEST_SRC_DIR}/include) @@ -171,3 +173,20 @@ main/native/container-executor/test/utils/test_docker_util.cc) target_link_libraries(cetest gtest container) output_directory(cetest test) + +# CGroup OOM listener +add_executable(oom-listener + main/native/oom-listener/impl/oom_listener.c + main/native/oom-listener/impl/oom_listener.h + main/native/oom-listener/impl/oom_listener_main.c +) +output_directory(oom-listener target/usr/local/bin) + +# CGroup OOM listener test with GTest +add_executable(test-oom-listener + main/native/oom-listener/impl/oom_listener.c + main/native/oom-listener/impl/oom_listener.h + main/native/oom-listener/test/oom_listener_test_main.cc +) +target_link_libraries(test-oom-listener gtest) +output_directory(test-oom-listener test) \ No newline at end of file Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupElasticMemoryController.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupElasticMemoryController.java (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupElasticMemoryController.java (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) @@ -0,0 +1,381 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.MonotonicClock; + +import java.io.File; +import java.io.InputStream; +import java.lang.reflect.Constructor; +import java.nio.charset.Charset; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC; +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES; +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL; +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES; +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_NO_LIMIT; + +/** + * This thread controls memory usage using cgroups. It listens to out of memory + * events of all the containers together, and if we go over the limit picks + * a container to kill. The algorithm that picks the container is a plugin. + */ +public class CGroupElasticMemoryController extends Thread { + protected static final Log LOG = LogFactory + .getLog(CGroupElasticMemoryController.class); + private final Clock clock = new MonotonicClock(); + private String yarnCGroupPath; + private String oomListenerPath; + private Runnable oomHandler; + private CGroupsHandler cgroups; + private boolean controlPhysicalMemory; + private boolean controlVirtualMemory; + private long limit; + private Process process = null; + private boolean stopped = false; + private int timeoutMS; + + /** + * Default constructor. + * @param conf Yarn configuration to use + * @param context Node manager context to out of memory handler + * @param cgroups Cgroups handler configured + * @param controlPhysicalMemory Whether to listen to physical memory OOM + * @param controlVirtualMemory Whether to listen to virtual memory OOM + * @param limit memory limit in bytes + * @exception YarnException Could not instantiate class + */ + public CGroupElasticMemoryController(Configuration conf, + Context context, + CGroupsHandler cgroups, + boolean controlPhysicalMemory, + boolean controlVirtualMemory, + long limit) + throws YarnException { + super("CGroupElasticMemoryController"); + Class oomHandlerClass = + conf.getClass( + YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_HANDLER, + DefaultOOMHandler.class); + final Runnable oomHandlerTemp; + try { + Constructor constr = oomHandlerClass.getConstructor(Context.class); + oomHandlerTemp = (Runnable)constr.newInstance(context); + } catch (Exception ex) { + throw new YarnException(ex); + } + if (controlPhysicalMemory && controlVirtualMemory) { + throw new YarnException( + "yarn.nodemanager.elastic-memory-control.enabled is on. " + + "We cannot control both virtual and physical " + + "memory at the same time. If swapping is enabled set " + + "yarn.nodemanager.vmem-check-enabled to true otherwise set " + + "yarn.nodemanager.pmem-check-enabled to true."); + } + if (!controlPhysicalMemory && !controlVirtualMemory) { + throw new YarnException( + "yarn.nodemanager.elastic-memory-control.enabled is on. " + + "We need either virtual or physical memory check requestd." + + "If swapping is enabled set " + + "yarn.nodemanager.vmem-check-enabled to true otherwise set " + + "yarn.nodemanager.pmem-check-enabled to true."); + } + // We are safe at this point that no more exceptions can be thrown + this.timeoutMS = + 1000 * conf.getInt(NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC, + DEFAULT_NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC); + this.oomListenerPath = getOOMListenerExecutablePath(conf); + this.oomHandler = oomHandlerTemp; + this.cgroups = cgroups; + this.controlPhysicalMemory = controlPhysicalMemory; + this.controlVirtualMemory = controlVirtualMemory; + this.yarnCGroupPath = this.cgroups + .getPathForCGroup(CGroupsHandler.CGroupController.MEMORY, ""); + this.limit = limit; + } + + /** + * Override OOM handler for unit tests. + */ + @VisibleForTesting + void setOomHandler(Runnable runnable) { + this.oomHandler = runnable; + } + + /** + * Stop listening to the cgroup. + */ + public synchronized void stopListening() { + stopped = true; + if (process != null) { + process.destroyForcibly(); + } else { + LOG.warn("Trying to stop listening, when listening is not running"); + } + } + + /** + * Checks if the CGroupElasticMemoryController is available on this system. + * This assumes that Linux container executor is already initialized. + * We need to have CGroups enabled. + * + * @return True if CGroupElasticMemoryController is available. + * False otherwise. + */ + public static boolean isAvailable() { + try { + if (!Shell.LINUX) { + LOG.info("CGroupElasticMemoryController currently is supported only " + + "on Linux."); + return false; + } + if (ResourceHandlerModule.getCGroupsHandler() == null || + ResourceHandlerModule.getMemoryResourceHandler() == null) { + LOG.info("CGroupElasticMemoryController requires enabling " + + "memory CGroups with" + + YarnConfiguration.NM_MEMORY_RESOURCE_ENABLED); + return false; + } + } catch (SecurityException se) { + LOG.info("Failed to get Operating System name. " + se); + return false; + } + return true; + } + + /** + * Main OOM listening thread. It uses an external process to listen to + * Linux events. The external process does not need to run as root, so + * it is not related to container-executor. We do not use JNI for security + * reasons. + */ + @Override + public void run() { + boolean oomNotResoved = false; + ExecutorService executor = null; + try { + // Disable OOM killer and set a limit. + // This has to be set first, so that we get notified about valid events. + // TODO could we miss an event before the process starts? + setCGroupParameters(); + + // Start a listener process + ProcessBuilder oomListener = new ProcessBuilder(); + oomListener.command(oomListenerPath, yarnCGroupPath); + synchronized (this) { + if (!stopped) { + process = oomListener.start(); + } else { + resetCGroupParameters(); + LOG.info("Listener stopped before starting"); + return; + } + } + LOG.info(String.format("Listening on %s with %s", + yarnCGroupPath, + oomListenerPath)); + + // We need 1 thread for the error stream and a few others + // as a watchdog for the OOM killer + executor = Executors.newFixedThreadPool(5); + + // Listen to any errors in the background. We do not expect this to + // be large in size, so it will fit into a string. + Future errorListener = executor.submit( + () -> IOUtils.toString(process.getErrorStream(), + Charset.defaultCharset())); + + // We get Linux event increments (8 bytes) forwarded from the event stream + // The events cannot be split, so it is safe to read them as a whole + InputStream events = process.getInputStream(); + byte[] event = new byte[8]; + int read; + while ((read = events.read(event)) == event.length) { + // An OOM event has occurred + // Just log, when we are still in OOM after a couple of seconds + final long start = clock.getTime(); + Future watchdog = + executor.submit(() -> watchAndLogOOMState(start)); + // Kill something to resolve the issue + oomHandler.run(); + if (!watchdog.get()) { + oomNotResoved = true; + } + } + + if (read != -1) { + LOG.warn(String.format("Characters returned from event hander: %d", + read)); + } + + // If the input stream is closed, we wait for exit or process terminated. + int exitCode = process.waitFor(); + String error = errorListener.get(); + LOG.info(String.format("OOM listener exited %d %s", exitCode, error)); + } catch (Exception ex) { + // Make sure we do not leak the child process, + // especially if process.waitFor() did not finish. + if (process != null && process.isAlive()) { + process.destroyForcibly(); + } + synchronized (this) { + if (!stopped) { + LOG.warn("OOM Listener exiting.", ex); + } + } + } finally { + if (executor != null) { + try { + executor.awaitTermination(6, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("Exiting without processing all OOM events."); + } + executor.shutdown(); + } + resetCGroupParameters(); + } + if (oomNotResoved) { + throw new YarnRuntimeException("OOM was not resolved in time."); + } + } + + /** + * Just watch until we are in OOM and log. Send an update log every second. + * @return empty string + */ + private boolean watchAndLogOOMState(long start) { + long lastLog = start; + try { + long end = start; + // Throw an error, if we are still in OOM after 5 seconds + while(end - start < timeoutMS) { + end = clock.getTime(); + String underOOM = cgroups.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL); + if (underOOM.contains("under_oom 1")) { + if (end - lastLog > 1000) { + LOG.warn(String.format( + "OOM not resolved in %d ms", end - start)); + lastLog = end; + } + } else { + LOG.info(String.format( + "Resolved OOM in %d ms", end - start)); + return true; + } + Thread.sleep(10); + } + } catch (Exception e) { + LOG.warn("Exception running logging thread", e); + } + LOG.warn(String.format("OOM was not resolved in %d ms", + clock.getTime() - start)); + stopListening(); + return false; + } + + /** + * Update root memory cgroup. This contains all containers. + * The physical limit has to be set first then the virtual limit. + */ + private void setCGroupParameters() throws ResourceHandlerException { + // Disable the OOM killer + cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "", + CGROUP_PARAM_MEMORY_OOM_CONTROL, "1"); + if (controlPhysicalMemory && !controlVirtualMemory) { + // Ignore virtual memory limits, since we do not know what it is set to + cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "", + CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT); + // Set physical memory limits + cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "", + CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, Long.toString(limit)); + } else if (controlVirtualMemory && !controlPhysicalMemory) { + // Ignore virtual memory limits, since we do not know what it is set to + cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "", + CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT); + // Set physical limits to no more than virtual limits + cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "", + CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, Long.toString(limit)); + // Set virtual memory limits + // Important: it has to be set after physical limit is set + cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "", + CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, Long.toString(limit)); + } else { + throw new ResourceHandlerException( + String.format("Unsupported scenario p:%b v:%b", + controlPhysicalMemory, controlVirtualMemory)); + } + } + + /** + * Reset root memory cgroup to OS defaults. This controls all containers. + */ + private void resetCGroupParameters() { + try { + // Disable memory limits + cgroups.updateCGroupParam( + CGroupsHandler.CGroupController.MEMORY, "", + CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT); + cgroups.updateCGroupParam( + CGroupsHandler.CGroupController.MEMORY, "", + CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT); + // Enable the OOM killer + cgroups.updateCGroupParam( + CGroupsHandler.CGroupController.MEMORY, "", + CGROUP_PARAM_MEMORY_OOM_CONTROL, "0"); + } catch (ResourceHandlerException ex) { + LOG.warn("Error in cleanup", ex); + } + } + + private static String getOOMListenerExecutablePath(Configuration conf) { + String yarnHomeEnvVar = + System.getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key()); + if (yarnHomeEnvVar == null) { + yarnHomeEnvVar = "."; + } + File hadoopBin = new File(yarnHomeEnvVar, "bin"); + String defaultPath = + new File(hadoopBin, "oom-listener").getAbsolutePath(); + LOG.debug(String.format("oom-listener path: %s %s", conf.get( + YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH, + defaultPath), defaultPath)); + return conf.get( + YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH, + defaultPath); + } +} Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java (revision 6c63cc7d304571578e6551170552182d30b8e8fa) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) @@ -76,8 +76,13 @@ String CGROUP_PARAM_BLKIO_WEIGHT = "weight"; String CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES = "limit_in_bytes"; + String CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES = "memsw.limit_in_bytes"; String CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES = "soft_limit_in_bytes"; + String CGROUP_PARAM_MEMORY_OOM_CONTROL = "oom_control"; String CGROUP_PARAM_MEMORY_SWAPPINESS = "swappiness"; + String CGROUP_PARAM_MEMORY_USAGE_BYTES = "usage_in_bytes"; + String CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES = "memsw.usage_in_bytes"; + String CGROUP_NO_LIMIT = "-1"; String CGROUP_CPU_PERIOD_US = "cfs_period_us"; Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java (revision 6c63cc7d304571578e6551170552182d30b8e8fa) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) @@ -594,7 +594,11 @@ @Override public String getCGroupParam(CGroupController controller, String cGroupId, String param) throws ResourceHandlerException { - String cGroupParamPath = getPathForCGroupParam(controller, cGroupId, param); + String cGroupParamPath = + param.equals(CGROUP_FILE_TASKS) ? + getPathForCGroup(controller, cGroupId) + + Path.SEPARATOR + param : + getPathForCGroupParam(controller, cGroupId, param); try { byte[] contents = Files.readAllBytes(Paths.get(cGroupParamPath)); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java (revision 6c63cc7d304571578e6551170552182d30b8e8fa) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) @@ -70,13 +70,20 @@ boolean vmemEnabled = conf.getBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED); - if (pmemEnabled || vmemEnabled) { + boolean emcEnabled = conf.getBoolean( + YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_ENABLED, + YarnConfiguration.DEFAULT_NM_ELASTIC_MEMORY_CONTROL_ENABLED); + if (!emcEnabled && (pmemEnabled || vmemEnabled)) { String msg = "The default YARN physical and/or virtual memory health" + " checkers as well as the CGroups memory controller are enabled. " - + "If you wish to use the Cgroups memory controller, please turn off" - + " the default physical/virtual memory checkers by setting " + + "If you wish to use the strict Cgroups memory controller, " + + "please turn off the default physical/virtual memory checkers by " + + "setting " + YarnConfiguration.NM_PMEM_CHECK_ENABLED + " and " - + YarnConfiguration.NM_VMEM_CHECK_ENABLED + " to false."; + + YarnConfiguration.NM_VMEM_CHECK_ENABLED + " to false." + + " Another option is to turn on " + + YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_ENABLED + + " and either the physical or virtual checkers above."; throw new ResourceHandlerException(msg); } this.cGroupsHandler.initializeCGroupController(MEMORY); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; + +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_FILE_TASKS; +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES; +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL; +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_USAGE_BYTES; + +/** + * A very basic OOM handler implementation. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class DefaultOOMHandler implements Runnable { + protected static final Log LOG = LogFactory + .getLog(DefaultOOMHandler.class); + private Context context; + private CGroupsHandler cgroups; + + public DefaultOOMHandler(Context context) { + this.context = context; + this.cgroups = ResourceHandlerModule.getCGroupsHandler(); + } + + @VisibleForTesting + void setCGroupsHandler(CGroupsHandler handler) { + cgroups = handler; + } + + /** + * Kill the container, if it has exceeded it's request. + * + * @param container Container to check + * @param fileName CGroup filename (physical or swap/virtual) + * @return true, if the container was preempted + */ + private boolean killContainerIfOOM(Container container, String fileName) { + String value = null; + try { + value = cgroups.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + container.getContainerId().toString(), + fileName); + long usage = Long.parseLong(value); + long request = container.getResource().getMemorySize() * 1024 * 1024; + + // Check if the container has exceeded its limits. + if (usage > request) { + // Kill the container + // We could call the regular cleanup but that sends a + // SIGTERM first that cannot be handled by frozen processes. + // Walk through the cgroup + // tasks file and kill all processes in it + sigKill(container); + String message = String.format( + "Container %s was killed by elastic cgroups OOM handler using %d " + + "when requested only %d", + container.getContainerId(), usage, request); + LOG.warn(message); + return true; + } + } catch (ResourceHandlerException ex) { + LOG.warn(String.format("Could not access memory resource for %s", + container.getContainerId()), ex); + } catch (NumberFormatException ex) { + LOG.warn(String.format("Could not parse %s in %s", + value, container.getContainerId())); + } + return false; + } + + /** + * SIGKILL the specified container. We do this not using the standard + * container logic. The reason is that the processes are frozen by + * the cgroups OOM handler, so they cannot respond to SIGTERM. + * On the other hand we have to be as fast as possible. + * We walk through the list of active processes in the container. + * This is needed because frozen parents cannot signal their children. + * We kill each process and then try again until the whole cgroup + * is cleaned up. This logic avoids leaking processes in a cgroup. + * Currently the killing only succeeds for PGIDS. + * + * @param container Container to clean up + */ + private void sigKill(Container container) { + boolean finished = false; + try { + while (!finished) { + String[] pids = + cgroups.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + container.getContainerId().toString(), + CGROUP_FILE_TASKS) + .split("\n"); + finished = true; + for (String pid : pids) { + // TODO kill standalone PIDs not just PGIDs + LOG.info(String.format( + "Terminating container %s Sending SIGKILL to -%s", + container.getContainerId().toString(), + pid)); + if (pid != null && !pid.isEmpty()) { + finished = false; + try { + context.getContainerExecutor().signalContainer( + new ContainerSignalContext.Builder().setContainer(container) + .setUser(container.getUser()) + .setPid(pid).setSignal(ContainerExecutor.Signal.KILL) + .build()); + } catch (IOException ex) { + LOG.warn(String.format("Cannot kill container %s pid -%s.", + container.getContainerId(), pid), ex); + } + } + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + LOG.debug("Waiting for processes to disappear"); + } + } + } catch (ResourceHandlerException ex) { + LOG.warn(String.format( + "Cannot list more tasks in container %s to kill.", + container.getContainerId())); + } + } + + /** + * It is called when the node is under an OOM condition. All processes in + * all sub-cgroups are suspended. We need to act fast, so that we do not + * affect the overall system utilization. + * In general we try to find a fresh container that exceeded it's limits. + * The justification is cost, since probably this is the one that has + * accumulated the least amount of uncommitted data so far. + */ + @Override + public void run() { + try { + // Reverse order by start time + Comparator comparator = (Container o1, Container o2) -> { + long order = o1.getContainerStartTime() - o2.getContainerStartTime(); + return order > 0 ? -1 : order < 0 ? 1 : 0; + }; + + // We kill containers until the kernel reports the OOM situation resolved + // TODO: If the kernel has a delay this may kill more than necessary + while (true) { + String status = cgroups.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL); + if (!status.contains("under_oom 1")) { + break; + } + + // The first pass kills a recent container + // that uses more than its request + ArrayList containers = new ArrayList<>(); + containers.addAll(context.getContainers().values()); + // TODO Sorting may take a long time with 10K+ containers + // but it is okay now + containers.sort(comparator); + + // Kill the latest container that exceeded it's request + boolean found = false; + for (Container container : containers) { + if (killContainerIfOOM(container, + CGROUP_PARAM_MEMORY_USAGE_BYTES)) { + found = true; + break; + } + if (killContainerIfOOM(container, + CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) { + found = true; + break; + } + } + if (found) { + continue; + } + + // We have not found any containers that ran out of their limit, + // so we will kill the latest one. This can happen, if all use + // close to their request and one of them requests a big block + // triggering the OOM freeze. + // Currently there is no other way to identify the outstanding one. + if (containers.size() > 0) { + Container container = containers.get(0); + sigKill(container); + String message = String.format( + "Newest container %s killed by elastic cgroups OOM handler using", + container.getContainerId()); + LOG.warn(message); + continue; + } + + // This can happen, if SIGKILL did not clean up + // non-PGID or containers or containers launched by other users + // or if a process was put to the root YARN cgroup. + throw new YarnRuntimeException( + "Could not find any containers but CGroups " + + "reserved for containers ran out of memory. " + + "I am giving up"); + } + } catch (ResourceHandlerException ex) { + LOG.warn("Could not fecth OOM status. " + + "This is expected at shutdown. Exiting.", ex); + } + } +} Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java (revision 6c63cc7d304571578e6551170552182d30b8e8fa) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) @@ -20,6 +20,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupElasticMemoryController; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +67,7 @@ private long monitoringInterval; private MonitoringThread monitoringThread; + private CGroupElasticMemoryController oomListenerThread; private boolean containerMetricsEnabled; private long containerMetricsPeriodMs; private long containerMetricsUnregisterDelayMs; @@ -85,6 +89,7 @@ private boolean pmemCheckEnabled; private boolean vmemCheckEnabled; + private boolean emcEnabled; private boolean containersMonitorEnabled; private long maxVCoresAllottedForContainers; @@ -173,8 +178,33 @@ vmemCheckEnabled = this.conf.getBoolean( YarnConfiguration.NM_VMEM_CHECK_ENABLED, YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED); + emcEnabled = this.conf.getBoolean( + YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_ENABLED, + YarnConfiguration.DEFAULT_NM_ELASTIC_MEMORY_CONTROL_ENABLED); LOG.info("Physical memory check enabled: " + pmemCheckEnabled); LOG.info("Virtual memory check enabled: " + vmemCheckEnabled); + LOG.info("Elastic memory control enabled: " + emcEnabled); + + if (emcEnabled) { + if (!CGroupElasticMemoryController.isAvailable()) { + // Test for availability outside the constructor + // to be able to write non-Linux unit tests for + // CGroupElasticMemoryController + throw new YarnException( + "CGroup Elastic Memory controller enabled but " + + "it is not available. Exiting."); + } else { + this.oomListenerThread = new CGroupElasticMemoryController( + conf, + context, + ResourceHandlerModule.getCGroupsHandler(), + pmemCheckEnabled, + vmemCheckEnabled, + pmemCheckEnabled ? + maxPmemAllottedForContainers : maxVmemAllottedForContainers + ); + } + } containersMonitorEnabled = isContainerMonitorEnabled() && monitoringInterval > 0; @@ -246,6 +276,9 @@ if (containersMonitorEnabled) { this.monitoringThread.start(); } + if (oomListenerThread != null) { + oomListenerThread.start(); + } super.serviceStart(); } @@ -259,6 +292,14 @@ } catch (InterruptedException e) { LOG.info("ContainersMonitorImpl monitoring thread interrupted"); } + if (this.oomListenerThread != null) { + this.oomListenerThread.stopListening(); + try { + this.oomListenerThread.join(); + } finally { + this.oomListenerThread = null; + } + } } super.serviceStop(); } @@ -651,6 +692,10 @@ ProcessTreeInfo ptInfo, long currentVmemUsage, long currentPmemUsage) { + if (emcEnabled) { + // We enforce the overall memory usage instead of individual containers + return; + } boolean isMemoryOverLimit = false; long vmemLimit = ptInfo.getVmemLimit(); long pmemLimit = ptInfo.getPmemLimit(); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.c IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.c (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.c (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if __linux + +#include +#include +#include "oom_listener.h" + +/* + * Print an error. +*/ +static inline void print_error(const char *file, const char *message, + ...) { + fprintf(stderr, "%s ", file); + va_list arguments; + va_start(arguments, message); + vfprintf(stderr, message, arguments); + va_end(arguments); +} + +/* + * Listen to OOM events in a memory cgroup. See declaration for details. + */ +int oom_listener(_oom_listener_descriptors *descriptors, const char *cgroup, int fd) { + const char *pattern = + cgroup[MAX(strlen(cgroup), 1) - 1] == '/' + ? "%s%s" :"%s/%s"; + + /* Create an event handle, if we do not have one already*/ + if (descriptors->event_fd == -1 && + (descriptors->event_fd = eventfd(0, 0)) == -1) { + print_error(descriptors->command, "eventfd() failed. errno:%d %s\n", + errno, strerror(errno)); + return EXIT_FAILURE; + } + + /* + * open the file to listen to (memory.oom_control) + * and write the event handle and the file handle + * to cgroup.event_control + */ + if (snprintf(descriptors->event_control_path, + sizeof(descriptors->event_control_path), + pattern, + cgroup, + "cgroup.event_control") < 0) { + print_error(descriptors->command, "path too long %s\n", cgroup); + return EXIT_FAILURE; + } + + if ((descriptors->event_control_fd = open( + descriptors->event_control_path, + O_WRONLY|O_CREAT, 0600)) == -1) { + print_error(descriptors->command, "Could not open %s. errno:%d %s\n", + descriptors->event_control_path, + errno, strerror(errno)); + return EXIT_FAILURE; + } + + if (snprintf(descriptors->oom_control_path, + sizeof(descriptors->oom_control_path), + pattern, + cgroup, + "memory.oom_control") < 0) { + print_error(descriptors->command, "path too long %s\n", cgroup); + return EXIT_FAILURE; + } + + if ((descriptors->oom_control_fd = open( + descriptors->oom_control_path, + O_RDONLY)) == -1) { + print_error(descriptors->command, "Could not open %s. errno:%d %s\n", + descriptors->oom_control_path, + errno, strerror(errno)); + return EXIT_FAILURE; + } + + if ((descriptors->oom_command_len = (size_t) snprintf( + descriptors->oom_command, + sizeof(descriptors->oom_command), + "%d %d", + descriptors->event_fd, + descriptors->oom_control_fd)) < 0) { + print_error(descriptors->command, "Could print %d %d\n", + descriptors->event_control_fd, + descriptors->oom_control_fd); + return EXIT_FAILURE; + } + + if (write(descriptors->event_control_fd, + descriptors->oom_command, + descriptors->oom_command_len) == -1) { + } + + if (close(descriptors->event_control_fd) == -1) { + print_error(descriptors->command, "Could not close %s errno:%d\n", + descriptors->event_control_path, errno); + return EXIT_FAILURE; + } + descriptors->event_control_fd = -1; + + /* + * Listen to events as long as the cgroup exists + * and forward them to the fd in the argument. + */ + for (;;) { + uint64_t u; + ssize_t ret = 0; + struct stat stat_buffer = {0}; + struct pollfd poll_fd = { + .fd = descriptors->event_fd, + .events = POLLIN + }; + + ret = poll(&poll_fd, 1, descriptors->watch_timeout); + if (ret < 0) { + /* Error calling poll */ + print_error(descriptors->command, + "Could not poll eventfd %d errno:%d %s\n", ret, + errno, strerror(errno)); + return EXIT_FAILURE; + } + + if (ret > 0) { + /* Event counter values are always 8 bytes */ + if ((ret = read(descriptors->event_fd, &u, sizeof(u))) != sizeof(u)) { + print_error(descriptors->command, + "Could not read from eventfd %d errno:%d %s\n", ret, + errno, strerror(errno)); + return EXIT_FAILURE; + } + + /* Forward the value to the caller, typically stdout */ + if ((ret = write(fd, &u, sizeof(u))) != sizeof(u)) { + print_error(descriptors->command, + "Could not write to pipe %d errno:%d %s\n", ret, + errno, strerror(errno)); + return EXIT_FAILURE; + } + } else if (ret == 0) { + /* Timeout has elapsed*/ + + /* Quit, if the cgroup is deleted */ + if (stat(cgroup, &stat_buffer) != 0) { + break; + } + } + } + return EXIT_SUCCESS; +} + +#endif Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.h IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.h (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.h (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if __linux + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +/* +This file implements a standard cgroups out of memory listener. +*/ + +typedef struct _oom_listener_descriptors { + /* + * Command line that was called to run this process. + */ + const char *command; + /* + * Event descriptor to watch. + * It is filled in by the function, + * if not specified, yet. + */ + int event_fd; + /* + * cgroup.event_control file handle + */ + int event_control_fd; + /* + * memory.oom_control file handle + */ + int oom_control_fd; + /* + * cgroup.event_control path + */ + char event_control_path[PATH_MAX]; + /* + * memory.oom_control path + */ + char oom_control_path[PATH_MAX]; + /* + * Control command to write to + * cgroup.event_control + * Filled by the function. + */ + char oom_command[25]; + /* + * Length of oom_command filled by the function. + */ + size_t oom_command_len; + /* + * Directory watch timeout + */ + int watch_timeout; +} _oom_listener_descriptors; + +/* + Clean up allocated resources in a descriptor structure +*/ +inline void cleanup(_oom_listener_descriptors *descriptors) { + close(descriptors->event_fd); + descriptors->event_fd = -1; + close(descriptors->event_control_fd); + descriptors->event_control_fd = -1; + close(descriptors->oom_control_fd); + descriptors->oom_control_fd = -1; + descriptors->watch_timeout = 1000; +} + +/* + * Enable an OOM listener on the memory cgroup cgroup + * descriptors: Structure that holds state for testing purposes + * cgroup: cgroup path to watch. It has to be a memory cgroup + * fd: File to forward events to. Normally this is stdout + */ +int oom_listener(_oom_listener_descriptors *descriptors, const char *cgroup, int fd); + +#endif Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener_main.c IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener_main.c (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener_main.c (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if __linux + +#include +#include + +#include "oom_listener.h" + +void print_usage(void) { + fprintf(stderr, "usage: oom-listener \n"); + exit(EXIT_FAILURE); +} + +/* + A command that receives a memory cgroup directory and + listens to the events in the directory. + It will print a new line on every out of memory event + to the standard output. + usage: + oom-listener +*/ +int main(int argc, char *argv[]) { + if (argc != 2) + print_usage(); + + _oom_listener_descriptors descriptors = { + .command = argv[0], + .event_fd = -1, + .event_control_fd = -1, + .oom_control_fd = -1, + .event_control_path = {0}, + .oom_control_path = {0}, + .oom_command = {0}, + .oom_command_len = 0, + .watch_timeout = 1000 + }; + + int ret = oom_listener(&descriptors, argv[1], STDOUT_FILENO); + + cleanup(&descriptors); + + return ret; +} + +#else + +/* + This tool uses Linux specific functionality, + so it is not available for other operating systems +*/ +int main() { + return 1; +} + +#endif \ No newline at end of file Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/test/oom_listener_test_main.cc IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/test/oom_listener_test_main.cc (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/test/oom_listener_test_main.cc (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if __linux + +extern "C" { +#include "oom_listener.h" +} + +#include +#include +#include + +#define CGROUP_ROOT "/sys/fs/cgroup/memory/" +#define TEST_ROOT "/tmp/test-oom-listener/" +#define CGROUP_TASKS "tasks" +#define CGROUP_OOM_CONTROL "memory.oom_control" +#define CGROUP_LIMIT_PHYSICAL "memory.limit_in_bytes" +#define CGROUP_LIMIT_SWAP "memory.memsw.limit_in_bytes" +#define CGROUP_EVENT_CONTROL "cgroup.event_control" +#define CGROUP_LIMIT (5 * 1024 * 1024) + +// We try multiple cgroup directories +// We try first the official path to test +// in production +// If we are running as a user we fall back +// to mock cgroup +static const char *cgroup_candidates[] = { CGROUP_ROOT, TEST_ROOT }; + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +class OOMListenerTest : public ::testing::Test { +private: + char cgroup[PATH_MAX] = {}; + const char* cgroup_root = nullptr; +public: + OOMListenerTest() = default; + + virtual ~OOMListenerTest() = default; + virtual const char* GetCGroup() { return cgroup; } + virtual void SetUp() { + struct stat cgroup_memory = {}; + for (unsigned int i = 0; i < GTEST_ARRAY_SIZE_(cgroup_candidates); ++i) { + cgroup_root = cgroup_candidates[i]; + + // Try to create the root. + // We might not have permission and + // it may already exist + mkdir(cgroup_root, 0700); + + if (0 != stat(cgroup_root, &cgroup_memory)) { + printf("%s missing. Skipping test\n", cgroup_root); + continue; + } + + timespec timespec1 = {}; + if (0 != clock_gettime(CLOCK_MONOTONIC, ×pec1)) { + ASSERT_TRUE(false) << " clock_gettime failed\n"; + } + + if (snprintf(cgroup, sizeof(cgroup), "%s%lx/", + cgroup_root, timespec1.tv_nsec) <= 0) { + cgroup[0] = '\0'; + printf("%s snprintf failed\n", cgroup_root); + continue; + } + + // Create a cgroup named the current timestamp + // to make it quasi unique + if (0 != mkdir(cgroup, 0700)) { + printf("%s not writable.\n", cgroup); + continue; + } + break; + } + + ASSERT_EQ(0, stat(cgroup, &cgroup_memory)) + << "Cannot use or simulate cgroup " << cgroup; + } + virtual void TearDown() { + if (cgroup[0] != '\0') { + rmdir(cgroup); + } + if (cgroup_root != nullptr && + cgroup_root != cgroup_candidates[0]) { + rmdir(cgroup_root); + } + } +}; + +TEST_F(OOMListenerTest, test_oom) { + // Disable OOM killer + std::ofstream oom_control; + std::string oom_control_file = + std::string(GetCGroup()).append(CGROUP_OOM_CONTROL); + oom_control.open(oom_control_file.c_str(), oom_control.out); + oom_control << 1 << std::endl; + oom_control.close(); + + // Set a low enough limit for physical + std::ofstream limit; + std::string limit_file = + std::string(GetCGroup()).append(CGROUP_LIMIT_PHYSICAL); + limit.open(limit_file.c_str(), limit.out); + limit << CGROUP_LIMIT << std::endl; + limit.close(); + + // Set a low enough limit for physical + swap + std::ofstream limitSwap; + std::string limit_swap_file = + std::string(GetCGroup()).append(CGROUP_LIMIT_SWAP); + limitSwap.open(limit_swap_file.c_str(), limitSwap.out); + limitSwap << CGROUP_LIMIT << std::endl; + limitSwap.close(); + + // Event control file to set + std::string memory_control_file = + std::string(GetCGroup()).append(CGROUP_EVENT_CONTROL); + + // Tasks file to check + std::string tasks_file = + std::string(GetCGroup()).append(CGROUP_TASKS); + + int mock_oom_event_as_user = -1; + struct stat stat1 = {}; + if (0 != stat(memory_control_file.c_str(), &stat1)) { + // We cannot tamper with cgroups + // running as a user, so simulate an + // oom event + mock_oom_event_as_user = eventfd(0, 0); + } + const int simulate_cgroups = + mock_oom_event_as_user != -1; + + __pid_t mem_hog_pid = fork(); + if (!mem_hog_pid) { + // Child process to consume too much memory + if (simulate_cgroups) { + std::cout << "Simulating cgroups OOM" << std::endl; + for (;;) { + sleep(1); + } + } else { + // Wait until we are added to the cgroup + // so that it is accounted for our mem + // usage + __pid_t cgroupPid; + do { + std::ifstream tasks; + tasks.open(tasks_file.c_str(), tasks.in); + tasks >> cgroupPid; + tasks.close(); + } while (cgroupPid != getpid()); + + // Start consuming as much memory as we can. + // cgroup will stop us at CGROUP_LIMIT + const int bufferSize = 1024 * 1024; + std::cout << "Consuming too much memory" << std::endl; + for (;;) { + auto buffer = (char *) malloc(bufferSize); + if (buffer != nullptr) { + for (int i = 0; i < bufferSize; ++i) { + buffer[i] = (char) std::rand(); + } + } + } + } + } else { + // Parent test + ASSERT_GE(mem_hog_pid, 1) << "Fork failed " << errno; + + // Put child into cgroup + std::ofstream tasks; + tasks.open(tasks_file.c_str(), tasks.out); + tasks << mem_hog_pid << std::endl; + tasks.close(); + + // Create pipe to get forwarded eventfd + int test_pipe[2]; + ASSERT_EQ(0, pipe(test_pipe)); + + // Launch OOM listener + __pid_t listener = fork(); + if (listener == 0) { + // child listener forwarding cgroup events + _oom_listener_descriptors descriptors = { + .command = "test", + .event_fd = mock_oom_event_as_user, + .event_control_fd = -1, + .oom_control_fd = -1, + .event_control_path = {0}, + .oom_control_path = {0}, + .oom_command = {0}, + .oom_command_len = 0, + .watch_timeout = 100 + }; + int ret = oom_listener(&descriptors, GetCGroup(), test_pipe[1]); + cleanup(&descriptors); + close(test_pipe[0]); + close(test_pipe[1]); + exit(ret); + } else { + // Parent test + uint64_t event_id = 1; + if (simulate_cgroups) { + // We cannot tamper with cgroups + // running as a user, so simulate an + // oom event + ASSERT_EQ(sizeof(event_id), + write(mock_oom_event_as_user, + &event_id, + sizeof(event_id))); + } + ASSERT_EQ(sizeof(event_id), + read(test_pipe[0], + &event_id, + sizeof(event_id))) + << "The event has not arrived"; + close(test_pipe[0]); + close(test_pipe[1]); + + // Simulate OOM killer + ASSERT_EQ(0, kill(mem_hog_pid, SIGKILL)); + + // Verify that process was killed + __WAIT_STATUS mem_hog_status = {}; + __pid_t exited0 = wait(mem_hog_status); + ASSERT_EQ(mem_hog_pid, exited0) + << "Wrong process exited"; + ASSERT_EQ(nullptr, mem_hog_status) + << "Test process killed with invalid status"; + + if (mock_oom_event_as_user != -1) { + ASSERT_EQ(0, unlink(oom_control_file.c_str())); + ASSERT_EQ(0, unlink(limit_file.c_str())); + ASSERT_EQ(0, unlink(limit_swap_file.c_str())); + ASSERT_EQ(0, unlink(tasks_file.c_str())); + ASSERT_EQ(0, unlink(memory_control_file.c_str())); + } + // Once the cgroup is empty delete it + ASSERT_EQ(0, rmdir(GetCGroup())) + << "Could not delete cgroup " << GetCGroup(); + + // Check that oom_listener exited on the deletion of the cgroup + __WAIT_STATUS oom_listener_status = {}; + __pid_t exited1 = wait(oom_listener_status); + ASSERT_EQ(listener, exited1) + << "Wrong process exited"; + ASSERT_EQ(nullptr, oom_listener_status) + << "Listener process exited with invalid status"; + } + } +} + +#else +/* +This tool covers Linux specific functionality, +so it is not available for other operating systems +*/ +int main() { + return 1; +} +#endif Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupElasticMemoryController.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupElasticMemoryController.java (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupElasticMemoryController.java (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.File; +import java.nio.charset.Charset; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test for elastic non-strict memory controller based on cgroups. + */ +public class TestCGroupElasticMemoryController { + private YarnConfiguration conf = new YarnConfiguration(); + private File script = new File("target/" + + TestCGroupElasticMemoryController.class.getName()); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + /** + * Test that only a single memory time can be requested. + * @throws YarnException on exception + */ + @Test + public void testConstructor() + throws YarnException { + thrown.expect(YarnException.class); + CGroupElasticMemoryController controller = + new CGroupElasticMemoryController( + conf, + null, + null, + true, + true, + 10000 + ); + } + + /** + * Test that at least one memory type is requested. + * @throws YarnException on exception + */ + @Test + public void testConstructorOff() + throws YarnException { + thrown.expect(YarnException.class); + CGroupElasticMemoryController controller = + new CGroupElasticMemoryController( + conf, + null, + null, + false, + false, + 10000 + ); + } + + /** + * Test that the OOM logic is pluggable. + * @throws YarnException on exception + */ + @Test + public void testConstructorHandler() + throws YarnException { + conf.setClass(YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_HANDLER, + DummyRunnable.class, Runnable.class); + thrown.expect(YarnException.class); + CGroupElasticMemoryController controller = + new CGroupElasticMemoryController( + conf, + null, + null, + true, + false, + 10000 + ); + } + + /** + * Test that the handler is notified about multiple OOM events. + * @throws Exception on exception + */ + @Test + public void testMultipleOOMEvents() throws Exception { + conf.set(YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH, + script.getAbsolutePath()); + try { + FileUtils.writeStringToFile(script, + "#!/bin/bash\nprintf oomevent;printf oomevent;\n", + Charset.defaultCharset(), false); + assertTrue("Could not set executable", + script.setExecutable(true)); + + CGroupsHandler cgroups = mock(CGroupsHandler.class); + when(cgroups.getPathForCGroup(any(), any())).thenReturn(""); + when(cgroups.getCGroupParam(any(), any(), any())) + .thenReturn("under_oom 0"); + + Runnable handler = mock(Runnable.class); + doNothing().when(handler).run(); + + CGroupElasticMemoryController controller = + new CGroupElasticMemoryController( + conf, + null, + cgroups, + true, + false, + 10000 + ); + controller.setOomHandler(handler); + controller.run(); + verify(handler, times(2)).run(); + } finally { + assertTrue(String.format("Could not clean up script %s", + script.getAbsolutePath()), script.delete()); + } + } + + /** + * Test the scenario that the controller is stopped before. + * the child process starts + * @throws Exception one exception + */ + @Test + public void testStopBeforeStart() throws Exception { + conf.set(YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH, + script.getAbsolutePath()); + try { + FileUtils.writeStringToFile(script, + "#!/bin/bash\nprintf oomevent;printf oomevent;\n", + Charset.defaultCharset(), false); + assertTrue("Could not set executable", + script.setExecutable(true)); + + CGroupsHandler cgroups = mock(CGroupsHandler.class); + when(cgroups.getPathForCGroup(any(), any())).thenReturn(""); + when(cgroups.getCGroupParam(any(), any(), any())) + .thenReturn("under_oom 0"); + + Runnable handler = mock(Runnable.class); + doNothing().when(handler).run(); + + CGroupElasticMemoryController controller = + new CGroupElasticMemoryController( + conf, + null, + cgroups, + true, + false, + 10000 + ); + controller.setOomHandler(handler); + controller.stopListening(); + controller.run(); + verify(handler, times(0)).run(); + } finally { + assertTrue(String.format("Could not clean up script %s", + script.getAbsolutePath()), script.delete()); + } + } + + /** + * Test the edge case that OOM is never resolved. + * @throws Exception on exception + */ + @Test + public void testInfiniteOOM() throws Exception { + conf.set(YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH, + script.getAbsolutePath()); + Runnable handler = mock(Runnable.class); + try { + FileUtils.writeStringToFile(script, + "#!/bin/bash\nprintf oomevent;sleep 1000;\n", + Charset.defaultCharset(), false); + assertTrue("Could not set executable", + script.setExecutable(true)); + + CGroupsHandler cgroups = mock(CGroupsHandler.class); + when(cgroups.getPathForCGroup(any(), any())).thenReturn(""); + when(cgroups.getCGroupParam(any(), any(), any())) + .thenReturn("under_oom 1"); + + doNothing().when(handler).run(); + + CGroupElasticMemoryController controller = + new CGroupElasticMemoryController( + conf, + null, + cgroups, + true, + false, + 10000 + ); + controller.setOomHandler(handler); + thrown.expect(YarnRuntimeException.class); + controller.run(); + } finally { + verify(handler, times(1)).run(); + assertTrue(String.format("Could not clean up script %s", + script.getAbsolutePath()), script.delete()); + } + } + + @Test + public void testNormalExit() throws Exception { + conf.set(YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH, + script.getAbsolutePath()); + try { + FileUtils.writeStringToFile(script, + "#!/bin/bash\nsleep 10000;\n", + Charset.defaultCharset(), false); + assertTrue("Could not set executable", + script.setExecutable(true)); + + CGroupsHandler cgroups = mock(CGroupsHandler.class); + when(cgroups.getPathForCGroup(any(), any())).thenReturn(""); + when(cgroups.getCGroupParam(any(), any(), any())) + .thenReturn("under_oom 1"); + + Runnable handler = mock(Runnable.class); + doNothing().when(handler).run(); + + CGroupElasticMemoryController controller = + new CGroupElasticMemoryController( + conf, + null, + cgroups, + true, + false, + 10000 + ); + controller.setOomHandler(handler); + + ExecutorService service = Executors.newFixedThreadPool(1); + service.submit(() -> { + try { + Thread.sleep(2000); + } catch (InterruptedException ex) { + assertTrue("Wait interrupted.", false); + } + controller.stopListening(); + }); + controller.run(); + } finally { + assertTrue(String.format("Could not clean up script %s", + script.getAbsolutePath()), script.delete()); + } + } + + /** + * Runnable that does not do anything. + */ + public class DummyRunnable implements Runnable { + public DummyRunnable(Context context) { + } + @Override + public void run() { + } + } +} Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_FILE_TASKS; +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES; +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL; +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_USAGE_BYTES; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test default out of memory handler. + */ +public class TestDefaultOOMHandler { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + /** + * Test an OOM situation where no containers are running. + */ + @Test + public void testNoContainers() throws Exception { + Context context = mock(Context.class); + + thrown.expect(YarnRuntimeException.class); + when(context.getContainers()).thenReturn(new ConcurrentHashMap<>()); + + CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL)) + .thenReturn("under_oom 1").thenReturn("under_oom 0"); + + DefaultOOMHandler handler = new DefaultOOMHandler(context); + handler.setCGroupsHandler(cGroupsHandler); + + handler.run(); + } + + /** + * We have two containers, both out of limit. We should kill the later one. + * + * @throws Exception exception + */ + @Test + public void testBothContainersOOM() throws Exception { + ConcurrentHashMap containers = + new ConcurrentHashMap<>(new LinkedHashMap<>()); + + Container c1 = mock(Container.class); + ContainerId cid1 = createContainerId(1); + when(c1.getContainerId()).thenReturn(cid1); + when(c1.getResource()).thenReturn(Resource.newInstance(10, 1)); + when(c1.getContainerStartTime()).thenReturn((long) 1); + containers.put(createContainerId(1), c1); + + Container c2 = mock(Container.class); + ContainerId cid2 = createContainerId(2); + when(c2.getContainerId()).thenReturn(cid2); + when(c2.getResource()).thenReturn(Resource.newInstance(10, 1)); + when(c2.getContainerStartTime()).thenReturn((long) 2); + containers.put(cid2, c2); + + CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + cid1.toString(), CGROUP_FILE_TASKS)) + .thenReturn("1234").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(11)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(11)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + cid2.toString(), CGROUP_FILE_TASKS)) + .thenReturn("1235").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(11)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(11)); + + ContainerExecutor ex = mock(ContainerExecutor.class); + + runOOMHandler(containers, cGroupsHandler, ex); + + verify(ex, times(1)).signalContainer(any()); + } + + /** + * We have two containers, one out of limit. We should kill that one. + * This should happen even, if it was started earlier + * + * @throws Exception exception + */ + @Test + public void testOneContainerOOM() throws Exception { + ConcurrentHashMap containers = + new ConcurrentHashMap<>(new LinkedHashMap<>()); + + Container c1 = mock(Container.class); + ContainerId cid1 = createContainerId(1); + when(c1.getContainerId()).thenReturn(cid1); + when(c1.getResource()).thenReturn(Resource.newInstance(10, 1)); + when(c1.getContainerStartTime()).thenReturn((long) 2); + containers.put(createContainerId(1), c1); + + Container c2 = mock(Container.class); + ContainerId cid2 = createContainerId(2); + when(c2.getContainerId()).thenReturn(cid2); + when(c2.getResource()).thenReturn(Resource.newInstance(10, 1)); + when(c2.getContainerStartTime()).thenReturn((long) 1); + containers.put(cid2, c2); + + CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + cid1.toString(), CGROUP_FILE_TASKS)) + .thenReturn("1234").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + cid2.toString(), CGROUP_FILE_TASKS)) + .thenReturn("1235").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(11)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(11)); + + ContainerExecutor ex = mock(ContainerExecutor.class); + runOOMHandler(containers, cGroupsHandler, ex); + + verify(ex, times(1)).signalContainer(any()); + } + + /** + * We have two containers, neither out of limit. We should kill the later one. + * + * @throws Exception exception + */ + @Test + public void testNoContainerOOM() throws Exception { + ConcurrentHashMap containers = + new ConcurrentHashMap<>(new LinkedHashMap<>()); + + Container c1 = mock(Container.class); + ContainerId cid1 = createContainerId(1); + when(c1.getContainerId()).thenReturn(cid1); + when(c1.getResource()).thenReturn(Resource.newInstance(10, 1)); + when(c1.getContainerStartTime()).thenReturn((long) 1); + containers.put(createContainerId(1), c1); + + Container c2 = mock(Container.class); + ContainerId cid2 = createContainerId(2); + when(c2.getContainerId()).thenReturn(cid2); + when(c2.getResource()).thenReturn(Resource.newInstance(10, 1)); + when(c2.getContainerStartTime()).thenReturn((long) 2); + containers.put(cid2, c2); + + CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + cid1.toString(), CGROUP_FILE_TASKS)) + .thenReturn("1234").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + cid2.toString(), CGROUP_FILE_TASKS)) + .thenReturn("1235").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + + ContainerExecutor ex = mock(ContainerExecutor.class); + runOOMHandler(containers, cGroupsHandler, ex); + + verify(ex, times(1)).signalContainer(any()); + } + + private void runOOMHandler( + ConcurrentHashMap containers, + CGroupsHandler cGroupsHandler, ContainerExecutor ex) + throws IOException, ResourceHandlerException { + Context context = mock(Context.class); + when(context.getContainers()).thenReturn(containers); + + when(ex.signalContainer(any())) + .thenAnswer(invocation -> { + assertEquals("Wrong pid killed", "1235", + ((ContainerSignalContext) invocation.getArguments()[0]).getPid()); + return true; + }); + + when(cGroupsHandler.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL)) + .thenReturn("under_oom 1").thenReturn("under_oom 0"); + + when(context.getContainerExecutor()).thenReturn(ex); + + DefaultOOMHandler handler = new DefaultOOMHandler(context); + handler.setCGroupsHandler(cGroupsHandler); + + handler.run(); + } + + private class AppId extends ApplicationIdPBImpl { + AppId(long clusterTs, int appId) { + this.setClusterTimestamp(clusterTs); + this.setId(appId); + } + } + + private ContainerId createContainerId(int id) { + ApplicationId applicationId = new AppId(1, 1); + + ApplicationAttemptId applicationAttemptId + = mock(ApplicationAttemptId.class); + when(applicationAttemptId.getApplicationId()).thenReturn(applicationId); + when(applicationAttemptId.getAttemptId()).thenReturn(1); + + ContainerId containerId = mock(ContainerId.class); + when(containerId.toString()).thenReturn(Integer.toString(id)); + when(containerId.getContainerId()).thenReturn(new Long(1)); + + return containerId; + } + + ContainerTokenIdentifier getToken() { + ContainerTokenIdentifier id = mock(ContainerTokenIdentifier.class); + when(id.getVersion()).thenReturn(1); + return id; + } + + String getMB(long mb) { + return Long.toString(mb * 1024 * 1024); + } +} \ No newline at end of file Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManagerCGroupsMemory.md IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManagerCGroupsMemory.md (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManagerCGroupsMemory.md (revision a581c1bfc9e49492cda1ed215e6a4b11ea5f3dcc) @@ -0,0 +1,82 @@ + + +Using Memory Control with cgroups in YARN +======================= + +cgroups can be used to preempt containers in case of out-of-memory events. There are two types of controls in YARN that can be used exclusively. Strict memory control kills each container that has exceeded it's limits. Elastic memory control on the other hand allows bursting and starts killing containers only, if the overall system memory usage reaches a limit. + +Elastic Memory Feature +---------------------- + +The cgroups kernel feature has the ability to notify the node manager, if the parent cgroup of all containers specified by `yarn.nodemanager.linux-container-executor.cgroups.hierarchy` goes over a memory limit. The YARN feature that uses this ability is called elastic memory control. The benefits are that containers can burst using more memory than they are reserved to. This is allowed as long as we do not exceed the overall memory limit. When the limit is reached the kernel freezes all the containers and notifies the node manager. The node manager chooses a container and preempts it. It continues this step until the node is resumed from the OOM condition. + +The Limit +--------- + +The limit is the amount of memory allocated to all the containers on the node. The limit is specified by `yarn.nodemanager.resource.memory-mb` and `yarn.nodemanager.vmem-pmem-ratio`. If these are not set, the limit is set based on the available resources. See `yarn.nodemanager.resource.detect-hardware-capabilities` for details. + +The pluggable preemption logic +------------------------------ + +The preemption logic specifies which container to preempt in an out-of-memory situation. The default logic is the `DefaultOOMHandler`. It picks the latest container that exceeded it's memory limit. In the unlikely case that no such container is found, it preempts the container that was launched most recently. This continues until the OOM condition is resolved. This logic supports oversubscription, when opportunistic containers can run on top of guaranteed ones as long as we have memory available. This helps to improve the overall cluster utilization. The logic ensures that as long as a container is within its limit, it won't get preempted. If the container bursts it can be preempted. There is an unlikely case that all containers are within their limits but we are out of memory. We prefer preemting the latest containers to minimize the cost and value lost. Once preempted, the data in the container is lost. + +The default out-of-memory handler can be updated using `yarn.nodemanager.elastic-memory-control.oom-handler`. The class named in this configuration entry has to implement java.lang.Runnable. The `run()` function will be called in a node level out-of-memory situation. + +Physical and virtual memory control +---------------------------------- + +The limit applies to the physical or virtual (rss+swap in cgroups) memory depending on whether `yarn.nodemanager.pmem-check-enabled` or `yarn.nodemanager.vmem-check-enabled` is set. + +There is no reason to set them both. If the system runs with swap disabled, both will have the same number. If swap is enabled the virtual memory counter will account for pages in physical memory and on the disk. This is what the application allocated and it has control over. The limit should be applied to the virtual memory in this case. When swapping is enabled, the physical memory is less than the virtual memory and it is adjusted by the kernel not just by the container. There is no point preempting a container when it exceeds a physical memory limit with swapping. The system will just swap out some memory, when needed. + +Virtual memory measurement and swapping +-------------------------------------------- + +There is a difference between the virtual memory reported by the container monitor and the virtual memory limit specified in the elastic memory control feature. The container monitor uses `ProcfsBasedProcessTree` by default for measurements that returns values from the `proc` file system. The virtual memory returned from it is the size of the address space of all the processes in each container. This includes anonymous pages, pages swapped out to disk, mapped files and reserved pages among others. The number of reserved pages is not backed by either physical or swapped memory. They can be a large part of the virtual memory usage. The reservabe address space was limited on 32 bit processors but it is very large on 64-bit ones making this metric less useful. Some Java Virtual Machines reserve large amounts of pages but they do not actually use it. This will result in gigabytes of virtual memory usage shown. However, this does not mean that anything is wrong with the container. + +Because of this you can now use `CGroupsResourceCalculator`. This shows only the sum of the physical memory usage and swapped pages as virtual memory usage excluding the reserved address space. This reflects much better what the application and the container allocated. + +In order to enable cgroups based resource calculation set `yarn.nodemanager.resource-calculator.class` to `org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsResourceCalculator`. + +Prerequisites +------------- + +Any cgroups based memory control requires the following settings. + +`yarn.nodemanager.container-executor.class` should be `org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor`. + +`yarn.nodemanager.runtime.linux.allowed-runtimes` should at least be `default`. + +`yarn.nodemanager.resource.memory.enabled` should be `true` + +Configuring elastic memory resource control +------------------------------------------ + +The cgroups based elastic memory control preempts containers only if the overall system memory usage reaches it's limit allowing bursting. This feature requires setting the following options on top of the prerequisites. + +`yarn.nodemanager.elastic-memory-control.enabled` should be `true`. + +`yarn.nodemanager.resource.memory.enforced` should be false + +`yarn.nodemanager.pmem-check-enabled` or `yarn.nodemanager.vmem-check-enabled` should be `true` but only one of them. + +Configuring strict memory resource control +------------------------------------------ + +Strict memory control preempts containers right away using the OOM killer feature of the kernel, when they reach their physical or virtual memory limits. You need to set the following options on top of the prerequisites above to use strict memory control. + +`yarn.nodemanager.pmem-check-enabled` or `yarn.nodemanager.vmem-check-enabled` should be `true`. You can set them both. + +`yarn.nodemanager.resource.memory.enforced` should be true \ No newline at end of file