diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerChangeMonitoringEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerChangeMonitoringEvent.java new file mode 100644 index 0000000..cb7bdb2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerChangeMonitoringEvent.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import org.apache.hadoop.yarn.api.records.ContainerId; + +public class ContainerChangeMonitoringEvent extends ContainersMonitorEvent { + private final long vmemLimit; + private final long pmemLimit; + + public ContainerChangeMonitoringEvent(ContainerId containerId, + long vmemLimit, long pmemLimit) { + super(containerId, ContainersMonitorEventType.CHANGE_MONITORING_CONTAINER); + this.vmemLimit = vmemLimit; + this.pmemLimit = pmemLimit; + } + + public long getVmemLimit() { + return this.vmemLimit; + } + + public long getPmemLimit() { + return this.pmemLimit; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEventType.java index be99651..9d73745 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEventType.java @@ -20,5 +20,6 @@ public enum ContainersMonitorEventType { START_MONITORING_CONTAINER, - STOP_MONITORING_CONTAINER + STOP_MONITORING_CONTAINER, + CHANGE_MONITORING_CONTAINER } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index b681b34..8ba371a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -55,6 +55,7 @@ final Map containersToBeAdded; Map trackingContainers = new HashMap(); + final List containersToBeChanged; final ContainerExecutor containerExecutor; private final Dispatcher eventDispatcher; @@ -81,6 +82,7 @@ public ContainersMonitorImpl(ContainerExecutor exec, this.containersToBeAdded = new HashMap(); this.containersToBeRemoved = new ArrayList(); + this.containersToBeChanged = new ArrayList(); this.monitoringThread = new MonitoringThread(); } @@ -195,7 +197,7 @@ protected void serviceStop() throws Exception { super.serviceStop(); } - private static class ProcessTreeInfo { + static class ProcessTreeInfo { private ContainerId containerId; private String pid; private ResourceCalculatorProcessTree pTree; @@ -306,6 +308,19 @@ boolean isProcessTreeOverLimit(ResourceCalculatorProcessTree pTree, return isProcessTreeOverLimit(containerId, currentMemUsage, curMemUsageOfAgedProcesses, limit); } + + static class ContainerResourceToBeChanged { + public ContainerResourceToBeChanged(ContainerId containerId, + long vmemLimit, long pmemLimit) { + this.containerId = containerId; + this.vmemLimit = vmemLimit; + this.pmemLimit = pmemLimit; + } + + ContainerId containerId; + long vmemLimit; + long pmemLimit; + } private class MonitoringThread extends Thread { public MonitoringThread() { @@ -348,6 +363,25 @@ public void run() { } containersToBeRemoved.clear(); } + + // handle containers to be changed + synchronized (containersToBeChanged) { + for (ContainerResourceToBeChanged c : containersToBeChanged) { + // if c is not in trackingContainers, it maybe removed (finished or + // killed) already, just skip + if (trackingContainers.get(c.containerId) == null) { + LOG.warn("note container not found in trackingContainers, " + + "it maybe already completed, please check, Container=" + + c.containerId.toString() + + " that was to be resized is no longer running"); + continue; + } + ProcessTreeInfo info = trackingContainers.get(c.containerId); + info.vmemLimit = c.vmemLimit; + info.pmemLimit = c.pmemLimit; + } + containersToBeChanged.clear(); + } // Now do the monitoring for the trackingContainers // Check memory usage and kill any overflowing containers @@ -547,6 +581,13 @@ public void handle(ContainersMonitorEvent monitoringEvent) { this.containersToBeRemoved.add(containerId); } break; + case CHANGE_MONITORING_CONTAINER: + ContainerChangeMonitoringEvent event = + (ContainerChangeMonitoringEvent) monitoringEvent; + synchronized (this.containersToBeChanged) { + this.containersToBeChanged.add(new ContainerResourceToBeChanged( + containerId, event.getVmemLimit(), event.getPmemLimit())); + } default: // TODO: Wrong event. } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java new file mode 100644 index 0000000..b560f40 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; + +public class MockResourceCalculatorPlugin extends ResourceCalculatorPlugin { + @Override + public long getVirtualMemorySize() { + return 0; + } + + @Override + public long getPhysicalMemorySize() { + return 0; + } + + @Override + public long getAvailableVirtualMemorySize() { + return 0; + } + + @Override + public long getAvailablePhysicalMemorySize() { + return 0; + } + + @Override + public int getNumProcessors() { + return 0; + } + + @Override + public long getCpuFrequency() { + return 0; + } + + @Override + public long getCumulativeCpuTime() { + return 0; + } + + @Override + public float getCpuUsage() { + return 0; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java new file mode 100644 index 0000000..ace74a8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; + +public class MockResourceCalculatorProcessTree extends ResourceCalculatorProcessTree { + long pmem = 0; + long vmem = 0; + + public MockResourceCalculatorProcessTree(String root) { + super(root); + } + + @Override + public void updateProcessTree() { + } + + @Override + public String getProcessTreeDump() { + return ""; + } + + @Override + public long getCumulativeVmem(int olderThanAge) { + return olderThanAge > 0 ? 0 : vmem; + } + + @Override + public long getCumulativeRssmem(int olderThanAge) { + return olderThanAge > 0 ? 0 : pmem; + } + + @Override + public long getCumulativeCpuTime() { + return 0; + } + + @Override + public boolean checkPidPgrpidForMatch() { + return true; + } + + public void setPmem(long pmem) { + this.pmem = pmem; + } + + public void setVmem(long vmem) { + this.vmem = vmem; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java new file mode 100644 index 0000000..8f5beeb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestContainersMonitorResourceChange { + ContainersMonitorImpl cm; + MockExecutor executor; + Configuration conf; + AsyncDispatcher dispatcher; + MockContainerEventHandler containerEventHandler; + + static class MockExecutor extends ContainerExecutor { + ConcurrentMap containerIdToPid = + new ConcurrentHashMap(); + + @Override + public void init() throws IOException { + } + + @Override + public void startLocalizer(Path nmPrivateContainerTokens, + InetSocketAddress nmAddr, String user, String appId, String locId, + List localDirs, List logDirs) throws IOException, + InterruptedException { + } + + @Override + public int launchContainer(Container container, + Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, + String user, String appId, Path containerWorkDir, + List localDirs, List logDirs) throws IOException { + return 0; + } + + @Override + public boolean signalContainer(String user, String pid, Signal signal) + throws IOException { + return true; + } + + @Override + public void deleteAsUser(String user, Path subDir, Path... basedirs) + throws IOException, InterruptedException { + } + + @Override + public String getProcessId(ContainerId containerID) { + return String.valueOf(containerID.getId()); + } + + public void setContainerPid(ContainerId containerId, String pid) { + containerIdToPid.put(containerId, pid); + } + } + + static class MockContainerEventHandler implements + EventHandler { + Set killedContainer = new HashSet(); + + @Override + public void handle(ContainerEvent event) { + if (event.getType() == ContainerEventType.KILL_CONTAINER) { + synchronized (killedContainer) { + killedContainer.add(event.getContainerID()); + } + } + } + + public boolean isContainerKilled(ContainerId containerId) { + synchronized (killedContainer) { + return killedContainer.contains(containerId); + } + } + } + + @Before + public void setup() { + executor = new MockExecutor(); + dispatcher = new AsyncDispatcher(); + conf = new Configuration(); + conf.set( + YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR, + MockResourceCalculatorPlugin.class.getCanonicalName()); + conf.set( + YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, + MockResourceCalculatorProcessTree.class.getCanonicalName()); + conf.setLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS, 20L); + dispatcher.init(conf); + dispatcher.start(); + containerEventHandler = new MockContainerEventHandler(); + dispatcher.register(ContainerEventType.class, containerEventHandler); + cm = new ContainersMonitorImpl(executor, dispatcher, null); + cm.init(conf); + cm.start(); + } + + @After + public void finalize() { + try { + cm.stop(); + } catch (Exception e) { + // do nothing + } + try { + dispatcher.stop(); + } catch (Exception e) { + // do nothing + } + } + + private ContainerId getContainerId(int id) { + return ContainerId.newInstance(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(123456L, 1), 1), id); + } + + private ProcessTreeInfo getProcessTreeInfo(ContainerId id) { + return cm.trackingContainers.get(id); + } + + @Test + public void testResourceChange() throws Exception { + // create a container-1 with + cm.handle(new ContainerStartMonitoringEvent( + getContainerId(1), 2100L, 1000L)); + + // check if this container is tracked and it's value + Thread.sleep(200); + Assert.assertNotNull(getProcessTreeInfo(getContainerId(1))); + Assert.assertEquals(1000L, getProcessTreeInfo(getContainerId(1)) + .getPmemLimit()); + Assert.assertEquals(2100L, getProcessTreeInfo(getContainerId(1)) + .getVmemLimit()); + + // increase size of pmem usage, it will be killed + MockResourceCalculatorProcessTree mockTree = + (MockResourceCalculatorProcessTree) getProcessTreeInfo( + getContainerId(1)).getProcessTree(); + mockTree.setPmem(2500L); + + // check if this container killed + Thread.sleep(200); + Assert.assertTrue(containerEventHandler + .isContainerKilled(getContainerId(1))); + + // create a container-2 with + cm.handle(new ContainerStartMonitoringEvent( + getContainerId(2), 2100L, 1000L)); + + // check if this container is tracked and it's value + Thread.sleep(200); + Assert.assertNotNull(getProcessTreeInfo(getContainerId(2))); + Assert.assertEquals(1000L, getProcessTreeInfo(getContainerId(2)) + .getPmemLimit()); + Assert.assertEquals(2100L, getProcessTreeInfo(getContainerId(2)) + .getVmemLimit()); + + // trigger a change resource event, check limit after changed + cm.handle(new ContainerChangeMonitoringEvent(getContainerId(2), 4200L, + 2000L)); + Thread.sleep(200); + Assert.assertNotNull(getProcessTreeInfo(getContainerId(2))); + Assert.assertEquals(2000L, getProcessTreeInfo(getContainerId(2)) + .getPmemLimit()); + Assert.assertEquals(4200L, getProcessTreeInfo(getContainerId(2)) + .getVmemLimit()); + + // increase size of pmem usage, it should NOT be killed + mockTree = + (MockResourceCalculatorProcessTree) getProcessTreeInfo( + getContainerId(2)).getProcessTree(); + mockTree.setPmem(2500L); + + // check if this container killed + Thread.sleep(200); + Assert.assertFalse(containerEventHandler + .isContainerKilled(getContainerId(2))); + } +}