diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java index 72327e8..e47a263 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java @@ -39,6 +39,8 @@ import java.io.IOException; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; @InterfaceAudience.Private @InterfaceStability.Unstable @@ -54,6 +56,9 @@ private byte[] localActiveNodeInfo; private ActiveStandbyElector elector; + private long zkSessionTimeout; + private Timer zkDisconnectTimer; + private final Object zkDisconnectLock = new Object(); EmbeddedElectorService(RMContext rmContext) { super(EmbeddedElectorService.class.getName()); @@ -80,7 +85,7 @@ protected void serviceInit(Configuration conf) YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH); String electionZNode = zkBasePath + "/" + clusterId; - long zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS, + zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS, YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); List zkAcls = RMZKUtils.getZKAcls(conf); @@ -123,6 +128,8 @@ protected void serviceStop() throws Exception { @Override public void becomeActive() throws ServiceFailedException { + cancelDisconnectTimer(); + try { rmContext.getRMAdminService().transitionToActive(req); } catch (Exception e) { @@ -132,6 +139,8 @@ public void becomeActive() throws ServiceFailedException { @Override public void becomeStandby() { + cancelDisconnectTimer(); + try { rmContext.getRMAdminService().transitionToStandby(req); } catch (Exception e) { @@ -139,13 +148,44 @@ public void becomeStandby() { } } + /** + * Stop the disconnect timer. Any running tasks will be allowed to complete. + */ + private void cancelDisconnectTimer() { + synchronized (zkDisconnectLock) { + if (zkDisconnectTimer != null) { + zkDisconnectTimer.cancel(); + zkDisconnectTimer = null; + } + } + } + + /** + * When the ZK client loses contact with ZK, this method will be called to + * allow the RM to react. Because the loss of connection can be noticed + * before the session timeout happens, it is undesirable to transition + * immediately. Instead the method starts a timer that will wait + * {@link YarnConfiguration#RM_ZK_TIMEOUT_MS} milliseconds before + * initiating the transition into standby state. + */ @Override public void enterNeutralMode() { - /** - * Possibly due to transient connection issues. Do nothing. - * TODO: Might want to keep track of how long in this state and transition - * to standby. - */ + LOG.warn("Lost contact with Zookeeper. Transitioning to standby in " + + zkSessionTimeout + " ms if connection is not reestablished."); + + // If we've just become disconnected, start a timer. When the time's up, + // we'll transition to standby. + synchronized (zkDisconnectLock) { + if (zkDisconnectTimer == null) { + zkDisconnectTimer = new Timer("Zookeeper disconnect timer"); + zkDisconnectTimer.schedule(new TimerTask() { + @Override + public void run() { + becomeStandby(); + } + }, zkSessionTimeout); + } + } } @SuppressWarnings(value = "unchecked")