diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index 86f2554..9f2efe1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -37,8 +37,17 @@ ContainerId getContainerId(); + /** + * The timestamp when the container start request is received. + * @return + */ long getContainerStartTime(); + /** + * The timestamp when the container is allowed to be launched. + */ + long getContainerLaunchTime(); + Resource getResource(); ContainerTokenIdentifier getContainerTokenIdentifier(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 5527ac4..95ab374 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -883,6 +883,11 @@ public long getContainerStartTime() { } @Override + public long getContainerLaunchTime() { + return this.containerLaunchStartTime; + } + + @Override public Resource getResource() { return Resources.clone( this.containerTokenIdentifier.getResource()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java index c690225..ba79830 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

+ * * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -44,37 +45,58 @@ @InterfaceAudience.Public @InterfaceStability.Evolving public class DefaultOOMHandler implements Runnable { - protected static final Log LOG = LogFactory + private static final Log LOG = LogFactory .getLog(DefaultOOMHandler.class); - private Context context; - private boolean virtual; - private CGroupsHandler cgroups; + private final Context context; + private final boolean enforceVirtualMemory; + private final CGroupsHandler cgroups; + + // Order by execution type and then reverse order by launch time + private static final Comparator CONTAINER_LAUNCH_TIME_COMPARATOR = + (Container c1, Container c2) -> { + boolean isC1Opportunistic = isOpportunistic(c1); + boolean isC2Opportunistic = isOpportunistic(c2); + if (isC1Opportunistic == isC2Opportunistic) { + // the two containers are of the same execution type + long order = c1.getContainerLaunchTime() - c2.getContainerLaunchTime(); + return order > 0 ? -1 : order < 0 ? 1 : 0; + } else if (isC1Opportunistic) { + // container c1 is opportunistic and c2 is guaranteed + return 1; + } else { + // container c1 is guaranteed and c2 is opportunistic + return -1; + } + }; /** * Create an OOM handler. * This has to be public to be able to construct through reflection. * @param context node manager context to work with - * @param testVirtual Test virtual memory or physical + * @param enforceVirtualMemory true if virtual memory needs to be checked, + * false if physical memory needs to be checked instead */ - public DefaultOOMHandler(Context context, boolean testVirtual) { + public DefaultOOMHandler(Context context, boolean enforceVirtualMemory) { this.context = context; - this.virtual = testVirtual; - this.cgroups = ResourceHandlerModule.getCGroupsHandler(); + this.enforceVirtualMemory = enforceVirtualMemory; + this.cgroups = getCGroupsHandler(); } @VisibleForTesting - void setCGroupsHandler(CGroupsHandler handler) { - cgroups = handler; + protected CGroupsHandler getCGroupsHandler() { + return ResourceHandlerModule.getCGroupsHandler(); } /** - * Kill the container, if it has exceeded its request. + * Kill the GUARANTEED container, if it has exceeded its request. * * @param container Container to check * @param fileName CGroup filename (physical or swap/virtual) * @return true, if the container was preempted */ - private boolean killContainerIfOOM(Container container, String fileName) { + private boolean killGuaranteedContainerIfOOM( + Container container, String fileName) { + assert(!isOpportunistic(container)); String value = null; try { value = cgroups.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, @@ -168,21 +190,12 @@ private void sigKill(Container container) { /** * It is called when the node is under an OOM condition. All processes in * all sub-cgroups are suspended. We need to act fast, so that we do not - * affect the overall system utilization. - * In general we try to find a newly run container that exceeded its limits. - * The justification is cost, since probably this is the one that has - * accumulated the least amount of uncommitted data so far. - * We continue the process until the OOM is resolved. + * affect the overall system utilization. We continue the process until + * the OOM is resolved. */ @Override public void run() { try { - // Reverse order by start time - Comparator comparator = (Container o1, Container o2) -> { - long order = o1.getContainerStartTime() - o2.getContainerStartTime(); - return order > 0 ? -1 : order < 0 ? 1 : 0; - }; - // We kill containers until the kernel reports the OOM situation resolved // Note: If the kernel has a delay this may kill more than necessary while (true) { @@ -194,61 +207,86 @@ public void run() { break; } - // The first pass kills a recent container - // that uses more than its request - ArrayList containers = new ArrayList<>(); - containers.addAll(context.getContainers().values()); - // Note: Sorting may take a long time with 10K+ containers - // but it is acceptable now with low number of containers per node - containers.sort(comparator); - - // Kill the latest container that exceeded its request - boolean found = false; - for (Container container : containers) { - if (!virtual) { - if (killContainerIfOOM(container, - CGROUP_PARAM_MEMORY_USAGE_BYTES)) { - found = true; - break; - } - } else { - if (killContainerIfOOM(container, - CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) { - found = true; - break; - } - } - } - if (found) { - continue; - } + boolean containerKilled = killContainer(); - // We have not found any containers that ran out of their limit, - // so we will kill the latest one. This can happen, if all use - // close to their request and one of them requests a big block - // triggering the OOM freeze. - // Currently there is no other way to identify the outstanding one. - if (containers.size() > 0) { - Container container = containers.get(0); - sigKill(container); - String message = String.format( - "Newest container %s killed by elastic cgroups OOM handler using", - container.getContainerId()); - LOG.warn(message); - continue; + if (!containerKilled) { + // This can happen, if SIGKILL did not clean up + // non-PGID or containers or containers launched by other users + // or if a process was put to the root YARN cgroup. + throw new YarnRuntimeException( + "Could not find any containers but CGroups " + + "reserved for containers ran out of memory. " + + "I am giving up"); } - - // This can happen, if SIGKILL did not clean up - // non-PGID or containers or containers launched by other users - // or if a process was put to the root YARN cgroup. - throw new YarnRuntimeException( - "Could not find any containers but CGroups " + - "reserved for containers ran out of memory. " + - "I am giving up"); } } catch (ResourceHandlerException ex) { LOG.warn("Could not fecth OOM status. " + "This is expected at shutdown. Exiting.", ex); } } + + /** + * Choose and kill a container in case of OOM. We try to find the most + * recently launched OPPORTUNISTIC container and fall back to the most + * recently launched GUARANTEED container that has exceeded its limits + * If there is no such container found, we choose to kill the most + * recently launched GUARANTEED one. + * @return true if a container is killed, false otherwise + */ + protected boolean killContainer() { + ArrayList containers = new ArrayList<>(0); + containers.addAll(context.getContainers().values()); + containers.sort(CONTAINER_LAUNCH_TIME_COMPARATOR); + + boolean containerKilled = false; + for (Container container : containers) { + boolean isOpportunistic = isOpportunistic(container); + if (isOpportunistic) { + sigKill(container); + String message = String.format( + "Newest OPPORTUNISTIC container %s killed by elastic cgroups OOM" + + " handler using", container.getContainerId().toString()); + LOG.warn(message); + containerKilled = true; + break; + } else { + if (!enforceVirtualMemory) { + if (killGuaranteedContainerIfOOM(container, + CGROUP_PARAM_MEMORY_USAGE_BYTES)) { + containerKilled = true; + break; + } + } else { + if (killGuaranteedContainerIfOOM(container, + CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) { + containerKilled = true; + break; + } + } + } + } + + if (!containerKilled) { + // This can happen if there is no opportunistic container found and + // no guaranteed container goes over its limit. We just kill the most + // recently launched guaranteed one. + if (containers.size() > 0) { + Container container = containers.get(0); + sigKill(container); + String message = String.format( + "Newest GUARANTEED container %s killed by elastic cgroups OOM" + + " handler using", container.getContainerId()); + LOG.warn(message); + containerKilled = true; + } + } + + return containerKilled; + } + + private static boolean isOpportunistic(Container container) { + return container.getContainerTokenIdentifier() != null && + ExecutionType.OPPORTUNISTIC.equals( + container.getContainerTokenIdentifier().getExecutionType()); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java index 60c38fe..86bad20 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java @@ -20,6 +20,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -29,9 +30,11 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; import org.junit.Test; +import org.mockito.ArgumentCaptor; import java.io.IOException; import java.util.LinkedHashMap; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_FILE_TASKS; @@ -39,6 +42,7 @@ import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL; import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_USAGE_BYTES; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -51,13 +55,13 @@ public class TestDefaultOOMHandler { /** - * Test an OOM situation where no containers are running. + * Test an OOM situation where there is no containers that can be killed. */ @Test(expected = YarnRuntimeException.class) - public void testNoContainers() throws Exception { + public void testExceptionThrownWithNoContainersToKill() throws Exception { Context context = mock(Context.class); - when(context.getContainers()).thenReturn(new ConcurrentHashMap<>()); + when(context.getContainers()).thenReturn(new ConcurrentHashMap<>(0)); CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); when(cGroupsHandler.getCGroupParam( @@ -66,59 +70,69 @@ public void testNoContainers() throws Exception { CGROUP_PARAM_MEMORY_OOM_CONTROL)) .thenReturn("under_oom 1").thenReturn("under_oom 0"); - DefaultOOMHandler handler = new DefaultOOMHandler(context, false); - handler.setCGroupsHandler(cGroupsHandler); + DefaultOOMHandler handler = new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; handler.run(); } /** - * We have two containers, both out of limit. We should kill the later one. - * - * @throws Exception exception + * We have two guaranteed containers, both of which are out of limit. + * We should kill the later one. */ @Test - public void testBothContainersOOM() throws Exception { + public void testBothGuaranteedContainersOOM() throws Exception { ConcurrentHashMap containers = - new ConcurrentHashMap<>(new LinkedHashMap<>()); + new ConcurrentHashMap<>(); + Container c1 = createContainer(1, true, 1L); + containers.put(c1.getContainerId(), c1); + Container c2 = createContainer(2, true, 2L); + containers.put(c2.getContainerId(), c2); - Container c1 = mock(Container.class); - ContainerId cid1 = createContainerId(1); - when(c1.getContainerId()).thenReturn(cid1); - when(c1.getResource()).thenReturn(Resource.newInstance(10, 1)); - when(c1.getContainerStartTime()).thenReturn((long) 1); - containers.put(createContainerId(1), c1); + ContainerExecutor ex = createContainerExecutor(containers); + Context context = mock(Context.class); + when(context.getContainers()).thenReturn(containers); + when(context.getContainerExecutor()).thenReturn(ex); - Container c2 = mock(Container.class); - ContainerId cid2 = createContainerId(2); - when(c2.getContainerId()).thenReturn(cid2); - when(c2.getResource()).thenReturn(Resource.newInstance(10, 1)); - when(c2.getContainerStartTime()).thenReturn((long) 2); - containers.put(cid2, c2); CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL)) + .thenReturn("under_oom 1").thenReturn("under_oom 0"); when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid1.toString(), CGROUP_FILE_TASKS)) + c1.getContainerId().toString(), CGROUP_FILE_TASKS)) .thenReturn("1234").thenReturn(""); when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) .thenReturn(getMB(11)); when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) .thenReturn(getMB(11)); when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid2.toString(), CGROUP_FILE_TASKS)) + c2.getContainerId().toString(), CGROUP_FILE_TASKS)) .thenReturn("1235").thenReturn(""); when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) .thenReturn(getMB(11)); when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) .thenReturn(getMB(11)); - ContainerExecutor ex = mock(ContainerExecutor.class); + DefaultOOMHandler handler = + new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; + handler.run(); - runOOMHandler(containers, cGroupsHandler, ex); verify(ex, times(1)).signalContainer( new ContainerSignalContext.Builder() @@ -131,52 +145,57 @@ public void testBothContainersOOM() throws Exception { } /** - * We have two containers, one out of limit. We should kill that one. - * This should happen even, if it was started earlier - * - * @throws Exception exception + * We have two GUARANTEED containers, one of which is out of limit. + * We should kill the one that's out of its limit. This should + * happen even, if it was launched earlier than the other one. */ @Test - public void testOneContainerOOM() throws Exception { + public void testOneGuaranteedContainerOOM() throws Exception { ConcurrentHashMap containers = - new ConcurrentHashMap<>(new LinkedHashMap<>()); - - Container c1 = mock(Container.class); - ContainerId cid1 = createContainerId(1); - when(c1.getContainerId()).thenReturn(cid1); - when(c1.getResource()).thenReturn(Resource.newInstance(10, 1)); - when(c1.getContainerStartTime()).thenReturn((long) 2); - containers.put(createContainerId(1), c1); + new ConcurrentHashMap<>(); + Container c1 = createContainer(1, true, 2L); + containers.put(c1.getContainerId(), c1); + Container c2 = createContainer(2, true, 1L); + containers.put(c2.getContainerId(), c2); - Container c2 = mock(Container.class); - ContainerId cid2 = createContainerId(2); - when(c2.getContainerId()).thenReturn(cid2); - when(c2.getResource()).thenReturn(Resource.newInstance(10, 1)); - when(c2.getContainerStartTime()).thenReturn((long) 1); - containers.put(cid2, c2); + ContainerExecutor ex = createContainerExecutor(containers); + Context context = mock(Context.class); + when(context.getContainers()).thenReturn(containers); + when(context.getContainerExecutor()).thenReturn(ex); CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL)) + .thenReturn("under_oom 1").thenReturn("under_oom 0"); when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid1.toString(), CGROUP_FILE_TASKS)) + c1.getContainerId().toString(), CGROUP_FILE_TASKS)) .thenReturn("1234").thenReturn(""); when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) .thenReturn(getMB(9)); when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) .thenReturn(getMB(9)); when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid2.toString(), CGROUP_FILE_TASKS)) + c2.getContainerId().toString(), CGROUP_FILE_TASKS)) .thenReturn("1235").thenReturn(""); when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) .thenReturn(getMB(11)); when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) .thenReturn(getMB(11)); - ContainerExecutor ex = mock(ContainerExecutor.class); - runOOMHandler(containers, cGroupsHandler, ex); + DefaultOOMHandler handler = + new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; + handler.run(); verify(ex, times(1)).signalContainer( new ContainerSignalContext.Builder() @@ -189,51 +208,56 @@ public void testOneContainerOOM() throws Exception { } /** - * We have two containers, neither out of limit. We should kill the later one. - * - * @throws Exception exception + * We have two GUARANTEE containers, neither of which is out of limit. + * We should kill the later launched one. */ @Test - public void testNoContainerOOM() throws Exception { + public void testNoGuaranteedContainerOOM() throws Exception { ConcurrentHashMap containers = - new ConcurrentHashMap<>(new LinkedHashMap<>()); + new ConcurrentHashMap<>(); + Container c1 = createContainer(1, true, 1L); + containers.put(c1.getContainerId(), c1); + Container c2 = createContainer(2, true, 2L); + containers.put(c2.getContainerId(), c2); - Container c1 = mock(Container.class); - ContainerId cid1 = createContainerId(1); - when(c1.getContainerId()).thenReturn(cid1); - when(c1.getResource()).thenReturn(Resource.newInstance(10, 1)); - when(c1.getContainerStartTime()).thenReturn((long) 1); - containers.put(createContainerId(1), c1); - - Container c2 = mock(Container.class); - ContainerId cid2 = createContainerId(2); - when(c2.getContainerId()).thenReturn(cid2); - when(c2.getResource()).thenReturn(Resource.newInstance(10, 1)); - when(c2.getContainerStartTime()).thenReturn((long) 2); - containers.put(cid2, c2); + ContainerExecutor ex = createContainerExecutor(containers); + Context context = mock(Context.class); + when(context.getContainers()).thenReturn(containers); + when(context.getContainerExecutor()).thenReturn(ex); CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL)) + .thenReturn("under_oom 1").thenReturn("under_oom 0"); when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid1.toString(), CGROUP_FILE_TASKS)) + c1.getContainerId().toString(), CGROUP_FILE_TASKS)) .thenReturn("1234").thenReturn(""); when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) .thenReturn(getMB(9)); when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) .thenReturn(getMB(9)); when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid2.toString(), CGROUP_FILE_TASKS)) + c2.getContainerId().toString(), CGROUP_FILE_TASKS)) .thenReturn("1235").thenReturn(""); when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) .thenReturn(getMB(9)); when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) .thenReturn(getMB(9)); - ContainerExecutor ex = mock(ContainerExecutor.class); - runOOMHandler(containers, cGroupsHandler, ex); + DefaultOOMHandler handler = + new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; + handler.run(); verify(ex, times(1)).signalContainer( new ContainerSignalContext.Builder() @@ -245,43 +269,259 @@ public void testNoContainerOOM() throws Exception { verify(ex, times(1)).signalContainer(any()); } - private void runOOMHandler( - ConcurrentHashMap containers, - CGroupsHandler cGroupsHandler, ContainerExecutor ex) - throws IOException, ResourceHandlerException { + /** + * We have two OPPORTUNISTIC containers and one GUARANTEED container. + * OOM is resolved after killing the most recently launched OPPORTUNISTIC + * container. + */ + @Test + public void testKillOneOpportunisticContainerUponOOM() throws Exception { + ConcurrentHashMap containers = + new ConcurrentHashMap<>(); + int currentContainerId = 0; + Container c1 = createContainer(currentContainerId++, false, 1); + containers.put(c1.getContainerId(), c1); + Container c2 = createContainer(currentContainerId++, false, 2); + containers.put(c2.getContainerId(), c2); + Container c3 = createContainer(currentContainerId++, true, 3); + containers.put(c3.getContainerId(), c3); + + ContainerExecutor ex = createContainerExecutor(containers); Context context = mock(Context.class); when(context.getContainers()).thenReturn(containers); + when(context.getContainerExecutor()).thenReturn(ex); - when(ex.signalContainer(any())) - .thenAnswer(invocation -> { - assertEquals("Wrong pid killed", "1235", - ((ContainerSignalContext) invocation.getArguments()[0]).getPid()); - return true; - }); - + CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); when(cGroupsHandler.getCGroupParam( CGroupsHandler.CGroupController.MEMORY, "", CGROUP_PARAM_MEMORY_OOM_CONTROL)) - .thenReturn("under_oom 1").thenReturn("under_oom 0"); + .thenReturn("under_oom 1") + .thenReturn("under_oom 0"); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1234").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1235").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1236").thenReturn(""); + + DefaultOOMHandler handler = + new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; + handler.run(); + + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1235") + .setContainer(c2) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(1)).signalContainer(any()); + } + /** + * We have two OPPORTUNISTIC containers and one GUARANTEED container. + * OOM is resolved after killing both OPPORTUNISTIC containers. + */ + @Test + public void testKillBothOpportunisticContainerUponOOM() throws Exception { + int currentContainerId = 0; + + ConcurrentHashMap containers = + new ConcurrentHashMap<>(); + Container c1 = createContainer(currentContainerId++, false, 1); + containers.put(c1.getContainerId(), c1); + Container c2 = createContainer(currentContainerId++, false, 2); + containers.put(c2.getContainerId(), c2); + Container c3 = createContainer(currentContainerId++, true, 3); + containers.put(c3.getContainerId(), c3); + + ContainerExecutor ex = createContainerExecutor(containers); + Context context = mock(Context.class); + when(context.getContainers()).thenReturn(containers); when(context.getContainerExecutor()).thenReturn(ex); - DefaultOOMHandler handler = new DefaultOOMHandler(context, false); - handler.setCGroupsHandler(cGroupsHandler); + CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL)) + .thenReturn("under_oom 1") + .thenReturn("under_oom 1") + .thenReturn("under_oom 0"); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1234").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1235").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1236").thenReturn(""); + + DefaultOOMHandler handler = + new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; handler.run(); + + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1235") + .setContainer(c1) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1234") + .setContainer(c2) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(2)).signalContainer(any()); + } + + /** + * We have two OPPORTUNISTIC containers and one GUARANTEED container. + * OOM is resolved after killing all running containers. + */ + @Test + public void testKillAllContainersUponOOM() throws Exception { + int currentContainerId = 0; + + ConcurrentHashMap containers = + new ConcurrentHashMap<>(); + Container c1 = createContainer(currentContainerId++, false, 1); + containers.put(c1.getContainerId(), c1); + Container c2 = createContainer(currentContainerId++, false, 2); + containers.put(c2.getContainerId(), c2); + Container c3 = createContainer(currentContainerId++, true, 3); + containers.put(c3.getContainerId(), c3); + + ContainerExecutor ex = createContainerExecutor(containers); + Context context = mock(Context.class); + when(context.getContainers()).thenReturn(containers); + when(context.getContainerExecutor()).thenReturn(ex); + + CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL)) + .thenReturn("under_oom 1") + .thenReturn("under_oom 1") + .thenReturn("under_oom 1") + .thenReturn("under_oom 0"); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1234").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1235").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1236").thenReturn(""); + + DefaultOOMHandler handler = + new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; + handler.run(); + + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1234") + .setContainer(c2) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1235") + .setContainer(c1) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1236") + .setContainer(c3) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(3)).signalContainer(any()); } - private class AppId extends ApplicationIdPBImpl { - AppId(long clusterTs, int appId) { - this.setClusterTimestamp(clusterTs); - this.setId(appId); - } + /** + * We have two OPPORTUNISTIC containers and one GUARANTEED container. + * OOM is not resolved even after killing all running containers. + * A YarnRuntimeException is excepted to be thrown. + */ + @Test(expected = YarnRuntimeException.class) + public void testOOMUnresolvedAfterKillingAllContainers() throws Exception { + int currentContainerId = 0; + + ConcurrentHashMap containers = + new ConcurrentHashMap<>(); + Container c1 = createContainer(currentContainerId++, false, 1); + containers.put(c1.getContainerId(), c1); + Container c2 = createContainer(currentContainerId++, false, 2); + containers.put(c2.getContainerId(), c2); + Container c3 = createContainer(currentContainerId++, true, 3); + containers.put(c3.getContainerId(), c3); + + ContainerExecutor ex = createContainerExecutor(containers); + Context context = mock(Context.class); + when(context.getContainers()).thenReturn(containers); + when(context.getContainerExecutor()).thenReturn(ex); + + CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL)) + .thenReturn("under_oom 1") + .thenReturn("under_oom 1") + .thenReturn("under_oom 1") + .thenReturn("under_oom 1"); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1234").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1235").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1236").thenReturn(""); + + DefaultOOMHandler handler = + new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; + handler.run(); } - private ContainerId createContainerId(int id) { - ApplicationId applicationId = new AppId(1, 1); + private static ContainerId createContainerId(int id) { + ApplicationId applicationId = ApplicationId.newInstance(1, 1); ApplicationAttemptId applicationAttemptId = mock(ApplicationAttemptId.class); @@ -295,13 +535,41 @@ private ContainerId createContainerId(int id) { return containerId; } - ContainerTokenIdentifier getToken() { - ContainerTokenIdentifier id = mock(ContainerTokenIdentifier.class); - when(id.getVersion()).thenReturn(1); - return id; + private static Container createContainer(int containerId, + boolean guaranteed, long launchTime) { + Container c1 = mock(Container.class); + ContainerId cid1 = createContainerId(containerId); + when(c1.getContainerId()).thenReturn(cid1); + + ContainerTokenIdentifier token = mock(ContainerTokenIdentifier.class); + ExecutionType type = + guaranteed ? ExecutionType.GUARANTEED : ExecutionType.OPPORTUNISTIC; + when(token.getExecutionType()).thenReturn(type); + when(c1.getContainerTokenIdentifier()).thenReturn(token); + + when(c1.getResource()).thenReturn(Resource.newInstance(10, 1)); + when(c1.getContainerLaunchTime()).thenReturn(launchTime); + + return c1; } String getMB(long mb) { return Long.toString(mb * 1024 * 1024); } + + private static ContainerExecutor createContainerExecutor( + ConcurrentHashMap containers) + throws IOException { + ContainerExecutor ex = mock(ContainerExecutor.class); + when(ex.signalContainer(any())).thenAnswer( + invocation -> { + Object[] arguments = invocation.getArguments(); + Container container = ((ContainerSignalContext) + arguments[0]).getContainer(); + // remove container from NM context immediately + containers.remove(container.getContainerId()); + return true; + }); + return ex; + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index 77ebd34..325709b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -242,6 +242,11 @@ public long getContainerStartTime() { } @Override + public long getContainerLaunchTime() { + return 0; + } + + @Override public ResourceMappings getResourceMappings() { return null; }