diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java index fde11e7..6ef5d6c 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java @@ -142,14 +142,70 @@ public synchronized void addKey(DelegationKey key) throws IOException { return allKeys.values().toArray(new DelegationKey[0]); } - protected void logUpdateMasterKey(DelegationKey key) throws IOException { + protected void storeNewMasterKey(DelegationKey key) throws IOException { return; } - - protected void logExpireToken(TokenIdent ident) throws IOException { + + protected void removePersistedMasterKey(DelegationKey key) { + return; + } + + protected void storeNewToken(TokenIdent ident, long renewDate) { + return; + } + + protected void removePersistedToken(TokenIdent ident) throws IOException { + return; + } + + protected void updatePersistedToken(TokenIdent ident, long renewDate) { return; } + /** + * This method is intended to be used for recovering persisted delegation + * tokens + * @param identifier identifier read from persistent storage + * @param renewDate token renew time + * @throws IOException + */ + public synchronized void addPersistedDelegationToken( + TokenIdent identifier, long renewDate) throws IOException { + if (running) { + // a safety check + throw new IOException( + "Can't add persisted delegation token to a running SecretManager."); + } + int keyId = identifier.getMasterKeyId(); + DelegationKey dKey = allKeys.get(keyId); + if (dKey == null) { + LOG.warn("No KEY found for persisted identifier " + identifier.toString()); + return; + } + byte[] password = createPassword(identifier.getBytes(), dKey.getKey()); + if (identifier.getSequenceNumber() > this.delegationTokenSequenceNumber) { + this.delegationTokenSequenceNumber = identifier.getSequenceNumber(); + } + if (currentTokens.get(identifier) == null) { + currentTokens.put(identifier, new DelegationTokenInformation(renewDate, + password)); + } else { + throw new IOException( + "Same delegation token being added twice."); + } + } + + /** + * This method is intended to be used for recovering persisted master keys. + * + * @param key DelegationKey read from persistent storage + * @throws IOException + */ + public synchronized void addPersistedMasterKey(DelegationKey key) + throws IOException { + addKey(key); + } + /** * Update the current master key * This is called once by startThreads before tokenRemoverThread is created, @@ -166,7 +222,7 @@ private void updateCurrentKey() throws IOException { .currentTimeMillis() + keyUpdateInterval + tokenMaxLifetime, generateSecret()); //Log must be invoked outside the lock on 'this' - logUpdateMasterKey(newKey); + storeNewMasterKey(newKey); synchronized (this) { currentId = newKey.getKeyId(); currentKey = newKey; @@ -200,6 +256,10 @@ private synchronized void removeExpiredKeys() { Map.Entry e = it.next(); if (e.getValue().getExpiryDate() < now) { it.remove(); + // ensure the tokens generated by this current key can be recovered + // with this current key after this current key is rolled + if(!e.getValue().equals(currentKey)) + removePersistedMasterKey(e.getValue()); } } } @@ -215,6 +275,7 @@ private synchronized void removeExpiredKeys() { identifier.setSequenceNumber(sequenceNum); LOG.info("Creating password for identifier: " + identifier); byte[] password = createPassword(identifier.getBytes(), currentKey.getKey()); + storeNewToken(identifier, now + tokenRenewInterval); currentTokens.put(identifier, new DelegationTokenInformation(now + tokenRenewInterval, password)); return password; @@ -302,6 +363,7 @@ public synchronized long renewToken(Token token, throw new InvalidToken("Renewal request for unknown token"); } currentTokens.put(id, info); + updatePersistedToken(id, renewTime); return renewTime; } @@ -337,6 +399,7 @@ public synchronized TokenIdent cancelToken(Token token, if (info == null) { throw new InvalidToken("Token not found"); } + removePersistedToken(id); return id; } @@ -386,7 +449,7 @@ private void removeExpiredToken() throws IOException { } // don't hold lock on 'this' to avoid edit log updates blocking token ops for (TokenIdent ident : expiredTokens) { - logExpireToken(ident); + removePersistedToken(ident); } } diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java index 3458b2d..c3764b7 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java @@ -18,18 +18,19 @@ package org.apache.hadoop.security.token.delegation; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Arrays; import javax.crypto.SecretKey; +import org.apache.avro.reflect.Nullable; +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; -import org.apache.avro.reflect.Nullable; /** * Key used for generating and verifying delegation tokens @@ -117,4 +118,25 @@ public void readFields(DataInput in) throws IOException { in.readFully(keyBytes); } } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37).append(keyId).append(expiryDate) + .append(keyBytes).toHashCode(); + } + + @Override + public boolean equals(Object right) { + if (this == right) { + return true; + } else if (right == null || getClass() != right.getClass()) { + return false; + } else { + DelegationKey r = (DelegationKey) right; + return keyId == r.keyId && + expiryDate == r.expiryDate && + Arrays.equals(keyBytes, r.keyBytes); + } + } + } diff --git hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java index 85e2279..5229fae 100644 --- hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java +++ hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java @@ -101,7 +101,12 @@ public TestDelegationTokenIdentifier createIdentifier() { protected byte[] createPassword(TestDelegationTokenIdentifier t) { return super.createPassword(t); } - + + @Override + protected void removePersistedMasterKey(DelegationKey key) { + Assert.assertFalse(key.equals(allKeys.get(currentId))); + } + public byte[] createPassword(TestDelegationTokenIdentifier t, DelegationKey key) { return SecretManager.createPassword(t.getBytes(), key.getKey()); } @@ -316,7 +321,7 @@ public void testRollMasterKey() throws Exception { int prevNumKeys = dtSecretManager.getAllKeys().length; dtSecretManager.rollMasterKey(); - + //after rolling, the length of the keys list must increase int currNumKeys = dtSecretManager.getAllKeys().length; Assert.assertEquals((currNumKeys - prevNumKeys) >= 1, true); diff --git hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java index 896c20c..d1337d5 100644 --- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java +++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java @@ -136,56 +136,8 @@ public synchronized void saveSecretManagerState(DataOutputStream out) out.writeInt(delegationTokenSequenceNumber); saveCurrentTokens(out); } - - /** - * This method is intended to be used only while reading edit logs. - * - * @param identifier DelegationTokenIdentifier read from the edit logs or - * fsimage - * - * @param expiryTime token expiry time - * @throws IOException - */ - public synchronized void addPersistedDelegationToken( - DelegationTokenIdentifier identifier, long expiryTime) throws IOException { - if (running) { - // a safety check - throw new IOException( - "Can't add persisted delegation token to a running SecretManager."); - } - int keyId = identifier.getMasterKeyId(); - DelegationKey dKey = allKeys.get(keyId); - if (dKey == null) { - LOG - .warn("No KEY found for persisted identifier " - + identifier.toString()); - return; - } - byte[] password = createPassword(identifier.getBytes(), dKey.getKey()); - if (identifier.getSequenceNumber() > this.delegationTokenSequenceNumber) { - this.delegationTokenSequenceNumber = identifier.getSequenceNumber(); - } - if (currentTokens.get(identifier) == null) { - currentTokens.put(identifier, new DelegationTokenInformation(expiryTime, - password)); - } else { - throw new IOException( - "Same delegation token being added twice; invalid entry in fsimage or editlogs"); - } - } /** - * Add a MasterKey to the list of keys. - * - * @param key DelegationKey - * @throws IOException - */ - public synchronized void updatePersistedMasterKey(DelegationKey key) - throws IOException { - addKey(key); - } - - /** * Update the token cache with renewal record in edit logs. * * @param identifier DelegationTokenIdentifier of the renewed token @@ -295,7 +247,7 @@ private synchronized void loadAllKeys(DataInputStream in) throws IOException { * Call namesystem to update editlogs for new master key. */ @Override //AbstractDelegationTokenManager - protected void logUpdateMasterKey(DelegationKey key) + protected void storeNewMasterKey(DelegationKey key) throws IOException { synchronized (noInterruptsLock) { // The edit logging code will fail catastrophically if it @@ -312,7 +264,7 @@ protected void logUpdateMasterKey(DelegationKey key) } @Override //AbstractDelegationTokenManager - protected void logExpireToken(final DelegationTokenIdentifier dtId) + protected void removePersistedToken(final DelegationTokenIdentifier dtId) throws IOException { synchronized (noInterruptsLock) { // The edit logging code will fail catastrophically if it diff --git hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 297e7f0..c8efa14 100644 --- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -490,7 +490,7 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir, case OP_UPDATE_MASTER_KEY: { UpdateMasterKeyOp updateMasterKeyOp = (UpdateMasterKeyOp)op; fsNamesys.getDelegationTokenSecretManager() - .updatePersistedMasterKey(updateMasterKeyOp.key); + .addPersistedMasterKey(updateMasterKeyOp.key); break; } case OP_REASSIGN_LEASE: { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index e923b1e..b8208a2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -239,7 +239,7 @@ public synchronized void init(Configuration conf) { // Register event handler for RMAppManagerEvents this.rmDispatcher.register(RMAppManagerEventType.class, this.rmAppManager); - this.rmDTSecretManager = createRMDelegationTokenSecretManager(); + this.rmDTSecretManager = createRMDelegationTokenSecretManager(this.rmContext); clientRM = createClientRMService(); addService(clientRM); @@ -666,7 +666,7 @@ protected ResourceTrackerService createResourceTrackerService() { } protected RMDelegationTokenSecretManager - createRMDelegationTokenSecretManager() { + createRMDelegationTokenSecretManager(RMContext rmContext) { long secretKeyInterval = conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY, YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); @@ -678,7 +678,7 @@ protected ResourceTrackerService createResourceTrackerService() { YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); return new RMDelegationTokenSecretManager(secretKeyInterval, - tokenMaxLifetime, tokenRenewInterval, 3600000); + tokenMaxLifetime, tokenRenewInterval, 3600000, rmContext); } protected ClientRMService createClientRMService() { @@ -745,6 +745,9 @@ public ApplicationTokenSecretManager getApplicationTokenSecretManager(){ @Override public void recover(RMState state) throws Exception { + // recover RMdelegationTokenSecretManager + rmDTSecretManager.recover(state); + // recover applications rmAppManager.recover(state); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 03154b6..4df49cd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -33,11 +33,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -57,11 +59,16 @@ public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class); private static final String ROOT_DIR_NAME = "FSRMStateRoot"; - + private static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot"; + private static final String RM_APP_ROOT = "RMAppRoot"; + private static final String DELEGATION_KEY_PREFIX = "DelegationKey_"; + private static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_"; private FileSystem fs; private Path fsRootDirPath; + private Path fsRMDTSecretManagerRoot; + private Path fsRMAppRoot; @VisibleForTesting Path fsWorkingPath; @@ -71,10 +78,13 @@ public synchronized void initInternal(Configuration conf) fsWorkingPath = new Path(conf.get(YarnConfiguration.FS_RM_STATE_STORE_URI)); fsRootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME); + fsRMDTSecretManagerRoot = new Path(fsRootDirPath, RM_DT_SECRET_MANAGER_ROOT); + fsRMAppRoot = new Path(fsRootDirPath, RM_APP_ROOT); // create filesystem fs = fsWorkingPath.getFileSystem(conf); - fs.mkdirs(fsRootDirPath); + fs.mkdirs(fsRMDTSecretManagerRoot); + fs.mkdirs(fsRMAppRoot); } @Override @@ -84,15 +94,23 @@ protected synchronized void closeInternal() throws Exception { @Override public synchronized RMState loadState() throws Exception { + RMState rmState = new RMState(); + // recover DelegationTokenSecretManager + loadRMDTSecretManagerState(rmState); + // recover RM applications + loadRMAppState(rmState); + return rmState; + } + + private void loadRMAppState(RMState rmState) throws Exception { try { - RMState state = new RMState(); - FileStatus[] childNodes = fs.listStatus(fsRootDirPath); + FileStatus[] childNodes = fs.listStatus(fsRMAppRoot); List attempts = new ArrayList(); for(FileStatus childNodeStatus : childNodes) { assert childNodeStatus.isFile(); String childNodeName = childNodeStatus.getPath().getName(); - Path childNodePath = getNodePath(childNodeName); + Path childNodePath = getNodePath(fsRMAppRoot, childNodeName); byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); if(childNodeName.startsWith(ApplicationId.appIdStrPrefix)){ // application @@ -106,7 +124,7 @@ public synchronized RMState loadState() throws Exception { appStateData.getApplicationSubmissionContext()); // assert child node name is same as actual applicationId assert appId.equals(appState.context.getApplicationId()); - state.appState.put(appId, appState); + rmState.appState.put(appId, appState); } else if(childNodeName.startsWith( ApplicationAttemptId.appAttemptIdStrPrefix)) { // attempt @@ -138,7 +156,7 @@ public synchronized RMState loadState() throws Exception { // go through all attempts and add them to their apps for(ApplicationAttemptState attemptState : attempts) { ApplicationId appId = attemptState.getAttemptId().getApplicationId(); - ApplicationState appState = state.appState.get(appId); + ApplicationState appState = rmState.appState.get(appId); if(appState != null) { appState.attempts.put(attemptState.getAttemptId(), attemptState); } else { @@ -147,22 +165,44 @@ public synchronized RMState loadState() throws Exception { // application attempt nodes LOG.info("Application node not found for attempt: " + attemptState.getAttemptId()); - deleteFile(getNodePath(attemptState.getAttemptId().toString())); + deleteFile(getNodePath(fsRMAppRoot, attemptState.getAttemptId().toString())); } } - - return state; } catch (Exception e) { LOG.error("Failed to load state.", e); throw e; } } + private void loadRMDTSecretManagerState(RMState rmState) throws Exception { + FileStatus[] childNodes = fs.listStatus(fsRMDTSecretManagerRoot); + + for(FileStatus childNodeStatus : childNodes) { + assert childNodeStatus.isFile(); + String childNodeName = childNodeStatus.getPath().getName(); + Path childNodePath = getNodePath(fsRMDTSecretManagerRoot, childNodeName); + FSDataInputStream fsIn = fs.open(childNodePath); + if(childNodeName.startsWith(DELEGATION_KEY_PREFIX)){ + DelegationKey key = new DelegationKey(); + key.readFields(fsIn); + rmState.rmSecretManagerState.masterKeyState.add(key); + } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) { + RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier(); + identifier.readFields(fsIn); + long renewDate = fsIn.readLong(); + rmState.rmSecretManagerState.delegationTokenState.put(identifier, + renewDate); + } else { + LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager"); + } + fsIn.close(); + } + } + @Override public synchronized void storeApplicationState(String appId, - ApplicationStateDataPBImpl appStateDataPB) - throws Exception { - Path nodeCreatePath = getNodePath(appId); + ApplicationStateDataPBImpl appStateDataPB) throws Exception { + Path nodeCreatePath = getNodePath(fsRMAppRoot, appId); LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath); byte[] appStateData = appStateDataPB.getProto().toByteArray(); @@ -178,9 +218,8 @@ public synchronized void storeApplicationState(String appId, @Override public synchronized void storeApplicationAttemptState(String attemptId, - ApplicationAttemptStateDataPBImpl attemptStateDataPB) - throws Exception { - Path nodeCreatePath = getNodePath(attemptId); + ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception { + Path nodeCreatePath = getNodePath(fsRMAppRoot, attemptId); LOG.info("Storing info for attempt: " + attemptId + " at: " + nodeCreatePath); byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); @@ -196,9 +235,9 @@ public synchronized void storeApplicationAttemptState(String attemptId, @Override public synchronized void removeApplicationState(ApplicationState appState) - throws Exception { + throws Exception { String appId = appState.getAppId().toString(); - Path nodeRemovePath = getNodePath(appId); + Path nodeRemovePath = getNodePath(fsRMAppRoot, appId); LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath); deleteFile(nodeRemovePath); for(ApplicationAttemptId attemptId : appState.attempts.keySet()) { @@ -207,13 +246,55 @@ public synchronized void removeApplicationState(ApplicationState appState) } public synchronized void removeApplicationAttemptState(String attemptId) - throws Exception { - Path nodeRemovePath = getNodePath(attemptId); + throws Exception { + Path nodeRemovePath = getNodePath(fsRMAppRoot, attemptId); LOG.info("Removing info for attempt: " + attemptId + " at: " + nodeRemovePath); deleteFile(nodeRemovePath); } + @Override + public synchronized void storeRMDelegationTokenState( + RMDelegationTokenIdentifier identifier, Long renewDate) + throws Exception { + Path nodeCreatePath = getNodePath(fsRMDTSecretManagerRoot, + DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); + FSDataOutputStream fsOut = fs.create(nodeCreatePath, false); + LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber()); + identifier.write(fsOut); + fsOut.writeLong(renewDate); + fsOut.close(); + } + + @Override + public synchronized void removeRMDelegationTokenState( + RMDelegationTokenIdentifier identifier) throws Exception { + Path nodeCreatePath = getNodePath(fsRMDTSecretManagerRoot, + DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); + LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber()); + deleteFile(nodeCreatePath); + } + + @Override + public synchronized void storeMasterKeyState(DelegationKey masterKey) + throws Exception { + Path nodeCreatePath = getNodePath(fsRMDTSecretManagerRoot, + DELEGATION_KEY_PREFIX + masterKey.getKeyId()); + FSDataOutputStream fsOut = fs.create(nodeCreatePath, false); + LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId()); + masterKey.write(fsOut); + fsOut.close(); + } + + @Override + public synchronized void + removeMasterKeyState(DelegationKey masterKey) throws Exception { + Path nodeCreatePath = getNodePath(fsRMDTSecretManagerRoot, + DELEGATION_KEY_PREFIX + masterKey.getKeyId()); + LOG.info("Removing RMDelegationKey_"+ masterKey.getKeyId()); + deleteFile(nodeCreatePath); + } + // FileSystem related code private void deleteFile(Path deletePath) throws Exception { @@ -227,18 +308,17 @@ private void deleteFile(Path deletePath) throws Exception { // state data will not be that "long" byte[] data = new byte[(int)len]; fsIn.readFully(data); + fsIn.close(); return data; } private void writeFile(Path outputPath, byte[] data) throws Exception { FSDataOutputStream fsOut = fs.create(outputPath, false); fsOut.write(data); - fsOut.flush(); fsOut.close(); } - @VisibleForTesting - Path getNodePath(String nodeName) { - return new Path(fsRootDirPath, nodeName); + private Path getNodePath(Path root, String nodeName) { + return new Path(root, nodeName); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index dd6fab5..f01a5c3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -19,14 +19,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; import java.io.IOException; +import java.util.Map; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -49,6 +53,10 @@ public synchronized RMState loadState() throws Exception { // return a copy of the state to allow for modification of the real state RMState returnState = new RMState(); returnState.appState.putAll(state.appState); + returnState.rmSecretManagerState.getRMDTMasterKeyState() + .addAll(state.rmSecretManagerState.getRMDTMasterKeyState()); + returnState.rmSecretManagerState.getRMDelegationTokenState().putAll( + state.rmSecretManagerState.getRMDelegationTokenState()); return returnState; } @@ -113,4 +121,52 @@ public synchronized void removeApplicationState(ApplicationState appState) ApplicationState removed = state.appState.remove(appId); assert removed != null; } + + @Override + public synchronized void storeRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) + throws Exception { + Map rmDTState = + state.rmSecretManagerState.getRMDelegationTokenState(); + if(rmDTState.containsKey(rmDTIdentifier)) { + IOException e = new IOException("RMDelegationToken: " + rmDTIdentifier + + "is already stored."); + LOG.info("Error storing info for RMDelegationToken: " + rmDTIdentifier, e); + throw e; + } + rmDTState.put(rmDTIdentifier, renewDate); + } + + @Override + public synchronized void removeRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier) throws Exception{ + Map rmDTState = + state.rmSecretManagerState.getRMDelegationTokenState(); + rmDTState.remove(rmDTIdentifier); + } + + @Override + public synchronized void storeMasterKeyState(DelegationKey delegationKey) + throws Exception { + Set rmDTMasterKeyState = + state.rmSecretManagerState.getRMDTMasterKeyState(); + + if (rmDTMasterKeyState.contains(delegationKey)) { + IOException e = new IOException("RMDTMasterKey with keyID: " + + delegationKey.getKeyId() + " is already stored"); + LOG.info("Error storing info for RMDTMasterKey with keyID: " + + delegationKey.getKeyId(), e); + throw e; + } + state.getRMDTSecretManagerState().getRMDTMasterKeyState().add(delegationKey); + LOG.info("rmDTMasterKeyState SIZE: " + rmDTMasterKeyState.size()); + } + + @Override + public synchronized void removeMasterKeyState(DelegationKey delegationKey) + throws Exception { + Set rmDTMasterKeyState = + state.rmSecretManagerState.getRMDTMasterKeyState(); + rmDTMasterKeyState.remove(delegationKey); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index 29bdbb0..1db368c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -18,10 +18,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; + import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; @Unstable public class NullRMStateStore extends RMStateStore { @@ -59,4 +62,27 @@ protected void removeApplicationState(ApplicationState appState) // Do nothing } + @Override + public void storeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) + throws Exception { + // Do nothing + } + + @Override + public void removeRMDelegationToken(RMDelegationTokenIdentifier rmDTIdentifier) + throws Exception { + // Do nothing + } + + @Override + public void storeMasterKey(DelegationKey delegationKey) throws Exception { + // Do nothing + } + + @Override + public void removeMasterKey(DelegationKey delegationKey) throws Exception { + // Do nothing + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index e1400f4..439b97a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -20,7 +20,9 @@ import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,6 +32,7 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -42,6 +45,7 @@ import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -57,7 +61,7 @@ */ public abstract class RMStateStore { public static final Log LOG = LogFactory.getLog(RMStateStore.class); - + /** * State of an application attempt */ @@ -115,17 +119,40 @@ public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) { return attempts.get(attemptId); } } - + + public static class RMDTSecretManagerState { + // DTIdentifier -> renewDate + Map delegationTokenState = + new HashMap(); + + Set masterKeyState = + new HashSet(); + + public Map getRMDelegationTokenState() { + return delegationTokenState; + } + + public Set getRMDTMasterKeyState() { + return masterKeyState; + } + } + /** * State of the ResourceManager */ public static class RMState { - Map appState = - new HashMap(); - + Map appState = + new HashMap(); + + RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState(); + public Map getApplicationState() { return appState; } + + public RMDTSecretManagerState getRMDTSecretManagerState() { + return rmSecretManagerState; + } } private Dispatcher rmDispatcher; @@ -229,8 +256,72 @@ public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) { protected abstract void storeApplicationAttemptState(String attemptId, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception; - - + + + /** + * RMDTSecretManager call this to store the state of a delegation token + */ + public synchronized void storeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) + throws Exception { + storeRMDelegationTokenState(rmDTIdentifier, renewDate); + } + + /** + * Blocking API + * Derived classes must implement this method to store the state of RMDelegationToken + */ + protected abstract void storeRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) + throws Exception; + + /** + * RMDTSecretManager call this to remove the state of a delegation token + */ + public synchronized void removeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { + removeRMDelegationTokenState(rmDTIdentifier); + } + + /** + * Blocking API + * Derived classes must implement this method to remove the state of RMDelegationToken + */ + protected abstract void removeRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier) throws Exception; + + /** + * RMDTSecretManager call this to store the state of a master key + */ + public synchronized void storeMasterKey(DelegationKey delegationKey) + throws Exception { + storeMasterKeyState(delegationKey); + } + + /** + * Blocking API + * Derived classes must implement this method to store the state of + * DelegationToken Master Key + */ + protected abstract void storeMasterKeyState(DelegationKey delegationKey) + throws Exception; + + /** + * RMDTSecretManager call this to remove the state of a master key + */ + public synchronized void removeMasterKey(DelegationKey delegationKey) + throws Exception { + removeMasterKeyState(delegationKey); + } + + /** + * Blocking AP + * Derived classes must implement this method to remove the state of + * DelegationToken Master Key + */ + protected abstract void removeMasterKeyState(DelegationKey delegationKey) + throws Exception; + /** * Non-blocking API * ResourceManager services call this to remove an application from the state diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java index f70605c..bfc64bb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java @@ -18,10 +18,25 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; + +import com.google.common.annotations.VisibleForTesting; /** * A ResourceManager specific delegation token secret manager. @@ -32,6 +47,10 @@ @InterfaceStability.Unstable public class RMDelegationTokenSecretManager extends AbstractDelegationTokenSecretManager { + private static final Log LOG = LogFactory + .getLog(RMDelegationTokenSecretManager.class); + + protected final RMContext rmContext; /** * Create a secret manager @@ -46,13 +65,114 @@ public RMDelegationTokenSecretManager(long delegationKeyUpdateInterval, long delegationTokenMaxLifetime, long delegationTokenRenewInterval, - long delegationTokenRemoverScanInterval) { + long delegationTokenRemoverScanInterval, + RMContext rmContext) { super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval, delegationTokenRemoverScanInterval); + this.rmContext = rmContext; } @Override public RMDelegationTokenIdentifier createIdentifier() { return new RMDelegationTokenIdentifier(); } + + + @Override + protected void storeNewMasterKey(DelegationKey newKey) { + try { + rmContext.getStateStore().storeMasterKey(newKey); + } catch (Exception e) { + LOG.error("Error in storing master key with KeyID: " + newKey.getKeyId()); + ExitUtil.terminate(1, e); + } + } + + @Override + protected void removePersistedMasterKey(DelegationKey key) { + try { + LOG.info("removing master key with keyID " + key.getKeyId()); + rmContext.getStateStore().removeMasterKey(key); + } catch (Exception e) { + LOG.error("Error in removing master key with KeyID: " + key.getKeyId()); + ExitUtil.terminate(1, e); + } + } + + @Override + protected void storeNewToken(RMDelegationTokenIdentifier identifier, + long renewDate) { + try { + LOG.info("storing RMDelegation token with sequence number: " + + identifier.getSequenceNumber()); + rmContext.getStateStore().storeRMDelegationToken(identifier, renewDate); + } catch (Exception e) { + LOG.error("Error in storing RMDelegationToken with sequence number: " + + identifier.getSequenceNumber()); + ExitUtil.terminate(1, e); + } + } + + @Override + protected void updatePersistedToken(RMDelegationTokenIdentifier id, + long renewDate) { + try { + rmContext.getStateStore().removeRMDelegationToken(id); + rmContext.getStateStore().storeRMDelegationToken(id, renewDate); + } catch (Exception e) { + LOG.error("Error in updating persisted RMDelegationToken with sequence number: " + + id.getSequenceNumber()); + ExitUtil.terminate(1, e); + } + } + + @Override + protected void removePersistedToken(RMDelegationTokenIdentifier ident) + throws IOException { + try { + rmContext.getStateStore().removeRMDelegationToken(ident); + } catch (Exception e) { + LOG.error("Error in removing RMDelegationToken with sequence number: " + + ident.getSequenceNumber()); + ExitUtil.terminate(1, e); + } + } + + @Private + @VisibleForTesting + public synchronized Set getAllMasterKeys() { + HashSet keySet = new HashSet(); + keySet.addAll(allKeys.values()); + return keySet; + } + + @Private + @VisibleForTesting + public synchronized Map getAllTokens() { + Map allTokens = + new HashMap(); + + for (Map.Entry entry : currentTokens.entrySet()) { + allTokens.put(entry.getKey(), entry.getValue().getRenewDate()); + } + return allTokens; + } + + public void recover(RMState rmState) throws Exception { + + LOG.info("recovering RMDelegationTokenSecretManager."); + // recover RMDTMasterKeys + for (DelegationKey dtKey : rmState.getRMDTSecretManagerState().getRMDTMasterKeyState()) { + addPersistedMasterKey(dtKey); + } + + // recover RMDelegationTokens + Map rmDelegationTokens = + rmState.getRMDTSecretManagerState().getRMDelegationTokenState(); + for (Map.Entry entry : rmDelegationTokens + .entrySet()) { + addPersistedDelegationToken(entry.getKey(), entry.getValue()); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index d25f0f9..d9c050f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -90,7 +91,9 @@ @BeforeClass public static void setupSecretManager() throws IOException { - dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getStateStore()).thenReturn(new NullRMStateStore()); + dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, rmContext); dtsm.startThreads(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java index d334cc5..d0f1c00 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java @@ -17,8 +17,18 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; @@ -52,6 +62,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -438,10 +449,15 @@ private static ResourceScheduler createMockScheduler(Configuration conf) { return mockSched; } - private static RMDelegationTokenSecretManager createRMDelegationTokenSecretManager( - long secretKeyInterval, long tokenMaxLifetime, long tokenRenewInterval) { - RMDelegationTokenSecretManager rmDtSecretManager = new RMDelegationTokenSecretManager( - secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, 3600000); + private static RMDelegationTokenSecretManager + createRMDelegationTokenSecretManager(long secretKeyInterval, + long tokenMaxLifetime, long tokenRenewInterval) { + RMContext rmContext = mock(RMContext.class); + when(rmContext.getStateStore()).thenReturn(new NullRMStateStore()); + + RMDelegationTokenSecretManager rmDtSecretManager = + new RMDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime, + tokenRenewInterval, 3600000, rmContext); return rmDtSecretManager; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 30ae6ce..ef2ad84 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -25,6 +27,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -33,6 +37,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -57,25 +62,32 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class TestRMRestart { - - @Test - public void testRMRestart() throws Exception { + + private YarnConfiguration conf; + + @Before + public void setup() { Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); ExitUtil.disableSystemExit(); - - YarnConfiguration conf = new YarnConfiguration(); + conf = new YarnConfiguration(); conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName()); + } + + @Test + public void testRMRestart() throws Exception { Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -327,13 +339,6 @@ public void testRMRestart() throws Exception { @Test public void testRMRestartOnMaxAppAttempts() throws Exception { - Logger rootLogger = LogManager.getRootLogger(); - rootLogger.setLevel(Level.DEBUG); - ExitUtil.disableSystemExit(); - - YarnConfiguration conf = new YarnConfiguration(); - conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); - conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -406,13 +411,6 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { @Test public void testDelegationTokenRestoredInDelegationTokenRenewer() throws Exception { - Logger rootLogger = LogManager.getRootLogger(); - rootLogger.setLevel(Level.DEBUG); - ExitUtil.disableSystemExit(); - - YarnConfiguration conf = new YarnConfiguration(); - conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); - conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); @@ -491,13 +489,6 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer() @Test public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { - Logger rootLogger = LogManager.getRootLogger(); - rootLogger.setLevel(Level.DEBUG); - ExitUtil.disableSystemExit(); - - YarnConfiguration conf = new YarnConfiguration(); - conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); - conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); @@ -572,6 +563,203 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { rm2.stop(); } + @Test + public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Map rmAppState = + rmState.getApplicationState(); + Map rmDTState = + rmState.getRMDTSecretManagerState().getRMDelegationTokenState(); + Set rmDTMasterKeyState = + rmState.getRMDTSecretManagerState().getRMDTMasterKeyState(); + + MockRM rm1 = new TestSecurityMockRM(conf, memStore); + rm1.start(); + + // create an empty credential + Credentials ts = new Credentials(); + + // create a token and add into credential + HashSet tokenIdentSet = + new HashSet(); + Text userText1 = new Text("user1"); + RMDelegationTokenIdentifier dtId1 = + new RMDelegationTokenIdentifier(userText1, new Text("renewer1"), + userText1); + Token token1 = + new Token(dtId1, + rm1.getRMDTSecretManager()); + ts.addToken(userText1, token1); + tokenIdentSet.add(dtId1); + + // submit an app with customized credential + RMApp app = rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, ts); + + // assert app info is saved + ApplicationState appState = rmAppState.get(app.getApplicationId()); + Assert.assertNotNull(appState); + + // assert all master keys are saved + Set allKeysRM1 = rm1.getRMDTSecretManager().getAllMasterKeys(); + Assert.assertEquals(allKeysRM1, rmDTMasterKeyState); + + // assert all tokens are saved + Map allTokensRM1 = + rm1.getRMDTSecretManager().getAllTokens(); + Assert.assertEquals(tokenIdentSet, allTokensRM1.keySet()); + Assert.assertEquals(allTokensRM1, rmDTState); + + // start new RM + MockRM rm2 = new TestSecurityMockRM(conf, memStore); + rm2.start(); + + // assert master keys and tokens are populated back to DTSecretManager + Map allTokensRM2 = + rm2.getRMDTSecretManager().getAllTokens(); + Assert.assertTrue(rm2.getRMDTSecretManager().getAllMasterKeys() + .containsAll(allKeysRM1)); + Assert.assertEquals(allTokensRM1, allTokensRM2); + + // renewDate before renewing + Long renewDateBeforeRenew = allTokensRM2.get(dtId1); + try{ + // renew recovered token + rm2.getRMDTSecretManager().renewToken(token1, "renewer1"); + } catch(Exception e) { + Assert.fail(); + } + + allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens(); + Long renewDateAfterRenew = allTokensRM2.get(dtId1); + // assert token is renewed + Assert.assertTrue(renewDateAfterRenew > renewDateBeforeRenew); + + // assert new token is added into state store + Assert.assertTrue(rmDTState.containsValue(renewDateAfterRenew)); + + try{ + rm2.getRMDTSecretManager().cancelToken(token1, "user1"); + } catch(Exception e) { + Assert.fail(); + } + + // assert old token is removed from state store + Assert.assertFalse(rmDTState.containsValue(renewDateBeforeRenew)); + + // assert token is removed from state after its cancelled + allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens(); + Assert.assertFalse(allTokensRM2.containsKey(dtId1)); + Assert.assertFalse(rmDTState.containsKey(dtId1)); + + // stop the RM + rm1.stop(); + rm2.stop(); + } + + @Test + public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Map rmDTState = + rmState.getRMDTSecretManagerState().getRMDelegationTokenState(); + Set rmDTMasterKeyState = + rmState.getRMDTSecretManagerState().getRMDTMasterKeyState(); + + MockRM rm1 = new MyMockRM(conf, memStore); + rm1.start(); + // on rm start, two master keys are created. + // One is created at RMDTSecretMgr.startThreads.updateCurrentKey(); + // the other is created on the first run of tokenRemoverThread.rollMasterKey() + + RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager(); + // assert all master keys are saved + Assert.assertEquals(dtSecretManager.getAllMasterKeys(), rmDTMasterKeyState); + Set expiringKeys = new HashSet(); + expiringKeys.addAll(dtSecretManager.getAllMasterKeys()); + + // record the current key + DelegationKey oldCurrentKey = + ((TestRMDelegationTokenSecretManager) dtSecretManager).getCurrentKey(); + + // generate a RMDelegationToken + Text userText1 = new Text("user1"); + RMDelegationTokenIdentifier dtId1 = + new RMDelegationTokenIdentifier(userText1, new Text("renewer1"), + userText1); + Token token1 = + new Token(dtId1, dtSecretManager); + byte[] oldPasswd = token1.getPassword(); + + // wait for the first rollMasterKey + while(((TestRMDelegationTokenSecretManager)dtSecretManager) + .numUpdatedKeys.get() < 1); + + // assert old-current-key and new-current-key exist + Assert.assertTrue(rmDTMasterKeyState.contains(oldCurrentKey)); + DelegationKey newCurrentKey = + ((TestRMDelegationTokenSecretManager) dtSecretManager).getCurrentKey(); + Assert.assertTrue(rmDTMasterKeyState.contains(newCurrentKey)); + + //after rolling, the token that was generated earlier must still be valid + ByteArrayInputStream bi = new ByteArrayInputStream(token1.getIdentifier()); + RMDelegationTokenIdentifier identifier = dtSecretManager.createIdentifier(); + identifier.readFields(new DataInputStream(bi)); + byte[] newPasswd = dtSecretManager.retrievePassword(identifier); + // compare the passwords + Assert.assertEquals(oldPasswd, newPasswd); + + // wait for token to expire + // rollMasterKey is called every 2 seconds. + while(((TestRMDelegationTokenSecretManager)dtSecretManager) + .numUpdatedKeys.get() < 5); + Assert.assertFalse(rmDTState.containsKey(dtId1)); + rm1.stop(); + } + + @Test(timeout = 15000) + public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Set rmDTMasterKeyState = + rmState.getRMDTSecretManagerState().getRMDTMasterKeyState(); + + MockRM rm1 = new MyMockRM(conf, memStore); + rm1.start(); + RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager(); + + // assert all master keys are saved + Assert.assertEquals(dtSecretManager.getAllMasterKeys(), rmDTMasterKeyState); + Set expiringKeys = new HashSet(); + expiringKeys.addAll(dtSecretManager.getAllMasterKeys()); + + // wait for expiringKeys to expire + while(true) { + boolean expired = true; + for(DelegationKey key : expiringKeys) { + if(rmDTMasterKeyState.contains(key)){ + expired = false; + } + } + if(expired) + break; + Thread.sleep(500); + } + + // assert initial 2 keys expired + for (DelegationKey key : expiringKeys) { + Assert.assertFalse(rmDTMasterKeyState.contains(key)); + } + } + class TestSecurityMockRM extends MockRM { public TestSecurityMockRM(Configuration conf, RMStateStore store) { @@ -600,4 +788,46 @@ protected void setTimerForTokenRenewal(DelegationTokenToRenew token) }; } } + + class MyMockRM extends TestSecurityMockRM { + + public MyMockRM(Configuration conf, RMStateStore store) { + super(conf, store); + } + + @Override + protected RMDelegationTokenSecretManager + createRMDelegationTokenSecretManager(RMContext rmContext) { + // KeyUpdateInterval-> 2 seconds + // TokenMaxLifetime-> 3 seconds. + return new TestRMDelegationTokenSecretManager(2000, 3000, 2000, + 1000, rmContext); + } + } + public class TestRMDelegationTokenSecretManager extends + RMDelegationTokenSecretManager { + public AtomicInteger numUpdatedKeys = new AtomicInteger(0); + public TestRMDelegationTokenSecretManager(long delegationKeyUpdateInterval, + long delegationTokenMaxLifetime, long delegationTokenRenewInterval, + long delegationTokenRemoverScanInterval, RMContext rmContext) { + super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, + delegationTokenRenewInterval, delegationTokenRemoverScanInterval, + rmContext); + } + + @Override + protected void storeNewMasterKey(DelegationKey newKey) { + super.storeNewMasterKey(newKey); + numUpdatedKeys.incrementAndGet(); + } + public DelegationKey getCurrentKey() { + for (int keyId : allKeys.keySet()) { + if (keyId == currentId) { + return allKeys.get(keyId); + } + } + return null; + } + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java index a245e54..d8e9418 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java @@ -31,6 +31,8 @@ import java.util.List; import java.util.Map; +import junit.framework.Assert; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -41,6 +43,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -53,8 +56,10 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -111,7 +116,8 @@ public void testFSRMStateStore() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); try { TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster); - testRMStateStore(fsTester); + testRMAppStateStore(fsTester); + testRMDTSecretManagerStateStore(fsTester); } finally { cluster.shutdown(); } @@ -217,7 +223,7 @@ ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, } @SuppressWarnings("unchecked") - void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { + void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { long submitTime = System.currentTimeMillis(); Configuration conf = new YarnConfiguration(); RMStateStore store = stateStoreHelper.getRMStateStore(); @@ -333,6 +339,33 @@ void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { store.close(); } + public void testRMDTSecretManagerStateStore( + RMStateStoreHelper stateStoreHelper) throws Exception { + RMStateStore store = stateStoreHelper.getRMStateStore(); + TestDispatcher dispatcher = new TestDispatcher(); + store.setDispatcher(dispatcher); + + // store RM delegation token; + RMDelegationTokenIdentifier dtId1 = + new RMDelegationTokenIdentifier(new Text("owner1"), + new Text("renewer1"), new Text("realuser1")); + Long renewDate1 = new Long(System.currentTimeMillis()); + store.storeRMDelegationToken(dtId1, renewDate1); + Map token1 = + new HashMap(); + token1.put(dtId1, renewDate1); + + // store delegation key; + DelegationKey key = new DelegationKey(1234, 4321 , "keyBytes".getBytes()); + store.storeMasterKey(key); + + RMDTSecretManagerState secretManagerState = + store.loadState().getRMDTSecretManagerState(); + Assert.assertEquals(token1, secretManagerState.getRMDelegationTokenState()); + Assert.assertEquals(key, secretManagerState.getRMDTMasterKeyState() + .iterator().next()); + } + private List> generateTokens(ApplicationAttemptId attemptId, ApplicationTokenSecretManager appTokenMgr, ClientToAMTokenSecretManagerInRM clientTokenMgr, Configuration conf) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index c596253..30f965c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -160,7 +160,7 @@ public MyDelegationTokenSecretManager(long delegationKeyUpdateInterval, } @Override //DelegationTokenSecretManager - public void logUpdateMasterKey(DelegationKey key) throws IOException { + public void storeNewMasterKey(DelegationKey key) throws IOException { return; } }