diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index 86f2554..5d48d84 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -37,8 +37,16 @@
ContainerId getContainerId();
+ /**
+ * The timestamp when the container start request is received.
+ */
long getContainerStartTime();
+ /**
+ * The timestamp when the container is allowed to be launched.
+ */
+ long getContainerLaunchTime();
+
Resource getResource();
ContainerTokenIdentifier getContainerTokenIdentifier();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 5527ac4..95ab374 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -883,6 +883,11 @@ public long getContainerStartTime() {
}
@Override
+ public long getContainerLaunchTime() {
+ return this.containerLaunchStartTime;
+ }
+
+ @Override
public Resource getResource() {
return Resources.clone(
this.containerTokenIdentifier.getResource());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java
index c690225..becedd5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -30,7 +31,7 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Comparator;
+import java.util.Collections;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_FILE_TASKS;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES;
@@ -46,66 +47,60 @@
public class DefaultOOMHandler implements Runnable {
protected static final Log LOG = LogFactory
.getLog(DefaultOOMHandler.class);
- private Context context;
- private boolean virtual;
- private CGroupsHandler cgroups;
+ private final Context context;
+ private final String memoryStatFile;
+ private final CGroupsHandler cgroups;
/**
* Create an OOM handler.
* This has to be public to be able to construct through reflection.
* @param context node manager context to work with
- * @param testVirtual Test virtual memory or physical
+ * @param enforceVirtualMemory true if virtual memory needs to be checked,
+ * false if physical memory needs to be checked instead
*/
- public DefaultOOMHandler(Context context, boolean testVirtual) {
+ public DefaultOOMHandler(Context context, boolean enforceVirtualMemory) {
this.context = context;
- this.virtual = testVirtual;
- this.cgroups = ResourceHandlerModule.getCGroupsHandler();
+ this.memoryStatFile = enforceVirtualMemory ?
+ CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES :
+ CGROUP_PARAM_MEMORY_USAGE_BYTES;
+ this.cgroups = getCGroupsHandler();
}
@VisibleForTesting
- void setCGroupsHandler(CGroupsHandler handler) {
- cgroups = handler;
+ protected CGroupsHandler getCGroupsHandler() {
+ return ResourceHandlerModule.getCGroupsHandler();
}
/**
- * Kill the container, if it has exceeded its request.
- *
- * @param container Container to check
- * @param fileName CGroup filename (physical or swap/virtual)
- * @return true, if the container was preempted
+ * Check if a given container exceeds its limits.
*/
- private boolean killContainerIfOOM(Container container, String fileName) {
+ private boolean isContainerOutOfLimit(Container container) {
+ boolean outOfLimit = false;
+
String value = null;
try {
value = cgroups.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- container.getContainerId().toString(),
- fileName);
+ container.getContainerId().toString(), memoryStatFile);
long usage = Long.parseLong(value);
long request = container.getResource().getMemorySize() * 1024 * 1024;
// Check if the container has exceeded its limits.
if (usage > request) {
- // Kill the container
- // We could call the regular cleanup but that sends a
- // SIGTERM first that cannot be handled by frozen processes.
- // Walk through the cgroup
- // tasks file and kill all processes in it
- sigKill(container);
+ outOfLimit = true;
String message = String.format(
- "Container %s was killed by elastic cgroups OOM handler using %d " +
+ "Container %s is out of its limits, using %d " +
"when requested only %d",
container.getContainerId(), usage, request);
LOG.warn(message);
- return true;
}
} catch (ResourceHandlerException ex) {
LOG.warn(String.format("Could not access memory resource for %s",
container.getContainerId()), ex);
} catch (NumberFormatException ex) {
- LOG.warn(String.format("Could not parse %s in %s",
- value, container.getContainerId()));
+ LOG.warn(String.format("Could not parse %s in %s", value,
+ container.getContainerId()));
}
- return false;
+ return outOfLimit;
}
/**
@@ -168,21 +163,16 @@ private void sigKill(Container container) {
/**
* It is called when the node is under an OOM condition. All processes in
* all sub-cgroups are suspended. We need to act fast, so that we do not
- * affect the overall system utilization.
- * In general we try to find a newly run container that exceeded its limits.
- * The justification is cost, since probably this is the one that has
- * accumulated the least amount of uncommitted data so far.
- * We continue the process until the OOM is resolved.
+ * affect the overall system utilization. In general we try to find a
+ * newly launched container that exceeded its limits. The justification is
+ * cost, since probably this is the one that has accumulated the least
+ * amount of uncommitted data so far. OPPORTUNISTIC containers are always
+ * killed before any GUARANTEED containers are considered. We continue the
+ * process until the OOM is resolved.
*/
@Override
public void run() {
try {
- // Reverse order by start time
- Comparator comparator = (Container o1, Container o2) -> {
- long order = o1.getContainerStartTime() - o2.getContainerStartTime();
- return order > 0 ? -1 : order < 0 ? 1 : 0;
- };
-
// We kill containers until the kernel reports the OOM situation resolved
// Note: If the kernel has a delay this may kill more than necessary
while (true) {
@@ -194,61 +184,106 @@ public void run() {
break;
}
- // The first pass kills a recent container
- // that uses more than its request
- ArrayList containers = new ArrayList<>();
- containers.addAll(context.getContainers().values());
- // Note: Sorting may take a long time with 10K+ containers
- // but it is acceptable now with low number of containers per node
- containers.sort(comparator);
-
- // Kill the latest container that exceeded its request
- boolean found = false;
- for (Container container : containers) {
- if (!virtual) {
- if (killContainerIfOOM(container,
- CGROUP_PARAM_MEMORY_USAGE_BYTES)) {
- found = true;
- break;
- }
- } else {
- if (killContainerIfOOM(container,
- CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) {
- found = true;
- break;
- }
- }
- }
- if (found) {
- continue;
- }
+ boolean containerKilled = killContainer();
- // We have not found any containers that ran out of their limit,
- // so we will kill the latest one. This can happen, if all use
- // close to their request and one of them requests a big block
- // triggering the OOM freeze.
- // Currently there is no other way to identify the outstanding one.
- if (containers.size() > 0) {
- Container container = containers.get(0);
- sigKill(container);
- String message = String.format(
- "Newest container %s killed by elastic cgroups OOM handler using",
- container.getContainerId());
- LOG.warn(message);
- continue;
+ if (!containerKilled) {
+ // This can happen, if SIGKILL did not clean up
+ // non-PGID or containers or containers launched by other users
+ // or if a process was put to the root YARN cgroup.
+ throw new YarnRuntimeException(
+ "Could not find any containers but CGroups " +
+ "reserved for containers ran out of memory. " +
+ "I am giving up");
}
-
- // This can happen, if SIGKILL did not clean up
- // non-PGID or containers or containers launched by other users
- // or if a process was put to the root YARN cgroup.
- throw new YarnRuntimeException(
- "Could not find any containers but CGroups " +
- "reserved for containers ran out of memory. " +
- "I am giving up");
}
} catch (ResourceHandlerException ex) {
- LOG.warn("Could not fecth OOM status. " +
+ LOG.warn("Could not fetch OOM status. " +
"This is expected at shutdown. Exiting.", ex);
}
}
+
+ /**
+ * Choose and kill a container in case of OOM. We try to find the most
+ * recently launched OPPORTUNISTIC container that exceeds its limit
+ * and fall back to the most recently launched OPPORTUNISTIC container
+ * If there is no such container found, we choose to kill a GUARANTEED
+ * container in the same way.
+ * @return true if a container is killed, false otherwise
+ */
+ protected boolean killContainer() {
+ boolean containerKilled = false;
+
+ ArrayList candidates = new ArrayList<>(0);
+ for (Container container : context.getContainers().values()) {
+ candidates.add(
+ new ContainerCandidate(container, isContainerOutOfLimit(container)));
+ }
+ Collections.sort(candidates);
+
+ if (candidates.size() > 0) {
+ ContainerCandidate candidate = candidates.get(0);
+ sigKill(candidate.container);
+ String message = String.format(
+ "container %s killed by elastic cgroups OOM handler.",
+ candidate.container.getContainerId());
+ LOG.warn(message);
+ containerKilled = true;
+ }
+ return containerKilled;
+ }
+
+ private static class ContainerCandidate
+ implements Comparable {
+ final boolean outOfLimit;
+ final long launchTIme;
+ final boolean opportunistic;
+ final Container container;
+
+ ContainerCandidate(Container container, boolean outOfLimit) {
+ this.outOfLimit = outOfLimit;
+ this.container = container;
+ this.launchTIme = container.getContainerLaunchTime();
+ this.opportunistic = isOpportunistic(container);
+ }
+
+ /**
+ * Order two containers by their execution type, followed by
+ * their out-of-limit status and then launch time. Opportunistic
+ * containers are ordered before Guaranteed containers. If two
+ * containers are of the same execution type, the one that is
+ * out of its limits is ordered before the one that isn't. If
+ * two containers have the same execution type and out-of-limit
+ * status, the one that's launched later is ordered before the
+ * other one.
+ */
+ @Override
+ public int compareTo(ContainerCandidate o) {
+ int ret = Boolean.compare(o.opportunistic, this.opportunistic);
+ if (ret == 0) {
+ // the two containers are of the same execution type, order them
+ // by their out-of-limit status.
+ int outOfLimitRet = Boolean.compare(o.outOfLimit, outOfLimit);
+ if (outOfLimitRet == 0) {
+ // the two containers are also of the same out-of-limit status,
+ // order them by their launch time
+ ret = Long.compare(o.container.getContainerLaunchTime(),
+ this.container.getContainerLaunchTime());
+ } else {
+ ret = outOfLimitRet;
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Check if a container is OPPORTUNISTIC or not. A container is
+ * considered OPPORTUNISTIC only if its execution type is not
+ * null and is OPPORTUNISTIC.
+ */
+ private static boolean isOpportunistic(Container container) {
+ return container.getContainerTokenIdentifier() != null &&
+ ExecutionType.OPPORTUNISTIC.equals(
+ container.getContainerTokenIdentifier().getExecutionType());
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java
index 60c38fe..4be3f55 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java
@@ -20,8 +20,8 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
@@ -31,14 +31,12 @@
import org.junit.Test;
import java.io.IOException;
-import java.util.LinkedHashMap;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_FILE_TASKS;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_USAGE_BYTES;
-import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -51,13 +49,13 @@
public class TestDefaultOOMHandler {
/**
- * Test an OOM situation where no containers are running.
+ * Test an OOM situation where there are no containers that can be killed.
*/
@Test(expected = YarnRuntimeException.class)
- public void testNoContainers() throws Exception {
+ public void testExceptionThrownWithNoContainersToKill() throws Exception {
Context context = mock(Context.class);
- when(context.getContainers()).thenReturn(new ConcurrentHashMap<>());
+ when(context.getContainers()).thenReturn(new ConcurrentHashMap<>(0));
CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
when(cGroupsHandler.getCGroupParam(
@@ -66,59 +64,69 @@ public void testNoContainers() throws Exception {
CGROUP_PARAM_MEMORY_OOM_CONTROL))
.thenReturn("under_oom 1").thenReturn("under_oom 0");
- DefaultOOMHandler handler = new DefaultOOMHandler(context, false);
- handler.setCGroupsHandler(cGroupsHandler);
+ DefaultOOMHandler handler = new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
handler.run();
}
/**
- * We have two containers, both out of limit. We should kill the later one.
- *
- * @throws Exception exception
+ * We have two guaranteed containers, both of which are out of limit.
+ * We should kill the later one.
*/
@Test
- public void testBothContainersOOM() throws Exception {
+ public void testBothGuaranteedContainersOverLimitUponOOM() throws Exception {
ConcurrentHashMap containers =
- new ConcurrentHashMap<>(new LinkedHashMap<>());
+ new ConcurrentHashMap<>();
+ Container c1 = createContainer(1, true, 1L);
+ containers.put(c1.getContainerId(), c1);
+ Container c2 = createContainer(2, true, 2L);
+ containers.put(c2.getContainerId(), c2);
- Container c1 = mock(Container.class);
- ContainerId cid1 = createContainerId(1);
- when(c1.getContainerId()).thenReturn(cid1);
- when(c1.getResource()).thenReturn(Resource.newInstance(10, 1));
- when(c1.getContainerStartTime()).thenReturn((long) 1);
- containers.put(createContainerId(1), c1);
+ ContainerExecutor ex = createContainerExecutor(containers);
+ Context context = mock(Context.class);
+ when(context.getContainers()).thenReturn(containers);
+ when(context.getContainerExecutor()).thenReturn(ex);
- Container c2 = mock(Container.class);
- ContainerId cid2 = createContainerId(2);
- when(c2.getContainerId()).thenReturn(cid2);
- when(c2.getResource()).thenReturn(Resource.newInstance(10, 1));
- when(c2.getContainerStartTime()).thenReturn((long) 2);
- containers.put(cid2, c2);
CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+ when(cGroupsHandler.getCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY,
+ "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL))
+ .thenReturn("under_oom 1").thenReturn("under_oom 0");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid1.toString(), CGROUP_FILE_TASKS))
+ c1.getContainerId().toString(), CGROUP_FILE_TASKS))
.thenReturn("1234").thenReturn("");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
.thenReturn(getMB(11));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
.thenReturn(getMB(11));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid2.toString(), CGROUP_FILE_TASKS))
+ c2.getContainerId().toString(), CGROUP_FILE_TASKS))
.thenReturn("1235").thenReturn("");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
.thenReturn(getMB(11));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
.thenReturn(getMB(11));
- ContainerExecutor ex = mock(ContainerExecutor.class);
+ DefaultOOMHandler handler =
+ new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
+ handler.run();
- runOOMHandler(containers, cGroupsHandler, ex);
verify(ex, times(1)).signalContainer(
new ContainerSignalContext.Builder()
@@ -131,52 +139,185 @@ public void testBothContainersOOM() throws Exception {
}
/**
- * We have two containers, one out of limit. We should kill that one.
- * This should happen even, if it was started earlier
- *
- * @throws Exception exception
+ * We have two GUARANTEED containers, one of which is out of limit.
+ * We should kill the one that's out of its limit. This should
+ * happen even if it was launched earlier than the other one.
*/
@Test
- public void testOneContainerOOM() throws Exception {
+ public void testOneGuaranteedContainerOverLimitUponOOM() throws Exception {
ConcurrentHashMap containers =
- new ConcurrentHashMap<>(new LinkedHashMap<>());
+ new ConcurrentHashMap<>();
+ Container c1 = createContainer(1, true, 2L);
+ containers.put(c1.getContainerId(), c1);
+ Container c2 = createContainer(2, true, 1L);
+ containers.put(c2.getContainerId(), c2);
- Container c1 = mock(Container.class);
- ContainerId cid1 = createContainerId(1);
- when(c1.getContainerId()).thenReturn(cid1);
- when(c1.getResource()).thenReturn(Resource.newInstance(10, 1));
- when(c1.getContainerStartTime()).thenReturn((long) 2);
- containers.put(createContainerId(1), c1);
+ ContainerExecutor ex = createContainerExecutor(containers);
+ Context context = mock(Context.class);
+ when(context.getContainers()).thenReturn(containers);
+ when(context.getContainerExecutor()).thenReturn(ex);
- Container c2 = mock(Container.class);
- ContainerId cid2 = createContainerId(2);
- when(c2.getContainerId()).thenReturn(cid2);
- when(c2.getResource()).thenReturn(Resource.newInstance(10, 1));
- when(c2.getContainerStartTime()).thenReturn((long) 1);
- containers.put(cid2, c2);
+ CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+ when(cGroupsHandler.getCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY,
+ "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL))
+ .thenReturn("under_oom 1").thenReturn("under_oom 0");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1234").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(9));
+
+ // container c2 is out of its limit
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1235").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(11));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(11));
+
+ DefaultOOMHandler handler =
+ new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
+ handler.run();
+
+ verify(ex, times(1)).signalContainer(
+ new ContainerSignalContext.Builder()
+ .setPid("1235")
+ .setContainer(c2)
+ .setSignal(ContainerExecutor.Signal.KILL)
+ .build()
+ );
+ verify(ex, times(1)).signalContainer(any());
+ }
+
+ /**
+ * We have two GUARANTEE containers, neither of which is out of limit.
+ * We should kill the later launched one.
+ */
+ @Test
+ public void testNoGuaranteedContainerOverLimitOOM() throws Exception {
+ ConcurrentHashMap containers =
+ new ConcurrentHashMap<>();
+ Container c1 = createContainer(1, true, 1L);
+ containers.put(c1.getContainerId(), c1);
+ Container c2 = createContainer(2, true, 2L);
+ containers.put(c2.getContainerId(), c2);
+
+ ContainerExecutor ex = createContainerExecutor(containers);
+ Context context = mock(Context.class);
+ when(context.getContainers()).thenReturn(containers);
+ when(context.getContainerExecutor()).thenReturn(ex);
CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+ when(cGroupsHandler.getCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY,
+ "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL))
+ .thenReturn("under_oom 1").thenReturn("under_oom 0");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid1.toString(), CGROUP_FILE_TASKS))
+ c1.getContainerId().toString(), CGROUP_FILE_TASKS))
.thenReturn("1234").thenReturn("");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
.thenReturn(getMB(9));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
.thenReturn(getMB(9));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid2.toString(), CGROUP_FILE_TASKS))
+ c2.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1235").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(9));
+
+ DefaultOOMHandler handler =
+ new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
+ handler.run();
+
+ verify(ex, times(1)).signalContainer(
+ new ContainerSignalContext.Builder()
+ .setPid("1235")
+ .setContainer(c2)
+ .setSignal(ContainerExecutor.Signal.KILL)
+ .build()
+ );
+ verify(ex, times(1)).signalContainer(any());
+ }
+
+ /**
+ * We have two opportunistic containers, both of which are out of limit.
+ * We should kill the later one.
+ */
+ @Test
+ public void testBothOpportunisticContainersOverLimitUponOOM() throws Exception {
+ ConcurrentHashMap containers =
+ new ConcurrentHashMap<>();
+ Container c1 = createContainer(1, false, 1L);
+ containers.put(c1.getContainerId(), c1);
+ Container c2 = createContainer(2, false, 2L);
+ containers.put(c2.getContainerId(), c2);
+
+ ContainerExecutor ex = createContainerExecutor(containers);
+ Context context = mock(Context.class);
+ when(context.getContainers()).thenReturn(containers);
+ when(context.getContainerExecutor()).thenReturn(ex);
+
+
+ CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+ when(cGroupsHandler.getCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY,
+ "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL))
+ .thenReturn("under_oom 1").thenReturn("under_oom 0");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1234").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(11));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(11));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_FILE_TASKS))
.thenReturn("1235").thenReturn("");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
.thenReturn(getMB(11));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
.thenReturn(getMB(11));
- ContainerExecutor ex = mock(ContainerExecutor.class);
- runOOMHandler(containers, cGroupsHandler, ex);
+ DefaultOOMHandler handler =
+ new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
+ handler.run();
+
verify(ex, times(1)).signalContainer(
new ContainerSignalContext.Builder()
@@ -189,51 +330,120 @@ public void testOneContainerOOM() throws Exception {
}
/**
- * We have two containers, neither out of limit. We should kill the later one.
- *
- * @throws Exception exception
+ * We have two OPPORTUNISTIC containers, one of which is out of limit.
+ * We should kill the one that's out of its limit. This should
+ * happen even if it was launched earlier than the other one.
*/
@Test
- public void testNoContainerOOM() throws Exception {
+ public void testOneOpportunisticContainerOverLimitUponOOM() throws Exception {
ConcurrentHashMap containers =
- new ConcurrentHashMap<>(new LinkedHashMap<>());
+ new ConcurrentHashMap<>();
+ Container c1 = createContainer(1, false, 2L);
+ containers.put(c1.getContainerId(), c1);
+ Container c2 = createContainer(2, false, 1L);
+ containers.put(c2.getContainerId(), c2);
- Container c1 = mock(Container.class);
- ContainerId cid1 = createContainerId(1);
- when(c1.getContainerId()).thenReturn(cid1);
- when(c1.getResource()).thenReturn(Resource.newInstance(10, 1));
- when(c1.getContainerStartTime()).thenReturn((long) 1);
- containers.put(createContainerId(1), c1);
+ ContainerExecutor ex = createContainerExecutor(containers);
+ Context context = mock(Context.class);
+ when(context.getContainers()).thenReturn(containers);
+ when(context.getContainerExecutor()).thenReturn(ex);
+
+ CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+ when(cGroupsHandler.getCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY,
+ "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL))
+ .thenReturn("under_oom 1").thenReturn("under_oom 0");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1234").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ // contnainer c2 is out of its limit
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1235").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(11));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(11));
+
+ DefaultOOMHandler handler =
+ new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
+ handler.run();
+
+ verify(ex, times(1)).signalContainer(
+ new ContainerSignalContext.Builder()
+ .setPid("1235")
+ .setContainer(c2)
+ .setSignal(ContainerExecutor.Signal.KILL)
+ .build()
+ );
+ verify(ex, times(1)).signalContainer(any());
+ }
- Container c2 = mock(Container.class);
- ContainerId cid2 = createContainerId(2);
- when(c2.getContainerId()).thenReturn(cid2);
- when(c2.getResource()).thenReturn(Resource.newInstance(10, 1));
- when(c2.getContainerStartTime()).thenReturn((long) 2);
- containers.put(cid2, c2);
+ /**
+ * We have two OPPORTUNISTIC containers, neither of which is out of limit.
+ * We should kill the later one.
+ */
+ @Test
+ public void testNoOpportunisticContainerOverLimitOOM() throws Exception {
+ ConcurrentHashMap containers =
+ new ConcurrentHashMap<>();
+ Container c1 = createContainer(1, false, 1L);
+ containers.put(c1.getContainerId(), c1);
+ Container c2 = createContainer(2, false, 2L);
+ containers.put(c2.getContainerId(), c2);
+
+ ContainerExecutor ex = createContainerExecutor(containers);
+ Context context = mock(Context.class);
+ when(context.getContainers()).thenReturn(containers);
+ when(context.getContainerExecutor()).thenReturn(ex);
CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+ when(cGroupsHandler.getCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY,
+ "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL))
+ .thenReturn("under_oom 1").thenReturn("under_oom 0");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid1.toString(), CGROUP_FILE_TASKS))
+ c1.getContainerId().toString(), CGROUP_FILE_TASKS))
.thenReturn("1234").thenReturn("");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
.thenReturn(getMB(9));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
.thenReturn(getMB(9));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid2.toString(), CGROUP_FILE_TASKS))
+ c2.getContainerId().toString(), CGROUP_FILE_TASKS))
.thenReturn("1235").thenReturn("");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
.thenReturn(getMB(9));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
- cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
.thenReturn(getMB(9));
- ContainerExecutor ex = mock(ContainerExecutor.class);
- runOOMHandler(containers, cGroupsHandler, ex);
+ DefaultOOMHandler handler =
+ new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
+ handler.run();
verify(ex, times(1)).signalContainer(
new ContainerSignalContext.Builder()
@@ -245,43 +455,509 @@ public void testNoContainerOOM() throws Exception {
verify(ex, times(1)).signalContainer(any());
}
- private void runOOMHandler(
- ConcurrentHashMap containers,
- CGroupsHandler cGroupsHandler, ContainerExecutor ex)
- throws IOException, ResourceHandlerException {
+ /**
+ * We have two OPPORTUNISTIC containers and one GUARANTEED container.
+ * One of the OPPORTUNISTIC container is out of limit.
+ * OOM is resolved after killing the OPPORTUNISTIC container that
+ * exceeded its limit even though it is launched earlier than the
+ * other OPPORTUNISTIC container.
+ */
+ @Test
+ public void testKillOneOverLimitOpportunisticContainerUponOOM() throws Exception {
+ ConcurrentHashMap containers =
+ new ConcurrentHashMap<>();
+ int currentContainerId = 0;
+ Container c1 = createContainer(currentContainerId++, false, 2);
+ containers.put(c1.getContainerId(), c1);
+ Container c2 = createContainer(currentContainerId++, false, 1);
+ containers.put(c2.getContainerId(), c2);
+ Container c3 = createContainer(currentContainerId++, true, 1);
+ containers.put(c3.getContainerId(), c3);
+
+ ContainerExecutor ex = createContainerExecutor(containers);
Context context = mock(Context.class);
when(context.getContainers()).thenReturn(containers);
+ when(context.getContainerExecutor()).thenReturn(ex);
- when(ex.signalContainer(any()))
- .thenAnswer(invocation -> {
- assertEquals("Wrong pid killed", "1235",
- ((ContainerSignalContext) invocation.getArguments()[0]).getPid());
- return true;
- });
+ CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+ when(cGroupsHandler.getCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY,
+ "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL))
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 0");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1234").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(9));
+
+ // container c2 is out of its limit
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1235").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(11));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(11));
+
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1236").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(9));
+
+ DefaultOOMHandler handler =
+ new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
+ handler.run();
+ verify(ex, times(1)).signalContainer(
+ new ContainerSignalContext.Builder()
+ .setPid("1235")
+ .setContainer(c2)
+ .setSignal(ContainerExecutor.Signal.KILL)
+ .build()
+ );
+ verify(ex, times(1)).signalContainer(any());
+ }
+ /**
+ * We have two OPPORTUNISTIC containers and one GUARANTEED container.
+ * None of the containers exceeded its memory limit.
+ * OOM is resolved after killing the most recently launched OPPORTUNISTIC
+ * container.
+ */
+ @Test
+ public void testKillOneLaterOpportunisticContainerUponOOM() throws Exception {
+ ConcurrentHashMap containers =
+ new ConcurrentHashMap<>();
+ int currentContainerId = 0;
+ Container c1 = createContainer(currentContainerId++, false, 1);
+ containers.put(c1.getContainerId(), c1);
+ Container c2 = createContainer(currentContainerId++, false, 2);
+ containers.put(c2.getContainerId(), c2);
+ Container c3 = createContainer(currentContainerId++, true, 1);
+ containers.put(c3.getContainerId(), c3);
+
+ ContainerExecutor ex = createContainerExecutor(containers);
+ Context context = mock(Context.class);
+ when(context.getContainers()).thenReturn(containers);
+ when(context.getContainerExecutor()).thenReturn(ex);
+
+ CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
when(cGroupsHandler.getCGroupParam(
CGroupsHandler.CGroupController.MEMORY,
"",
CGROUP_PARAM_MEMORY_OOM_CONTROL))
- .thenReturn("under_oom 1").thenReturn("under_oom 0");
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 0");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1234").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1235").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1236").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(9));
+
+ DefaultOOMHandler handler =
+ new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
+ handler.run();
+
+ verify(ex, times(1)).signalContainer(
+ new ContainerSignalContext.Builder()
+ .setPid("1235")
+ .setContainer(c2)
+ .setSignal(ContainerExecutor.Signal.KILL)
+ .build()
+ );
+ verify(ex, times(1)).signalContainer(any());
+ }
+
+ /**
+ * We have two OPPORTUNISTIC containers and one GUARANTEED container.
+ * One of the OPPORTUNISTIC container is out of limit.
+ * OOM is resolved after killing both OPPORTUNISTIC containers.
+ */
+ @Test
+ public void testKillBothOpportunisticContainerUponOOM() throws Exception {
+ int currentContainerId = 0;
+
+ ConcurrentHashMap containers =
+ new ConcurrentHashMap<>();
+ Container c1 = createContainer(currentContainerId++, false, 2);
+ containers.put(c1.getContainerId(), c1);
+ Container c2 = createContainer(currentContainerId++, false, 1);
+ containers.put(c2.getContainerId(), c2);
+ Container c3 = createContainer(currentContainerId++, true, 1);
+ containers.put(c3.getContainerId(), c3);
+
+ ContainerExecutor ex = createContainerExecutor(containers);
+ Context context = mock(Context.class);
+ when(context.getContainers()).thenReturn(containers);
+ when(context.getContainerExecutor()).thenReturn(ex);
+
+ CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+ when(cGroupsHandler.getCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY,
+ "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL))
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 0");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1234").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1235").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(11));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(11));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1236").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(9));
+
+
+ DefaultOOMHandler handler =
+ new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
+ handler.run();
+
+ verify(ex, times(1)).signalContainer(
+ new ContainerSignalContext.Builder()
+ .setPid("1235")
+ .setContainer(c1)
+ .setSignal(ContainerExecutor.Signal.KILL)
+ .build()
+ );
+ verify(ex, times(1)).signalContainer(
+ new ContainerSignalContext.Builder()
+ .setPid("1234")
+ .setContainer(c2)
+ .setSignal(ContainerExecutor.Signal.KILL)
+ .build()
+ );
+ verify(ex, times(2)).signalContainer(any());
+ }
+
+ /**
+ * We have two OPPORTUNISTIC containers and one GUARANTEED container.
+ * the GUARANTEED container is out of limit. OOM is resolved
+ * after first killing the two OPPORTUNISTIC containers and then the
+ * GUARANTEED container.
+ */
+ @Test
+ public void testKillGuaranteedContainerUponOOM() throws Exception {
+ int currentContainerId = 0;
+
+ ConcurrentHashMap containers =
+ new ConcurrentHashMap<>();
+ Container c1 = createContainer(currentContainerId++, false, 2);
+ containers.put(c1.getContainerId(), c1);
+ Container c2 = createContainer(currentContainerId++, false, 1);
+ containers.put(c2.getContainerId(), c2);
+ Container c3 = createContainer(currentContainerId++, true, 1);
+ containers.put(c3.getContainerId(), c3);
+
+ ContainerExecutor ex = createContainerExecutor(containers);
+ Context context = mock(Context.class);
+ when(context.getContainers()).thenReturn(containers);
+ when(context.getContainerExecutor()).thenReturn(ex);
+
+ CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+ when(cGroupsHandler.getCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY,
+ "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL))
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 0");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1234").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1235").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1236").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(11));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(11));
+
+ DefaultOOMHandler handler =
+ new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
+ handler.run();
+
+ verify(ex, times(1)).signalContainer(
+ new ContainerSignalContext.Builder()
+ .setPid("1234")
+ .setContainer(c1)
+ .setSignal(ContainerExecutor.Signal.KILL)
+ .build()
+ );
+ verify(ex, times(1)).signalContainer(
+ new ContainerSignalContext.Builder()
+ .setPid("1235")
+ .setContainer(c1)
+ .setSignal(ContainerExecutor.Signal.KILL)
+ .build()
+ );
+ verify(ex, times(1)).signalContainer(
+ new ContainerSignalContext.Builder()
+ .setPid("1236")
+ .setContainer(c1)
+ .setSignal(ContainerExecutor.Signal.KILL)
+ .build()
+ );
+ verify(ex, times(3)).signalContainer(any());
+ }
+
+ /**
+ * We have two OPPORTUNISTIC containers and one GUARANTEED container.
+ * None of the containers exceeded its memory limit.
+ * OOM is resolved after killing all running containers.
+ */
+ @Test
+ public void testKillAllContainersUponOOM() throws Exception {
+ int currentContainerId = 0;
+ ConcurrentHashMap containers =
+ new ConcurrentHashMap<>();
+ Container c1 = createContainer(currentContainerId++, false, 1);
+ containers.put(c1.getContainerId(), c1);
+ Container c2 = createContainer(currentContainerId++, false, 2);
+ containers.put(c2.getContainerId(), c2);
+ Container c3 = createContainer(currentContainerId++, true, 1);
+ containers.put(c3.getContainerId(), c3);
+
+ ContainerExecutor ex = createContainerExecutor(containers);
+ Context context = mock(Context.class);
+ when(context.getContainers()).thenReturn(containers);
when(context.getContainerExecutor()).thenReturn(ex);
- DefaultOOMHandler handler = new DefaultOOMHandler(context, false);
- handler.setCGroupsHandler(cGroupsHandler);
+ CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+ when(cGroupsHandler.getCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY,
+ "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL))
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 0");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1234").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1235").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1236").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ DefaultOOMHandler handler =
+ new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
handler.run();
+
+ verify(ex, times(1)).signalContainer(
+ new ContainerSignalContext.Builder()
+ .setPid("1234")
+ .setContainer(c2)
+ .setSignal(ContainerExecutor.Signal.KILL)
+ .build()
+ );
+ verify(ex, times(1)).signalContainer(
+ new ContainerSignalContext.Builder()
+ .setPid("1235")
+ .setContainer(c1)
+ .setSignal(ContainerExecutor.Signal.KILL)
+ .build()
+ );
+ verify(ex, times(1)).signalContainer(
+ new ContainerSignalContext.Builder()
+ .setPid("1236")
+ .setContainer(c3)
+ .setSignal(ContainerExecutor.Signal.KILL)
+ .build()
+ );
+ verify(ex, times(3)).signalContainer(any());
}
- private class AppId extends ApplicationIdPBImpl {
- AppId(long clusterTs, int appId) {
- this.setClusterTimestamp(clusterTs);
- this.setId(appId);
- }
+ /**
+ * We have two OPPORTUNISTIC containers and one GUARANTEED container.
+ * None of the containers exceeded its memory limit.
+ * OOM is not resolved even after killing all running containers.
+ * A YarnRuntimeException is excepted to be thrown.
+ */
+ @Test(expected = YarnRuntimeException.class)
+ public void testOOMUnresolvedAfterKillingAllContainers() throws Exception {
+ int currentContainerId = 0;
+
+ ConcurrentHashMap containers =
+ new ConcurrentHashMap<>();
+ Container c1 = createContainer(currentContainerId++, false, 1);
+ containers.put(c1.getContainerId(), c1);
+ Container c2 = createContainer(currentContainerId++, false, 2);
+ containers.put(c2.getContainerId(), c2);
+ Container c3 = createContainer(currentContainerId++, true, 3);
+ containers.put(c3.getContainerId(), c3);
+
+ ContainerExecutor ex = createContainerExecutor(containers);
+ Context context = mock(Context.class);
+ when(context.getContainers()).thenReturn(containers);
+ when(context.getContainerExecutor()).thenReturn(ex);
+
+ CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
+ when(cGroupsHandler.getCGroupParam(
+ CGroupsHandler.CGroupController.MEMORY,
+ "",
+ CGROUP_PARAM_MEMORY_OOM_CONTROL))
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 1")
+ .thenReturn("under_oom 1");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1234").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1235").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_FILE_TASKS))
+ .thenReturn("1236").thenReturn("");
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
+ .thenReturn(getMB(9));
+ when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
+ c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
+ .thenReturn(getMB(9));
+
+ DefaultOOMHandler handler =
+ new DefaultOOMHandler(context, false) {
+ @Override
+ protected CGroupsHandler getCGroupsHandler() {
+ return cGroupsHandler;
+ }
+ };
+ handler.run();
}
- private ContainerId createContainerId(int id) {
- ApplicationId applicationId = new AppId(1, 1);
+ private static ContainerId createContainerId(int id) {
+ ApplicationId applicationId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId applicationAttemptId
= mock(ApplicationAttemptId.class);
@@ -295,13 +971,41 @@ private ContainerId createContainerId(int id) {
return containerId;
}
- ContainerTokenIdentifier getToken() {
- ContainerTokenIdentifier id = mock(ContainerTokenIdentifier.class);
- when(id.getVersion()).thenReturn(1);
- return id;
+ private static Container createContainer(int containerId,
+ boolean guaranteed, long launchTime) {
+ Container c1 = mock(Container.class);
+ ContainerId cid1 = createContainerId(containerId);
+ when(c1.getContainerId()).thenReturn(cid1);
+
+ ContainerTokenIdentifier token = mock(ContainerTokenIdentifier.class);
+ ExecutionType type =
+ guaranteed ? ExecutionType.GUARANTEED : ExecutionType.OPPORTUNISTIC;
+ when(token.getExecutionType()).thenReturn(type);
+ when(c1.getContainerTokenIdentifier()).thenReturn(token);
+
+ when(c1.getResource()).thenReturn(Resource.newInstance(10, 1));
+ when(c1.getContainerLaunchTime()).thenReturn(launchTime);
+
+ return c1;
}
String getMB(long mb) {
return Long.toString(mb * 1024 * 1024);
}
+
+ private static ContainerExecutor createContainerExecutor(
+ ConcurrentHashMap containers)
+ throws IOException {
+ ContainerExecutor ex = mock(ContainerExecutor.class);
+ when(ex.signalContainer(any())).thenAnswer(
+ invocation -> {
+ Object[] arguments = invocation.getArguments();
+ Container container = ((ContainerSignalContext)
+ arguments[0]).getContainer();
+ // remove container from NM context immediately
+ containers.remove(container.getContainerId());
+ return true;
+ });
+ return ex;
+ }
}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index 77ebd34..325709b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -242,6 +242,11 @@ public long getContainerStartTime() {
}
@Override
+ public long getContainerLaunchTime() {
+ return 0;
+ }
+
+ @Override
public ResourceMappings getResourceMappings() {
return null;
}