diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 867a02d..1fced58 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -81,6 +81,7 @@ private Context context; private AsyncDispatcher dispatcher; private ContainerManagerImpl containerManager; + private NodeStatusUpdater nodeStatusUpdater; private static CompositeServiceShutdownHook nodeManagerShutdownHook; private long waitForContainersOnShutdownMillis; @@ -163,7 +164,7 @@ public void init(Configuration conf) { addService(nodeHealthChecker); dirsHandler = nodeHealthChecker.getDiskHandler(); - NodeStatusUpdater nodeStatusUpdater = + nodeStatusUpdater = createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor(); @@ -214,14 +215,25 @@ public void stop() { if (isStopping.getAndSet(true)) { return; } - - cleanupContainers(); + + cleanupContainers(NodeManagerEventType.SHUTDOWN); super.stop(); DefaultMetricsSystem.shutdown(); } - + + protected void cleanupContainersOnResync() { + //we do not want to block dispatcher thread here + new Thread() { + @Override + public void run() { + cleanupContainers(NodeManagerEventType.RESYNC); + ((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater(); + } + }.start(); + } + @SuppressWarnings("unchecked") - protected void cleanupContainers() { + protected void cleanupContainers(NodeManagerEventType eventType) { Map containers = context.getContainers(); if (containers.isEmpty()) { return; @@ -235,13 +247,26 @@ protected void cleanupContainers() { LOG.info("Waiting for containers to be killed"); - long waitStartTime = System.currentTimeMillis(); - while (!containers.isEmpty() && - System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) { - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - LOG.warn("Interrupted while sleeping on container kill", ex); + switch (eventType) { + case SHUTDOWN: + long waitStartTime = System.currentTimeMillis(); + while (!containers.isEmpty() + && System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) { + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + LOG.warn("Interrupted while sleeping on container kill on shutdown", ex); + } + } + case RESYNC: + while (!containers.isEmpty()) { + try { + Thread.sleep(1000); + //to remove done containers from the map + ((NodeStatusUpdaterImpl) nodeStatusUpdater ).getNodeStatus(); + } catch (InterruptedException ex) { + LOG.warn("Interrupted while sleeping on container kill on resync", ex); + } } } @@ -342,9 +367,8 @@ public void handle(NodeManagerEvent event) { case SHUTDOWN: stop(); break; - case REBOOT: - stop(); - reboot(); + case RESYNC: + cleanupContainersOnResync(); break; default: LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring."); @@ -361,6 +385,11 @@ ContainerManagerImpl getContainerManager() { return containerManager; } + //For testing + Dispatcher getNMDispatcher(){ + return dispatcher; + } + @VisibleForTesting Context getNMContext() { return this.context; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java index d18cec6..f4d1caa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java @@ -18,5 +18,5 @@ package org.apache.hadoop.yarn.server.nodemanager; public enum NodeManagerEventType { - SHUTDOWN, REBOOT + SHUTDOWN, RESYNC } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index cca296c..3528971 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -60,6 +60,8 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.service.AbstractService; +import com.google.common.annotations.VisibleForTesting; + public class NodeStatusUpdaterImpl extends AbstractService implements NodeStatusUpdater { @@ -91,6 +93,9 @@ private long rmConnectionRetryIntervalMS; private boolean waitForEver; + private Runnable statusUpdaterRunnable; + private Thread statusUpdater; + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { super(NodeStatusUpdaterImpl.class.getName()); @@ -169,6 +174,22 @@ public synchronized void stop() { this.isStopped = true; super.stop(); } + + public void rebootNodeStatusUpdater() { + // Interrupt the updater. + this.isStopped = true; + + try { + statusUpdater.join(); + registerWithRM(); + statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); + this.isStopped = false; + statusUpdater.start(); + LOG.info("NodeStatusUpdater thread is reRegistered and restarted"); + } catch (Exception e) { + throw new AvroRuntimeException(e); + } + } private boolean isSecurityEnabled() { return UserGroupInformation.isSecurityEnabled(); @@ -188,7 +209,8 @@ protected ResourceTracker getRMClient() { conf); } - private void registerWithRM() throws YarnRemoteException { + @VisibleForTesting + protected void registerWithRM() throws YarnRemoteException { Configuration conf = getConfig(); rmConnectWaitMS = conf.getInt( @@ -311,8 +333,8 @@ private void registerWithRM() throws YarnRemoteException { } return appList; } - - private NodeStatus getNodeStatus() { + //change to protected because its called in cleanupContainers + protected NodeStatus getNodeStatus() { NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); nodeStatus.setNodeId(this.nodeId); @@ -387,7 +409,7 @@ public void sendOutofBandHeartBeat() { protected void startStatusUpdater() { - new Thread("Node Status Updater") { + statusUpdaterRunnable = new Runnable() { @Override @SuppressWarnings("unchecked") public void run() { @@ -395,6 +417,9 @@ public void run() { while (!isStopped) { // Send heartbeat try { + if(isStopped){ + break; + } NodeHeartbeatResponse response = null; int rmRetryCount = 0; long waitStartTime = System.currentTimeMillis(); @@ -457,7 +482,7 @@ public void run() { LOG.info("Node is out of sync with ResourceManager," + " hence rebooting."); dispatcher.getEventHandler().handle( - new NodeManagerEvent(NodeManagerEventType.REBOOT)); + new NodeManagerEvent(NodeManagerEventType.RESYNC)); break; } @@ -500,6 +525,9 @@ public void run() { } } } - }.start(); + }; + statusUpdater = + new Thread(statusUpdaterRunnable, "Node Status Updater"); + statusUpdater.start(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java index 10a85c7..801f594 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java @@ -160,7 +160,7 @@ public void testClearLocalDirWhenNodeReboot() throws IOException { "container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) > 0); - nm.handle(new NodeManagerEvent(NodeManagerEventType.REBOOT)); + nm.handle(new NodeManagerEvent(NodeManagerEventType.RESYNC)); numTries = 0; while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer @@ -257,7 +257,7 @@ public void handle(NodeManagerEvent event) { case SHUTDOWN: this.stop(); break; - case REBOOT: + case RESYNC: this.stop(); this.createNewMyNodeManager().start(); break; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java index f422617..98de564 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java @@ -28,6 +28,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; import junit.framework.Assert; @@ -49,9 +52,12 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; import org.junit.Before; @@ -71,7 +77,7 @@ .getRecordFactory(null); static final String user = "nobody"; private FileContext localFS; - + private CyclicBarrier syncBarrier = new CyclicBarrier(2); @Before public void setup() throws UnsupportedFileSystemException { localFS = FileContext.getLocalFSFileContext(); @@ -91,11 +97,64 @@ public void testKillContainersOnShutdown() throws IOException { NodeManager nm = getNodeManager(); nm.init(createNMConfig()); nm.start(); + startContainers(nm); + + final int MAX_TRIES=20; + int numTries = 0; + while (!processStartFile.exists() && numTries < MAX_TRIES) { + try { + Thread.sleep(500); + } catch (InterruptedException ex) {ex.printStackTrace();} + numTries++; + } + + nm.stop(); + // Now verify the contents of the file + // Script generates a message when it receives a sigterm + // so we look for that + BufferedReader reader = + new BufferedReader(new FileReader(processStartFile)); + + boolean foundSigTermMessage = false; + while (true) { + String line = reader.readLine(); + if (line == null) { + break; + } + if (line.contains("SIGTERM")) { + foundSigTermMessage = true; + break; + } + } + Assert.assertTrue("Did not find sigterm message", foundSigTermMessage); + reader.close(); + } + + @SuppressWarnings("unchecked") + @Test + public void testKillContainersOnResync() throws IOException, InterruptedException { + NodeManager nm = new TestNodeManager(); + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + nm.start(); + startContainers(nm); + + assert ((TestNodeManager) nm).getRegCount() == 1; + nm.getNMDispatcher().getEventHandler(). + handle( new NodeManagerEvent(NodeManagerEventType.RESYNC)); + try { + syncBarrier.await(); + } catch (BrokenBarrierException e) { + } + assert ((TestNodeManager) nm).getRegCount() == 2; + } + + private void startContainers(NodeManager nm) throws IOException { ContainerManagerImpl containerManager = nm.getContainerManager(); File scriptFile = createUnhaltingScriptFile(); - ContainerLaunchContext containerLaunchContext = + ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); // Construct the Container-id @@ -127,7 +186,8 @@ public void testKillContainersOnShutdown() throws IOException { containerLaunchContext.setResource(recordFactory .newRecordInstance(Resource.class)); containerLaunchContext.getResource().setMemory(1024); - StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class); + StartContainerRequest startRequest = + recordFactory.newRecordInstance(StartContainerRequest.class); startRequest.setContainerLaunchContext(containerLaunchContext); containerManager.startContainer(startRequest); @@ -137,37 +197,6 @@ public void testKillContainersOnShutdown() throws IOException { ContainerStatus containerStatus = containerManager.getContainerStatus(request).getStatus(); Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState()); - - final int MAX_TRIES=20; - int numTries = 0; - while (!processStartFile.exists() && numTries < MAX_TRIES) { - try { - Thread.sleep(500); - } catch (InterruptedException ex) {ex.printStackTrace();} - numTries++; - } - - nm.stop(); - - // Now verify the contents of the file - // Script generates a message when it receives a sigterm - // so we look for that - BufferedReader reader = - new BufferedReader(new FileReader(processStartFile)); - - boolean foundSigTermMessage = false; - while (true) { - String line = reader.readLine(); - if (line == null) { - break; - } - if (line.contains("SIGTERM")) { - foundSigTermMessage = true; - break; - } - } - Assert.assertTrue("Did not find sigterm message", foundSigTermMessage); - reader.close(); } private ContainerId createContainerId() { @@ -226,4 +255,51 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, } }; } + + class TestNodeManager extends NodeManager { + + private int regCount = 0; + public TestNodeManager (){ + super(); + } + + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + return new TestNodeStatusUpdaterImpl(context, dispatcher, + healthChecker, metrics); + } + + public int getRegCount() { + return regCount; + } + + class TestNodeStatusUpdaterImpl extends MockNodeStatusUpdater { + + public TestNodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + super(context, dispatcher, healthChecker, metrics); + } + + @Override + public void registerWithRM() throws YarnRemoteException { + super.registerWithRM(); + regCount++; + } + + @Override + public void rebootNodeStatusUpdater() { + ConcurrentMap containers + = getNMContext().getContainers(); + //ensure that containers are empty before restart nodeStatusUpdater + Assert.assertTrue(containers.isEmpty()); + super.rebootNodeStatusUpdater(); + try { + syncBarrier.await(); + } catch (InterruptedException e) { + } catch (BrokenBarrierException e) { + } + } + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index b0f3093..8452ef9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -663,8 +663,8 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, } @Override - protected void cleanupContainers() { - super.cleanupContainers(); + protected void cleanupContainers(NodeManagerEventType eventType) { + super.cleanupContainers(NodeManagerEventType.SHUTDOWN); numCleanups.incrementAndGet(); } }; @@ -718,50 +718,6 @@ public void testNodeDecommision() throws Exception { } @Test - public void testNodeReboot() throws Exception { - nm = getNodeManager(NodeAction.REBOOT); - YarnConfiguration conf = createNMConfig(); - nm.init(conf); - Assert.assertEquals(STATE.INITED, nm.getServiceState()); - nm.start(); - - int waitCount = 0; - while (heartBeatID < 1 && waitCount++ != 20) { - Thread.sleep(500); - } - Assert.assertFalse(heartBeatID < 1); - - // NM takes a while to reach the STOPPED state. - waitCount = 0; - while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) { - LOG.info("Waiting for NM to stop.."); - Thread.sleep(1000); - } - Assert.assertEquals(STATE.STOPPED, nm.getServiceState()); - - waitCount = 0; - while (null == rebootedNodeManager && waitCount++ != 20) { - LOG.info("Waiting for NM to reinitialize.."); - Thread.sleep(1000); - } - - waitCount = 0; - while (rebootedNodeManager.getServiceState() != STATE.STARTED && waitCount++ != 20) { - LOG.info("Waiting for NM to start.."); - Thread.sleep(1000); - } - Assert.assertEquals(STATE.STARTED, rebootedNodeManager.getServiceState()); - - rebootedNodeManager.stop(); - waitCount = 0; - while (rebootedNodeManager.getServiceState() != STATE.STOPPED && waitCount++ != 20) { - LOG.info("Waiting for NM to stop.."); - Thread.sleep(1000); - } - Assert.assertEquals(STATE.STOPPED, rebootedNodeManager.getServiceState()); - } - - @Test public void testNMShutdownForRegistrationFailure() { nm = new NodeManager() {