diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 579fa77..3e6b887 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -364,7 +364,7 @@ protected synchronized void storeVersion() throws Exception { String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); byte[] data = ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); - if (zkClient.exists(versionNodePath, true) != null) { + if (existsWithRetries(versionNodePath, true)) { setDataWithRetries(versionNodePath, data, -1); } else { createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT); @@ -375,13 +375,16 @@ protected synchronized void storeVersion() throws Exception { protected synchronized RMStateVersion loadVersion() throws Exception { String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); - if (zkClient.exists(versionNodePath, true) != null) { + try { byte[] data = getDataWithRetries(versionNodePath, true); RMStateVersion version = new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data)); return version; + } catch (KeeperException.NoNodeException nne) { + // Version node doesn't exist + LOG.info(versionNodePath + " znode doesn't exist"); + return null; } - return null; } @Override @@ -811,6 +814,12 @@ private void doMultiWithRetries(final Op op) throws Exception { public void createWithRetries( final String path, final byte[] data, final List acl, final CreateMode mode) throws Exception { + String znodePath = path; + // Create parent znode if it doesn't exist (recursively) + String parentZnode = znodePath.substring(0, znodePath.lastIndexOf("/") - 1); + if (!existsWithRetries(parentZnode, true)) { + createWithRetries(parentZnode, null, acl, mode); + } doMultiWithRetries(Op.create(path, data, acl, mode)); } @@ -831,9 +840,23 @@ private void deleteWithRetries(final String path, final int version) @Unstable public void setDataWithRetries(final String path, final byte[] data, final int version) throws Exception { + if (!existsWithRetries(path, false)) { + createWithRetries(path, data, zkAcl, CreateMode.PERSISTENT); + } + // Now set the data to make sure the version is set to the expected value doMultiWithRetries(Op.setData(path, data, version)); } + private boolean existsWithRetries(final String path, final boolean watch) + throws Exception{ + return new ZKAction() { + @Override + public Boolean run() throws KeeperException, InterruptedException { + return (zkClient.exists(path, watch) != null); + } + }.runWithRetries(); + } + @VisibleForTesting @Private @Unstable diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java index 7e6e602..ad5313f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java @@ -152,6 +152,7 @@ public void testZKClientDisconnectAndReconnect() TestZKClient zkClientTester = new TestZKClient(); String path = "/test"; + String path2 = "/test2/test3"; YarnConfiguration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 100); ZKRMStateStore store = @@ -164,6 +165,7 @@ public void testZKClientDisconnectAndReconnect() CreateMode.PERSISTENT); store.getDataWithRetries(path, true); store.setDataWithRetries(path, "newBytes".getBytes(), 0); + store.setDataWithRetries(path2, "newBytes".getBytes(), 0); stopServer(); zkClientTester.watcher.waitForDisconnected(ZK_OP_WAIT_TIME); @@ -179,14 +181,17 @@ public void testZKClientDisconnectAndReconnect() startServer(); zkClientTester.watcher.waitForConnected(ZK_OP_WAIT_TIME); byte[] ret = null; + byte[] ret2 = null; try { ret = store.getDataWithRetries(path, true); + ret2 = store.getDataWithRetries(path2, true); } catch (Exception e) { String error = "ZKRMStateStore Session restore failed"; LOG.error(error, e); fail(error); } Assert.assertEquals("newBytes", new String(ret)); + Assert.assertEquals("newBytes", new String(ret2)); } @Test(timeout = 20000)