diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java index 4d8246e..652c05f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java @@ -24,5 +24,5 @@ */ public enum NodeAction { - NORMAL, REBOOT, SHUTDOWN + NORMAL, RESYNC, SHUTDOWN } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 89ec81c..7fa1fb7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -25,7 +25,7 @@ import "yarn_protos.proto"; enum NodeActionProto { NORMAL = 0; - REBOOT = 1; + RESYNC = 1; SHUTDOWN = 2; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 867a02d..a5d16c5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -81,6 +81,7 @@ private Context context; private AsyncDispatcher dispatcher; private ContainerManagerImpl containerManager; + private NodeStatusUpdater nodeStatusUpdater; private static CompositeServiceShutdownHook nodeManagerShutdownHook; private long waitForContainersOnShutdownMillis; @@ -163,7 +164,7 @@ public void init(Configuration conf) { addService(nodeHealthChecker); dirsHandler = nodeHealthChecker.getDiskHandler(); - NodeStatusUpdater nodeStatusUpdater = + nodeStatusUpdater = createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor(); @@ -214,35 +215,67 @@ public void stop() { if (isStopping.getAndSet(true)) { return; } - - cleanupContainers(); + + cleanupContainers(NodeManagerEventType.SHUTDOWN); super.stop(); DefaultMetricsSystem.shutdown(); } - + + protected void cleanupContainersOnResync() { + //we do not want to block dispatcher thread here + new Thread() { + @Override + public void run() { + cleanupContainers(NodeManagerEventType.RESYNC); + ((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater(); + } + }.start(); + } + @SuppressWarnings("unchecked") - protected void cleanupContainers() { + protected void cleanupContainers(NodeManagerEventType eventType) { Map containers = context.getContainers(); if (containers.isEmpty()) { return; } - LOG.info("Containers still running on shutdown: " + containers.keySet()); + LOG.info("Containers still running on " + eventType + " : " + + containers.keySet()); - List containerIds = new ArrayList(containers.keySet()); + List containerIds = + new ArrayList(containers.keySet()); dispatcher.getEventHandler().handle( new CMgrCompletedContainersEvent(containerIds, CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN)); LOG.info("Waiting for containers to be killed"); - long waitStartTime = System.currentTimeMillis(); - while (!containers.isEmpty() && - System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) { - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - LOG.warn("Interrupted while sleeping on container kill", ex); + switch (eventType) { + case SHUTDOWN: + long waitStartTime = System.currentTimeMillis(); + while (!containers.isEmpty() + && System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) { + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + LOG.warn("Interrupted while sleeping on container kill on shutdown", + ex); + } } + break; + case RESYNC: + while (!containers.isEmpty()) { + try { + Thread.sleep(1000); + //to remove done containers from the map + nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext(); + } catch (InterruptedException ex) { + LOG.warn("Interrupted while sleeping on container kill on resync", + ex); + } + } + break; + default: + LOG.warn("Invalid eventType: " + eventType); } // All containers killed @@ -342,9 +375,8 @@ public void handle(NodeManagerEvent event) { case SHUTDOWN: stop(); break; - case REBOOT: - stop(); - reboot(); + case RESYNC: + cleanupContainersOnResync(); break; default: LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring."); @@ -361,6 +393,11 @@ ContainerManagerImpl getContainerManager() { return containerManager; } + //For testing + Dispatcher getNMDispatcher(){ + return dispatcher; + } + @VisibleForTesting Context getNMContext() { return this.context; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java index d18cec6..f4d1caa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java @@ -18,5 +18,5 @@ package org.apache.hadoop.yarn.server.nodemanager; public enum NodeManagerEventType { - SHUTDOWN, REBOOT + SHUTDOWN, RESYNC } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java index f1e6ac3..41949e7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java @@ -18,9 +18,11 @@ package org.apache.hadoop.yarn.server.nodemanager; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.service.Service; public interface NodeStatusUpdater extends Service { void sendOutofBandHeartBeat(); + NodeStatus getNodeStatusAndUpdateContainersInContext(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index cca296c..e9583c2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -60,6 +60,8 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.service.AbstractService; +import com.google.common.annotations.VisibleForTesting; + public class NodeStatusUpdaterImpl extends AbstractService implements NodeStatusUpdater { @@ -91,6 +93,9 @@ private long rmConnectionRetryIntervalMS; private boolean waitForEver; + private Runnable statusUpdaterRunnable; + private Thread statusUpdater; + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { super(NodeStatusUpdaterImpl.class.getName()); @@ -169,6 +174,22 @@ public synchronized void stop() { this.isStopped = true; super.stop(); } + + protected void rebootNodeStatusUpdater() { + // Interrupt the updater. + this.isStopped = true; + + try { + statusUpdater.join(); + registerWithRM(); + statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); + this.isStopped = false; + statusUpdater.start(); + LOG.info("NodeStatusUpdater thread is reRegistered and restarted"); + } catch (Exception e) { + throw new AvroRuntimeException(e); + } + } private boolean isSecurityEnabled() { return UserGroupInformation.isSecurityEnabled(); @@ -188,7 +209,8 @@ protected ResourceTracker getRMClient() { conf); } - private void registerWithRM() throws YarnRemoteException { + @VisibleForTesting + protected void registerWithRM() throws YarnRemoteException { Configuration conf = getConfig(); rmConnectWaitMS = conf.getInt( @@ -312,7 +334,7 @@ private void registerWithRM() throws YarnRemoteException { return appList; } - private NodeStatus getNodeStatus() { + public NodeStatus getNodeStatusAndUpdateContainersInContext() { NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); nodeStatus.setNodeId(this.nodeId); @@ -387,7 +409,7 @@ public void sendOutofBandHeartBeat() { protected void startStatusUpdater() { - new Thread("Node Status Updater") { + statusUpdaterRunnable = new Runnable() { @Override @SuppressWarnings("unchecked") public void run() { @@ -398,7 +420,7 @@ public void run() { NodeHeartbeatResponse response = null; int rmRetryCount = 0; long waitStartTime = System.currentTimeMillis(); - NodeStatus nodeStatus = getNodeStatus(); + NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext(); nodeStatus.setResponseId(lastHeartBeatID); NodeHeartbeatRequest request = recordFactory @@ -453,11 +475,11 @@ public void run() { new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); break; } - if (response.getNodeAction() == NodeAction.REBOOT) { + if (response.getNodeAction() == NodeAction.RESYNC) { LOG.info("Node is out of sync with ResourceManager," + " hence rebooting."); dispatcher.getEventHandler().handle( - new NodeManagerEvent(NodeManagerEventType.REBOOT)); + new NodeManagerEvent(NodeManagerEventType.RESYNC)); break; } @@ -500,6 +522,9 @@ public void run() { } } } - }.start(); + }; + statusUpdater = + new Thread(statusUpdaterRunnable, "Node Status Updater"); + statusUpdater.start(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java index 10a85c7..9ac237b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java @@ -160,7 +160,10 @@ public void testClearLocalDirWhenNodeReboot() throws IOException { "container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) > 0); - nm.handle(new NodeManagerEvent(NodeManagerEventType.REBOOT)); + // restart the NodeManager + nm.stop(); + nm = new MyNodeManager(); + nm.start(); numTries = 0; while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer @@ -250,26 +253,6 @@ protected DeletionService createDeletionService(ContainerExecutor exec) { return delService; } - // mimic part of reboot process - @Override - public void handle(NodeManagerEvent event) { - switch (event.getType()) { - case SHUTDOWN: - this.stop(); - break; - case REBOOT: - this.stop(); - this.createNewMyNodeManager().start(); - break; - default: - LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring."); - } - } - - private MyNodeManager createNewMyNodeManager() { - return new MyNodeManager(); - } - private YarnConfiguration createNMConfig() { YarnConfiguration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java index f422617..72f3433 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java @@ -28,6 +28,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; import junit.framework.Assert; @@ -49,9 +52,12 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; import org.junit.Before; @@ -71,6 +77,7 @@ .getRecordFactory(null); static final String user = "nobody"; private FileContext localFS; + private CyclicBarrier syncBarrier = new CyclicBarrier(2); @Before public void setup() throws UnsupportedFileSystemException { @@ -91,11 +98,64 @@ public void testKillContainersOnShutdown() throws IOException { NodeManager nm = getNodeManager(); nm.init(createNMConfig()); nm.start(); + startContainers(nm); + final int MAX_TRIES=20; + int numTries = 0; + while (!processStartFile.exists() && numTries < MAX_TRIES) { + try { + Thread.sleep(500); + } catch (InterruptedException ex) {ex.printStackTrace();} + numTries++; + } + + nm.stop(); + + // Now verify the contents of the file + // Script generates a message when it receives a sigterm + // so we look for that + BufferedReader reader = + new BufferedReader(new FileReader(processStartFile)); + + boolean foundSigTermMessage = false; + while (true) { + String line = reader.readLine(); + if (line == null) { + break; + } + if (line.contains("SIGTERM")) { + foundSigTermMessage = true; + break; + } + } + Assert.assertTrue("Did not find sigterm message", foundSigTermMessage); + reader.close(); + } + + @SuppressWarnings("unchecked") + @Test + public void testKillContainersOnResync() throws IOException, InterruptedException { + NodeManager nm = new TestNodeManager(); + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + nm.start(); + startContainers(nm); + + assert ((TestNodeManager) nm).getNMRegistrationCount() == 1; + nm.getNMDispatcher().getEventHandler(). + handle( new NodeManagerEvent(NodeManagerEventType.RESYNC)); + try { + syncBarrier.await(); + } catch (BrokenBarrierException e) { + } + assert ((TestNodeManager) nm).getNMRegistrationCount() == 2; + } + + private void startContainers(NodeManager nm) throws IOException { ContainerManagerImpl containerManager = nm.getContainerManager(); File scriptFile = createUnhaltingScriptFile(); - ContainerLaunchContext containerLaunchContext = + ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); // Construct the Container-id @@ -127,7 +187,8 @@ public void testKillContainersOnShutdown() throws IOException { containerLaunchContext.setResource(recordFactory .newRecordInstance(Resource.class)); containerLaunchContext.getResource().setMemory(1024); - StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class); + StartContainerRequest startRequest = + recordFactory.newRecordInstance(StartContainerRequest.class); startRequest.setContainerLaunchContext(containerLaunchContext); containerManager.startContainer(startRequest); @@ -137,37 +198,6 @@ public void testKillContainersOnShutdown() throws IOException { ContainerStatus containerStatus = containerManager.getContainerStatus(request).getStatus(); Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState()); - - final int MAX_TRIES=20; - int numTries = 0; - while (!processStartFile.exists() && numTries < MAX_TRIES) { - try { - Thread.sleep(500); - } catch (InterruptedException ex) {ex.printStackTrace();} - numTries++; - } - - nm.stop(); - - // Now verify the contents of the file - // Script generates a message when it receives a sigterm - // so we look for that - BufferedReader reader = - new BufferedReader(new FileReader(processStartFile)); - - boolean foundSigTermMessage = false; - while (true) { - String line = reader.readLine(); - if (line == null) { - break; - } - if (line.contains("SIGTERM")) { - foundSigTermMessage = true; - break; - } - } - Assert.assertTrue("Did not find sigterm message", foundSigTermMessage); - reader.close(); } private ContainerId createContainerId() { @@ -226,4 +256,48 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, } }; } + + class TestNodeManager extends NodeManager { + + private int registrationCount = 0; + + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + return new TestNodeStatusUpdaterImpl(context, dispatcher, + healthChecker, metrics); + } + + public int getNMRegistrationCount() { + return registrationCount; + } + + class TestNodeStatusUpdaterImpl extends MockNodeStatusUpdater { + + public TestNodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + super(context, dispatcher, healthChecker, metrics); + } + + @Override + protected void registerWithRM() throws YarnRemoteException { + super.registerWithRM(); + registrationCount++; + } + + @Override + protected void rebootNodeStatusUpdater() { + ConcurrentMap containers = + getNMContext().getContainers(); + // ensure that containers are empty before restart nodeStatusUpdater + Assert.assertTrue(containers.isEmpty()); + super.rebootNodeStatusUpdater(); + try { + syncBarrier.await(); + } catch (InterruptedException e) { + } catch (BrokenBarrierException e) { + } + } + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index b0f3093..c06a54d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -99,7 +99,6 @@ private final List registeredNodes = new ArrayList(); private final Configuration conf = createNMConfig(); private NodeManager nm; - protected NodeManager rebootedNodeManager; private boolean containerStatusBackupSuccessfully = true; private List completedContainerStatusList = new ArrayList(); @@ -663,8 +662,8 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, } @Override - protected void cleanupContainers() { - super.cleanupContainers(); + protected void cleanupContainers(NodeManagerEventType eventType) { + super.cleanupContainers(NodeManagerEventType.SHUTDOWN); numCleanups.incrementAndGet(); } }; @@ -718,50 +717,6 @@ public void testNodeDecommision() throws Exception { } @Test - public void testNodeReboot() throws Exception { - nm = getNodeManager(NodeAction.REBOOT); - YarnConfiguration conf = createNMConfig(); - nm.init(conf); - Assert.assertEquals(STATE.INITED, nm.getServiceState()); - nm.start(); - - int waitCount = 0; - while (heartBeatID < 1 && waitCount++ != 20) { - Thread.sleep(500); - } - Assert.assertFalse(heartBeatID < 1); - - // NM takes a while to reach the STOPPED state. - waitCount = 0; - while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) { - LOG.info("Waiting for NM to stop.."); - Thread.sleep(1000); - } - Assert.assertEquals(STATE.STOPPED, nm.getServiceState()); - - waitCount = 0; - while (null == rebootedNodeManager && waitCount++ != 20) { - LOG.info("Waiting for NM to reinitialize.."); - Thread.sleep(1000); - } - - waitCount = 0; - while (rebootedNodeManager.getServiceState() != STATE.STARTED && waitCount++ != 20) { - LOG.info("Waiting for NM to start.."); - Thread.sleep(1000); - } - Assert.assertEquals(STATE.STARTED, rebootedNodeManager.getServiceState()); - - rebootedNodeManager.stop(); - waitCount = 0; - while (rebootedNodeManager.getServiceState() != STATE.STOPPED && waitCount++ != 20) { - LOG.info("Waiting for NM to stop.."); - Thread.sleep(1000); - } - Assert.assertEquals(STATE.STOPPED, rebootedNodeManager.getServiceState()); - } - - @Test public void testNMShutdownForRegistrationFailure() { nm = new NodeManager() { @@ -1108,12 +1063,6 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, myNodeStatusUpdater.resourceTracker = myResourceTracker2; return myNodeStatusUpdater; } - - @Override - NodeManager createNewNodeManager() { - rebootedNodeManager = getNodeManager(NodeAction.NORMAL); - return rebootedNodeManager; - } }; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 27f74d4..258c7dc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -73,13 +73,13 @@ private Server server; private InetSocketAddress resourceTrackerAddress; - private static final NodeHeartbeatResponse reboot = recordFactory + private static final NodeHeartbeatResponse resync = recordFactory .newRecordInstance(NodeHeartbeatResponse.class); private static final NodeHeartbeatResponse shutDown = recordFactory .newRecordInstance(NodeHeartbeatResponse.class); static { - reboot.setNodeAction(NodeAction.REBOOT); + resync.setNodeAction(NodeAction.RESYNC); shutDown.setNodeAction(NodeAction.SHUTDOWN); } @@ -220,7 +220,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) if (rmNode == null) { /* node does not exist */ LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId()); - return reboot; + return resync; } // Send ping @@ -250,7 +250,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // TODO: Just sending reboot is not enough. Think more. this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING)); - return reboot; + return resync; } // Heartbeat response diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 2057d8a..1b8550c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -220,9 +220,9 @@ public void testRMRestart() throws Exception { // NM should be rebooted on heartbeat, even first heartbeat for nm2 NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); - Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction()); + Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction()); hbResponse = nm2.nodeHeartbeat(true); - Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction()); + Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction()); // new NM to represent NM re-register nm1 = rm2.registerNode("h1:1234", 15120); @@ -230,9 +230,9 @@ public void testRMRestart() throws Exception { // verify no more reboot response sent hbResponse = nm1.nodeHeartbeat(true); - Assert.assertTrue(NodeAction.REBOOT != hbResponse.getNodeAction()); + Assert.assertTrue(NodeAction.RESYNC != hbResponse.getNodeAction()); hbResponse = nm2.nodeHeartbeat(true); - Assert.assertTrue(NodeAction.REBOOT != hbResponse.getNodeAction()); + Assert.assertTrue(NodeAction.RESYNC != hbResponse.getNodeAction()); // assert app1 attempt is saved attempt1 = loadedApp1.getCurrentAppAttempt(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 61ed065..af9d5d2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -282,7 +282,7 @@ public void testReboot() throws Exception { nodeHeartbeat = nm2.nodeHeartbeat( new HashMap>(), true, -100); - Assert.assertTrue(NodeAction.REBOOT.equals(nodeHeartbeat.getNodeAction())); + Assert.assertTrue(NodeAction.RESYNC.equals(nodeHeartbeat.getNodeAction())); checkRebootedNMCount(rm, ++initialMetricCount); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java index 984d7cd..1fd1b2c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java @@ -130,6 +130,6 @@ public void testRPCResponseId() throws IOException { nodeStatus.setResponseId(0); response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest); - Assert.assertTrue(NodeAction.REBOOT.equals(response.getNodeAction())); + Assert.assertTrue(NodeAction.RESYNC.equals(response.getNodeAction())); } } \ No newline at end of file