diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/RMStateVersionIncompatibleException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/RMStateVersionIncompatibleException.java new file mode 100644 index 0000000..0e7acbe --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/RMStateVersionIncompatibleException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.exceptions; + +/** + * This exception is thrown by ResourceManager if it's loading an incompatible + * version of state from state store on recovery. + */ +public class RMStateVersionIncompatibleException extends YarnException { + + private static final long serialVersionUID = 1364408L; + + public RMStateVersionIncompatibleException(Throwable cause) { + super(cause); + } + + public RMStateVersionIncompatibleException(String message) { + super(message); + } + + public RMStateVersionIncompatibleException(String message, Throwable cause) { + super(message, cause); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 82a1f64..3c187f9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -457,6 +457,7 @@ protected void serviceStart() throws Exception { if(recoveryEnabled) { try { + rmStore.checkVersion(); RMState state = rmStore.loadState(); recover(state); } catch (Exception e) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 46a58fc..b9a9ae3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -63,7 +63,7 @@ public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class); - private static final String ROOT_DIR_NAME = "FSRMStateRoot"; + protected static final String ROOT_DIR_NAME = "FSRMStateRoot"; protected FileSystem fs; @@ -101,6 +101,35 @@ protected synchronized void closeInternal() throws Exception { } @Override + protected synchronized long loadVersionInfo() throws Exception { + Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE); + + if (fs.exists(versionNodePath)) { + FileStatus status = fs.getFileStatus(versionNodePath); + byte[] data = readFile(versionNodePath, status.getLen()); + ByteArrayInputStream byteIn = new ByteArrayInputStream(data); + DataInputStream dataIn = new DataInputStream(byteIn); + long version = VERSION_INFO; + try { + version = dataIn.readLong(); + } finally { + dataIn.close(); + } + return version; + } else { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + DataOutputStream dataOut = new DataOutputStream(os); + try { + dataOut.writeLong(VERSION_INFO); + updateFile(versionNodePath, os.toByteArray()); + } finally { + dataOut.close(); + } + return VERSION_INFO; + } + } + + @Override public synchronized RMState loadState() throws Exception { RMState rmState = new RMState(); // recover DelegationTokenSecretManager @@ -430,7 +459,8 @@ private void writeFile(Path outputPath, byte[] data) throws Exception { fs.rename(tempPath, outputPath); } - private void updateFile(Path outputPath, byte[] data) throws Exception { + @VisibleForTesting + public void updateFile(Path outputPath, byte[] data) throws Exception { if (fs.exists(outputPath)) { deleteFile(outputPath); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 495c292..935d818 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -43,12 +43,25 @@ public class MemoryRMStateStore extends RMStateStore { RMState state = new RMState(); - + private final static long DUMMY_VERSION_INFO = -1L; + private long versionInfo = DUMMY_VERSION_INFO; + @VisibleForTesting public RMState getState() { return state; } - + + @Override + protected long loadVersionInfo() throws Exception { + if (versionInfo == DUMMY_VERSION_INFO) { + // no version info exists. + versionInfo = VERSION_INFO; + return versionInfo; + } else { + return versionInfo; + } + } + @Override public synchronized RMState loadState() throws Exception { // return a copy of the state to allow for modification of the real state @@ -224,4 +237,13 @@ public synchronized void removeRMDTMasterKeyState(DelegationKey delegationKey) state.rmSecretManagerState.getMasterKeyState(); rmDTMasterKeyState.remove(delegationKey); } + + public void setVersionInfo(long versionInfo) { + this.versionInfo = versionInfo; + } + + + public long getCurrentVersion() { + return VERSION_INFO; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index c8ad1c4..cf16376 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -101,4 +101,9 @@ protected void updateApplicationAttemptStateInternal(String attemptId, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { // Do nothing } + + @Override + protected long loadVersionInfo() throws Exception { + return VERSION_INFO; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index b9724d2..0a94714 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.RMStateVersionIncompatibleException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; @@ -78,6 +79,8 @@ protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_"; protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX = "RMDTSequenceNumber_"; + protected static final String VERSION_NODE = "RMVersionNode"; + protected final long VERSION_INFO = 1L; public static final Log LOG = LogFactory.getLog(RMStateStore.class); @@ -304,7 +307,24 @@ protected void serviceStop() throws Exception { * after this */ protected abstract void closeInternal() throws Exception; - + + /** + * Derived classes load state's version using this method. + */ + protected abstract long loadVersionInfo() throws Exception; + + /** + * Check if the state's version is compatible with current version. + */ + public synchronized void checkVersion() throws Exception { + long version = loadVersionInfo(); + LOG.info("Loaded RM state version info: " + version); + if (version != VERSION_INFO) { + throw new RMStateVersionIncompatibleException( + "Loading an incompatible version of Resource Manager state."); + } + } + /** * Blocking API * The derived class must recover state from the store and return a new diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 45afb4e..14f558b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -33,7 +33,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; @@ -41,7 +40,6 @@ import org.apache.hadoop.util.ZKUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.client.RMHAServiceTarget; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -64,9 +62,9 @@ import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; import com.google.common.annotations.VisibleForTesting; -import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; @Private @Unstable @@ -74,7 +72,7 @@ public static final Log LOG = LogFactory.getLog(ZKRMStateStore.class); - private static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; + protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; private int numRetries; private String zkHostPort = null; @@ -302,6 +300,34 @@ protected synchronized void closeInternal() throws Exception { } @Override + protected synchronized long loadVersionInfo() throws Exception { + String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); + if (zkClient.exists(versionNodePath, true) != null) { + byte[] data = getDataWithRetries(versionNodePath, true); + ByteArrayInputStream byteIn = new ByteArrayInputStream(data); + DataInputStream dataIn = new DataInputStream(byteIn); + long version = VERSION_INFO; + try { + version = dataIn.readLong(); + } finally { + dataIn.close(); + } + return version; + } else { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + DataOutputStream dataOut = new DataOutputStream(os); + try { + dataOut.writeLong(VERSION_INFO); + createWithRetries(versionNodePath, os.toByteArray(), zkAcl, + CreateMode.PERSISTENT); + } finally { + dataOut.close(); + } + return VERSION_INFO; + } + } + + @Override public synchronized RMState loadState() throws Exception { RMState rmState = new RMState(); // recover DelegationTokenSecretManager diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index f87f689..9bd1f05 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1122,6 +1122,43 @@ protected void handleStoreEvent(RMStateStoreEvent event) { Assert.assertTrue(rmAppState.size() == NUM_APPS); } + @Test + public void testRMStateStoreVersion() { + // Default version. + MemoryRMStateStore memStore0 = new MemoryRMStateStore(); + memStore0.init(conf); + MockRM rm0 = new MockRM(conf, memStore0); + try { + rm0.start(); + } catch (Throwable t) { + Assert.fail("Using default version, should not fail."); + } + + // Use a compatible version. + MemoryRMStateStore memStore1 = new MemoryRMStateStore(); + memStore1.init(conf); + memStore1.setVersionInfo(memStore1.getCurrentVersion()); + MockRM rm1 = new MockRM(conf, memStore1); + try { + rm1.start(); + } catch (Throwable t) { + Assert.fail("Using compatible version, should not fail."); + } + + // Use an incompatible version. + MemoryRMStateStore memStore2 = new MemoryRMStateStore(); + memStore2.init(conf); + memStore2.setVersionInfo(12345); + MockRM rm2 = new MockRM(conf, memStore2); + try { + rm2.start(); + Assert.fail("Invalid version, should fail."); + } catch (Throwable t) { + Assert.assertTrue(t.getMessage().contains( + "Loading an incompatible version of Resource Manager state")); + } + } + public static class TestSecurityMockRM extends MockRM { public TestSecurityMockRM(Configuration conf, RMStateStore store) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 95c14bf..047e701 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.RMStateVersionIncompatibleException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; @@ -106,6 +107,8 @@ public EventHandler getEventHandler() { interface RMStateStoreHelper { RMStateStore getRMStateStore() throws Exception; boolean isFinalStateValid() throws Exception; + void writeVersion(long data) throws Exception; + long readVersion() throws Exception; } void waitNotify(TestDispatcher dispatcher) { @@ -379,4 +382,36 @@ public void testRMDTSecretManagerStateStore( appToken.setService(new Text("appToken service")); return appToken; } + + public void testCheckVersion(RMStateStoreHelper stateStoreHelper) + throws Exception { + RMStateStore store = stateStoreHelper.getRMStateStore(); + store.setRMDispatcher(new TestDispatcher()); + + try { + store.checkVersion(); + } catch (Throwable t) { + t.printStackTrace(); + Assert.fail("Using default version, should not fail."); + } + + long wrongVersion = 1234; + stateStoreHelper.writeVersion(wrongVersion); + Assert.assertEquals(wrongVersion, stateStoreHelper.readVersion()); + try { + store.checkVersion(); + Assert.fail("Invalid version, should fail."); + } catch (Throwable t) { + Assert.assertTrue(t instanceof RMStateVersionIncompatibleException); + } + + long rightVersion = 1; + stateStoreHelper.writeVersion(rightVersion); + Assert.assertEquals(rightVersion, stateStoreHelper.readVersion()); + try { + store.checkVersion(); + } catch (Throwable t) { + Assert.fail("Using correct version, should not fail."); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index a1a6eab..ebf5f67 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -19,6 +19,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; + import junit.framework.Assert; import org.apache.commons.logging.Log; @@ -42,7 +46,7 @@ class TestFSRMStateStoreTester implements RMStateStoreHelper { Path workingDirPathURI; - FileSystemRMStateStore store; + TestFileSystemRMStore store; MiniDFSCluster cluster; class TestFileSystemRMStore extends FileSystemRMStateStore { @@ -54,6 +58,15 @@ start(); Assert.assertNotNull(fs); } + + public Path getVersionNode() { + return new Path(new Path(workingDirPathURI, ROOT_DIR_NAME), VERSION_NODE); + } + + @Override + public long loadVersionInfo() throws Exception { + return super.loadVersionInfo(); + } } public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception { @@ -81,6 +94,20 @@ public boolean isFinalStateValid() throws Exception { FileStatus[] files = fs.listStatus(workingDirPathURI); return files.length == 1; } + + @Override + public void writeVersion(long data) throws Exception { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + DataOutputStream dataOut = new DataOutputStream(os); + dataOut.writeLong(data); + store.updateFile(store.getVersionNode(), os.toByteArray()); + dataOut.close(); + } + + @Override + public long readVersion() throws Exception { + return store.loadVersionInfo(); + } } @Test @@ -113,6 +140,7 @@ public void testFSRMStateStore() throws Exception { Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath .getFileSystem(conf).exists(tempAppAttemptFile)); testRMDTSecretManagerStateStore(fsTester); + testCheckVersion(fsTester); } finally { cluster.shutdown(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 7f07ddb..94e5697 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -54,7 +56,7 @@ class TestZKRMStateStoreTester implements RMStateStoreHelper { ZooKeeper client; - ZKRMStateStore store; + TestZKRMStateStoreInternal store; class TestZKRMStateStoreInternal extends ZKRMStateStore { @@ -69,6 +71,20 @@ public TestZKRMStateStoreInternal(Configuration conf, String workingZnode) public ZooKeeper getNewZooKeeper() throws IOException { return client; } + + public String getAppNode(String appId) { + return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + RM_APP_ROOT + "/" + + appId; + } + + public String getVersionNode() { + return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE; + } + + @Override + public long loadVersionInfo() throws Exception { + return super.loadVersionInfo(); + } } public RMStateStore getRMStateStore() throws Exception { @@ -86,6 +102,19 @@ public boolean isFinalStateValid() throws Exception { List nodes = client.getChildren(store.znodeWorkingPath, false); return nodes.size() == 1; } + + @Override + public void writeVersion(long data) throws Exception { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + DataOutputStream fsOut = new DataOutputStream(os); + fsOut.writeLong(data); + client.setData(store.getVersionNode(), os.toByteArray(), -1); + } + + @Override + public long readVersion() throws Exception { + return store.loadVersionInfo(); + } } @Test @@ -93,6 +122,7 @@ public void testZKRMStateStoreRealZK() throws Exception { TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); testRMAppStateStore(zkTester); testRMDTSecretManagerStateStore(zkTester); + testCheckVersion(zkTester); } private Configuration createHARMConf(