diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index f12ada7..70ccd8c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -116,12 +116,10 @@ private List zkAuths; class ZKSyncOperationCallback implements AsyncCallback.VoidCallback { - public final CountDownLatch latch = new CountDownLatch(1); @Override public void processResult(int rc, String path, Object ctx){ if (rc == Code.OK.intValue()) { LOG.info("ZooKeeper sync operation succeeded. path: " + path); - latch.countDown(); } else { LOG.fatal("ZooKeeper sync operation failed. Waiting for session " + "timeout. path: " + path); @@ -945,16 +943,20 @@ String getNodePath(String root, String nodeName) { * @return true if ZK.sync() succeededs, false if ZK.sync() fails. * @throws InterruptedException */ - private boolean syncInternal(String path) throws InterruptedException { - ZKSyncOperationCallback cb = new ZKSyncOperationCallback(); - if (path != null) { - zkClient.sync(path, cb, null); - } else { - zkClient.sync(zkRootNodePath, cb, null); + private void syncInternal(final String path) throws InterruptedException { + final ZKSyncOperationCallback cb = new ZKSyncOperationCallback(); + final String pathForSync = (path != null) ? path : zkRootNodePath; + try { + new ZKAction() { + @Override + Void run() throws KeeperException, InterruptedException { + zkClient.sync(pathForSync, cb, null); + return null; + } + }.runWithRetries(); + } catch (Exception e) { + LOG.fatal("sync failed."); } - boolean succeededToSync = cb.latch.await( - zkSessionTimeout, TimeUnit.MILLISECONDS); - return succeededToSync; } /** @@ -1211,22 +1213,8 @@ T runWithRetries() throws Exception { "Retry no. " + retry); Thread.sleep(zkRetryInterval); createConnection(); - boolean succeededToSync = false; - try { - succeededToSync = syncInternal(ke.getPath()); - } catch (InterruptedException ie) { - LOG.info("Interrupted sync operation. Giving up!"); - Thread.currentThread().interrupt(); - throw ke; - } - if (succeededToSync) { - // continue the operation. - continue; - } else { - // Giving up since new connection without sync can occur an - // unexpected view from the client like YARN-3798. - LOG.info("Failed to sync with ZK new connection."); - } + syncInternal(ke.getPath()); + continue; } LOG.info("Maxed out ZK retries. Giving up!"); throw ke;