diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java index 652c05f..785cbea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java @@ -24,5 +24,16 @@ */ public enum NodeAction { - NORMAL, RESYNC, SHUTDOWN + NORMAL, + + // Resync is used when RM has rebooted and the NM is not recognized + // Legacy resync command that is not work preserving, ie containers are killed + RESYNC, + + // Returned when the node manager is not allowed + SHUTDOWN, + + // Resync is used when RM has rebooted and the NM is not recognized + // Work preserving resync option that preserves containers + RESYNC_KEEPING_CONTAINERS, } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 4f5d168..884d811 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -28,6 +28,7 @@ enum NodeActionProto { NORMAL = 0; RESYNC = 1; SHUTDOWN = 2; + RESYNC_KEEPING_CONTAINERS = 3; } message NodeStatusProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 2292a0d..7c872b9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -84,7 +84,7 @@ private NMStateStoreService nmStore = null; private AtomicBoolean isStopping = new AtomicBoolean(false); - + public NodeManager() { super(NodeManager.class.getName()); } @@ -268,7 +268,7 @@ public void run() { }.start(); } - protected void resyncWithRM() { + protected void resyncWithRM(final boolean keepContainers) { //we do not want to block dispatcher thread here new Thread() { @Override @@ -276,8 +276,13 @@ public void run() { try { LOG.info("Notifying ContainerManager to block new container-requests"); containerManager.setBlockNewContainerRequests(true); - LOG.info("Cleaning up running containers on resync"); - containerManager.cleanupContainersOnNMResync(); + if (!keepContainers) + { + LOG.info("Cleaning up running containers on resync"); + containerManager.cleanupContainersOnNMResync(); + } else { + LOG.info("Preserving containers on resync"); + } ((NodeStatusUpdaterImpl) nodeStatusUpdater) .rebootNodeStatusUpdaterAndRegisterWithRM(); } catch (YarnRuntimeException e) { @@ -436,8 +441,11 @@ public void handle(NodeManagerEvent event) { case SHUTDOWN: shutDown(); break; + case RESYNC_KEEPING_CONTAINERS: + resyncWithRM(true); + break; case RESYNC: - resyncWithRM(); + resyncWithRM(false); break; default: LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring."); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java index f4d1caa..2fdb267 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java @@ -18,5 +18,5 @@ package org.apache.hadoop.yarn.server.nodemanager; public enum NodeManagerEventType { - SHUTDOWN, RESYNC + SHUTDOWN, RESYNC, RESYNC_KEEPING_CONTAINERS, } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 0b8f5b4..ad023a2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -525,16 +525,27 @@ public void run() { new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); break; } - if (response.getNodeAction() == NodeAction.RESYNC) { + if (response.getNodeAction() == NodeAction.RESYNC || response + .getNodeAction() == NodeAction.RESYNC_KEEPING_CONTAINERS) { + boolean keepContainers = response.getNodeAction() == NodeAction + .RESYNC_KEEPING_CONTAINERS; LOG.warn("Node is out of sync with ResourceManager," - + " hence resyncing."); + + " hence resyncing. ShouldKeepContainers set to " + + keepContainers); LOG.warn("Message from ResourceManager: " + response.getDiagnosticsMessage()); + // Invalidate the RMIdentifier while resync NodeStatusUpdaterImpl.this.rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER; - dispatcher.getEventHandler().handle( - new NodeManagerEvent(NodeManagerEventType.RESYNC)); + if (keepContainers) { + dispatcher.getEventHandler().handle( + new NodeManagerEvent(NodeManagerEventType + .RESYNC_KEEPING_CONTAINERS)); + } else { + dispatcher.getEventHandler().handle( + new NodeManagerEvent(NodeManagerEventType.RESYNC)); + } break; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index 3cbf3d3..98a677b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -58,7 +58,6 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; @@ -105,30 +104,53 @@ public void tearDown() throws IOException, InterruptedException { @Test public void testKillContainersOnResync() throws IOException, InterruptedException, YarnException { - NodeManager nm = new TestNodeManager1(); + NodeManager nm = new TestNodeManager1(false); + NodeManagerEvent resyncEvent = new NodeManagerEvent(NodeManagerEventType.RESYNC); + + testContainerPreservationOnResyncImpl(nm, resyncEvent); + } + + @SuppressWarnings("unchecked") + @Test + public void testPreserveContainersOnResyncKeepingContainers() throws + IOException, + InterruptedException, YarnException { + NodeManager nm = new TestNodeManager1(true); + NodeManagerEvent resyncKeepingContainersEvent = + new NodeManagerEvent(NodeManagerEventType.RESYNC_KEEPING_CONTAINERS); + + testContainerPreservationOnResyncImpl(nm, resyncKeepingContainersEvent); + } + + protected void testContainerPreservationOnResyncImpl(NodeManager nm, + NodeManagerEvent resyncEvent) + throws IOException, YarnException, InterruptedException { YarnConfiguration conf = createNMConfig(); - nm.init(conf); - nm.start(); - ContainerId cId = TestNodeManagerShutdown.createContainerId(); - TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir, - processStartFile); - - Assert.assertEquals(1, ((TestNodeManager1) nm).getNMRegistrationCount()); - nm.getNMDispatcher().getEventHandler(). - handle( new NodeManagerEvent(NodeManagerEventType.RESYNC)); try { - syncBarrier.await(); - } catch (BrokenBarrierException e) { + nm.init(conf); + nm.start(); + ContainerId cId = TestNodeManagerShutdown.createContainerId(); + TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir, + processStartFile); + + Assert.assertEquals(1, ((TestNodeManager1) nm).getNMRegistrationCount()); + nm.getNMDispatcher().getEventHandler().handle(resyncEvent); + try { + syncBarrier.await(); + } catch (BrokenBarrierException e) { + } + Assert.assertEquals(2, ((TestNodeManager1) nm).getNMRegistrationCount()); + // Only containers should be killed on resync, apps should lie around. That + + // way local resources for apps can be used beyond resync without + // relocalization + Assert.assertTrue(nm.getNMContext().getApplications() + .containsKey(cId.getApplicationAttemptId().getApplicationId())); + Assert.assertFalse(assertionFailedInThread.get()); + } + finally { + nm.stop(); } - Assert.assertEquals(2, ((TestNodeManager1) nm).getNMRegistrationCount()); - // Only containers should be killed on resync, apps should lie around. That - // way local resources for apps can be used beyond resync without - // relocalization - Assert.assertTrue(nm.getNMContext().getApplications() - .containsKey(cId.getApplicationAttemptId().getApplicationId())); - Assert.assertFalse(assertionFailedInThread.get()); - - nm.stop(); } // This test tests new container requests are blocked when NM starts from @@ -137,49 +159,96 @@ public void testKillContainersOnResync() throws IOException, @Test public void testBlockNewContainerRequestsOnStartAndResync() throws IOException, InterruptedException, YarnException { - NodeManager nm = new TestNodeManager2(); - YarnConfiguration conf = createNMConfig(); - nm.init(conf); - nm.start(); + NodeManager nm = new TestNodeManager2(false); + NodeManagerEvent resyncEvent = new NodeManagerEvent(NodeManagerEventType + .RESYNC); + + testBlockNewContainerRequestsOnStartAndResyncImpl(nm, resyncEvent); + } - // Start the container in running state - ContainerId cId = TestNodeManagerShutdown.createContainerId(); - TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir, - processStartFile); + @SuppressWarnings("unchecked") + @Test + public void testBlockNewContainerRequestsOnStartAndResyncKeepingContainers() + throws IOException, InterruptedException, YarnException { + NodeManager nm = new TestNodeManager2(true); + NodeManagerEvent resyncKeepingContainersEvent = + new NodeManagerEvent(NodeManagerEventType.RESYNC_KEEPING_CONTAINERS); - nm.getNMDispatcher().getEventHandler() - .handle(new NodeManagerEvent(NodeManagerEventType.RESYNC)); + testBlockNewContainerRequestsOnStartAndResyncImpl + (nm, resyncKeepingContainersEvent); + } + + protected void testBlockNewContainerRequestsOnStartAndResyncImpl( + NodeManager nm, NodeManagerEvent resyncEvent) + throws IOException, YarnException, InterruptedException { try { - syncBarrier.await(); - } catch (BrokenBarrierException e) { + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + nm.start(); + + // Start the container in running state + ContainerId cId = TestNodeManagerShutdown.createContainerId(); + TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir, + processStartFile); + + nm.getNMDispatcher().getEventHandler().handle(resyncEvent); + try { + syncBarrier.await(); + } catch (BrokenBarrierException e) { + } + Assert.assertFalse(assertionFailedInThread.get()); + } + finally { + nm.stop(); } - Assert.assertFalse(assertionFailedInThread.get()); - nm.stop(); } - + @SuppressWarnings("unchecked") @Test(timeout=10000) public void testNMshutdownWhenResyncThrowException() throws IOException, InterruptedException, YarnException { + NodeManagerEvent resyncEvent = new NodeManagerEvent(NodeManagerEventType + .RESYNC); + + testNMshutdownWhenResyncThrowExceptionImpl(resyncEvent); + } + + @SuppressWarnings("unchecked") + @Test(timeout=10000) + public void testNMshutdownWhenResyncKeepingContainersThrowException() + throws IOException, InterruptedException, YarnException { + NodeManagerEvent resyncKeepingContainersEvent = + new NodeManagerEvent(NodeManagerEventType.RESYNC_KEEPING_CONTAINERS); + + testNMshutdownWhenResyncThrowExceptionImpl(resyncKeepingContainersEvent); + } + + protected void testNMshutdownWhenResyncThrowExceptionImpl( + NodeManagerEvent resyncEvent) + throws IOException, InterruptedException, YarnException { NodeManager nm = new TestNodeManager3(); - YarnConfiguration conf = createNMConfig(); - nm.init(conf); - nm.start(); - Assert.assertEquals(1, ((TestNodeManager3) nm).getNMRegistrationCount()); - nm.getNMDispatcher().getEventHandler() - .handle(new NodeManagerEvent(NodeManagerEventType.RESYNC)); - - synchronized (isNMShutdownCalled) { - while (isNMShutdownCalled.get() == false) { - try { - isNMShutdownCalled.wait(); - } catch (InterruptedException e) { + try { + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + nm.start(); + Assert.assertEquals(1, ((TestNodeManager3) nm).getNMRegistrationCount()); + nm.getNMDispatcher().getEventHandler(). + handle(resyncEvent); + + synchronized (isNMShutdownCalled) { + while (isNMShutdownCalled.get() == false) { + try { + isNMShutdownCalled.wait(); + } catch (InterruptedException e) { + } } } + + Assert.assertTrue("NM shutdown not called.", isNMShutdownCalled.get()); + } + finally { + nm.stop(); } - - Assert.assertTrue("NM shutdown not called.",isNMShutdownCalled.get()); - nm.stop(); } @@ -188,6 +257,18 @@ public void testNMshutdownWhenResyncThrowException() throws IOException, // statuses again when it re-register with RM. @Test public void testNMSentContainerStatusOnResync() throws Exception { + testNMSentContainerStatusOnResyncImpl(NodeAction.RESYNC); + } + + @Test + public void testNMSentContainerStatusOnResyncKeepingContainers() + throws Exception { + testNMSentContainerStatusOnResyncImpl(NodeAction.RESYNC_KEEPING_CONTAINERS); + } + + protected void testNMSentContainerStatusOnResyncImpl( + final NodeAction resyncAction) throws Exception { + final ContainerStatus testCompleteContainer = TestNodeStatusUpdater.createContainerStatus(2, ContainerState.COMPLETE); final Container container = @@ -256,7 +337,7 @@ public NodeHeartbeatResponse nodeHeartbeat( // notify RESYNC on first heartbeat. return YarnServerBuilderUtils.newNodeHeartbeatResponse(1, - NodeAction.RESYNC, null, null, null, null, 1000L); + resyncAction, null, null, null, null, 1000L); } }; } @@ -264,15 +345,19 @@ public NodeHeartbeatResponse nodeHeartbeat( } }; YarnConfiguration conf = createNMConfig(); - nm.init(conf); - nm.start(); - try { - syncBarrier.await(); - } catch (BrokenBarrierException e) { + nm.init(conf); + nm.start(); + + try { + syncBarrier.await(); + } catch (BrokenBarrierException e) { + } + Assert.assertFalse(assertionFailedInThread.get()); + } + finally { + nm.stop(); } - Assert.assertFalse(assertionFailedInThread.get()); - nm.stop(); } // This can be used as a common base class for testing NM resync behavior. @@ -312,6 +397,11 @@ private YarnConfiguration createNMConfig() { class TestNodeManager1 extends NodeManager { private int registrationCount = 0; + private boolean containersShouldBePreserved; + + public TestNodeManager1(boolean containersShouldBePreserved) { + this.containersShouldBePreserved = containersShouldBePreserved; + } @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, @@ -343,21 +433,35 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() { .containermanager.container.Container> containers = getNMContext().getContainers(); try { - // ensure that containers are empty before restart nodeStatusUpdater - Assert.assertTrue(containers.isEmpty()); - super.rebootNodeStatusUpdaterAndRegisterWithRM(); - syncBarrier.await(); + try { + // ensure that containers are empty before restart nodeStatusUpdater + if (containersShouldBePreserved) { + Assert.assertFalse(containers.isEmpty()); + } else { + Assert.assertTrue(containers.isEmpty()); + } + super.rebootNodeStatusUpdaterAndRegisterWithRM(); + } + catch (AssertionError ae) { + ae.printStackTrace(); + assertionFailedInThread.set(true); + } + finally { + syncBarrier.await(); + } } catch (InterruptedException e) { } catch (BrokenBarrierException e) { - } catch (AssertionError ae) { - ae.printStackTrace(); - assertionFailedInThread.set(true); } } } } class TestNodeManager2 extends NodeManager { + private boolean containersShouldBePreserved; + + public TestNodeManager2(boolean containersShouldBePreserved) { + this.containersShouldBePreserved = containersShouldBePreserved; + } Thread launchContainersThread = null; @Override @@ -416,18 +520,25 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() { getNMContext().getContainers(); try { - // ensure that containers are empty before restart nodeStatusUpdater - Assert.assertTrue(containers.isEmpty()); - super.rebootNodeStatusUpdaterAndRegisterWithRM(); - // After this point new containers are free to be launched, except - // containers from previous RM - // Wait here so as to sync with the main test thread. - syncBarrier.await(); + try { + if (!containersShouldBePreserved) { + // ensure that containers are empty before restart nodeStatusUpdater + Assert.assertTrue(containers.isEmpty()); + } + super.rebootNodeStatusUpdaterAndRegisterWithRM(); + } + catch (AssertionError ae) { + ae.printStackTrace(); + assertionFailedInThread.set(true); + } + finally { + // After this point new containers are free to be launched, except + // containers from previous RM + // Wait here so as to sync with the main test thread. + syncBarrier.await(); + } } catch (InterruptedException e) { } catch (BrokenBarrierException e) { - } catch (AssertionError ae) { - ae.printStackTrace(); - assertionFailedInThread.set(true); } } }