diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java index fde11e7..42085ac 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java @@ -141,15 +141,73 @@ public synchronized void addKey(DelegationKey key) throws IOException { public synchronized DelegationKey[] getAllKeys() { return allKeys.values().toArray(new DelegationKey[0]); } - + + // HDFS protected void logUpdateMasterKey(DelegationKey key) throws IOException { return; } - + + // HDFS protected void logExpireToken(TokenIdent ident) throws IOException { return; } + // RM + protected void storeNewMasterKey(DelegationKey key) throws IOException { + return; + } + + // RM + protected void removeStoredMasterKey(DelegationKey key) { + return; + } + + // RM + protected void storeNewToken(TokenIdent ident, long renewDate) { + return; + } + // RM + protected void removeStoredToken(TokenIdent ident) throws IOException { + + } + // RM + protected void updateStoredToken(TokenIdent ident, long renewDate) { + return; + } + + /** + * This method is intended to be used for recovering persisted delegation + * tokens + * @param identifier identifier read from persistent storage + * @param renewDate token renew time + * @throws IOException + */ + public synchronized void addPersistedDelegationToken( + TokenIdent identifier, long renewDate) throws IOException { + if (running) { + // a safety check + throw new IOException( + "Can't add persisted delegation token to a running SecretManager."); + } + int keyId = identifier.getMasterKeyId(); + DelegationKey dKey = allKeys.get(keyId); + if (dKey == null) { + LOG.warn("No KEY found for persisted identifier " + identifier.toString()); + return; + } + byte[] password = createPassword(identifier.getBytes(), dKey.getKey()); + if (identifier.getSequenceNumber() > this.delegationTokenSequenceNumber) { + this.delegationTokenSequenceNumber = identifier.getSequenceNumber(); + } + if (currentTokens.get(identifier) == null) { + currentTokens.put(identifier, new DelegationTokenInformation(renewDate, + password)); + } else { + throw new IOException( + "Same delegation token being added twice."); + } + } + /** * Update the current master key * This is called once by startThreads before tokenRemoverThread is created, @@ -167,6 +225,7 @@ private void updateCurrentKey() throws IOException { + keyUpdateInterval + tokenMaxLifetime, generateSecret()); //Log must be invoked outside the lock on 'this' logUpdateMasterKey(newKey); + storeNewMasterKey(newKey); synchronized (this) { currentId = newKey.getKeyId(); currentKey = newKey; @@ -200,6 +259,10 @@ private synchronized void removeExpiredKeys() { Map.Entry e = it.next(); if (e.getValue().getExpiryDate() < now) { it.remove(); + // ensure the tokens generated by this current key can be recovered + // with this current key after this current key is rolled + if(!e.getValue().equals(currentKey)) + removeStoredMasterKey(e.getValue()); } } } @@ -215,6 +278,7 @@ private synchronized void removeExpiredKeys() { identifier.setSequenceNumber(sequenceNum); LOG.info("Creating password for identifier: " + identifier); byte[] password = createPassword(identifier.getBytes(), currentKey.getKey()); + storeNewToken(identifier, now + tokenRenewInterval); currentTokens.put(identifier, new DelegationTokenInformation(now + tokenRenewInterval, password)); return password; @@ -302,6 +366,7 @@ public synchronized long renewToken(Token token, throw new InvalidToken("Renewal request for unknown token"); } currentTokens.put(id, info); + updateStoredToken(id, renewTime); return renewTime; } @@ -337,6 +402,7 @@ public synchronized TokenIdent cancelToken(Token token, if (info == null) { throw new InvalidToken("Token not found"); } + removeStoredToken(id); return id; } @@ -387,6 +453,7 @@ private void removeExpiredToken() throws IOException { // don't hold lock on 'this' to avoid edit log updates blocking token ops for (TokenIdent ident : expiredTokens) { logExpireToken(ident); + removeStoredToken(ident); } } diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java index 3458b2d..c3764b7 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java @@ -18,18 +18,19 @@ package org.apache.hadoop.security.token.delegation; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Arrays; import javax.crypto.SecretKey; +import org.apache.avro.reflect.Nullable; +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; -import org.apache.avro.reflect.Nullable; /** * Key used for generating and verifying delegation tokens @@ -117,4 +118,25 @@ public void readFields(DataInput in) throws IOException { in.readFully(keyBytes); } } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37).append(keyId).append(expiryDate) + .append(keyBytes).toHashCode(); + } + + @Override + public boolean equals(Object right) { + if (this == right) { + return true; + } else if (right == null || getClass() != right.getClass()) { + return false; + } else { + DelegationKey r = (DelegationKey) right; + return keyId == r.keyId && + expiryDate == r.expiryDate && + Arrays.equals(keyBytes, r.keyBytes); + } + } + } diff --git hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java index 85e2279..c40f4d4 100644 --- hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java +++ hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java @@ -101,7 +101,12 @@ public TestDelegationTokenIdentifier createIdentifier() { protected byte[] createPassword(TestDelegationTokenIdentifier t) { return super.createPassword(t); } - + + @Override + protected void removeStoredMasterKey(DelegationKey key) { + Assert.assertFalse(key.equals(allKeys.get(currentId))); + } + public byte[] createPassword(TestDelegationTokenIdentifier t, DelegationKey key) { return SecretManager.createPassword(t.getBytes(), key.getKey()); } @@ -316,7 +321,7 @@ public void testRollMasterKey() throws Exception { int prevNumKeys = dtSecretManager.getAllKeys().length; dtSecretManager.rollMasterKey(); - + //after rolling, the length of the keys list must increase int currNumKeys = dtSecretManager.getAllKeys().length; Assert.assertEquals((currNumKeys - prevNumKeys) >= 1, true); diff --git hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java index ebb3f5d..8955537 100644 --- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java +++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java @@ -138,43 +138,6 @@ public synchronized void saveSecretManagerState(DataOutputStream out) } /** - * This method is intended to be used only while reading edit logs. - * - * @param identifier DelegationTokenIdentifier read from the edit logs or - * fsimage - * - * @param expiryTime token expiry time - * @throws IOException - */ - public synchronized void addPersistedDelegationToken( - DelegationTokenIdentifier identifier, long expiryTime) throws IOException { - if (running) { - // a safety check - throw new IOException( - "Can't add persisted delegation token to a running SecretManager."); - } - int keyId = identifier.getMasterKeyId(); - DelegationKey dKey = allKeys.get(keyId); - if (dKey == null) { - LOG - .warn("No KEY found for persisted identifier " - + identifier.toString()); - return; - } - byte[] password = createPassword(identifier.getBytes(), dKey.getKey()); - if (identifier.getSequenceNumber() > this.delegationTokenSequenceNumber) { - this.delegationTokenSequenceNumber = identifier.getSequenceNumber(); - } - if (currentTokens.get(identifier) == null) { - currentTokens.put(identifier, new DelegationTokenInformation(expiryTime, - password)); - } else { - throw new IOException( - "Same delegation token being added twice; invalid entry in fsimage or editlogs"); - } - } - - /** * Add a MasterKey to the list of keys. * * @param key DelegationKey diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index e923b1e..b8208a2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -239,7 +239,7 @@ public synchronized void init(Configuration conf) { // Register event handler for RMAppManagerEvents this.rmDispatcher.register(RMAppManagerEventType.class, this.rmAppManager); - this.rmDTSecretManager = createRMDelegationTokenSecretManager(); + this.rmDTSecretManager = createRMDelegationTokenSecretManager(this.rmContext); clientRM = createClientRMService(); addService(clientRM); @@ -666,7 +666,7 @@ protected ResourceTrackerService createResourceTrackerService() { } protected RMDelegationTokenSecretManager - createRMDelegationTokenSecretManager() { + createRMDelegationTokenSecretManager(RMContext rmContext) { long secretKeyInterval = conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY, YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); @@ -678,7 +678,7 @@ protected ResourceTrackerService createResourceTrackerService() { YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); return new RMDelegationTokenSecretManager(secretKeyInterval, - tokenMaxLifetime, tokenRenewInterval, 3600000); + tokenMaxLifetime, tokenRenewInterval, 3600000, rmContext); } protected ClientRMService createClientRMService() { @@ -745,6 +745,9 @@ public ApplicationTokenSecretManager getApplicationTokenSecretManager(){ @Override public void recover(RMState state) throws Exception { + // recover RMdelegationTokenSecretManager + rmDTSecretManager.recover(state); + // recover applications rmAppManager.recover(state); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 03154b6..414f232 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -18,6 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.util.ArrayList; import java.util.List; @@ -33,11 +37,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -57,11 +63,16 @@ public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class); private static final String ROOT_DIR_NAME = "FSRMStateRoot"; - + private static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot"; + private static final String RM_APP_ROOT = "RMAppRoot"; + private static final String DELEGATION_KEY_PREFIX = "DelegationKey_"; + private static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_"; private FileSystem fs; private Path fsRootDirPath; + private Path fsRMDTSecretManagerRoot; + private Path fsRMAppRoot; @VisibleForTesting Path fsWorkingPath; @@ -71,10 +82,13 @@ public synchronized void initInternal(Configuration conf) fsWorkingPath = new Path(conf.get(YarnConfiguration.FS_RM_STATE_STORE_URI)); fsRootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME); + fsRMDTSecretManagerRoot = new Path(fsRootDirPath, RM_DT_SECRET_MANAGER_ROOT); + fsRMAppRoot = new Path(fsRootDirPath, RM_APP_ROOT); // create filesystem fs = fsWorkingPath.getFileSystem(conf); - fs.mkdirs(fsRootDirPath); + fs.mkdirs(fsRMDTSecretManagerRoot); + fs.mkdirs(fsRMAppRoot); } @Override @@ -84,15 +98,23 @@ protected synchronized void closeInternal() throws Exception { @Override public synchronized RMState loadState() throws Exception { + RMState rmState = new RMState(); + // recover DelegationTokenSecretManager + loadRMDTSecretManagerState(rmState); + // recover RM applications + loadRMAppState(rmState); + return rmState; + } + + private void loadRMAppState(RMState rmState) throws Exception { try { - RMState state = new RMState(); - FileStatus[] childNodes = fs.listStatus(fsRootDirPath); + FileStatus[] childNodes = fs.listStatus(fsRMAppRoot); List attempts = new ArrayList(); for(FileStatus childNodeStatus : childNodes) { assert childNodeStatus.isFile(); String childNodeName = childNodeStatus.getPath().getName(); - Path childNodePath = getNodePath(childNodeName); + Path childNodePath = getNodePath(fsRMAppRoot, childNodeName); byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); if(childNodeName.startsWith(ApplicationId.appIdStrPrefix)){ // application @@ -106,7 +128,7 @@ public synchronized RMState loadState() throws Exception { appStateData.getApplicationSubmissionContext()); // assert child node name is same as actual applicationId assert appId.equals(appState.context.getApplicationId()); - state.appState.put(appId, appState); + rmState.appState.put(appId, appState); } else if(childNodeName.startsWith( ApplicationAttemptId.appAttemptIdStrPrefix)) { // attempt @@ -138,7 +160,7 @@ public synchronized RMState loadState() throws Exception { // go through all attempts and add them to their apps for(ApplicationAttemptState attemptState : attempts) { ApplicationId appId = attemptState.getAttemptId().getApplicationId(); - ApplicationState appState = state.appState.get(appId); + ApplicationState appState = rmState.appState.get(appId); if(appState != null) { appState.attempts.put(attemptState.getAttemptId(), attemptState); } else { @@ -147,22 +169,46 @@ public synchronized RMState loadState() throws Exception { // application attempt nodes LOG.info("Application node not found for attempt: " + attemptState.getAttemptId()); - deleteFile(getNodePath(attemptState.getAttemptId().toString())); + deleteFile(getNodePath(fsRMAppRoot, attemptState.getAttemptId().toString())); } } - - return state; } catch (Exception e) { LOG.error("Failed to load state.", e); throw e; } } + private void loadRMDTSecretManagerState(RMState rmState) throws Exception { + FileStatus[] childNodes = fs.listStatus(fsRMDTSecretManagerRoot); + + for(FileStatus childNodeStatus : childNodes) { + assert childNodeStatus.isFile(); + String childNodeName = childNodeStatus.getPath().getName(); + Path childNodePath = getNodePath(fsRMDTSecretManagerRoot, childNodeName); + byte[] childData = readFile(childNodePath, childNodeStatus.getLen()); + ByteArrayInputStream is = new ByteArrayInputStream(childData); + DataInputStream fsIn = new DataInputStream(is); + if(childNodeName.startsWith(DELEGATION_KEY_PREFIX)){ + DelegationKey key = new DelegationKey(); + key.readFields(fsIn); + rmState.rmSecretManagerState.masterKeyState.add(key); + } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) { + RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier(); + identifier.readFields(fsIn); + long renewDate = fsIn.readLong(); + rmState.rmSecretManagerState.delegationTokenState.put(identifier, + renewDate); + } else { + LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager"); + } + fsIn.close(); + } + } + @Override public synchronized void storeApplicationState(String appId, - ApplicationStateDataPBImpl appStateDataPB) - throws Exception { - Path nodeCreatePath = getNodePath(appId); + ApplicationStateDataPBImpl appStateDataPB) throws Exception { + Path nodeCreatePath = getNodePath(fsRMAppRoot, appId); LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath); byte[] appStateData = appStateDataPB.getProto().toByteArray(); @@ -178,9 +224,8 @@ public synchronized void storeApplicationState(String appId, @Override public synchronized void storeApplicationAttemptState(String attemptId, - ApplicationAttemptStateDataPBImpl attemptStateDataPB) - throws Exception { - Path nodeCreatePath = getNodePath(attemptId); + ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception { + Path nodeCreatePath = getNodePath(fsRMAppRoot, attemptId); LOG.info("Storing info for attempt: " + attemptId + " at: " + nodeCreatePath); byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); @@ -196,9 +241,9 @@ public synchronized void storeApplicationAttemptState(String attemptId, @Override public synchronized void removeApplicationState(ApplicationState appState) - throws Exception { + throws Exception { String appId = appState.getAppId().toString(); - Path nodeRemovePath = getNodePath(appId); + Path nodeRemovePath = getNodePath(fsRMAppRoot, appId); LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath); deleteFile(nodeRemovePath); for(ApplicationAttemptId attemptId : appState.attempts.keySet()) { @@ -207,13 +252,59 @@ public synchronized void removeApplicationState(ApplicationState appState) } public synchronized void removeApplicationAttemptState(String attemptId) - throws Exception { - Path nodeRemovePath = getNodePath(attemptId); + throws Exception { + Path nodeRemovePath = getNodePath(fsRMAppRoot, attemptId); LOG.info("Removing info for attempt: " + attemptId + " at: " + nodeRemovePath); deleteFile(nodeRemovePath); } + @Override + public synchronized void storeRMDelegationTokenState( + RMDelegationTokenIdentifier identifier, Long renewDate) + throws Exception { + Path nodeCreatePath = getNodePath(fsRMDTSecretManagerRoot, + DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + DataOutputStream fsOut = new DataOutputStream(os); + LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber()); + identifier.write(fsOut); + fsOut.writeLong(renewDate); + writeFile(nodeCreatePath, os.toByteArray()); + fsOut.close(); + } + + @Override + public synchronized void removeRMDelegationTokenState( + RMDelegationTokenIdentifier identifier) throws Exception { + Path nodeCreatePath = getNodePath(fsRMDTSecretManagerRoot, + DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); + LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber()); + deleteFile(nodeCreatePath); + } + + @Override + public synchronized void storeMasterKeyState(DelegationKey masterKey) + throws Exception { + Path nodeCreatePath = getNodePath(fsRMDTSecretManagerRoot, + DELEGATION_KEY_PREFIX + masterKey.getKeyId()); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + DataOutputStream fsOut = new DataOutputStream(os); + LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId()); + masterKey.write(fsOut); + writeFile(nodeCreatePath, os.toByteArray()); + fsOut.close(); + } + + @Override + public synchronized void + removeMasterKeyState(DelegationKey masterKey) throws Exception { + Path nodeCreatePath = getNodePath(fsRMDTSecretManagerRoot, + DELEGATION_KEY_PREFIX + masterKey.getKeyId()); + LOG.info("Removing RMDelegationKey_"+ masterKey.getKeyId()); + deleteFile(nodeCreatePath); + } + // FileSystem related code private void deleteFile(Path deletePath) throws Exception { @@ -227,18 +318,17 @@ private void deleteFile(Path deletePath) throws Exception { // state data will not be that "long" byte[] data = new byte[(int)len]; fsIn.readFully(data); + fsIn.close(); return data; } private void writeFile(Path outputPath, byte[] data) throws Exception { FSDataOutputStream fsOut = fs.create(outputPath, false); fsOut.write(data); - fsOut.flush(); fsOut.close(); } - @VisibleForTesting - Path getNodePath(String nodeName) { - return new Path(fsRootDirPath, nodeName); + private Path getNodePath(Path root, String nodeName) { + return new Path(root, nodeName); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index dd6fab5..f01a5c3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -19,14 +19,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; import java.io.IOException; +import java.util.Map; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -49,6 +53,10 @@ public synchronized RMState loadState() throws Exception { // return a copy of the state to allow for modification of the real state RMState returnState = new RMState(); returnState.appState.putAll(state.appState); + returnState.rmSecretManagerState.getRMDTMasterKeyState() + .addAll(state.rmSecretManagerState.getRMDTMasterKeyState()); + returnState.rmSecretManagerState.getRMDelegationTokenState().putAll( + state.rmSecretManagerState.getRMDelegationTokenState()); return returnState; } @@ -113,4 +121,52 @@ public synchronized void removeApplicationState(ApplicationState appState) ApplicationState removed = state.appState.remove(appId); assert removed != null; } + + @Override + public synchronized void storeRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) + throws Exception { + Map rmDTState = + state.rmSecretManagerState.getRMDelegationTokenState(); + if(rmDTState.containsKey(rmDTIdentifier)) { + IOException e = new IOException("RMDelegationToken: " + rmDTIdentifier + + "is already stored."); + LOG.info("Error storing info for RMDelegationToken: " + rmDTIdentifier, e); + throw e; + } + rmDTState.put(rmDTIdentifier, renewDate); + } + + @Override + public synchronized void removeRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier) throws Exception{ + Map rmDTState = + state.rmSecretManagerState.getRMDelegationTokenState(); + rmDTState.remove(rmDTIdentifier); + } + + @Override + public synchronized void storeMasterKeyState(DelegationKey delegationKey) + throws Exception { + Set rmDTMasterKeyState = + state.rmSecretManagerState.getRMDTMasterKeyState(); + + if (rmDTMasterKeyState.contains(delegationKey)) { + IOException e = new IOException("RMDTMasterKey with keyID: " + + delegationKey.getKeyId() + " is already stored"); + LOG.info("Error storing info for RMDTMasterKey with keyID: " + + delegationKey.getKeyId(), e); + throw e; + } + state.getRMDTSecretManagerState().getRMDTMasterKeyState().add(delegationKey); + LOG.info("rmDTMasterKeyState SIZE: " + rmDTMasterKeyState.size()); + } + + @Override + public synchronized void removeMasterKeyState(DelegationKey delegationKey) + throws Exception { + Set rmDTMasterKeyState = + state.rmSecretManagerState.getRMDTMasterKeyState(); + rmDTMasterKeyState.remove(delegationKey); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index 29bdbb0..93dc764 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -18,10 +18,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; + import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; @Unstable public class NullRMStateStore extends RMStateStore { @@ -59,4 +62,26 @@ protected void removeApplicationState(ApplicationState appState) // Do nothing } + @Override + public void storeRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) + throws Exception { + // Do nothing + } + + @Override + public void removeRMDelegationTokenState(RMDelegationTokenIdentifier rmDTIdentifier) + throws Exception { + // Do nothing + } + + @Override + public void storeMasterKeyState(DelegationKey delegationKey) throws Exception { + // Do nothing + } + + @Override + public void removeMasterKeyState(DelegationKey delegationKey) throws Exception { + // Do nothing + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index e1400f4..da6b7ff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -20,7 +20,9 @@ import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,6 +32,7 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -42,6 +45,7 @@ import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -57,7 +61,7 @@ */ public abstract class RMStateStore { public static final Log LOG = LogFactory.getLog(RMStateStore.class); - + /** * State of an application attempt */ @@ -115,17 +119,40 @@ public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) { return attempts.get(attemptId); } } - + + public static class RMDTSecretManagerState { + // DTIdentifier -> renewDate + Map delegationTokenState = + new HashMap(); + + Set masterKeyState = + new HashSet(); + + public Map getRMDelegationTokenState() { + return delegationTokenState; + } + + public Set getRMDTMasterKeyState() { + return masterKeyState; + } + } + /** * State of the ResourceManager */ public static class RMState { - Map appState = - new HashMap(); - + Map appState = + new HashMap(); + + RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState(); + public Map getApplicationState() { return appState; } + + public RMDTSecretManagerState getRMDTSecretManagerState() { + return rmSecretManagerState; + } } private Dispatcher rmDispatcher; @@ -229,8 +256,72 @@ public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) { protected abstract void storeApplicationAttemptState(String attemptId, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception; - - + + + /** + * RMDTSecretManager call this to store the state of a delegation token + */ + public synchronized void storeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) + throws Exception { + storeRMDelegationTokenState(rmDTIdentifier, renewDate); + } + + /** + * Blocking API + * Derived classes must implement this method to store the state of RMDelegationToken + */ + protected abstract void storeRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) + throws Exception; + + /** + * RMDTSecretManager call this to remove the state of a delegation token + */ + public synchronized void removeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { + removeRMDelegationTokenState(rmDTIdentifier); + } + + /** + * Blocking API + * Derived classes must implement this method to remove the state of RMDelegationToken + */ + protected abstract void removeRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier) throws Exception; + + /** + * RMDTSecretManager call this to store the state of a master key + */ + public synchronized void storeMasterKey(DelegationKey delegationKey) + throws Exception { + storeMasterKeyState(delegationKey); + } + + /** + * Blocking API + * Derived classes must implement this method to store the state of + * DelegationToken Master Key + */ + protected abstract void storeMasterKeyState(DelegationKey delegationKey) + throws Exception; + + /** + * RMDTSecretManager call this to remove the state of a master key + */ + public synchronized void removeMasterKey(DelegationKey delegationKey) + throws Exception { + removeMasterKeyState(delegationKey); + } + + /** + * Blocking API + * Derived classes must implement this method to remove the state of + * DelegationToken Master Key + */ + protected abstract void removeMasterKeyState(DelegationKey delegationKey) + throws Exception; + /** * Non-blocking API * ResourceManager services call this to remove an application from the state diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java index f70605c..e117aca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java @@ -18,10 +18,25 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; + +import com.google.common.annotations.VisibleForTesting; /** * A ResourceManager specific delegation token secret manager. @@ -32,6 +47,10 @@ @InterfaceStability.Unstable public class RMDelegationTokenSecretManager extends AbstractDelegationTokenSecretManager { + private static final Log LOG = LogFactory + .getLog(RMDelegationTokenSecretManager.class); + + protected final RMContext rmContext; /** * Create a secret manager @@ -46,13 +65,119 @@ public RMDelegationTokenSecretManager(long delegationKeyUpdateInterval, long delegationTokenMaxLifetime, long delegationTokenRenewInterval, - long delegationTokenRemoverScanInterval) { + long delegationTokenRemoverScanInterval, + RMContext rmContext) { super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval, delegationTokenRemoverScanInterval); + this.rmContext = rmContext; } @Override public RMDelegationTokenIdentifier createIdentifier() { return new RMDelegationTokenIdentifier(); } + + + @Override + protected void storeNewMasterKey(DelegationKey newKey) { + try { + LOG.info("storing master key with keyID " + newKey.getKeyId()); + rmContext.getStateStore().storeMasterKey(newKey); + } catch (Exception e) { + LOG.error("Error in storing master key with KeyID: " + newKey.getKeyId()); + ExitUtil.terminate(1, e); + } + } + + @Override + protected void removeStoredMasterKey(DelegationKey key) { + try { + LOG.info("removing master key with keyID " + key.getKeyId()); + rmContext.getStateStore().removeMasterKey(key); + } catch (Exception e) { + LOG.error("Error in removing master key with KeyID: " + key.getKeyId()); + ExitUtil.terminate(1, e); + } + } + + @Override + protected void storeNewToken(RMDelegationTokenIdentifier identifier, + long renewDate) { + try { + LOG.info("storing RMDelegation token with sequence number: " + + identifier.getSequenceNumber()); + rmContext.getStateStore().storeRMDelegationToken(identifier, renewDate); + } catch (Exception e) { + LOG.error("Error in storing RMDelegationToken with sequence number: " + + identifier.getSequenceNumber()); + ExitUtil.terminate(1, e); + } + } + + @Override + protected void updateStoredToken(RMDelegationTokenIdentifier id, + long renewDate) { + try { + LOG.info("updating RMDelegation token with sequence number: " + + id.getSequenceNumber()); + rmContext.getStateStore().removeRMDelegationToken(id); + rmContext.getStateStore().storeRMDelegationToken(id, renewDate); + } catch (Exception e) { + LOG.error("Error in updating persisted RMDelegationToken with sequence number: " + + id.getSequenceNumber()); + ExitUtil.terminate(1, e); + } + } + + @Override + protected void removeStoredToken(RMDelegationTokenIdentifier ident) + throws IOException { + try { + LOG.info("removing RMDelegation token with sequence number: " + + ident.getSequenceNumber()); + rmContext.getStateStore().removeRMDelegationToken(ident); + } catch (Exception e) { + LOG.error("Error in removing RMDelegationToken with sequence number: " + + ident.getSequenceNumber()); + ExitUtil.terminate(1, e); + } + } + + @Private + @VisibleForTesting + public synchronized Set getAllMasterKeys() { + HashSet keySet = new HashSet(); + keySet.addAll(allKeys.values()); + return keySet; + } + + @Private + @VisibleForTesting + public synchronized Map getAllTokens() { + Map allTokens = + new HashMap(); + + for (Map.Entry entry : currentTokens.entrySet()) { + allTokens.put(entry.getKey(), entry.getValue().getRenewDate()); + } + return allTokens; + } + + public void recover(RMState rmState) throws Exception { + + LOG.info("recovering RMDelegationTokenSecretManager."); + // recover RMDTMasterKeys + for (DelegationKey dtKey : rmState.getRMDTSecretManagerState().getRMDTMasterKeyState()) { + addKey(dtKey); + } + + // recover RMDelegationTokens + Map rmDelegationTokens = + rmState.getRMDTSecretManagerState().getRMDelegationTokenState(); + for (Map.Entry entry : rmDelegationTokens + .entrySet()) { + addPersistedDelegationToken(entry.getKey(), entry.getValue()); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 5ddd5b4..2b6e015 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -283,7 +283,7 @@ public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId) @Override protected ClientRMService createClientRMService() { return new ClientRMService(getRMContext(), getResourceScheduler(), - rmAppManager, applicationACLsManager, null) { + rmAppManager, applicationACLsManager, rmDTSecretManager) { @Override public void start() { // override to not start rpc handler diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 577ea0a..16f1885 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -90,7 +91,9 @@ @BeforeClass public static void setupSecretManager() throws IOException { - dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getStateStore()).thenReturn(new NullRMStateStore()); + dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, rmContext); dtsm.startThreads(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java index d334cc5..d0f1c00 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java @@ -17,8 +17,18 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; @@ -52,6 +62,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -438,10 +449,15 @@ private static ResourceScheduler createMockScheduler(Configuration conf) { return mockSched; } - private static RMDelegationTokenSecretManager createRMDelegationTokenSecretManager( - long secretKeyInterval, long tokenMaxLifetime, long tokenRenewInterval) { - RMDelegationTokenSecretManager rmDtSecretManager = new RMDelegationTokenSecretManager( - secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, 3600000); + private static RMDelegationTokenSecretManager + createRMDelegationTokenSecretManager(long secretKeyInterval, + long tokenMaxLifetime, long tokenRenewInterval) { + RMContext rmContext = mock(RMContext.class); + when(rmContext.getStateStore()).thenReturn(new NullRMStateStore()); + + RMDelegationTokenSecretManager rmDtSecretManager = + new RMDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime, + tokenRenewInterval, 3600000, rmContext); return rmDtSecretManager; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index b2b5205..99d6d25 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -25,22 +28,29 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.DelegationToken; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; @@ -57,25 +67,34 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.hadoop.yarn.util.ProtoUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class TestRMRestart { - - @Test - public void testRMRestart() throws Exception { + + private YarnConfiguration conf; + + @Before + public void setup() { Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); ExitUtil.disableSystemExit(); - - YarnConfiguration conf = new YarnConfiguration(); + conf = new YarnConfiguration(); + UserGroupInformation.setConfiguration(conf); conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName()); + } + + @Test + public void testRMRestart() throws Exception { Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -327,13 +346,6 @@ public void testRMRestart() throws Exception { @Test public void testRMRestartOnMaxAppAttempts() throws Exception { - Logger rootLogger = LogManager.getRootLogger(); - rootLogger.setLevel(Level.DEBUG); - ExitUtil.disableSystemExit(); - - YarnConfiguration conf = new YarnConfiguration(); - conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); - conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -406,13 +418,6 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { @Test public void testDelegationTokenRestoredInDelegationTokenRenewer() throws Exception { - Logger rootLogger = LogManager.getRootLogger(); - rootLogger.setLevel(Level.DEBUG); - ExitUtil.disableSystemExit(); - - YarnConfiguration conf = new YarnConfiguration(); - conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); - conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); @@ -491,13 +496,6 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer() @Test public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { - Logger rootLogger = LogManager.getRootLogger(); - rootLogger.setLevel(Level.DEBUG); - ExitUtil.disableSystemExit(); - - YarnConfiguration conf = new YarnConfiguration(); - conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); - conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); @@ -572,6 +570,194 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { rm2.stop(); } + @Test + public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Map rmAppState = + rmState.getApplicationState(); + Map rmDTState = + rmState.getRMDTSecretManagerState().getRMDelegationTokenState(); + Set rmDTMasterKeyState = + rmState.getRMDTSecretManagerState().getRMDTMasterKeyState(); + + MockRM rm1 = new TestSecurityMockRM(conf, memStore); + rm1.start(); + + // create an empty credential + Credentials ts = new Credentials(); + + // create a token and add into credential + HashSet tokenIdentSet = + new HashSet(); + Text userText1 = new Text("user1"); + RMDelegationTokenIdentifier dtId1 = + new RMDelegationTokenIdentifier(userText1, new Text("renewer1"), + userText1); + Token token1 = + new Token(dtId1, + rm1.getRMDTSecretManager()); + ts.addToken(userText1, token1); + tokenIdentSet.add(dtId1); + + // submit an app with customized credential + RMApp app = rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, ts); + + // assert app info is saved + ApplicationState appState = rmAppState.get(app.getApplicationId()); + Assert.assertNotNull(appState); + + // assert all master keys are saved + Set allKeysRM1 = rm1.getRMDTSecretManager().getAllMasterKeys(); + Assert.assertEquals(allKeysRM1, rmDTMasterKeyState); + + // assert all tokens are saved + Map allTokensRM1 = + rm1.getRMDTSecretManager().getAllTokens(); + Assert.assertEquals(tokenIdentSet, allTokensRM1.keySet()); + Assert.assertEquals(allTokensRM1, rmDTState); + + // start new RM + MockRM rm2 = new TestSecurityMockRM(conf, memStore); + rm2.start(); + + // assert master keys and tokens are populated back to DTSecretManager + Map allTokensRM2 = + rm2.getRMDTSecretManager().getAllTokens(); + // rm2 has its own master keys when it starts, we use containsAll here + Assert.assertTrue(rm2.getRMDTSecretManager().getAllMasterKeys() + .containsAll(allKeysRM1)); + Assert.assertEquals(allTokensRM1, allTokensRM2); + + // renewDate before renewing + Long renewDateBeforeRenew = allTokensRM2.get(dtId1); + try{ + // renew recovered token + rm2.getRMDTSecretManager().renewToken(token1, "renewer1"); + } catch(Exception e) { + Assert.fail(); + } + + allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens(); + Long renewDateAfterRenew = allTokensRM2.get(dtId1); + // assert token is renewed + Assert.assertTrue(renewDateAfterRenew > renewDateBeforeRenew); + + // assert new token is added into state store + Assert.assertTrue(rmDTState.containsValue(renewDateAfterRenew)); + // assert old token is removed from state store + Assert.assertFalse(rmDTState.containsValue(renewDateBeforeRenew)); + + try{ + rm2.getRMDTSecretManager().cancelToken(token1, "user1"); + } catch(Exception e) { + Assert.fail(); + } + + // assert token is removed from state after its cancelled + allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens(); + Assert.assertFalse(allTokensRM2.containsKey(dtId1)); + Assert.assertFalse(rmDTState.containsKey(dtId1)); + + // stop the RM + rm1.stop(); + rm2.stop(); + } + + @Test + public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Map rmDTState = + rmState.getRMDTSecretManagerState().getRMDelegationTokenState(); + Set rmDTMasterKeyState = + rmState.getRMDTSecretManagerState().getRMDTMasterKeyState(); + + MockRM rm1 = new MyMockRM(conf, memStore); + rm1.start(); + // on rm start, two master keys are created. + // One is created at RMDTSecretMgr.startThreads.updateCurrentKey(); + // the other is created on the first run of tokenRemoverThread.rollMasterKey() + + RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager(); + // assert all master keys are saved + Assert.assertEquals(dtSecretManager.getAllMasterKeys(), rmDTMasterKeyState); + Set expiringKeys = new HashSet(); + expiringKeys.addAll(dtSecretManager.getAllMasterKeys()); + + // record the current key + DelegationKey oldCurrentKey = + ((TestRMDelegationTokenSecretManager) dtSecretManager).getCurrentKey(); + + // request to generate a RMDelegationToken + GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class); + when(request.getRenewer()).thenReturn("renew1"); + GetDelegationTokenResponse response = + rm1.getClientRMService().getDelegationToken(request); + DelegationToken delegationToken = response.getRMDelegationToken(); + Token token1 = + ProtoUtils.convertFromProtoFormat(delegationToken, null); + DataInputBuffer inBuf = new DataInputBuffer(); + byte[] bytes = token1.getIdentifier(); + inBuf.reset(bytes, bytes.length); + RMDelegationTokenIdentifier dtId1 = new RMDelegationTokenIdentifier(); + dtId1.readFields(inBuf); + + // wait for the first rollMasterKey + while(((TestRMDelegationTokenSecretManager)dtSecretManager) + .numUpdatedKeys.get() < 1); + + // assert old-current-key and new-current-key exist + Assert.assertTrue(rmDTMasterKeyState.contains(oldCurrentKey)); + DelegationKey newCurrentKey = + ((TestRMDelegationTokenSecretManager) dtSecretManager).getCurrentKey(); + Assert.assertTrue(rmDTMasterKeyState.contains(newCurrentKey)); + + // wait for token to expire + // rollMasterKey is called every 1 second. + while(((TestRMDelegationTokenSecretManager)dtSecretManager) + .numUpdatedKeys.get() < 6); + Assert.assertFalse(rmDTState.containsKey(dtId1)); + rm1.stop(); + } + + @Test(timeout = 15000) + public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Set rmDTMasterKeyState = + rmState.getRMDTSecretManagerState().getRMDTMasterKeyState(); + + MockRM rm1 = new MyMockRM(conf, memStore); + rm1.start(); + RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager(); + + // assert all master keys are saved + Assert.assertEquals(dtSecretManager.getAllMasterKeys(), rmDTMasterKeyState); + Set expiringKeys = new HashSet(); + expiringKeys.addAll(dtSecretManager.getAllMasterKeys()); + + // wait for expiringKeys to expire + while(true) { + boolean expired = true; + for(DelegationKey key : expiringKeys) { + if(rmDTMasterKeyState.contains(key)){ + expired = false; + } + } + if(expired) + break; + } + } + class TestSecurityMockRM extends MockRM { public TestSecurityMockRM(Configuration conf, RMStateStore store) { @@ -600,4 +786,46 @@ protected void setTimerForTokenRenewal(DelegationTokenToRenew token) }; } } + + class MyMockRM extends TestSecurityMockRM { + + public MyMockRM(Configuration conf, RMStateStore store) { + super(conf, store); + } + + @Override + protected RMDelegationTokenSecretManager + createRMDelegationTokenSecretManager(RMContext rmContext) { + // KeyUpdateInterval-> 1 seconds + // TokenMaxLifetime-> 2 seconds. + return new TestRMDelegationTokenSecretManager(1000, 1000, 2000, + 1000, rmContext); + } + } + public class TestRMDelegationTokenSecretManager extends + RMDelegationTokenSecretManager { + public AtomicInteger numUpdatedKeys = new AtomicInteger(0); + public TestRMDelegationTokenSecretManager(long delegationKeyUpdateInterval, + long delegationTokenMaxLifetime, long delegationTokenRenewInterval, + long delegationTokenRemoverScanInterval, RMContext rmContext) { + super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, + delegationTokenRenewInterval, delegationTokenRemoverScanInterval, + rmContext); + } + + @Override + protected void storeNewMasterKey(DelegationKey newKey) { + super.storeNewMasterKey(newKey); + numUpdatedKeys.incrementAndGet(); + } + public DelegationKey getCurrentKey() { + for (int keyId : allKeys.keySet()) { + if (keyId == currentId) { + return allKeys.get(keyId); + } + } + return null; + } + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java index a245e54..cf8d35f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java @@ -31,6 +31,8 @@ import java.util.List; import java.util.Map; +import junit.framework.Assert; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -41,6 +43,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -53,8 +56,10 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -111,7 +116,8 @@ public void testFSRMStateStore() throws Exception { MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); try { TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster); - testRMStateStore(fsTester); + testRMAppStateStore(fsTester); + testRMDTSecretManagerStateStore(fsTester); } finally { cluster.shutdown(); } @@ -217,7 +223,7 @@ ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, } @SuppressWarnings("unchecked") - void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { + void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { long submitTime = System.currentTimeMillis(); Configuration conf = new YarnConfiguration(); RMStateStore store = stateStoreHelper.getRMStateStore(); @@ -333,6 +339,34 @@ void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { store.close(); } + public void testRMDTSecretManagerStateStore( + RMStateStoreHelper stateStoreHelper) throws Exception { + RMStateStore store = stateStoreHelper.getRMStateStore(); + TestDispatcher dispatcher = new TestDispatcher(); + store.setDispatcher(dispatcher); + + // store RM delegation token; + RMDelegationTokenIdentifier dtId1 = + new RMDelegationTokenIdentifier(new Text("owner1"), + new Text("renewer1"), new Text("realuser1")); + Long renewDate1 = new Long(System.currentTimeMillis()); + store.storeRMDelegationToken(dtId1, renewDate1); + Map token1 = + new HashMap(); + token1.put(dtId1, renewDate1); + + // store delegation key; + DelegationKey key = new DelegationKey(1234, 4321 , "keyBytes".getBytes()); + HashSet keySet = new HashSet(); + keySet.add(key); + store.storeMasterKey(key); + + RMDTSecretManagerState secretManagerState = + store.loadState().getRMDTSecretManagerState(); + Assert.assertEquals(token1, secretManagerState.getRMDelegationTokenState()); + Assert.assertEquals(keySet, secretManagerState.getRMDTMasterKeyState()); + } + private List> generateTokens(ApplicationAttemptId attemptId, ApplicationTokenSecretManager appTokenMgr, ClientToAMTokenSecretManagerInRM clientTokenMgr, Configuration conf) {