diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index b5abecd..5276d44 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -224,6 +224,15 @@ public void stop() { DefaultMetricsSystem.shutdown(); } + protected void shutDown() { + new Thread() { + @Override + public void run() { + NodeManager.this.stop(); + } + }.start(); + } + protected void resyncWithRM() { //we do not want to block dispatcher thread here new Thread() { @@ -260,6 +269,7 @@ protected void cleanupContainers(NodeManagerEventType eventType) { while (!containers.isEmpty() && System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) { try { + nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext(); Thread.sleep(1000); } catch (InterruptedException ex) { LOG.warn("Interrupted while sleeping on container kill on shutdown", @@ -387,7 +397,7 @@ private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) { public void handle(NodeManagerEvent event) { switch (event.getType()) { case SHUTDOWN: - stop(); + shutDown(); break; case RESYNC: resyncWithRM(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 8b50fd8..058d792 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -496,7 +496,7 @@ public void run() { } if (response.getNodeAction() == NodeAction.RESYNC) { LOG.warn("Node is out of sync with ResourceManager," - + " hence rebooting."); + + " hence resyncing."); LOG.warn("Message from ResourceManager: " + response.getDiagnosticsMessage()); // Invalidate the RMIdentifier while resync @@ -529,6 +529,7 @@ public void run() { new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); throw e; } catch (Throwable e) { + // TODO Better error handling. Thread can die with the rest of the // NM still running. LOG.error("Caught exception in status-updater", e); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index f33d6f0..460cf01 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -42,7 +43,6 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.YarnException; -import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -378,6 +378,11 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, public void stop() { super.stop(); isStopped = true; + ConcurrentMap containers = + getNMContext().getContainers(); + // ensure that containers are empty + Assert.assertTrue(containers.isEmpty()); try { syncBarrier.await(); } catch (Exception e) { @@ -926,8 +931,7 @@ protected NMContext createNMContext( } @Test(timeout = 20000) - public void testNodeStatusUpdaterRetryAndNMShutdown() - throws InterruptedException { + public void testNodeStatusUpdaterRetryAndNMShutdown() throws Exception { final long connectionWaitSecs = 1; final long connectionRetryIntervalSecs = 1; YarnConfiguration conf = createNMConfig(); @@ -936,10 +940,23 @@ public void testNodeStatusUpdaterRetryAndNMShutdown() conf.setLong(YarnConfiguration .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, connectionRetryIntervalSecs); + conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000); CyclicBarrier syncBarrier = new CyclicBarrier(2); nm = new MyNodeManager2(syncBarrier); nm.init(conf); nm.start(); + // start a container + ContainerId cId = TestNodeManagerShutdown.createContainerId(); + File basedirFile = + new File("target", TestNodeManagerResync.class.getName()); + File tmpDir = new File(basedirFile, "tmpDir"); + File processStartFile = new File(tmpDir, "start_file.txt") + .getAbsoluteFile(); + FileContext localFS = FileContext.getLocalFSFileContext(); + tmpDir.mkdirs(); + TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir, + processStartFile); + try { syncBarrier.await(); } catch (Exception e) {