diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 82ac2c1..d7394b2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -31,6 +31,8 @@ import java.util.List; import com.google.common.base.Preconditions; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -63,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -112,6 +115,14 @@ private List zkAcl; private List zkAuths; + class ZKSyncOperationCallback implements AsyncCallback.VoidCallback { + public final CountDownLatch latch = new CountDownLatch(1); + @Override public void processResult ( int rc, String path, Object ctx){ + LOG.info("ZooKeeper sync operation was done. path: " + path); + latch.countDown(); + } + } + /** * * ROOT_DIR_PATH @@ -1100,6 +1111,18 @@ private boolean shouldRetry(Code code) { switch (code) { case CONNECTIONLOSS: case OPERATIONTIMEOUT: + return true; + default: + break; + } + return false; + } + + private boolean shouldRetryWithNewConnection(Code code) { + // For fast recover, we choose to close currenct connection after + // SESSIONMOVED occurs. Latest state of a path is assured by a following + // zk.sync(path) operation. + switch (code) { case SESSIONEXPIRED: case SESSIONMOVED: return true; @@ -1135,9 +1158,33 @@ T runWithRetries() throws Exception { if (shouldRetry(ke.code()) && ++retry < numRetries) { LOG.info("Retrying operation on ZK. Retry no. " + retry); Thread.sleep(zkRetryInterval); - createConnection(); continue; } + if (shouldRetryWithNewConnection(ke.code()) && ++retry < numRetries) { + LOG.info("Retrying operation on ZK with new Connection. " + + "Retry no. " + retry); + Thread.sleep(zkRetryInterval); + createConnection(); + ZKSyncOperationCallback cb = new ZKSyncOperationCallback(); + zkClient.sync(ke.getPath(), cb, null); + boolean succeededToSync = false; + try { + succeededToSync = cb.latch.await( + zkRetryInterval * (numRetries - retry), + TimeUnit.MILLISECONDS); + } catch (InterruptedException ie) { + LOG.info("Interrupted. Giving up!"); + throw ke; + } + if (succeededToSync) { + // continue to operation. + continue; + } else { + // Giving up since new connection without sync can occur an + // unexpected view from the client like YARN-3798. + LOG.info("Failed to sync with ZK new connection."); + } + } LOG.info("Maxed out ZK retries. Giving up!"); throw ke; }