diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index 273a3b9..0440f1d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.ClientBaseWithFixes; import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -42,6 +43,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; +import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; +import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer; import org.junit.After; import org.junit.Assert; @@ -169,6 +173,7 @@ public void testExplicitFailover() verifyConnections(); } + @SuppressWarnings("unchecked") @Test public void testAutomaticFailover() throws YarnException, InterruptedException, IOException { @@ -186,6 +191,25 @@ public void testAutomaticFailover() failover(); verifyConnections(); + + // Make the current Active handle an RMFatalEvent, + // so it transitions to standby. + ResourceManager rm = cluster.getResourceManager( + cluster.getActiveRMIndex()); + RMFatalEvent event = + new RMFatalEvent(RMFatalEventType.STATE_STORE_FENCED, + "Fake RMFatalEvent"); + rm.getRMContext().getDispatcher().getEventHandler().handle(event); + int maxWaitingAttempts = 2000; + while (maxWaitingAttempts-- > 0 ) { + if (rm.getRMContext().getHAServiceState() == HAServiceState.STANDBY) { + break; + } + Thread.sleep(1); + } + Assert.assertFalse("RM didn't transition to Standby ", + maxWaitingAttempts == 0); + verifyConnections(); } @Test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 6d521d4..1d2f376 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ha.HAServiceProtocol; @@ -86,6 +87,7 @@ private String rmId; private boolean autoFailoverEnabled; + private EmbeddedElectorService embeddedElector; private Server server; private InetSocketAddress masterServiceAddress; @@ -106,7 +108,8 @@ public void serviceInit(Configuration conf) throws Exception { autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf); if (autoFailoverEnabled) { if (HAUtil.isAutomaticFailoverEmbedded(conf)) { - addIfService(createEmbeddedElectorService()); + embeddedElector = createEmbeddedElectorService(); + addIfService(embeddedElector); } } } @@ -181,6 +184,13 @@ protected EmbeddedElectorService createEmbeddedElectorService() { return new EmbeddedElectorService(rmContext); } + @InterfaceAudience.Private + void resetLeaderElection() { + if (embeddedElector != null) { + embeddedElector.resetLeaderElection(); + } + } + private UserGroupInformation checkAccess(String method) throws IOException { return RMServerUtils.verifyAccess(adminAcl, method, LOG); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java index 618f83d..0aa292f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java @@ -194,4 +194,9 @@ private boolean isParentZnodeSafe(String clusterId) } return true; } + + public void resetLeaderElection() { + elector.quitElection(false); + elector.joinElection(localActiveNodeInfo); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 054ec04..b914b1f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -664,6 +664,7 @@ public void handle(RMFatalEvent event) { // Transition to standby and reinit active services LOG.info("Transitioning RM to Standby mode"); rm.transitionToStandby(true); + rm.adminService.resetLeaderElection(); return; } catch (Exception e) { LOG.fatal("Failed to transition RM to Standby mode.");