diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index e0840b6..e8d5579 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -1008,13 +1008,24 @@ void stopActiveServices() throws Exception { if (activeServices != null) { activeServices.stop(); activeServices = null; - rmContext.getRMNodes().clear(); - rmContext.getInactiveRMNodes().clear(); - rmContext.getRMApps().clear(); - ClusterMetrics.destroy(); - QueueMetrics.clearQueueMetrics(); } } + + void reinitialize(boolean initialize) throws Exception { + clearClusterMetrics(); + if (initialize) { + resetDispatcher(); + createAndInitActiveServices(); + } + } + + private void clearClusterMetrics(){ + rmContext.getRMNodes().clear(); + rmContext.getInactiveRMNodes().clear(); + rmContext.getRMApps().clear(); + ClusterMetrics.destroy(); + QueueMetrics.clearQueueMetrics(); + } @VisibleForTesting protected boolean areActiveServicesRunning() { @@ -1036,8 +1047,7 @@ public Void run() throws Exception { startActiveServices(); return null; } catch (Exception e) { - resetDispatcher(); - createAndInitActiveServices(); + reinitialize(true); throw e; } } @@ -1059,10 +1069,8 @@ synchronized void transitionToStandby(boolean initialize) if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE) { stopActiveServices(); - if (initialize) { - resetDispatcher(); - createAndInitActiveServices(); - } + reinitialize(initialize); + } rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY); LOG.info("Transitioned to standby state"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index c6d7d09..122eb60 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -513,6 +513,68 @@ public void run() { rm.stop(); } + @Test + public void testFailoverClearsRMContext() throws Exception { + configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + Configuration conf = new YarnConfiguration(configuration); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // 1. start RM + rm = new MockRM(conf, memStore); + rm.init(conf); + rm.start(); + + StateChangeRequestInfo requestInfo = + new StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + checkMonitorHealth(); + checkStandbyRMFunctionality(); + + // 2. Transition to active + rm.adminService.transitionToActive(requestInfo); + checkMonitorHealth(); + checkActiveRMFunctionality(); + verifyClusterMetrics(1, 1, 1, 1, 2048, 1); + assertEquals(1, rm.getRMContext().getRMNodes().size()); + assertEquals(1, rm.getRMContext().getRMApps().size()); + + // 3. Create new RM + rm = new MockRM(conf, memStore) { + @Override + protected ResourceTrackerService createResourceTrackerService() { + return new ResourceTrackerService(this.rmContext, + this.nodesListManager, this.nmLivelinessMonitor, + this.rmContext.getContainerTokenSecretManager(), + this.rmContext.getNMTokenSecretManager()) { + @Override + protected void serviceStart() throws Exception { + throw new Exception("ResourceTracker service failed"); + } + }; + } + }; + rm.init(conf); + rm.start(); + checkMonitorHealth(); + checkStandbyRMFunctionality(); + + // 4. Try Transition to active, throw exception + try { + rm.adminService.transitionToActive(requestInfo); + Assert.fail("Transitioned to Active should throw exception."); + } catch (Exception e) { + assertTrue("Error when transitioning to Active mode".contains(e + .getMessage())); + } + // 5. Clears the metrics + verifyClusterMetrics(0, 0, 0, 0, 0, 0); + assertEquals(0, rm.getRMContext().getRMNodes().size()); + assertEquals(0, rm.getRMContext().getRMApps().size()); + } + public void innerTestHAWithRMHostName(boolean includeBindHost) { //this is run two times, with and without a bind host configured if (includeBindHost) {