diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java index fde11e7..d009984 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java @@ -150,6 +150,22 @@ protected void logExpireToken(TokenIdent ident) throws IOException { return; } + protected void storeToken(TokenIdent identifier, long renewDate) { + return; + } + + protected void removeToken(TokenIdent identifier) { + return; + } + + protected void storeMasterKey(DelegationKey key) { + return; + } + + protected void removeMasterKey(DelegationKey key) { + return; + } + /** * Update the current master key * This is called once by startThreads before tokenRemoverThread is created, @@ -167,6 +183,7 @@ private void updateCurrentKey() throws IOException { + keyUpdateInterval + tokenMaxLifetime, generateSecret()); //Log must be invoked outside the lock on 'this' logUpdateMasterKey(newKey); + storeMasterKey(newKey); synchronized (this) { currentId = newKey.getKeyId(); currentKey = newKey; @@ -178,7 +195,7 @@ private void updateCurrentKey() throws IOException { * Update the current master key for generating delegation tokens * It should be called only by tokenRemoverThread. */ - void rollMasterKey() throws IOException { + protected void rollMasterKey() throws IOException { synchronized (this) { removeExpiredKeys(); /* set final expiry date for retiring currentKey */ @@ -200,6 +217,8 @@ private synchronized void removeExpiredKeys() { Map.Entry e = it.next(); if (e.getValue().getExpiryDate() < now) { it.remove(); + if(!e.getValue().equals(currentKey)) + removeMasterKey(e.getValue()); } } } @@ -215,6 +234,7 @@ private synchronized void removeExpiredKeys() { identifier.setSequenceNumber(sequenceNum); LOG.info("Creating password for identifier: " + identifier); byte[] password = createPassword(identifier.getBytes(), currentKey.getKey()); + storeToken(identifier, now + tokenRenewInterval); currentTokens.put(identifier, new DelegationTokenInformation(now + tokenRenewInterval, password)); return password; @@ -302,6 +322,8 @@ public synchronized long renewToken(Token token, throw new InvalidToken("Renewal request for unknown token"); } currentTokens.put(id, info); + removeToken(id); + storeToken(id, renewTime); return renewTime; } @@ -337,6 +359,7 @@ public synchronized TokenIdent cancelToken(Token token, if (info == null) { throw new InvalidToken("Token not found"); } + removeToken(id); return id; } @@ -387,6 +410,7 @@ private void removeExpiredToken() throws IOException { // don't hold lock on 'this' to avoid edit log updates blocking token ops for (TokenIdent ident : expiredTokens) { logExpireToken(ident); + removeToken(ident); } } diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java index 3458b2d..bdf498d 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java @@ -18,18 +18,19 @@ package org.apache.hadoop.security.token.delegation; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Arrays; import javax.crypto.SecretKey; +import org.apache.avro.reflect.Nullable; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableUtils; -import org.apache.avro.reflect.Nullable; /** * Key used for generating and verifying delegation tokens @@ -117,4 +118,24 @@ public void readFields(DataInput in) throws IOException { in.readFully(keyBytes); } } + + @Override + public int hashCode() { + return WritableComparator.hashBytes(keyBytes, keyBytes.length); + } + + @Override + public boolean equals(Object right) { + if (this == right) { + return true; + } else if (right == null || getClass() != right.getClass()) { + return false; + } else { + DelegationKey r = (DelegationKey) right; + return keyId == r.keyId && + expiryDate == r.expiryDate && + Arrays.equals(keyBytes, r.keyBytes); + } + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index e923b1e..9efd392 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -239,7 +239,7 @@ public synchronized void init(Configuration conf) { // Register event handler for RMAppManagerEvents this.rmDispatcher.register(RMAppManagerEventType.class, this.rmAppManager); - this.rmDTSecretManager = createRMDelegationTokenSecretManager(); + this.rmDTSecretManager = createRMDelegationTokenSecretManager(this.rmContext); clientRM = createClientRMService(); addService(clientRM); @@ -666,7 +666,7 @@ protected ResourceTrackerService createResourceTrackerService() { } protected RMDelegationTokenSecretManager - createRMDelegationTokenSecretManager() { + createRMDelegationTokenSecretManager(RMContext rmContext) { long secretKeyInterval = conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY, YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); @@ -678,7 +678,7 @@ protected ResourceTrackerService createResourceTrackerService() { YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); return new RMDelegationTokenSecretManager(secretKeyInterval, - tokenMaxLifetime, tokenRenewInterval, 3600000); + tokenMaxLifetime, tokenRenewInterval, 3600000, rmContext); } protected ClientRMService createClientRMService() { @@ -747,6 +747,9 @@ public ApplicationTokenSecretManager getApplicationTokenSecretManager(){ public void recover(RMState state) throws Exception { // recover applications rmAppManager.recover(state); + + // recover RMdelegationTokenSecretManager + rmDTSecretManager.recover(state); } public static void main(String argv[]) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index c4990da..d795f43 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -31,11 +32,13 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -56,7 +59,6 @@ private static final String ROOT_DIR_NAME = "FSRMStateRoot"; - private FileSystem fs; private Path fsRootDirPath; @@ -203,6 +205,38 @@ public synchronized void removeApplicationAttemptState(String attemptId) deleteFile(nodeRemovePath); } + @Override + public synchronized void storeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) + throws IOException { + + } + + @Override + public synchronized void removeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier) throws IOException { + + } + + @Override + public synchronized void storeMasterKey(DelegationKey masterKey) + throws IOException { + Path nodeCreatePath = + getNodePath(DELEGATION_KEY_PREFIX + masterKey.getKeyId()); + FSDataOutputStream fsOut = fs.create(nodeCreatePath, false); + LOG.info("Storing RMDelegationKey-" + masterKey.getKeyId()); + masterKey.write(fsOut); + } + + @Override + public synchronized void + removeMasterKey(DelegationKey masterKey) throws Exception { + Path nodeCreatePath = + getNodePath(DELEGATION_KEY_PREFIX + masterKey.getKeyId()); + LOG.info("Removing RMDelegationKey-"+ masterKey.getKeyId()); + deleteFile(nodeCreatePath); + } + // FileSystem related code private void deleteFile(Path deletePath) throws Exception { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 5fb1167..f9d90bf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -19,12 +19,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; import java.io.IOException; +import java.util.Map; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -47,6 +51,8 @@ public synchronized RMState loadState() throws Exception { // return a copy of the state to allow for modification of the real state RMState returnState = new RMState(); returnState.appState.putAll(state.appState); + returnState.rmDTMasterKeyState.addAll(state.rmDTMasterKeyState); + returnState.rmDTState.putAll(state.rmDTState); return returnState; } @@ -103,4 +109,52 @@ public synchronized void removeApplicationState(ApplicationState appState) ApplicationState removed = state.appState.remove(appId); assert removed != null; } + + @Override + public synchronized void storeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) + throws IOException { + Map rmDTState = + state.getRMDelegationTokenState(); + if(rmDTState.containsKey(rmDTIdentifier)) { + IOException e = new IOException("RMDelegationToken: " + rmDTIdentifier + + "is already stored."); + LOG.info("Error storing info for RMDelegationToken: " + rmDTIdentifier, e); + throw e; + } + rmDTState.put(rmDTIdentifier, renewDate); + } + + @Override + public synchronized void removeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier) throws IOException{ + Map rmDTState = + state.getRMDelegationTokenState(); + Long removed = rmDTState.remove(rmDTIdentifier); + assert removed != null; + } + + @Override + public synchronized void storeMasterKey(DelegationKey delegationKey) + throws IOException { + Set rmDTMasterKeyState = state.getRMDTMasterKeyState(); + + if (rmDTMasterKeyState.contains(delegationKey)) { + IOException e = new IOException("RMDTMasterKey with keyID: " + + delegationKey.getKeyId() + " is already stored"); + LOG.info("Error storing info for RMDTMasterKey with keyID: " + + delegationKey.getKeyId(), e); + throw e; + } + state.rmDTMasterKeyState.add(delegationKey); + LOG.info("rmDTMasterKeyState SIZE: " + rmDTMasterKeyState.size()); + } + + @Override + public synchronized void removeMasterKey(DelegationKey delegationKey) + throws IOException { + Set rmDTMasterKeyState = state.getRMDTMasterKeyState(); + boolean removed = rmDTMasterKeyState.remove(delegationKey); + assert removed; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index db04495..8725f09 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -20,6 +20,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; @@ -59,4 +61,28 @@ protected void removeApplicationState(ApplicationState appState) // Do nothing } + @Override + public void storeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) + throws Exception { + // Do nothing + } + + @Override + public void + removeRMDelegationToken(RMDelegationTokenIdentifier rmDTIdentifier) + throws Exception { + // Do nothing + } + + @Override + public void storeMasterKey(DelegationKey delegationKey) throws Exception { + // Do nothing + } + + @Override + public void removeMasterKey(DelegationKey delegationKey) throws Exception { + // Do nothing + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index d5c5015..046b792 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -19,13 +19,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -34,6 +37,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -51,7 +55,8 @@ */ public abstract class RMStateStore { public static final Log LOG = LogFactory.getLog(RMStateStore.class); - + protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_"; + /** * State of an application attempt */ @@ -103,17 +108,32 @@ public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) { return attempts.get(attemptId); } } - + /** * State of the ResourceManager */ public static class RMState { - Map appState = - new HashMap(); - + Map appState = + new HashMap(); + + // DTIdentifier -> renewDate + Map rmDTState = + new HashMap(); + + Set rmDTMasterKeyState = + new HashSet(); + public Map getApplicationState() { return appState; } + + public Map getRMDelegationTokenState() { + return rmDTState; + } + + public Set getRMDTMasterKeyState() { + return rmDTMasterKeyState; + } } private Dispatcher rmDispatcher; @@ -214,7 +234,20 @@ protected abstract void storeApplicationAttemptState(String attemptId, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception; - + + public abstract void storeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) + throws Exception; + + public abstract void removeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier) throws Exception; + + public abstract void storeMasterKey(DelegationKey delegationKey) + throws Exception; + + public abstract void removeMasterKey(DelegationKey delegationKey) + throws Exception; + /** * Non-blocking API * ResourceManager services call this to remove an application from the state diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java index f70605c..71de955 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java @@ -18,10 +18,22 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; /** * A ResourceManager specific delegation token secret manager. @@ -32,7 +44,11 @@ @InterfaceStability.Unstable public class RMDelegationTokenSecretManager extends AbstractDelegationTokenSecretManager { + private static final Log LOG = LogFactory + .getLog(RMDelegationTokenSecretManager.class); + protected final RMContext rmContext; + private RMStateStore rmStateStore; /** * Create a secret manager * @param delegationKeyUpdateInterval the number of seconds for rolling new @@ -46,13 +62,127 @@ public RMDelegationTokenSecretManager(long delegationKeyUpdateInterval, long delegationTokenMaxLifetime, long delegationTokenRenewInterval, - long delegationTokenRemoverScanInterval) { + long delegationTokenRemoverScanInterval, + RMContext rmContext) { super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval, delegationTokenRemoverScanInterval); + this.rmContext = rmContext; + this.rmStateStore = rmContext.getStateStore(); } @Override public RMDelegationTokenIdentifier createIdentifier() { return new RMDelegationTokenIdentifier(); } + + @Override + protected void storeToken(RMDelegationTokenIdentifier identifier, + long renewDate) { + try { + LOG.info("storing Delegation Token with SequenceNumber: " + + identifier.getSequenceNumber()); + rmStateStore.storeRMDelegationToken(identifier, renewDate); + } catch (Exception e) { + //TODO + e.printStackTrace(); + } + } + + @Override + protected void removeToken(RMDelegationTokenIdentifier identifier) { + try { + LOG.info("removing Delegation Token with SequenceNumber: " + + identifier.getSequenceNumber()); + rmStateStore.removeRMDelegationToken(identifier); + } catch (Exception e) { + // TODO + e.printStackTrace(); + } + } + + @Override + protected void storeMasterKey(DelegationKey key) { + try { + LOG.info("storing master key with keyID " + key.getKeyId()); + rmStateStore.storeMasterKey(key); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + protected void removeMasterKey(DelegationKey key) { + try { + LOG.info("removing master key with keyID " + key.getKeyId()); + rmStateStore.removeMasterKey(key); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public synchronized Set getAllMasterKeys() { + HashSet keySet = new HashSet(); + keySet.addAll(allKeys.values()); + return keySet; + } + + public synchronized Map getAllTokens() { + Map allTokens = + new HashMap(); + + for (Map.Entry entry : currentTokens.entrySet()) { + allTokens.put(entry.getKey(), entry.getValue().getRenewDate()); + } + return allTokens; + } + + public void recover(RMState rmState) throws Exception { + + LOG.info("recovering RMDelegationTokenSecretManager."); + // recover RMDTMasterKeys + for (DelegationKey dtKey : rmState.getRMDTMasterKeyState()) { + addKey(dtKey); + } + + // recover RMDelegationTokens + Map rmDelegationTokens = + rmState.getRMDelegationTokenState(); + for (Map.Entry entry : rmDelegationTokens + .entrySet()) { + addPersistedRMDelegationToken(entry.getKey(), entry.getValue()); + } + } + + public synchronized void addPersistedRMDelegationToken( + RMDelegationTokenIdentifier identifier, long expiryTime) + throws IOException { + if (running) { + // a safety check + throw new IOException( + "Can't add persisted RMDelegation token to a running SecretManager."); + } + int keyId = identifier.getMasterKeyId(); + DelegationKey dKey = allKeys.get(keyId); + if (dKey == null) { + LOG.warn("No KEY found for persisted identifier "+ identifier.toString()); + return; + } + byte[] password = createPassword(identifier.getBytes(), dKey.getKey()); + if (identifier.getSequenceNumber() > this.delegationTokenSequenceNumber) { + this.delegationTokenSequenceNumber = identifier.getSequenceNumber(); + } + if (currentTokens.get(identifier) == null) { + currentTokens.put(identifier, new DelegationTokenInformation(expiryTime, + password)); + } else { + throw new IOException( + "Same RMDelegation token being added twice."); + } + } + + // FOR TEST + public void setRMStateStore(RMStateStore rmStore) { + this.rmStateStore = rmStore; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 2259970..26e034c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -80,6 +80,7 @@ public MockRM(Configuration conf, RMStateStore store) { init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); if(store != null) { setRMStateStore(store); + super.rmDTSecretManager.setRMStateStore(store); } Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index d25f0f9..89e16b5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -90,7 +90,8 @@ @BeforeClass public static void setupSecretManager() throws IOException { - dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000); + dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, + mock(RMContext.class)); dtsm.startThreads(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java index 55b8c93..d0b1c16 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java @@ -438,10 +438,12 @@ private static ResourceScheduler createMockScheduler(Configuration conf) { return mockSched; } - private static RMDelegationTokenSecretManager createRMDelegationTokenSecretManager( - long secretKeyInterval, long tokenMaxLifetime, long tokenRenewInterval) { - RMDelegationTokenSecretManager rmDtSecretManager = new RMDelegationTokenSecretManager( - secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, 3600000); + private static RMDelegationTokenSecretManager + createRMDelegationTokenSecretManager(long secretKeyInterval, + long tokenMaxLifetime, long tokenRenewInterval) { + RMDelegationTokenSecretManager rmDtSecretManager = + new RMDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime, + tokenRenewInterval, 3600000, mock(RMContext.class)); return rmDtSecretManager; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index fded9fb..5767732 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -25,6 +27,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -33,6 +37,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -57,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -483,6 +489,191 @@ public void testTokenRestoredOnRMrestart() throws Exception { rm2.stop(); } + @Test + public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + ExitUtil.disableSystemExit(); + + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Map rmAppState = + rmState.getApplicationState(); + Map rmDTState = + rmState.getRMDelegationTokenState(); + Set rmDTMasterKeyState = + rmState.getRMDTMasterKeyState(); + + MockRM rm1 = new MyMockRM(conf, memStore); + rm1.start(); + + // create an empty credential + Credentials ts = new Credentials(); + + // create a token and add into credential + HashSet tokenIdentSet = + new HashSet(); + Text userText1 = new Text("user1"); + RMDelegationTokenIdentifier dtId1 = + new RMDelegationTokenIdentifier(userText1, new Text("renewer1"), + userText1); + Token token1 = + new Token(dtId1, + rm1.getRMDTSecretManager()); + ts.addToken(userText1, token1); + tokenIdentSet.add(dtId1); + + // submit an app with customized credential + RMApp app = rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, ts); + + // assert app info is saved + ApplicationState appState = rmAppState.get(app.getApplicationId()); + Assert.assertNotNull(appState); + + // assert all master keys are saved + Set allKeysRM1 = rm1.getRMDTSecretManager().getAllMasterKeys(); + Assert.assertEquals(allKeysRM1, rmDTMasterKeyState); + + // assert all tokens are saved + Map allTokensRM1 = + rm1.getRMDTSecretManager().getAllTokens(); + Assert.assertEquals(tokenIdentSet, allTokensRM1.keySet()); + Assert.assertEquals(allTokensRM1, rmDTState); + + // start new RM + MockRM rm2 = new MyMockRM(conf, memStore); + rm2.start(); + + // assert master keys and tokens are populated back to DTSecretManager + Map allTokensRM2 = + rm2.getRMDTSecretManager().getAllTokens(); + Assert.assertTrue(rm2.getRMDTSecretManager().getAllMasterKeys() + .containsAll(allKeysRM1)); + Assert.assertEquals(allTokensRM1, allTokensRM2); + + // renewDate before renewing + Long renewDateBeforeRenew = allTokensRM2.get(dtId1); + try{ + // renew recovered token + rm2.getRMDTSecretManager().renewToken(token1, "renewer1"); + } catch(Exception e) { + Assert.fail(); + } + + allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens(); + Long renewDateAfterRenew = allTokensRM2.get(dtId1); + // assert token is renewed + Assert.assertTrue(renewDateAfterRenew > renewDateBeforeRenew); + // assert old token is removed from state store + Assert.assertFalse(rmDTState.containsValue(renewDateBeforeRenew)); + // assert new token is added into state store + Assert.assertTrue(rmDTState.containsValue(renewDateAfterRenew)); + + try{ + rm2.getRMDTSecretManager().cancelToken(token1, "user1"); + } catch(Exception e) { + Assert.fail(); + } + // assert token is removed from state after its cancelled + allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens(); + Assert.assertFalse(allTokensRM2.containsKey(dtId1)); + Assert.assertFalse(rmDTState.containsKey(dtId1)); + + // stop the RM + rm1.stop(); + rm2.stop(); + } + + @Test + public void testRMDTMasterKeyStateStoreOnRollingMasterKey() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + ExitUtil.disableSystemExit(); + + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Map rmDTState = + rmState.getRMDelegationTokenState(); + Set rmDTMasterKeyState = + rmState.getRMDTMasterKeyState(); + + MockRM rm1 = new MyMockRM2(conf, memStore); + rm1.start(); + // on rm start, two master keys are created. + // One is created at RMDTSecretMgr.startThreads.updateCurrentKey(); + // the other is created on the first run of tokenRemoverThread.rollMasterKey() + + RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager(); + // assert all master keys are saved + Set allKeysRM1 = dtSecretManager.getAllMasterKeys(); + Set expiringKeys = allKeysRM1; + Assert.assertEquals(allKeysRM1, rmDTMasterKeyState); + + // record the current key + DelegationKey oldCurrentKey = + ((TestRMDelegationTokenSecretManager) dtSecretManager).getCurrentKey(); + + // generate a RMDelegationToken + Text userText1 = new Text("user1"); + RMDelegationTokenIdentifier dtId1 = + new RMDelegationTokenIdentifier(userText1, new Text("renewer1"), + userText1); + Token token1 = + new Token(dtId1, dtSecretManager); + byte[] oldPasswd = token1.getPassword(); + + // wait for the first rollMasterKey + while(((TestRMDelegationTokenSecretManager)dtSecretManager) + .numRolledKeys.get() == 0); + + // assert old-current-key and new-current-key exist + Assert.assertTrue(rmDTMasterKeyState.contains(oldCurrentKey)); + DelegationKey newCurrentKey = + ((TestRMDelegationTokenSecretManager) dtSecretManager).getCurrentKey(); + Assert.assertTrue(rmDTMasterKeyState.contains(newCurrentKey)); + + //after rolling, the token that was generated earlier must still be valid + ByteArrayInputStream bi = new ByteArrayInputStream(token1.getIdentifier()); + RMDelegationTokenIdentifier identifier = dtSecretManager.createIdentifier(); + identifier.readFields(new DataInputStream(bi)); + byte[] newPasswd = dtSecretManager.retrievePassword(identifier); + // compare the passwords + Assert.assertEquals(oldPasswd, newPasswd); + + // wait for initial 2 keys to expire; + // rollMasterKey is called every 2 seconds. this indicates 8 seconds passed. + while(((TestRMDelegationTokenSecretManager)dtSecretManager) + .numRolledKeys.get() < 4); + // assert initial 2 keys expired + for (DelegationKey key : expiringKeys) { + Assert.assertFalse(rmDTMasterKeyState.contains(key)); + } + + // totally (numRolledKeys) + 1 keys are created; 2 keys expired. + // we expect (numRolledKeys) - 1 keys remaining in rmDTMasterKeyState + Assert.assertEquals(((TestRMDelegationTokenSecretManager)dtSecretManager) + .numRolledKeys.get() - 1, rmDTMasterKeyState.size()); + + // at this point, the token should expire as well. + Assert.assertFalse(rmDTState.containsKey(dtId1)); + rm1.stop(); + } + class MyMockRM extends MockRM { public MyMockRM(Configuration conf, RMStateStore store) { @@ -511,4 +702,46 @@ protected void setTimerForTokenRenewal(DelegationTokenToRenew token) }; } } + + class MyMockRM2 extends MyMockRM { + public MyMockRM2(Configuration conf, RMStateStore store) { + super(conf, store); + } + + @Override + protected RMDelegationTokenSecretManager + createRMDelegationTokenSecretManager(RMContext rmContext) { + // KeyUpdateInterval-> 2 seconds + // TokenMaxLifetime-> 3 seconds. + // every master key will expire in 2 + 3 seconds + return new TestRMDelegationTokenSecretManager(2000, 3000, 2000, + 1000, rmContext); + } + } + public class TestRMDelegationTokenSecretManager extends + RMDelegationTokenSecretManager { + public AtomicInteger numRolledKeys = new AtomicInteger(0); + public TestRMDelegationTokenSecretManager(long delegationKeyUpdateInterval, + long delegationTokenMaxLifetime, long delegationTokenRenewInterval, + long delegationTokenRemoverScanInterval, RMContext rmContext) { + super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, + delegationTokenRenewInterval, delegationTokenRemoverScanInterval, + rmContext); + } + + @Override + protected void rollMasterKey() throws IOException { + super.rollMasterKey(); + numRolledKeys.incrementAndGet(); + } + + public DelegationKey getCurrentKey() { + for (int keyId : allKeys.keySet()) { + if (keyId == currentId) { + return allKeys.get(keyId); + } + } + return null; + } + } }