diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index 86f2554..9f2efe1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -37,8 +37,17 @@
ContainerId getContainerId();
+ /**
+ * The timestamp when the container start request is received.
+ * @return
+ */
long getContainerStartTime();
+ /**
+ * The timestamp when the container is allowed to be launched.
+ */
+ long getContainerLaunchTime();
+
Resource getResource();
ContainerTokenIdentifier getContainerTokenIdentifier();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 5527ac4..95ab374 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -883,6 +883,11 @@ public long getContainerStartTime() {
}
@Override
+ public long getContainerLaunchTime() {
+ return this.containerLaunchStartTime;
+ }
+
+ @Override
public Resource getResource() {
return Resources.clone(
this.containerTokenIdentifier.getResource());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java
index c690225..ba79830 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -44,37 +45,58 @@
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class DefaultOOMHandler implements Runnable {
- protected static final Log LOG = LogFactory
+ private static final Log LOG = LogFactory
.getLog(DefaultOOMHandler.class);
- private Context context;
- private boolean virtual;
- private CGroupsHandler cgroups;
+ private final Context context;
+ private final boolean enforceVirtualMemory;
+ private final CGroupsHandler cgroups;
+
+ // Order by execution type and then reverse order by launch time
+ private static final Comparator CONTAINER_LAUNCH_TIME_COMPARATOR =
+ (Container c1, Container c2) -> {
+ boolean isC1Opportunistic = isOpportunistic(c1);
+ boolean isC2Opportunistic = isOpportunistic(c2);
+ if (isC1Opportunistic == isC2Opportunistic) {
+ // the two containers are of the same execution type
+ long order = c1.getContainerLaunchTime() - c2.getContainerLaunchTime();
+ return order > 0 ? -1 : order < 0 ? 1 : 0;
+ } else if (isC1Opportunistic) {
+ // container c1 is opportunistic and c2 is guaranteed
+ return 1;
+ } else {
+ // container c1 is guaranteed and c2 is opportunistic
+ return -1;
+ }
+ };
/**
* Create an OOM handler.
* This has to be public to be able to construct through reflection.
* @param context node manager context to work with
- * @param testVirtual Test virtual memory or physical
+ * @param enforceVirtualMemory true if virtual memory needs to be checked,
+ * false if physical memory needs to be checked instead
*/
- public DefaultOOMHandler(Context context, boolean testVirtual) {
+ public DefaultOOMHandler(Context context, boolean enforceVirtualMemory) {
this.context = context;
- this.virtual = testVirtual;
- this.cgroups = ResourceHandlerModule.getCGroupsHandler();
+ this.enforceVirtualMemory = enforceVirtualMemory;
+ this.cgroups = getCGroupsHandler();
}
@VisibleForTesting
- void setCGroupsHandler(CGroupsHandler handler) {
- cgroups = handler;
+ protected CGroupsHandler getCGroupsHandler() {
+ return ResourceHandlerModule.getCGroupsHandler();
}
/**
- * Kill the container, if it has exceeded its request.
+ * Kill the GUARANTEED container, if it has exceeded its request.
*
* @param container Container to check
* @param fileName CGroup filename (physical or swap/virtual)
* @return true, if the container was preempted
*/
- private boolean killContainerIfOOM(Container container, String fileName) {
+ private boolean killGuaranteedContainerIfOOM(
+ Container container, String fileName) {
+ assert(!isOpportunistic(container));
String value = null;
try {
value = cgroups.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
@@ -168,21 +190,12 @@ private void sigKill(Container container) {
/**
* It is called when the node is under an OOM condition. All processes in
* all sub-cgroups are suspended. We need to act fast, so that we do not
- * affect the overall system utilization.
- * In general we try to find a newly run container that exceeded its limits.
- * The justification is cost, since probably this is the one that has
- * accumulated the least amount of uncommitted data so far.
- * We continue the process until the OOM is resolved.
+ * affect the overall system utilization. We continue the process until
+ * the OOM is resolved.
*/
@Override
public void run() {
try {
- // Reverse order by start time
- Comparator comparator = (Container o1, Container o2) -> {
- long order = o1.getContainerStartTime() - o2.getContainerStartTime();
- return order > 0 ? -1 : order < 0 ? 1 : 0;
- };
-
// We kill containers until the kernel reports the OOM situation resolved
// Note: If the kernel has a delay this may kill more than necessary
while (true) {
@@ -194,61 +207,86 @@ public void run() {
break;
}
- // The first pass kills a recent container
- // that uses more than its request
- ArrayList containers = new ArrayList<>();
- containers.addAll(context.getContainers().values());
- // Note: Sorting may take a long time with 10K+ containers
- // but it is acceptable now with low number of containers per node
- containers.sort(comparator);
-
- // Kill the latest container that exceeded its request
- boolean found = false;
- for (Container container : containers) {
- if (!virtual) {
- if (killContainerIfOOM(container,
- CGROUP_PARAM_MEMORY_USAGE_BYTES)) {
- found = true;
- break;
- }
- } else {
- if (killContainerIfOOM(container,
- CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) {
- found = true;
- break;
- }
- }
- }
- if (found) {
- continue;
- }
+ boolean containerKilled = killContainer();
- // We have not found any containers that ran out of their limit,
- // so we will kill the latest one. This can happen, if all use
- // close to their request and one of them requests a big block
- // triggering the OOM freeze.
- // Currently there is no other way to identify the outstanding one.
- if (containers.size() > 0) {
- Container container = containers.get(0);
- sigKill(container);
- String message = String.format(
- "Newest container %s killed by elastic cgroups OOM handler using",
- container.getContainerId());
- LOG.warn(message);
- continue;
+ if (!containerKilled) {
+ // This can happen, if SIGKILL did not clean up
+ // non-PGID or containers or containers launched by other users
+ // or if a process was put to the root YARN cgroup.
+ throw new YarnRuntimeException(
+ "Could not find any containers but CGroups " +
+ "reserved for containers ran out of memory. " +
+ "I am giving up");
}
-
- // This can happen, if SIGKILL did not clean up
- // non-PGID or containers or containers launched by other users
- // or if a process was put to the root YARN cgroup.
- throw new YarnRuntimeException(
- "Could not find any containers but CGroups " +
- "reserved for containers ran out of memory. " +
- "I am giving up");
}
} catch (ResourceHandlerException ex) {
LOG.warn("Could not fecth OOM status. " +
"This is expected at shutdown. Exiting.", ex);
}
}
+
+ /**
+ * Choose and kill a container in case of OOM. We try to find the most
+ * recently launched OPPORTUNISTIC container and fall back to the most
+ * recently launched GUARANTEED container that has exceeded its limits
+ * If there is no such container found, we choose to kill the most
+ * recently launched GUARANTEED one.
+ * @return true if a container is killed, false otherwise
+ */
+ protected boolean killContainer() {
+ ArrayList containers = new ArrayList<>(0);
+ containers.addAll(context.getContainers().values());
+ containers.sort(CONTAINER_LAUNCH_TIME_COMPARATOR);
+
+ boolean containerKilled = false;
+ for (Container container : containers) {
+ boolean isOpportunistic = isOpportunistic(container);
+ if (isOpportunistic) {
+ sigKill(container);
+ String message = String.format(
+ "Newest OPPORTUNISTIC container %s killed by elastic cgroups OOM" +
+ " handler using", container.getContainerId().toString());
+ LOG.warn(message);
+ containerKilled = true;
+ break;
+ } else {
+ if (!enforceVirtualMemory) {
+ if (killGuaranteedContainerIfOOM(container,
+ CGROUP_PARAM_MEMORY_USAGE_BYTES)) {
+ containerKilled = true;
+ break;
+ }
+ } else {
+ if (killGuaranteedContainerIfOOM(container,
+ CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) {
+ containerKilled = true;
+ break;
+ }
+ }
+ }
+ }
+
+ if (!containerKilled) {
+ // This can happen if there is no opportunistic container found and
+ // no guaranteed container goes over its limit. We just kill the most
+ // recently launched guaranteed one.
+ if (containers.size() > 0) {
+ Container container = containers.get(0);
+ sigKill(container);
+ String message = String.format(
+ "Newest GUARANTEED container %s killed by elastic cgroups OOM" +
+ " handler using", container.getContainerId());
+ LOG.warn(message);
+ containerKilled = true;
+ }
+ }
+
+ return containerKilled;
+ }
+
+ private static boolean isOpportunistic(Container container) {
+ return container.getContainerTokenIdentifier() != null &&
+ ExecutionType.OPPORTUNISTIC.equals(
+ container.getContainerTokenIdentifier().getExecutionType());
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java
index 60c38fe..86bad20 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -29,9 +30,11 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_FILE_TASKS;
@@ -39,6 +42,7 @@
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_USAGE_BYTES;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -51,13 +55,13 @@
public class TestDefaultOOMHandler {
/**
- * Test an OOM situation where no containers are running.
+ * Test an OOM situation where there is no containers that can be killed.
*/
@Test(expected = YarnRuntimeException.class)
- public void testNoContainers() throws Exception {
+ public void testExceptionThrownWithNoContainersToKill() throws Exception {
Context context = mock(Context.class);
- when(context.getContainers()).thenReturn(new ConcurrentHashMap<>());
+ when(context.getContainers()).thenReturn(new ConcurrentHashMap<>(0));
CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
when(cGroupsHandler.getCGroupParam(
@@ -66,59 +70,69 @@ public void testNoContainers() throws Exception {
CGROUP_PARAM_MEMORY_OOM_CONTROL))
.thenReturn("under_oom 1").thenReturn("under_oom 0");
- DefaultOOMHandler handler = new DefaultOOMHandler(context, false);
- handler.setCGroupsHandler(cGroupsHandler);
+ DefaultOOMHandler handler = new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
handler.run();
}
/**
- * We have two containers, both out of limit. We should kill the later one.
- *
- * @throws Exception exception
+ * We have two guaranteed containers, both of which are out of limit.
+ * We should kill the later one.
*/
@Test
- public void testBothContainersOOM() throws Exception {
+ public void testBothGuaranteedContainersOOM() throws Exception {
ConcurrentHashMap containers =
- new ConcurrentHashMap<>(new LinkedHashMap<>());
+ new ConcurrentHashMap<>();
+ Container c1 = createContainer(1, true, 1L);
+ containers.put(c1.getContainerId(), c1);
+ Container c2 = createContainer(2, true, 2L);
+ containers.put(c2.getContainerId(), c2);
- Container c1 = mock(Container.class);
- ContainerId cid1 = createContainerId(1);
- when(c1.getContainerId()).thenReturn(cid1);
- when(c1.getResource()).thenReturn(Resource.newInstance(10, 1));
- when(c1.getContainerStartTime()).thenReturn((long) 1);
- containers.put(createContainerId(1), c1);
+ ContainerExecutor ex = createContainerExecutor(containers);
+ Context context = mock(Context.class);
+ when(context.getContainers()).thenReturn(containers);
+ when(context.getContainerExecutor()).thenReturn(ex);
- Container c2 = mock(Container.class);
- ContainerId cid2 = createContainerId(2);
- when(c2.getContainerId()).thenReturn(cid2);
- when(c2.getResource()).thenReturn(Resource.newInstance(10, 1));
- when(c2.getContainerStartTime()).thenReturn((long) 2);
- containers.put(cid2, c2);
CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+ when(cGroupsHandler.getCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY,
+ "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL))
+ .thenReturn("under_oom 1").thenReturn("under_oom 0");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid1.toString(), CGROUP_FILE_TASKS))
+ c1.getContainerId().toString(), CGROUP_FILE_TASKS))
.thenReturn("1234").thenReturn("");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
.thenReturn(getMB(11));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
.thenReturn(getMB(11));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid2.toString(), CGROUP_FILE_TASKS))
+ c2.getContainerId().toString(), CGROUP_FILE_TASKS))
.thenReturn("1235").thenReturn("");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
.thenReturn(getMB(11));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
.thenReturn(getMB(11));
- ContainerExecutor ex = mock(ContainerExecutor.class);
+ DefaultOOMHandler handler =
+ new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
+ handler.run();
- runOOMHandler(containers, cGroupsHandler, ex);
verify(ex, times(1)).signalContainer(
new ContainerSignalContext.Builder()
@@ -131,52 +145,57 @@ public void testBothContainersOOM() throws Exception {
}
/**
- * We have two containers, one out of limit. We should kill that one.
- * This should happen even, if it was started earlier
- *
- * @throws Exception exception
+ * We have two GUARANTEED containers, one of which is out of limit.
+ * We should kill the one that's out of its limit. This should
+ * happen even, if it was launched earlier than the other one.
*/
@Test
- public void testOneContainerOOM() throws Exception {
+ public void testOneGuaranteedContainerOOM() throws Exception {
ConcurrentHashMap containers =
- new ConcurrentHashMap<>(new LinkedHashMap<>());
-
- Container c1 = mock(Container.class);
- ContainerId cid1 = createContainerId(1);
- when(c1.getContainerId()).thenReturn(cid1);
- when(c1.getResource()).thenReturn(Resource.newInstance(10, 1));
- when(c1.getContainerStartTime()).thenReturn((long) 2);
- containers.put(createContainerId(1), c1);
+ new ConcurrentHashMap<>();
+ Container c1 = createContainer(1, true, 2L);
+ containers.put(c1.getContainerId(), c1);
+ Container c2 = createContainer(2, true, 1L);
+ containers.put(c2.getContainerId(), c2);
- Container c2 = mock(Container.class);
- ContainerId cid2 = createContainerId(2);
- when(c2.getContainerId()).thenReturn(cid2);
- when(c2.getResource()).thenReturn(Resource.newInstance(10, 1));
- when(c2.getContainerStartTime()).thenReturn((long) 1);
- containers.put(cid2, c2);
+ ContainerExecutor ex = createContainerExecutor(containers);
+ Context context = mock(Context.class);
+ when(context.getContainers()).thenReturn(containers);
+ when(context.getContainerExecutor()).thenReturn(ex);
CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+ when(cGroupsHandler.getCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY,
+ "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL))
+ .thenReturn("under_oom 1").thenReturn("under_oom 0");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid1.toString(), CGROUP_FILE_TASKS))
+ c1.getContainerId().toString(), CGROUP_FILE_TASKS))
.thenReturn("1234").thenReturn("");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
.thenReturn(getMB(9));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
.thenReturn(getMB(9));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid2.toString(), CGROUP_FILE_TASKS))
+ c2.getContainerId().toString(), CGROUP_FILE_TASKS))
.thenReturn("1235").thenReturn("");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
.thenReturn(getMB(11));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
.thenReturn(getMB(11));
- ContainerExecutor ex = mock(ContainerExecutor.class);
- runOOMHandler(containers, cGroupsHandler, ex);
+ DefaultOOMHandler handler =
+ new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
+ handler.run();
verify(ex, times(1)).signalContainer(
new ContainerSignalContext.Builder()
@@ -189,51 +208,56 @@ public void testOneContainerOOM() throws Exception {
}
/**
- * We have two containers, neither out of limit. We should kill the later one.
- *
- * @throws Exception exception
+ * We have two GUARANTEE containers, neither of which is out of limit.
+ * We should kill the later launched one.
*/
@Test
- public void testNoContainerOOM() throws Exception {
+ public void testNoGuaranteedContainerOOM() throws Exception {
ConcurrentHashMap containers =
- new ConcurrentHashMap<>(new LinkedHashMap<>());
+ new ConcurrentHashMap<>();
+ Container c1 = createContainer(1, true, 1L);
+ containers.put(c1.getContainerId(), c1);
+ Container c2 = createContainer(2, true, 2L);
+ containers.put(c2.getContainerId(), c2);
- Container c1 = mock(Container.class);
- ContainerId cid1 = createContainerId(1);
- when(c1.getContainerId()).thenReturn(cid1);
- when(c1.getResource()).thenReturn(Resource.newInstance(10, 1));
- when(c1.getContainerStartTime()).thenReturn((long) 1);
- containers.put(createContainerId(1), c1);
-
- Container c2 = mock(Container.class);
- ContainerId cid2 = createContainerId(2);
- when(c2.getContainerId()).thenReturn(cid2);
- when(c2.getResource()).thenReturn(Resource.newInstance(10, 1));
- when(c2.getContainerStartTime()).thenReturn((long) 2);
- containers.put(cid2, c2);
+ ContainerExecutor ex = createContainerExecutor(containers);
+ Context context = mock(Context.class);
+ when(context.getContainers()).thenReturn(containers);
+ when(context.getContainerExecutor()).thenReturn(ex);
CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+ when(cGroupsHandler.getCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY,
+ "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL))
+ .thenReturn("under_oom 1").thenReturn("under_oom 0");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid1.toString(), CGROUP_FILE_TASKS))
+ c1.getContainerId().toString(), CGROUP_FILE_TASKS))
.thenReturn("1234").thenReturn("");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
.thenReturn(getMB(9));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
.thenReturn(getMB(9));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid2.toString(), CGROUP_FILE_TASKS))
+ c2.getContainerId().toString(), CGROUP_FILE_TASKS))
.thenReturn("1235").thenReturn("");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
.thenReturn(getMB(9));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
.thenReturn(getMB(9));
- ContainerExecutor ex = mock(ContainerExecutor.class);
- runOOMHandler(containers, cGroupsHandler, ex);
+ DefaultOOMHandler handler =
+ new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
+ handler.run();
verify(ex, times(1)).signalContainer(
new ContainerSignalContext.Builder()
@@ -245,43 +269,259 @@ public void testNoContainerOOM() throws Exception {
verify(ex, times(1)).signalContainer(any());
}
- private void runOOMHandler(
- ConcurrentHashMap containers,
- CGroupsHandler cGroupsHandler, ContainerExecutor ex)
- throws IOException, ResourceHandlerException {
+ /**
+ * We have two OPPORTUNISTIC containers and one GUARANTEED container.
+ * OOM is resolved after killing the most recently launched OPPORTUNISTIC
+ * container.
+ */
+ @Test
+ public void testKillOneOpportunisticContainerUponOOM() throws Exception {
+ ConcurrentHashMap containers =
+ new ConcurrentHashMap<>();
+ int currentContainerId = 0;
+ Container c1 = createContainer(currentContainerId++, false, 1);
+ containers.put(c1.getContainerId(), c1);
+ Container c2 = createContainer(currentContainerId++, false, 2);
+ containers.put(c2.getContainerId(), c2);
+ Container c3 = createContainer(currentContainerId++, true, 3);
+ containers.put(c3.getContainerId(), c3);
+
+ ContainerExecutor ex = createContainerExecutor(containers);
Context context = mock(Context.class);
when(context.getContainers()).thenReturn(containers);
+ when(context.getContainerExecutor()).thenReturn(ex);
- when(ex.signalContainer(any()))
- .thenAnswer(invocation -> {
- assertEquals("Wrong pid killed", "1235",
- ((ContainerSignalContext) invocation.getArguments()[0]).getPid());
- return true;
- });
-
+ CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
when(cGroupsHandler.getCGroupParam(
CGroupsHandler.CGroupController.MEMORY,
"",
CGROUP_PARAM_MEMORY_OOM_CONTROL))
- .thenReturn("under_oom 1").thenReturn("under_oom 0");
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 0");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1234").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1235").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1236").thenReturn("");
+
+ DefaultOOMHandler handler =
+ new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
+ handler.run();
+
+ verify(ex, times(1)).signalContainer(
+ new ContainerSignalContext.Builder()
+ .setPid("1235")
+ .setContainer(c2)
+ .setSignal(ContainerExecutor.Signal.KILL)
+ .build()
+ );
+ verify(ex, times(1)).signalContainer(any());
+ }
+ /**
+ * We have two OPPORTUNISTIC containers and one GUARANTEED container.
+ * OOM is resolved after killing both OPPORTUNISTIC containers.
+ */
+ @Test
+ public void testKillBothOpportunisticContainerUponOOM() throws Exception {
+ int currentContainerId = 0;
+
+ ConcurrentHashMap containers =
+ new ConcurrentHashMap<>();
+ Container c1 = createContainer(currentContainerId++, false, 1);
+ containers.put(c1.getContainerId(), c1);
+ Container c2 = createContainer(currentContainerId++, false, 2);
+ containers.put(c2.getContainerId(), c2);
+ Container c3 = createContainer(currentContainerId++, true, 3);
+ containers.put(c3.getContainerId(), c3);
+
+ ContainerExecutor ex = createContainerExecutor(containers);
+ Context context = mock(Context.class);
+ when(context.getContainers()).thenReturn(containers);
when(context.getContainerExecutor()).thenReturn(ex);
- DefaultOOMHandler handler = new DefaultOOMHandler(context, false);
- handler.setCGroupsHandler(cGroupsHandler);
+ CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+ when(cGroupsHandler.getCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY,
+ "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL))
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 0");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1234").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1235").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1236").thenReturn("");
+
+ DefaultOOMHandler handler =
+ new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
handler.run();
+
+ verify(ex, times(1)).signalContainer(
+ new ContainerSignalContext.Builder()
+ .setPid("1235")
+ .setContainer(c1)
+ .setSignal(ContainerExecutor.Signal.KILL)
+ .build()
+ );
+ verify(ex, times(1)).signalContainer(
+ new ContainerSignalContext.Builder()
+ .setPid("1234")
+ .setContainer(c2)
+ .setSignal(ContainerExecutor.Signal.KILL)
+ .build()
+ );
+ verify(ex, times(2)).signalContainer(any());
+ }
+
+ /**
+ * We have two OPPORTUNISTIC containers and one GUARANTEED container.
+ * OOM is resolved after killing all running containers.
+ */
+ @Test
+ public void testKillAllContainersUponOOM() throws Exception {
+ int currentContainerId = 0;
+
+ ConcurrentHashMap containers =
+ new ConcurrentHashMap<>();
+ Container c1 = createContainer(currentContainerId++, false, 1);
+ containers.put(c1.getContainerId(), c1);
+ Container c2 = createContainer(currentContainerId++, false, 2);
+ containers.put(c2.getContainerId(), c2);
+ Container c3 = createContainer(currentContainerId++, true, 3);
+ containers.put(c3.getContainerId(), c3);
+
+ ContainerExecutor ex = createContainerExecutor(containers);
+ Context context = mock(Context.class);
+ when(context.getContainers()).thenReturn(containers);
+ when(context.getContainerExecutor()).thenReturn(ex);
+
+ CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+ when(cGroupsHandler.getCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY,
+ "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL))
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 0");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1234").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1235").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1236").thenReturn("");
+
+ DefaultOOMHandler handler =
+ new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
+ handler.run();
+
+ verify(ex, times(1)).signalContainer(
+ new ContainerSignalContext.Builder()
+ .setPid("1234")
+ .setContainer(c2)
+ .setSignal(ContainerExecutor.Signal.KILL)
+ .build()
+ );
+ verify(ex, times(1)).signalContainer(
+ new ContainerSignalContext.Builder()
+ .setPid("1235")
+ .setContainer(c1)
+ .setSignal(ContainerExecutor.Signal.KILL)
+ .build()
+ );
+ verify(ex, times(1)).signalContainer(
+ new ContainerSignalContext.Builder()
+ .setPid("1236")
+ .setContainer(c3)
+ .setSignal(ContainerExecutor.Signal.KILL)
+ .build()
+ );
+ verify(ex, times(3)).signalContainer(any());
}
- private class AppId extends ApplicationIdPBImpl {
- AppId(long clusterTs, int appId) {
- this.setClusterTimestamp(clusterTs);
- this.setId(appId);
- }
+ /**
+ * We have two OPPORTUNISTIC containers and one GUARANTEED container.
+ * OOM is not resolved even after killing all running containers.
+ * A YarnRuntimeException is excepted to be thrown.
+ */
+ @Test(expected = YarnRuntimeException.class)
+ public void testOOMUnresolvedAfterKillingAllContainers() throws Exception {
+ int currentContainerId = 0;
+
+ ConcurrentHashMap containers =
+ new ConcurrentHashMap<>();
+ Container c1 = createContainer(currentContainerId++, false, 1);
+ containers.put(c1.getContainerId(), c1);
+ Container c2 = createContainer(currentContainerId++, false, 2);
+ containers.put(c2.getContainerId(), c2);
+ Container c3 = createContainer(currentContainerId++, true, 3);
+ containers.put(c3.getContainerId(), c3);
+
+ ContainerExecutor ex = createContainerExecutor(containers);
+ Context context = mock(Context.class);
+ when(context.getContainers()).thenReturn(containers);
+ when(context.getContainerExecutor()).thenReturn(ex);
+
+ CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+ when(cGroupsHandler.getCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY,
+ "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL))
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 1");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1234").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1235").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1236").thenReturn("");
+
+ DefaultOOMHandler handler =
+ new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
+ handler.run();
}
- private ContainerId createContainerId(int id) {
- ApplicationId applicationId = new AppId(1, 1);
+ private static ContainerId createContainerId(int id) {
+ ApplicationId applicationId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId applicationAttemptId
= mock(ApplicationAttemptId.class);
@@ -295,13 +535,41 @@ private ContainerId createContainerId(int id) {
return containerId;
}
- ContainerTokenIdentifier getToken() {
- ContainerTokenIdentifier id = mock(ContainerTokenIdentifier.class);
- when(id.getVersion()).thenReturn(1);
- return id;
+ private static Container createContainer(int containerId,
+ boolean guaranteed, long launchTime) {
+ Container c1 = mock(Container.class);
+ ContainerId cid1 = createContainerId(containerId);
+ when(c1.getContainerId()).thenReturn(cid1);
+
+ ContainerTokenIdentifier token = mock(ContainerTokenIdentifier.class);
+ ExecutionType type =
+ guaranteed ? ExecutionType.GUARANTEED : ExecutionType.OPPORTUNISTIC;
+ when(token.getExecutionType()).thenReturn(type);
+ when(c1.getContainerTokenIdentifier()).thenReturn(token);
+
+ when(c1.getResource()).thenReturn(Resource.newInstance(10, 1));
+ when(c1.getContainerLaunchTime()).thenReturn(launchTime);
+
+ return c1;
}
String getMB(long mb) {
return Long.toString(mb * 1024 * 1024);
}
+
+ private static ContainerExecutor createContainerExecutor(
+ ConcurrentHashMap containers)
+ throws IOException {
+ ContainerExecutor ex = mock(ContainerExecutor.class);
+ when(ex.signalContainer(any())).thenAnswer(
+ invocation -> {
+ Object[] arguments = invocation.getArguments();
+ Container container = ((ContainerSignalContext)
+ arguments[0]).getContainer();
+ // remove container from NM context immediately
+ containers.remove(container.getContainerId());
+ return true;
+ });
+ return ex;
+ }
}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index 77ebd34..325709b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -242,6 +242,11 @@ public long getContainerStartTime() {
}
@Override
+ public long getContainerLaunchTime() {
+ return 0;
+ }
+
+ @Override
public ResourceMappings getResourceMappings() {
return null;
}