diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index d60e8ad..cc25be7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; @@ -73,7 +74,9 @@ protected FileSystem fs; private Path rootDirPath; - private Path rmDTSecretManagerRoot; + @Private + @VisibleForTesting + Path rmDTSecretManagerRoot; private Path rmAppRoot; private Path dtSequenceNumberPath = null; @@ -157,6 +160,7 @@ private void loadRMAppState(RMState rmState) throws Exception { new ArrayList(); for (FileStatus appDir : fs.listStatus(rmAppRoot)) { + checkAndResumeUpdateOperation(appDir.getPath()); for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) { assert childNodeStatus.isFile(); String childNodeName = childNodeStatus.getPath().getName(); @@ -250,7 +254,29 @@ private boolean checkAndRemovePartialRecord(Path record) throws IOException { return false; } + private void checkAndResumeUpdateOperation(Path path) throws Exception { + // Before loading the state information, check whether .new file exists. + // If it does, the prior updateFile is failed on half way. We need to + // complete replacing the old file first. + FileStatus[] newChildNodes = + fs.listStatus(path, new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().endsWith(".new"); + } + }); + for(FileStatus newChildNodeStatus : newChildNodes) { + assert newChildNodeStatus.isFile(); + String newChildNodeName = newChildNodeStatus.getPath().getName(); + String childNodeName = newChildNodeName.substring( + 0, newChildNodeName.length() - ".new".length()); + Path childNodePath = + new Path(newChildNodeStatus.getPath().getParent(), childNodeName); + replaceFile(newChildNodeStatus.getPath(), childNodePath); + } + } private void loadRMDTSecretManagerState(RMState rmState) throws Exception { + checkAndResumeUpdateOperation(rmDTSecretManagerRoot); FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot); for(FileStatus childNodeStatus : childNodes) { @@ -380,15 +406,44 @@ public synchronized void removeApplicationStateInternal(ApplicationState appStat public synchronized void storeRMDelegationTokenAndSequenceNumberState( RMDelegationTokenIdentifier identifier, Long renewDate, int latestSequenceNumber) throws Exception { + storeOrUpdateRMDelegationTokenAndSequenceNumberState( + identifier, renewDate,latestSequenceNumber, false); + } + + @Override + public synchronized void removeRMDelegationTokenState( + RMDelegationTokenIdentifier identifier) throws Exception { + Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, + DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); + LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber()); + deleteFile(nodeCreatePath); + } + + @Override + protected void updateRMDelegationTokenAndSequenceNumberInternal( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception { + storeOrUpdateRMDelegationTokenAndSequenceNumberState( + rmDTIdentifier, renewDate,latestSequenceNumber, true); + } + + private void storeOrUpdateRMDelegationTokenAndSequenceNumberState( + RMDelegationTokenIdentifier identifier, Long renewDate, + int latestSequenceNumber, boolean isUpdate) throws Exception { Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); ByteArrayOutputStream os = new ByteArrayOutputStream(); DataOutputStream fsOut = new DataOutputStream(os); - LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber()); identifier.write(fsOut); fsOut.writeLong(renewDate); - writeFile(nodeCreatePath, os.toByteArray()); + if (isUpdate) { + LOG.info("Updating RMDelegationToken_" + identifier.getSequenceNumber()); + updateFile(nodeCreatePath, os.toByteArray()); + } else { + LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber()); + writeFile(nodeCreatePath, os.toByteArray()); + } fsOut.close(); // store sequence number @@ -409,15 +464,6 @@ public synchronized void storeRMDelegationTokenAndSequenceNumberState( } @Override - public synchronized void removeRMDelegationTokenState( - RMDelegationTokenIdentifier identifier) throws Exception { - Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, - DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); - LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber()); - deleteFile(nodeCreatePath); - } - - @Override public synchronized void storeRMDTMasterKeyState(DelegationKey masterKey) throws Exception { Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, @@ -477,14 +523,28 @@ private void writeFile(Path outputPath, byte[] data) throws Exception { fs.rename(tempPath, outputPath); } + /* + * In order to make this update atomic as a part of write we will first write + * data to .new file and then rename it. Here we are assuming that rename is + * atomic for underlying file system. + */ protected void updateFile(Path outputPath, byte[] data) throws Exception { - if (fs.exists(outputPath)) { - deleteFile(outputPath); + Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new"); + // use writeFile to make sure .new file is created atomically + writeFile(newPath, data); + replaceFile(newPath, outputPath); + } + + protected void replaceFile(Path srcPath, Path dstPath) throws Exception { + if (fs.exists(dstPath)) { + deleteFile(dstPath); } - writeFile(outputPath, data); + fs.rename(srcPath, dstPath); } - private boolean renameFile(Path src, Path dst) throws Exception { + @Private + @VisibleForTesting + boolean renameFile(Path src, Path dst) throws Exception { return fs.rename(src, dst); } @@ -492,7 +552,10 @@ private boolean createFile(Path newFile) throws Exception { return fs.createNewFile(newFile); } - private Path getNodePath(Path root, String nodeName) { + @Private + @VisibleForTesting + Path getNodePath(Path root, String nodeName) { return new Path(root, nodeName); } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 5a20ff2..0eb5a3d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; -import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; @@ -202,6 +201,15 @@ public synchronized void removeRMDelegationTokenState( } @Override + protected void updateRMDelegationTokenAndSequenceNumberInternal( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception { + removeRMDelegationTokenState(rmDTIdentifier); + storeRMDelegationTokenAndSequenceNumberState( + rmDTIdentifier, renewDate, latestSequenceNumber); + } + + @Override public synchronized void storeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception { Set rmDTMasterKeyState = @@ -239,4 +247,5 @@ protected void storeVersion() throws Exception { protected RMStateVersion getCurrentVersion() { return null; } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index af28a01..a12099f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -84,6 +84,13 @@ public void removeRMDelegationTokenState(RMDelegationTokenIdentifier rmDTIdentif } @Override + protected void updateRMDelegationTokenAndSequenceNumberInternal( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception { + // Do nothing + } + + @Override public void storeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception { // Do nothing } @@ -125,4 +132,5 @@ protected RMStateVersion getCurrentVersion() { // Do nothing return null; } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 55f0b65..32a06c4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -478,6 +478,30 @@ protected abstract void removeRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier) throws Exception; /** + * RMDTSecretManager call this to update the state of a delegation token + * and sequence number + */ + public synchronized void updateRMDelegationTokenAndSequenceNumber( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) { + try { + updateRMDelegationTokenAndSequenceNumberInternal(rmDTIdentifier, renewDate, + latestSequenceNumber); + } catch (Exception e) { + notifyStoreOperationFailed(e); + } + } + + /** + * Blocking API + * Derived classes must implement this method to update the state of + * RMDelegationToken and sequence number + */ + protected abstract void updateRMDelegationTokenAndSequenceNumberInternal( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception; + + /** * RMDTSecretManager call this to store the state of a master key */ public synchronized void storeRMDTMasterKey(DelegationKey delegationKey) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index abc1e4c..7cfa770 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -629,6 +629,39 @@ protected synchronized void storeRMDelegationTokenAndSequenceNumberState( RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, int latestSequenceNumber) throws Exception { ArrayList opList = new ArrayList(); + addStoreOrUpdateOps( + opList, rmDTIdentifier, renewDate, latestSequenceNumber, false); + doMultiWithRetries(opList); + } + + @Override + protected synchronized void removeRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { + ArrayList opList = new ArrayList(); + addRemoveOp(opList, rmDTIdentifier); + doMultiWithRetries(opList); + } + + @Override + protected void updateRMDelegationTokenAndSequenceNumberInternal( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception { + ArrayList opList = new ArrayList(); + addRemoveOp(opList, rmDTIdentifier); + if (opList.size() == 0) { // in case znode doesn't exist + addStoreOrUpdateOps( + opList, rmDTIdentifier, renewDate, latestSequenceNumber, false); + } else { // in case znode exists, remove the del op, use setData instead + opList.remove(0); + addStoreOrUpdateOps( + opList, rmDTIdentifier, renewDate, latestSequenceNumber, true); + } + doMultiWithRetries(opList); + } + + private void addStoreOrUpdateOps(ArrayList opList, + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber, boolean isUpdate) throws Exception { // store RM delegation token String nodeCreatePath = getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX @@ -642,17 +675,21 @@ protected synchronized void storeRMDelegationTokenAndSequenceNumberState( rmDTIdentifier.write(tokenOut); tokenOut.writeLong(renewDate); if (LOG.isDebugEnabled()) { - LOG.debug("Storing RMDelegationToken_" + + LOG.debug((isUpdate ? "Storing " : "Updating ") + "RMDelegationToken_" + rmDTIdentifier.getSequenceNumber()); } - opList.add(Op.create(nodeCreatePath, tokenOs.toByteArray(), zkAcl, - CreateMode.PERSISTENT)); + if (isUpdate) { + opList.add(Op.setData(nodeCreatePath, tokenOs.toByteArray(), -1)); + } else { + opList.add(Op.create(nodeCreatePath, tokenOs.toByteArray(), zkAcl, + CreateMode.PERSISTENT)); + } seqOut.writeInt(latestSequenceNumber); if (LOG.isDebugEnabled()) { - LOG.debug("Storing " + dtSequenceNumberPath + + LOG.debug((isUpdate ? "Storing " : "Updating ") + dtSequenceNumberPath + ". SequenceNumber: " + latestSequenceNumber); } @@ -661,12 +698,9 @@ protected synchronized void storeRMDelegationTokenAndSequenceNumberState( tokenOs.close(); seqOs.close(); } - - doMultiWithRetries(opList); } - @Override - protected synchronized void removeRMDelegationTokenState( + private void addRemoveOp(ArrayList opList, RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { String nodeRemovePath = getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX @@ -675,7 +709,13 @@ protected synchronized void removeRMDelegationTokenState( LOG.debug("Removing RMDelegationToken_" + rmDTIdentifier.getSequenceNumber()); } - deleteWithRetries(nodeRemovePath, -1); + if (zkClient.exists(nodeRemovePath, true) != null) { + opList.add(Op.delete(nodeRemovePath, -1)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath); + } + } } @Override @@ -707,7 +747,13 @@ protected synchronized void removeRMDTMasterKeyState( if (LOG.isDebugEnabled()) { LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId()); } - deleteWithRetries(nodeRemovePath, -1); + if (zkClient.exists(nodeRemovePath, true) != null) { + doMultiWithRetries(Op.delete(nodeRemovePath, -1)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath); + } + } } // ZK related code @@ -813,18 +859,6 @@ public void createWithRetries( doMultiWithRetries(Op.create(path, data, acl, mode)); } - private void deleteWithRetries(final String path, final int version) - throws Exception { - try { - doMultiWithRetries(Op.delete(path, version)); - } catch (KeeperException.NoNodeException nne) { - // We tried to delete a node that doesn't exist - if (LOG.isDebugEnabled()) { - LOG.debug("Attempted to delete a non-existing znode " + path); - } - } - } - @VisibleForTesting @Private @Unstable diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java index 23939de..ae786d7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java @@ -122,9 +122,7 @@ protected void updateStoredToken(RMDelegationTokenIdentifier id, try { LOG.info("updating RMDelegation token with sequence number: " + id.getSequenceNumber()); - rmContext.getStateStore().removeRMDelegationToken(id, - delegationTokenSequenceNumber); - rmContext.getStateStore().storeRMDelegationTokenAndSequenceNumber(id, + rmContext.getStateStore().updateRMDelegationTokenAndSequenceNumber(id, renewDate, id.getSequenceNumber()); } catch (Exception e) { LOG.error("Error in updating persisted RMDelegationToken with sequence number: " diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index dc8b043..009e96e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -244,6 +244,9 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) Thread.sleep(1000); store.close(); + // give tester a chance to modify app state in the store + modifyAppState(); + // load state store = stateStoreHelper.getRMStateStore(); store.setRMDispatcher(dispatcher); @@ -363,6 +366,7 @@ public void testRMDTSecretManagerStateStore( int sequenceNumber = 1111; store.storeRMDelegationTokenAndSequenceNumber(dtId1, renewDate1, sequenceNumber); + modifyRMDelegationTokenState(); Map token1 = new HashMap(); token1.put(dtId1, renewDate1); @@ -380,6 +384,20 @@ public void testRMDTSecretManagerStateStore( Assert.assertEquals(sequenceNumber, secretManagerState.getDTSequenceNumber()); + // update RM delegation token; + renewDate1 = new Long(System.currentTimeMillis()); + ++sequenceNumber; + store.updateRMDelegationTokenAndSequenceNumber( + dtId1, renewDate1, sequenceNumber); + token1.put(dtId1, renewDate1); + + RMDTSecretManagerState updateSecretManagerState = + store.loadState().getRMDTSecretManagerState(); + Assert.assertEquals(token1, updateSecretManagerState.getTokenState()); + Assert.assertEquals(keySet, updateSecretManagerState.getMasterKeyState()); + Assert.assertEquals(sequenceNumber, + updateSecretManagerState.getDTSequenceNumber()); + // check to delete delegationKey store.removeRMDTMasterKey(key); keySet.clear(); @@ -487,4 +505,13 @@ public void testAppDeletion(RMStateStoreHelper stateStoreHelper) } } } + + protected void modifyAppState() throws Exception { + + } + + protected void modifyRMDelegationTokenState() throws Exception { + + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index 0c372ac..792b73e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -49,6 +49,8 @@ public static final Log LOG = LogFactory.getLog(TestFSRMStateStore.class); + private TestFSRMStateStoreTester fsTester; + class TestFSRMStateStoreTester implements RMStateStoreHelper { Path workingDirPathURI; @@ -134,7 +136,7 @@ public void testFSRMStateStore() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); try { - TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster); + fsTester = new TestFSRMStateStoreTester(cluster); // If the state store is FileSystemRMStateStore then add corrupted entry. // It should discard the entry and remove it from file system. FSDataOutputStream fsOut = null; @@ -162,6 +164,36 @@ public void testFSRMStateStore() throws Exception { } } + @Override + protected void modifyAppState() throws Exception { + // imitate appAttemptFile1 is still .new, but old one is deleted + String appAttemptIdStr1 = "appattempt_1352994193343_0001_000001"; + ApplicationAttemptId attemptId1 = + ConverterUtils.toApplicationAttemptId(appAttemptIdStr1); + Path appDir = + fsTester.store.getAppDir(attemptId1.getApplicationId().toString()); + Path appAttemptFile1 = + new Path(appDir, attemptId1.toString() + ".new"); + FileSystemRMStateStore fileSystemRMStateStore = + (FileSystemRMStateStore) fsTester.getRMStateStore(); + fileSystemRMStateStore.renameFile(appAttemptFile1, + new Path(appAttemptFile1.getParent(), + appAttemptFile1.getName() + ".new")); + } + + @Override + protected void modifyRMDelegationTokenState() throws Exception { + // imitate dt file is still .new, but old one is deleted + Path nodeCreatePath = + fsTester.store.getNodePath(fsTester.store.rmDTSecretManagerRoot, + FileSystemRMStateStore.DELEGATION_TOKEN_PREFIX + 0); + FileSystemRMStateStore fileSystemRMStateStore = + (FileSystemRMStateStore) fsTester.getRMStateStore(); + fileSystemRMStateStore.renameFile(nodeCreatePath, + new Path(nodeCreatePath.getParent(), + nodeCreatePath.getName() + ".new")); + } + @Test (timeout = 30000) public void testFSRMStateStoreClientRetry() throws Exception { HdfsConfiguration conf = new HdfsConfiguration();