diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 57d1bad..895a6bd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Map.Entry; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -32,6 +33,7 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -39,6 +41,7 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceChangedEvent; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; @@ -58,6 +61,8 @@ final List containersToBeRemoved; final Map containersToBeAdded; + final List containersToBeChanged; + Map trackingContainers = new HashMap(); @@ -80,6 +85,10 @@ private int nodeCpuPercentageForYARN; private ResourceUtilization containersUtilization; + private final Object containersResizeMonitor = new Object(); + private volatile boolean containersResized = false; + + private volatile boolean stopped = false; public ContainersMonitorImpl(ContainerExecutor exec, AsyncDispatcher dispatcher, Context context) { @@ -91,6 +100,7 @@ public ContainersMonitorImpl(ContainerExecutor exec, this.containersToBeAdded = new HashMap(); this.containersToBeRemoved = new ArrayList(); + this.containersToBeChanged = new ArrayList<>(); this.monitoringThread = new MonitoringThread(); this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f); @@ -210,6 +220,7 @@ protected void serviceStart() throws Exception { @Override protected void serviceStop() throws Exception { if (this.isEnabled()) { + stopped = true; this.monitoringThread.interrupt(); try { this.monitoringThread.join(); @@ -220,7 +231,8 @@ protected void serviceStop() throws Exception { super.serviceStop(); } - private static class ProcessTreeInfo { + @VisibleForTesting + static class ProcessTreeInfo { private ContainerId containerId; private String pid; private ResourceCalculatorProcessTree pTree; @@ -279,6 +291,32 @@ public int getCpuVcores() { } } + private static class ContainerResourceToChange { + private final ContainerId containerId; + private final long vmemLimit; + private final long pmemLimit; + private final int cpuVcores; + + public ContainerResourceToChange(ContainerId containerId, + long vmemLimit, long pmemLimit, int cpuVcores) { + this.containerId = containerId; + this.vmemLimit = vmemLimit; + this.pmemLimit = pmemLimit; + this.cpuVcores = cpuVcores; + } + public ContainerId getContainerId() { + return this.containerId; + } + public long getVmemLimit() { + return this.vmemLimit; + } + public long getPmemLimit() { + return this.pmemLimit; + } + public int getCpuVcores() { + return this.cpuVcores; + } + } /** * Check whether a container's process tree's current memory usage is over @@ -349,9 +387,10 @@ public MonitoringThread() { } @Override + @SuppressWarnings("unchecked") public void run() { - while (true) { + while (!stopped && !Thread.currentThread().isInterrupted()) { // Print the processTrees for debugging. if (LOG.isDebugEnabled()) { @@ -389,6 +428,30 @@ public void run() { containersToBeRemoved.clear(); } + // Handle resized containers + synchronized (containersToBeChanged) { + for (ContainerResourceToChange c : containersToBeChanged) { + ContainerId containerId = c.getContainerId(); + ProcessTreeInfo info = trackingContainers.get(containerId); + if (info == null) { + LOG.warn("Failed to track container " + + containerId.toString() + + ". It may have already completed."); + continue; + } + LOG.info("Changing resource-monitoring for " + containerId); + info.pmemLimit = c.getPmemLimit(); + info.vmemLimit = c.getVmemLimit(); + info.cpuVcores = c.getCpuVcores(); + eventDispatcher.getEventHandler().handle( + new ContainerResourceChangedEvent(containerId, + Resource.newInstance((int) (info.pmemLimit >> 20), + info.cpuVcores))); + } + containersToBeChanged.clear(); + containersResized = false; + } + // Temporary structure to calculate the total resource utilization of // the containers ResourceUtilization trackedContainersUtilization = @@ -562,7 +625,11 @@ public void run() { setContainersUtilization(trackedContainersUtilization); try { - Thread.sleep(monitoringInterval); + synchronized (containersResizeMonitor) { + if (!containersResized) { + containersResizeMonitor.wait(monitoringInterval); + } + } } catch (InterruptedException e) { LOG.warn(ContainersMonitorImpl.class.getName() + " is interrupted. Exiting."); @@ -642,9 +709,21 @@ public void setContainersUtilization(ResourceUtilization utilization) { } @Override + @SuppressWarnings("unchecked") public void handle(ContainersMonitorEvent monitoringEvent) { - if (!isEnabled()) { + if (monitoringEvent.getType() == ContainersMonitorEventType + .CHANGE_MONITORING_CONTAINER_RESOURCE) { + // Nothing to enforce. Send resource changed event immediately. + ChangeMonitoringContainerResourceEvent changeEvent = + (ChangeMonitoringContainerResourceEvent) monitoringEvent; + eventDispatcher.getEventHandler().handle( + new ContainerResourceChangedEvent( + changeEvent.getContainerId(), + Resource.newInstance( + (int)(changeEvent.getPmemLimit() >> 20), + changeEvent.getCpuVcores()))); + } return; } @@ -675,6 +754,19 @@ public void handle(ContainersMonitorEvent monitoringEvent) { this.containersToBeRemoved.add(containerId); } break; + case CHANGE_MONITORING_CONTAINER_RESOURCE: + ChangeMonitoringContainerResourceEvent changeEvent = + (ChangeMonitoringContainerResourceEvent) monitoringEvent; + synchronized (this.containersToBeChanged) { + this.containersToBeChanged.add(new ContainerResourceToChange( + containerId, changeEvent.getVmemLimit(), + changeEvent.getPmemLimit(), changeEvent.getCpuVcores())); + containersResized = true; + } + synchronized (containersResizeMonitor) { + containersResizeMonitor.notify(); + } + break; default: // TODO: Wrong event. } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java new file mode 100644 index 0000000..4a18a8c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; + +public class MockResourceCalculatorPlugin extends ResourceCalculatorPlugin { + + @Override + public long getVirtualMemorySize() { + return 0; + } + + @Override + public long getPhysicalMemorySize() { + return 0; + } + + @Override + public long getAvailableVirtualMemorySize() { + return 0; + } + + @Override + public long getAvailablePhysicalMemorySize() { + return 0; + } + + @Override + public int getNumProcessors() { + return 0; + } + + @Override + public int getNumCores() { + return 0; + } + + @Override + public long getCpuFrequency() { + return 0; + } + + @Override + public long getCumulativeCpuTime() { + return 0; + } + + @Override + public float getCpuUsage() { + return 0; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java new file mode 100644 index 0000000..c5aaa77 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorProcessTree.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; + +public class MockResourceCalculatorProcessTree extends ResourceCalculatorProcessTree { + + private long rssMemorySize = 0; + + public MockResourceCalculatorProcessTree(String root) { + super(root); + } + + @Override + public void updateProcessTree() { + } + + @Override + public String getProcessTreeDump() { + return ""; + } + + @Override + public long getCumulativeCpuTime() { + return 0; + } + + @Override + public boolean checkPidPgrpidForMatch() { + return true; + } + + public void setRssMemorySize(long rssMemorySize) { + this.rssMemorySize = rssMemorySize; + } + + public long getRssMemorySize() { + return this.rssMemorySize; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java new file mode 100644 index 0000000..ea141cc --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; +import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; +import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +public class TestContainersMonitorResourceChange { + + private ContainersMonitorImpl containersMonitor; + private MockExecutor executor; + private Configuration conf; + private AsyncDispatcher dispatcher; + private MockContainerEventHandler containerEventHandler; + + private static class MockExecutor extends ContainerExecutor { + @Override + public void init() throws IOException { + } + @Override + public void startLocalizer(LocalizerStartContext ctx) + throws IOException, InterruptedException { + } + @Override + public int launchContainer(ContainerStartContext ctx) throws + IOException { + return 0; + } + @Override + public boolean signalContainer(ContainerSignalContext ctx) + throws IOException { + return true; + } + @Override + public void deleteAsUser(DeletionAsUserContext ctx) + throws IOException, InterruptedException { + } + @Override + public String getProcessId(ContainerId containerId) { + return String.valueOf(containerId.getContainerId()); + } + @Override + public boolean isContainerProcessAlive(ContainerLivenessContext ctx) + throws IOException { + return true; + } + } + + private static class MockContainerEventHandler implements + EventHandler { + final private Set killedContainer + = new HashSet<>(); + @Override + public void handle(ContainerEvent event) { + if (event.getType() == ContainerEventType.KILL_CONTAINER) { + synchronized (killedContainer) { + killedContainer.add(event.getContainerID()); + } + } + } + public boolean isContainerKilled(ContainerId containerId) { + synchronized (killedContainer) { + return killedContainer.contains(containerId); + } + } + } + + @Before + public void setup() { + executor = new MockExecutor(); + dispatcher = new AsyncDispatcher(); + conf = new Configuration(); + conf.set( + YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR, + MockResourceCalculatorPlugin.class.getCanonicalName()); + conf.set( + YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, + MockResourceCalculatorProcessTree.class.getCanonicalName()); + dispatcher.init(conf); + dispatcher.start(); + containerEventHandler = new MockContainerEventHandler(); + dispatcher.register(ContainerEventType.class, containerEventHandler); + } + + @After + public void tearDown() throws Exception { + if (containersMonitor != null) { + containersMonitor.stop(); + } + if (dispatcher != null) { + dispatcher.stop(); + } + } + + @Test + public void testContainersResourceChange() throws Exception { + // set container monitor interval to be 20ms + conf.setLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS, 20L); + containersMonitor = createContainersMonitor(executor, dispatcher); + containersMonitor.init(conf); + containersMonitor.start(); + // create container 1 + containersMonitor.handle(new ContainerStartMonitoringEvent( + getContainerId(1), 2100L, 1000L, 1, 0, 0)); + // verify that this container is properly tracked + Thread.sleep(200); + assertNotNull(getProcessTreeInfo(getContainerId(1))); + assertEquals(1000L, getProcessTreeInfo(getContainerId(1)) + .getPmemLimit()); + assertEquals(2100L, getProcessTreeInfo(getContainerId(1)) + .getVmemLimit()); + // increase pmem usage, the container should be killed + MockResourceCalculatorProcessTree mockTree = + (MockResourceCalculatorProcessTree) getProcessTreeInfo( + getContainerId(1)).getProcessTree(); + mockTree.setRssMemorySize(2500L); + // verify that this container is killed + Thread.sleep(200); + assertTrue(containerEventHandler + .isContainerKilled(getContainerId(1))); + // create container 2 + containersMonitor.handle(new ContainerStartMonitoringEvent( + getContainerId(2), 2100L, 1000L, 1, 0, 0)); + // verify that this container is properly tracked + Thread.sleep(200); + assertNotNull(getProcessTreeInfo(getContainerId(2))); + assertEquals(1000L, getProcessTreeInfo(getContainerId(2)) + .getPmemLimit()); + assertEquals(2100L, getProcessTreeInfo(getContainerId(2)) + .getVmemLimit()); + // trigger a change resource event, check limit after change + containersMonitor.handle(new ChangeMonitoringContainerResourceEvent( + getContainerId(2), 4200L, 2000L, 1)); + Thread.sleep(200); + assertNotNull(getProcessTreeInfo(getContainerId(2))); + assertEquals(2000L, getProcessTreeInfo(getContainerId(2)) + .getPmemLimit()); + assertEquals(4200L, getProcessTreeInfo(getContainerId(2)) + .getVmemLimit()); + // increase pmem usage, the container should NOT be killed + mockTree = + (MockResourceCalculatorProcessTree) getProcessTreeInfo( + getContainerId(2)).getProcessTree(); + mockTree.setRssMemorySize(2500L); + // verify that this container is not killed + Thread.sleep(200); + assertFalse(containerEventHandler + .isContainerKilled(getContainerId(2))); + containersMonitor.stop(); + } + + @Test + public void testContainersResourceChangeIsTriggeredImmediately() + throws Exception { + // set container monitor interval to be 20s + conf.setLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS, 20000L); + containersMonitor = createContainersMonitor(executor, dispatcher); + containersMonitor.init(conf); + containersMonitor.start(); + // sleep 1 second to make sure the container monitor thread is + // now waiting for the next monitor cycle + Thread.sleep(1000); + // create a container with id 3 + containersMonitor.handle(new ContainerStartMonitoringEvent( + getContainerId(3), 2100L, 1000L, 1, 0, 0)); + // sleep another second, and verify that this container still has not + // been tracked because the container monitor thread is still waiting + Thread.sleep(1000); + assertNull(getProcessTreeInfo(getContainerId(3))); + // trigger a change resource event, check limit after change + containersMonitor.handle(new ChangeMonitoringContainerResourceEvent( + getContainerId(3), 4200L, 2000L, 1)); + Thread.sleep(200); + // verify that this container has been properly tracked with the + // correct size + assertNotNull(getProcessTreeInfo(getContainerId(3))); + assertEquals(2000L, getProcessTreeInfo(getContainerId(3)) + .getPmemLimit()); + assertEquals(4200L, getProcessTreeInfo(getContainerId(3)) + .getVmemLimit()); + containersMonitor.stop(); + } + + private ContainersMonitorImpl createContainersMonitor( + ContainerExecutor containerExecutor, AsyncDispatcher dispatcher) { + return new ContainersMonitorImpl(containerExecutor, dispatcher, null); + } + + private ContainerId getContainerId(int id) { + return ContainerId.newContainerId(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(123456L, 1), 1), id); + } + + private ProcessTreeInfo getProcessTreeInfo(ContainerId id) { + return containersMonitor.trackingContainers.get(id); + } +}