diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index 273a3b9..9b403e1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.ClientBaseWithFixes; import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -42,6 +43,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; +import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; +import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer; import org.junit.After; import org.junit.Assert; @@ -186,6 +190,30 @@ public void testAutomaticFailover() failover(); verifyConnections(); + + // create a fake RMFatalEvent. + // The current Active RM transit to Standby after + // handles this RMFatalEvent. + ResourceManager rm = cluster.getResourceManager( + cluster.getActiveRMIndex()); + RMFatalEvent event = + new RMFatalEvent(RMFatalEventType.STATE_STORE_FENCED, + "fake RMFatalEvent"); + rm.getRMContext().getDispatcher().getEventHandler().handle(event); + int maxWaittingAttempt = 20; + while (maxWaittingAttempt -- > 0) { + if (rm.getRMContext().getHAServiceState() == HAServiceState.STANDBY) { + break; + } + try { + Thread.sleep(10); + } catch (Exception e) { + // Do nothing + } + } + Assert.assertEquals(rm.getRMContext().getHAServiceState(), + HAServiceState.STANDBY); + verifyConnections(); } @Test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index c5b2651..7a245b3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ha.HAServiceProtocol; @@ -86,6 +87,7 @@ private String rmId; private boolean autoFailoverEnabled; + private EmbeddedElectorService embeddedElector; private Server server; private InetSocketAddress masterServiceAddress; @@ -106,7 +108,8 @@ public synchronized void serviceInit(Configuration conf) throws Exception { autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf); if (autoFailoverEnabled) { if (HAUtil.isAutomaticFailoverEmbedded(conf)) { - addIfService(createEmbeddedElectorService()); + embeddedElector = createEmbeddedElectorService(); + addIfService(embeddedElector); } } } @@ -181,6 +184,13 @@ protected EmbeddedElectorService createEmbeddedElectorService() { return new EmbeddedElectorService(rmContext); } + @InterfaceAudience.Private + synchronized void resetLeaderElection() { + if (embeddedElector != null) { + embeddedElector.resetLeaderElection(); + } + } + private UserGroupInformation checkAccess(String method) throws IOException { return RMServerUtils.verifyAccess(adminAcl, method, LOG); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java index 3cd986c..321c520 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java @@ -194,4 +194,9 @@ private synchronized boolean isParentZnodeSafe(String clusterId) } return true; } + + public synchronized void resetLeaderElection() { + elector.quitElection(false); + elector.joinElection(localActiveNodeInfo); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 054ec04..b914b1f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -664,6 +664,7 @@ public void handle(RMFatalEvent event) { // Transition to standby and reinit active services LOG.info("Transitioning RM to Standby mode"); rm.transitionToStandby(true); + rm.adminService.resetLeaderElection(); return; } catch (Exception e) { LOG.fatal("Failed to transition RM to Standby mode.");