diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 82ac2c1..b49c167 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -31,6 +31,8 @@ import java.util.List; import com.google.common.base.Preconditions; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -63,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -112,6 +115,20 @@ private List zkAcl; private List zkAuths; + class ZKSyncOperationCallback implements AsyncCallback.VoidCallback { + public final CountDownLatch latch = new CountDownLatch(1); + @Override + public void processResult(int rc, String path, Object ctx){ + if (rc == 0) { + LOG.info("ZooKeeper sync operation succeeded. path: " + path); + latch.countDown(); + } else { + LOG.fatal("ZooKeeper sync operation failed. Waiting for session " + + "timeout. path: " + path); + } + } + } + /** * * ROOT_DIR_PATH @@ -225,6 +242,7 @@ public synchronized void initInternal(Configuration conf) throws Exception { znodeWorkingPath = conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH); + zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); @@ -298,6 +316,8 @@ public synchronized void startInternal() throws Exception { createRootDir(delegationTokensRootPath); createRootDir(dtSequenceNumberPath); createRootDir(amrmTokenSecretManagerRoot); + + syncInternal(zkRootNodePath); } private void createRootDir(final String rootPath) throws Exception { @@ -888,6 +908,7 @@ public synchronized void processWatchEvent(ZooKeeper zk, // call listener to reconnect LOG.info("ZKRMStateStore Session expired"); createConnection(); + syncInternal(event.getPath()); break; default: LOG.error("Unexpected Zookeeper" + @@ -905,6 +926,27 @@ String getNodePath(String root, String nodeName) { } /** + * Helper method to call ZK's sync() after calling createConnection(). + * Note that sync path is meaningless for now: + * http://mail-archives.apache.org/mod_mbox/zookeeper-user/201102.mbox/browser + * @param path path to sync, nullable value. If the path is null, + * zkRootNodePath is used to sync. + * @return true if ZK.sync() succeededs, false if ZK.sync() fails. + * @throws InterruptedException + */ + private boolean syncInternal(String path) throws InterruptedException { + ZKSyncOperationCallback cb = new ZKSyncOperationCallback(); + if (path != null) { + zkClient.sync(path, cb, null); + } else { + zkClient.sync(zkRootNodePath, cb, null); + } + boolean succeededToSync = cb.latch.await( + zkSessionTimeout, TimeUnit.MILLISECONDS); + return succeededToSync; + } + + /** * Helper method that creates fencing node, executes the passed operations, * and deletes the fencing node. */ @@ -1100,6 +1142,18 @@ private boolean shouldRetry(Code code) { switch (code) { case CONNECTIONLOSS: case OPERATIONTIMEOUT: + return true; + default: + break; + } + return false; + } + + private boolean shouldRetryWithNewConnection(Code code) { + // For fast recover, we choose to close currenct connection after + // SESSIONMOVED occurs. Latest state of a path is assured by a following + // zk.sync(path) operation. + switch (code) { case SESSIONEXPIRED: case SESSIONMOVED: return true; @@ -1132,12 +1186,33 @@ T runWithRetries() throws Exception { } LOG.info("Exception while executing a ZK operation.", ke); - if (shouldRetry(ke.code()) && ++retry < numRetries) { + retry++; + if (shouldRetry(ke.code()) && retry < numRetries) { LOG.info("Retrying operation on ZK. Retry no. " + retry); Thread.sleep(zkRetryInterval); - createConnection(); continue; } + if (shouldRetryWithNewConnection(ke.code()) && retry < numRetries) { + LOG.info("Retrying operation on ZK with new Connection. " + + "Retry no. " + retry); + Thread.sleep(zkRetryInterval); + createConnection(); + boolean succeededToSync = false; + try { + succeededToSync = syncInternal(ke.getPath()); + } catch (InterruptedException ie) { + LOG.info("Interrupted sync operation. Giving up!"); + throw ke; + } + if (succeededToSync) { + // continue the operation. + continue; + } else { + // Giving up since new connection without sync can occur an + // unexpected view from the client like YARN-3798. + LOG.info("Failed to sync with ZK new connection."); + } + } LOG.info("Maxed out ZK retries. Giving up!"); throw ke; }