diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 15ac971..94a20b7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -110,6 +111,14 @@ private List zkAcl; private List zkAuths; + private AsyncCallback.VoidCallback syncVoidCallback = + new AsyncCallback.VoidCallback() { + @Override + public void processResult(int rc, String path, Object ctx) { + LOG.info("ZooKeeper sync operation was done. path: " + path); + } + }; + /** * * ROOT_DIR_PATH @@ -1106,7 +1115,6 @@ private boolean shouldRetry(Code code) { switch (code) { case CONNECTIONLOSS: case OPERATIONTIMEOUT: - case SESSIONEXPIRED: case SESSIONMOVED: return true; default: @@ -1115,6 +1123,16 @@ private boolean shouldRetry(Code code) { return false; } + private boolean shouldRetryWithNewConnection(Code code) { + switch (code) { + case SESSIONEXPIRED: + return true; + default: + break; + } + return false; + } + T runWithRetries() throws Exception { int retry = 0; while (true) { @@ -1141,7 +1159,14 @@ T runWithRetries() throws Exception { if (shouldRetry(ke.code()) && ++retry < numRetries) { LOG.info("Retrying operation on ZK. Retry no. " + retry); Thread.sleep(zkRetryInterval); + continue; + } + if (shouldRetryWithNewConnection(ke.code()) && ++retry < numRetries) { + LOG.info("Retrying operation on ZK with new Connection. " + + "Retry no. " + retry); + Thread.sleep(zkRetryInterval); createConnection(); + zkClient.sync(ke.getPath(), syncVoidCallback, null); continue; } LOG.info("Maxed out ZK retries. Giving up!");