diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 7a53eb9..da0ec9d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -20,8 +20,10 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -40,6 +42,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -81,6 +84,7 @@ private Context context; private AsyncDispatcher dispatcher; private ContainerManagerImpl containerManager; + private NodeStatusUpdater nodeStatusUpdater; private static CompositeServiceShutdownHook nodeManagerShutdownHook; private long waitForContainersOnShutdownMillis; @@ -159,7 +163,7 @@ public void init(Configuration conf) { addService(nodeHealthChecker); dirsHandler = nodeHealthChecker.getDiskHandler(); - NodeStatusUpdater nodeStatusUpdater = + nodeStatusUpdater = createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor(); @@ -216,6 +220,17 @@ public void stop() { DefaultMetricsSystem.shutdown(); } + protected void cleanupContainersOnReboot(){ + //we do not want to block dispatcher thread here + new Thread() { + @Override + public void run() { + cleanupContainers(); + ((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater(); + } + }.start(); + } + @SuppressWarnings("unchecked") protected void cleanupContainers() { Map containers = context.getContainers(); @@ -241,6 +256,16 @@ protected void cleanupContainers() { } } + for (Iterator> i = + containers.entrySet().iterator(); i.hasNext();) { + Entry e = i.next(); + org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = + e.getValue().cloneAndGetContainerStatus(); + if( containerStatus.getState() == ContainerState.COMPLETE){ + i.remove(); + } + } + // All containers killed if (containers.isEmpty()) { LOG.info("All containers in DONE state"); @@ -339,8 +364,7 @@ public void handle(NodeManagerEvent event) { stop(); break; case REBOOT: - stop(); - reboot(); + cleanupContainersOnReboot(); break; default: LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring."); @@ -357,6 +381,11 @@ ContainerManagerImpl getContainerManager() { return containerManager; } + //For testing + Dispatcher getNMDispatcher(){ + return dispatcher; + } + @VisibleForTesting Context getNMContext() { return this.context; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 3df4311..6003755 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -87,7 +87,9 @@ private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; - + private Runnable statusUpdaterRunnable; + private Thread statusUpdater; + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { super(NodeStatusUpdaterImpl.class.getName()); @@ -168,6 +170,22 @@ public synchronized void stop() { this.isStopped = true; super.stop(); } + + public void rebootNodeStatusUpdater() { + // Interrupt the updater. + this.isStopped = true; + + try { + statusUpdater.join(); + registerWithRM(); + statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); + this.isStopped = false; + statusUpdater.start(); + LOG.info("NodeStatusUpdater thread is restarted"); + } catch (Exception e) { + throw new AvroRuntimeException(e); + } + } private boolean isSecurityEnabled() { return UserGroupInformation.isSecurityEnabled(); @@ -387,7 +405,7 @@ public void sendOutofBandHeartBeat() { protected void startStatusUpdater() { - new Thread("Node Status Updater") { + statusUpdaterRunnable = new Runnable() { @Override @SuppressWarnings("unchecked") public void run() { @@ -398,6 +416,9 @@ public void run() { synchronized (heartbeatMonitor) { heartbeatMonitor.wait(heartBeatInterval); } + if(isStopped){ + break; + } NodeStatus nodeStatus = getNodeStatus(); nodeStatus.setResponseId(lastHeartBeatID); @@ -434,7 +455,7 @@ public void run() { + " hence rebooting."); dispatcher.getEventHandler().handle( new NodeManagerEvent(NodeManagerEventType.REBOOT)); - break; + continue; } lastHeartBeatID = response.getResponseId(); @@ -460,6 +481,9 @@ public void run() { } } } - }.start(); + }; + statusUpdater = + new Thread(statusUpdaterRunnable, "Node Status Updater"); + statusUpdater.start(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java index f422617..ad458c5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java @@ -28,8 +28,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentMap; import junit.framework.Assert; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; @@ -52,6 +56,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; import org.junit.Before; @@ -91,7 +96,75 @@ public void testKillContainersOnShutdown() throws IOException { NodeManager nm = getNodeManager(); nm.init(createNMConfig()); nm.start(); + startContainers(nm); + final int MAX_TRIES=20; + int numTries = 0; + while (!processStartFile.exists() && numTries < MAX_TRIES) { + try { + Thread.sleep(500); + } catch (InterruptedException ex) {ex.printStackTrace();} + numTries++; + } + + nm.stop(); + + // Now verify the contents of the file + // Script generates a message when it receives a sigterm + // so we look for that + BufferedReader reader = + new BufferedReader(new FileReader(processStartFile)); + + boolean foundSigTermMessage = false; + while (true) { + String line = reader.readLine(); + if (line == null) { + break; + } + if (line.contains("SIGTERM")) { + foundSigTermMessage = true; + break; + } + } + Assert.assertTrue("Did not find sigterm message", foundSigTermMessage); + reader.close(); + } + + @SuppressWarnings("unchecked") + @Test + public void testKillContainersOnReboot() throws IOException { + NodeManager nm = new TestNodeManager(); + YarnConfiguration conf = createNMConfig(); + long waitForContainersOnShutdownMillis = + conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, + YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) + + conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS, + YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) + 1000; + nm.init(conf); + nm.start(); + startContainers(nm); + + nm.getNMDispatcher().getEventHandler(). + handle( new NodeManagerEvent(NodeManagerEventType.REBOOT)); + //wait 2x time for cleanupContainers to finish + long waitStartTime = System.currentTimeMillis(); + ConcurrentMap containers + = nm.getNMContext().getContainers(); + while (!containers.isEmpty() && + System.currentTimeMillis() - waitStartTime < + 2 * waitForContainersOnShutdownMillis) { + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + } + } + //ensure that containers are all cleaned before nodeStatusUpdater restarted + Assert.assertTrue(containers.isEmpty()); + verify(((TestNodeManager)nm).getNodeStatusUpdater(), + times(1)).rebootNodeStatusUpdater(); + } + + private void startContainers(NodeManager nm) throws IOException { ContainerManagerImpl containerManager = nm.getContainerManager(); File scriptFile = createUnhaltingScriptFile(); @@ -127,7 +200,8 @@ public void testKillContainersOnShutdown() throws IOException { containerLaunchContext.setResource(recordFactory .newRecordInstance(Resource.class)); containerLaunchContext.getResource().setMemory(1024); - StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class); + StartContainerRequest startRequest = + recordFactory.newRecordInstance(StartContainerRequest.class); startRequest.setContainerLaunchContext(containerLaunchContext); containerManager.startContainer(startRequest); @@ -137,37 +211,6 @@ public void testKillContainersOnShutdown() throws IOException { ContainerStatus containerStatus = containerManager.getContainerStatus(request).getStatus(); Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState()); - - final int MAX_TRIES=20; - int numTries = 0; - while (!processStartFile.exists() && numTries < MAX_TRIES) { - try { - Thread.sleep(500); - } catch (InterruptedException ex) {ex.printStackTrace();} - numTries++; - } - - nm.stop(); - - // Now verify the contents of the file - // Script generates a message when it receives a sigterm - // so we look for that - BufferedReader reader = - new BufferedReader(new FileReader(processStartFile)); - - boolean foundSigTermMessage = false; - while (true) { - String line = reader.readLine(); - if (line == null) { - break; - } - if (line.contains("SIGTERM")) { - foundSigTermMessage = true; - break; - } - } - Assert.assertTrue("Did not find sigterm message", foundSigTermMessage); - reader.close(); } private ContainerId createContainerId() { @@ -226,4 +269,22 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, } }; } + + class TestNodeManager extends NodeManager { + NodeStatusUpdaterImpl nodeStatusUpdater; + public TestNodeManager (){ + super(); + } + + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + nodeStatusUpdater = mock(NodeStatusUpdaterImpl.class); + return nodeStatusUpdater; + } + + public NodeStatusUpdaterImpl getNodeStatusUpdater(){ + return nodeStatusUpdater; + } + } }