diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 8ec3a5a..d4776bc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -229,6 +229,15 @@ public String getName() { return "NodeManager"; } + protected void shutDown() { + new Thread() { + @Override + public void run() { + NodeManager.this.stop(); + } + }.start(); + } + protected void resyncWithRM() { //we do not want to block dispatcher thread here new Thread() { @@ -265,6 +274,8 @@ protected void cleanupContainers(NodeManagerEventType eventType) { while (!containers.isEmpty() && System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) { try { + //To remove done containers in NM context + nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext(); Thread.sleep(1000); } catch (InterruptedException ex) { LOG.warn("Interrupted while sleeping on container kill on shutdown", @@ -276,7 +287,6 @@ protected void cleanupContainers(NodeManagerEventType eventType) { while (!containers.isEmpty()) { try { Thread.sleep(1000); - //to remove done containers from the map nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext(); } catch (InterruptedException ex) { LOG.warn("Interrupted while sleeping on container kill on resync", @@ -409,7 +419,7 @@ private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) { public void handle(NodeManagerEvent event) { switch (event.getType()) { case SHUTDOWN: - stop(); + shutDown(); break; case RESYNC: resyncWithRM(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index b0e71e9..8169677 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -385,7 +385,7 @@ public void run() { } if (response.getNodeAction() == NodeAction.RESYNC) { LOG.warn("Node is out of sync with ResourceManager," - + " hence rebooting."); + + " hence resyncing."); LOG.warn("Message from ResourceManager: " + response.getDiagnosticsMessage()); // Invalidate the RMIdentifier while resync @@ -418,6 +418,7 @@ public void run() { new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); throw new YarnRuntimeException(e); } catch (Throwable e) { + // TODO Better error handling. Thread can die with the rest of the // NM still running. LOG.error("Caught exception in status-updater", e); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 73bea03..97c81bc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -110,6 +112,7 @@ private NodeManager nm; private boolean containerStatusBackupSuccessfully = true; private List completedContainerStatusList = new ArrayList(); + private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false); @Before public void setUp() { @@ -121,6 +124,7 @@ public void tearDown() { this.registeredNodes.clear(); heartBeatID = 0; ServiceOperations.stop(nm); + assertionFailedInThread.set(false); DefaultMetricsSystem.shutdown(); } @@ -442,6 +446,13 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, protected void serviceStop() throws Exception { super.serviceStop(); isStopped = true; + ConcurrentMap containers = + getNMContext().getContainers(); + // ensure that containers are empty + if(!containers.isEmpty()) { + assertionFailedInThread.set(true); + } syncBarrier.await(10000, TimeUnit.MILLISECONDS); } } @@ -1095,7 +1106,7 @@ protected NMContext createNMContext( @Test(timeout = 200000) public void testNodeStatusUpdaterRetryAndNMShutdown() - throws InterruptedException { + throws Exception { final long connectionWaitSecs = 1; final long connectionRetryIntervalSecs = 1; YarnConfiguration conf = createNMConfig(); @@ -1104,14 +1115,29 @@ public void testNodeStatusUpdaterRetryAndNMShutdown() conf.setLong(YarnConfiguration .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, connectionRetryIntervalSecs); + conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000); CyclicBarrier syncBarrier = new CyclicBarrier(2); nm = new MyNodeManager2(syncBarrier, conf); nm.init(conf); nm.start(); + // start a container + ContainerId cId = TestNodeManagerShutdown.createContainerId(); + File basedirFile = + new File("target", TestNodeManagerResync.class.getName()); + File tmpDir = new File(basedirFile, "tmpDir"); + File processStartFile = new File(tmpDir, "start_file.txt") + .getAbsoluteFile(); + FileContext localFS = FileContext.getLocalFSFileContext(); + tmpDir.mkdirs(); + TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir, + processStartFile); + try { syncBarrier.await(10000, TimeUnit.MILLISECONDS); } catch (Exception e) { } + Assert.assertFalse("Containers not cleaned up when NM stopped", + assertionFailedInThread.get()); Assert.assertTrue(((MyNodeManager2) nm).isStopped); Assert.assertTrue("calculate heartBeatCount based on" + " connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index adc5984..aa8f120 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -253,7 +253,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); if (rmNode == null) { /* node does not exist */ - String message = "Node not found rebooting " + remoteNodeStatus.getNodeId(); + String message = "Node not found resyncing " + remoteNodeStatus.getNodeId(); LOG.info(message); resync.setDiagnosticsMessage(message); return resync;