diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index d60e8ad..a153390 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; @@ -157,6 +158,7 @@ private void loadRMAppState(RMState rmState) throws Exception { new ArrayList(); for (FileStatus appDir : fs.listStatus(rmAppRoot)) { + completeRenewRecords(appDir.getPath()); for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) { assert childNodeStatus.isFile(); String childNodeName = childNodeStatus.getPath().getName(); @@ -250,7 +252,29 @@ private boolean checkAndRemovePartialRecord(Path record) throws IOException { return false; } + private void completeRenewRecords(Path path) throws Exception { + // Before loading the state information, check whether .new file exists. + // If it does, the prior updateFile is failed on half way. We need to + // complete replacing the old file first. + FileStatus[] newChildNodes = + fs.listStatus(path, new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().endsWith(".new"); + } + }); + for(FileStatus newChildNodeStatus : newChildNodes) { + assert newChildNodeStatus.isFile(); + String newChildNodeName = newChildNodeStatus.getPath().getName(); + String childNodeName = newChildNodeName.substring( + 0, newChildNodeName.length() - ".new".length()); + Path childNodePath = + new Path(newChildNodeStatus.getPath().getParent(), childNodeName); + replaceFile(newChildNodeStatus.getPath(), childNodePath); + } + } private void loadRMDTSecretManagerState(RMState rmState) throws Exception { + completeRenewRecords(rmDTSecretManagerRoot); FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot); for(FileStatus childNodeStatus : childNodes) { @@ -380,15 +404,44 @@ public synchronized void removeApplicationStateInternal(ApplicationState appStat public synchronized void storeRMDelegationTokenAndSequenceNumberState( RMDelegationTokenIdentifier identifier, Long renewDate, int latestSequenceNumber) throws Exception { + storeOrUpdateRMDelegationTokenAndSequenceNumberState( + identifier, renewDate,latestSequenceNumber, false); + } + + @Override + public synchronized void removeRMDelegationTokenState( + RMDelegationTokenIdentifier identifier) throws Exception { + Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, + DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); + LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber()); + deleteFile(nodeCreatePath); + } + + @Override + protected void updateRMDelegationTokenAndSequenceNumberState( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception { + storeOrUpdateRMDelegationTokenAndSequenceNumberState( + rmDTIdentifier, renewDate,latestSequenceNumber, true); + } + + private void storeOrUpdateRMDelegationTokenAndSequenceNumberState( + RMDelegationTokenIdentifier identifier, Long renewDate, + int latestSequenceNumber, boolean isUpdate) throws Exception { Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); ByteArrayOutputStream os = new ByteArrayOutputStream(); DataOutputStream fsOut = new DataOutputStream(os); - LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber()); identifier.write(fsOut); fsOut.writeLong(renewDate); - writeFile(nodeCreatePath, os.toByteArray()); + if (isUpdate) { + LOG.info("Updating RMDelegationToken_" + identifier.getSequenceNumber()); + updateFile(nodeCreatePath, os.toByteArray()); + } else { + LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber()); + writeFile(nodeCreatePath, os.toByteArray()); + } fsOut.close(); // store sequence number @@ -409,15 +462,6 @@ public synchronized void storeRMDelegationTokenAndSequenceNumberState( } @Override - public synchronized void removeRMDelegationTokenState( - RMDelegationTokenIdentifier identifier) throws Exception { - Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, - DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); - LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber()); - deleteFile(nodeCreatePath); - } - - @Override public synchronized void storeRMDTMasterKeyState(DelegationKey masterKey) throws Exception { Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, @@ -477,11 +521,23 @@ private void writeFile(Path outputPath, byte[] data) throws Exception { fs.rename(tempPath, outputPath); } + /* + * In order to make this update atomic as a part of write we will first write + * data to .new file and then rename it. Here we are assuming that rename is + * atomic for underlying file system. + */ protected void updateFile(Path outputPath, byte[] data) throws Exception { - if (fs.exists(outputPath)) { - deleteFile(outputPath); + Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new"); + // use writeFile to make sure .new file is created atomically + writeFile(newPath, data); + replaceFile(newPath, outputPath); + } + + protected void replaceFile(Path srcPath, Path dstPath) throws Exception { + if (fs.exists(dstPath)) { + deleteFile(dstPath); } - writeFile(outputPath, data); + fs.rename(srcPath, dstPath); } private boolean renameFile(Path src, Path dst) throws Exception { @@ -495,4 +551,5 @@ private boolean createFile(Path newFile) throws Exception { private Path getNodePath(Path root, String nodeName) { return new Path(root, nodeName); } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 5a20ff2..dcaed3c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; -import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; @@ -202,6 +201,15 @@ public synchronized void removeRMDelegationTokenState( } @Override + protected void updateRMDelegationTokenAndSequenceNumberState( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception { + removeRMDelegationTokenState(rmDTIdentifier); + storeRMDelegationTokenAndSequenceNumberState( + rmDTIdentifier, renewDate, latestSequenceNumber); + } + + @Override public synchronized void storeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception { Set rmDTMasterKeyState = @@ -239,4 +247,5 @@ protected void storeVersion() throws Exception { protected RMStateVersion getCurrentVersion() { return null; } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index af28a01..138bf3b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -84,6 +84,13 @@ public void removeRMDelegationTokenState(RMDelegationTokenIdentifier rmDTIdentif } @Override + protected void updateRMDelegationTokenAndSequenceNumberState( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception { + // Do nothing + } + + @Override public void storeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception { // Do nothing } @@ -125,4 +132,5 @@ protected RMStateVersion getCurrentVersion() { // Do nothing return null; } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 55f0b65..22d9b6d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -478,6 +478,30 @@ protected abstract void removeRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier) throws Exception; /** + * RMDTSecretManager call this to update the state of a delegation token + * and sequence number + */ + public synchronized void updateRMDelegationTokenAndSequenceNumber( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) { + try { + updateRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate, + latestSequenceNumber); + } catch (Exception e) { + notifyStoreOperationFailed(e); + } + } + + /** + * Blocking API + * Derived classes must implement this method to update the state of + * RMDelegationToken and sequence number + */ + protected abstract void updateRMDelegationTokenAndSequenceNumberState( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception; + + /** * RMDTSecretManager call this to store the state of a master key */ public synchronized void storeRMDTMasterKey(DelegationKey delegationKey) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index abc1e4c..5dae7e6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -629,6 +629,31 @@ protected synchronized void storeRMDelegationTokenAndSequenceNumberState( RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, int latestSequenceNumber) throws Exception { ArrayList opList = new ArrayList(); + addStoreOps(opList, rmDTIdentifier, renewDate, latestSequenceNumber); + doMultiWithRetries(opList); + } + + @Override + protected synchronized void removeRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { + ArrayList opList = new ArrayList(); + addRemoveOp(opList, rmDTIdentifier); + doMultiWithRetries(opList); + } + + @Override + protected void updateRMDelegationTokenAndSequenceNumberState( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception { + ArrayList opList = new ArrayList(); + addRemoveOp(opList, rmDTIdentifier); + addStoreOps(opList, rmDTIdentifier, renewDate, latestSequenceNumber); + doMultiWithRetries(opList); + } + + private void addStoreOps(ArrayList opList, + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + int latestSequenceNumber) throws Exception { // store RM delegation token String nodeCreatePath = getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX @@ -661,12 +686,9 @@ protected synchronized void storeRMDelegationTokenAndSequenceNumberState( tokenOs.close(); seqOs.close(); } - - doMultiWithRetries(opList); } - @Override - protected synchronized void removeRMDelegationTokenState( + private void addRemoveOp(ArrayList opList, RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { String nodeRemovePath = getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX @@ -675,7 +697,13 @@ protected synchronized void removeRMDelegationTokenState( LOG.debug("Removing RMDelegationToken_" + rmDTIdentifier.getSequenceNumber()); } - deleteWithRetries(nodeRemovePath, -1); + if (zkClient.exists(nodeRemovePath, true) != null) { + opList.add(Op.delete(nodeRemovePath, -1)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath); + } + } } @Override @@ -707,7 +735,13 @@ protected synchronized void removeRMDTMasterKeyState( if (LOG.isDebugEnabled()) { LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId()); } - deleteWithRetries(nodeRemovePath, -1); + if (zkClient.exists(nodeRemovePath, true) != null) { + doMultiWithRetries(Op.delete(nodeRemovePath, -1)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath); + } + } } // ZK related code @@ -813,18 +847,6 @@ public void createWithRetries( doMultiWithRetries(Op.create(path, data, acl, mode)); } - private void deleteWithRetries(final String path, final int version) - throws Exception { - try { - doMultiWithRetries(Op.delete(path, version)); - } catch (KeeperException.NoNodeException nne) { - // We tried to delete a node that doesn't exist - if (LOG.isDebugEnabled()) { - LOG.debug("Attempted to delete a non-existing znode " + path); - } - } - } - @VisibleForTesting @Private @Unstable diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java index 23939de..ae786d7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java @@ -122,9 +122,7 @@ protected void updateStoredToken(RMDelegationTokenIdentifier id, try { LOG.info("updating RMDelegation token with sequence number: " + id.getSequenceNumber()); - rmContext.getStateStore().removeRMDelegationToken(id, - delegationTokenSequenceNumber); - rmContext.getStateStore().storeRMDelegationTokenAndSequenceNumber(id, + rmContext.getStateStore().updateRMDelegationTokenAndSequenceNumber(id, renewDate, id.getSequenceNumber()); } catch (Exception e) { LOG.error("Error in updating persisted RMDelegationToken with sequence number: " diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index dc8b043..8431601 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -380,6 +380,20 @@ public void testRMDTSecretManagerStateStore( Assert.assertEquals(sequenceNumber, secretManagerState.getDTSequenceNumber()); + // update RM delegation token; + renewDate1 = new Long(System.currentTimeMillis()); + ++sequenceNumber; + store.updateRMDelegationTokenAndSequenceNumber( + dtId1, renewDate1, sequenceNumber); + token1.put(dtId1, renewDate1); + + RMDTSecretManagerState updateSecretManagerState = + store.loadState().getRMDTSecretManagerState(); + Assert.assertEquals(token1, updateSecretManagerState.getTokenState()); + Assert.assertEquals(keySet, updateSecretManagerState.getMasterKeyState()); + Assert.assertEquals(sequenceNumber, + updateSecretManagerState.getDTSequenceNumber()); + // check to delete delegationKey store.removeRMDTMasterKey(key); keySet.clear();