diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java index fde11e7..bbe8643 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java @@ -150,6 +150,10 @@ protected void logExpireToken(TokenIdent ident) throws IOException { return; } + protected void removeMasterKey(DelegationKey key) { + return; + } + /** * Update the current master key * This is called once by startThreads before tokenRemoverThread is created, @@ -200,6 +204,8 @@ private synchronized void removeExpiredKeys() { Map.Entry e = it.next(); if (e.getValue().getExpiryDate() < now) { it.remove(); + if(!e.getValue().equals(currentKey)) + removeMasterKey(e.getValue()); } } } diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java index 3458b2d..bdf498d 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationKey.java @@ -18,18 +18,19 @@ package org.apache.hadoop.security.token.delegation; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Arrays; import javax.crypto.SecretKey; +import org.apache.avro.reflect.Nullable; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableUtils; -import org.apache.avro.reflect.Nullable; /** * Key used for generating and verifying delegation tokens @@ -117,4 +118,24 @@ public void readFields(DataInput in) throws IOException { in.readFully(keyBytes); } } + + @Override + public int hashCode() { + return WritableComparator.hashBytes(keyBytes, keyBytes.length); + } + + @Override + public boolean equals(Object right) { + if (this == right) { + return true; + } else if (right == null || getClass() != right.getClass()) { + return false; + } else { + DelegationKey r = (DelegationKey) right; + return keyId == r.keyId && + expiryDate == r.expiryDate && + Arrays.equals(keyBytes, r.keyBytes); + } + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index e923b1e..9efd392 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -239,7 +239,7 @@ public synchronized void init(Configuration conf) { // Register event handler for RMAppManagerEvents this.rmDispatcher.register(RMAppManagerEventType.class, this.rmAppManager); - this.rmDTSecretManager = createRMDelegationTokenSecretManager(); + this.rmDTSecretManager = createRMDelegationTokenSecretManager(this.rmContext); clientRM = createClientRMService(); addService(clientRM); @@ -666,7 +666,7 @@ protected ResourceTrackerService createResourceTrackerService() { } protected RMDelegationTokenSecretManager - createRMDelegationTokenSecretManager() { + createRMDelegationTokenSecretManager(RMContext rmContext) { long secretKeyInterval = conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY, YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); @@ -678,7 +678,7 @@ protected ResourceTrackerService createResourceTrackerService() { YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); return new RMDelegationTokenSecretManager(secretKeyInterval, - tokenMaxLifetime, tokenRenewInterval, 3600000); + tokenMaxLifetime, tokenRenewInterval, 3600000, rmContext); } protected ClientRMService createClientRMService() { @@ -747,6 +747,9 @@ public ApplicationTokenSecretManager getApplicationTokenSecretManager(){ public void recover(RMState state) throws Exception { // recover applications rmAppManager.recover(state); + + // recover RMdelegationTokenSecretManager + rmDTSecretManager.recover(state); } public static void main(String argv[]) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 03154b6..5fdc554 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; @@ -58,7 +60,6 @@ private static final String ROOT_DIR_NAME = "FSRMStateRoot"; - private FileSystem fs; private Path fsRootDirPath; @@ -214,6 +215,38 @@ public synchronized void removeApplicationAttemptState(String attemptId) deleteFile(nodeRemovePath); } + @Override + public synchronized void storeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) + throws Exception { + + } + + @Override + public synchronized void removeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { + + } + + @Override + public synchronized void storeMasterKey(DelegationKey masterKey) + throws Exception { + Path nodeCreatePath = + getNodePath(DELEGATION_KEY_PREFIX + masterKey.getKeyId()); + FSDataOutputStream fsOut = fs.create(nodeCreatePath, false); + LOG.info("Storing RMDelegationKey-" + masterKey.getKeyId()); + masterKey.write(fsOut); + } + + @Override + public synchronized void + removeMasterKey(DelegationKey masterKey) throws Exception { + Path nodeCreatePath = + getNodePath(DELEGATION_KEY_PREFIX + masterKey.getKeyId()); + LOG.info("Removing RMDelegationKey-"+ masterKey.getKeyId()); + deleteFile(nodeCreatePath); + } + // FileSystem related code private void deleteFile(Path deletePath) throws Exception { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index dd6fab5..052890a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; import java.io.IOException; +import java.util.Map; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -29,6 +31,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; @@ -49,6 +55,8 @@ public synchronized RMState loadState() throws Exception { // return a copy of the state to allow for modification of the real state RMState returnState = new RMState(); returnState.appState.putAll(state.appState); + returnState.rmDTMasterKeyState.addAll(state.rmDTMasterKeyState); + returnState.rmDTState.putAll(state.rmDTState); return returnState; } @@ -113,4 +121,52 @@ public synchronized void removeApplicationState(ApplicationState appState) ApplicationState removed = state.appState.remove(appId); assert removed != null; } + + @Override + public synchronized void storeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) + throws Exception { + Map rmDTState = + state.getRMDelegationTokenState(); + if(rmDTState.containsKey(rmDTIdentifier)) { + IOException e = new IOException("RMDelegationToken: " + rmDTIdentifier + + "is already stored."); + LOG.info("Error storing info for RMDelegationToken: " + rmDTIdentifier, e); + throw e; + } + rmDTState.put(rmDTIdentifier, renewDate); + } + + @Override + public synchronized void removeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier) throws Exception{ + Map rmDTState = + state.getRMDelegationTokenState(); + Long removed = rmDTState.remove(rmDTIdentifier); + assert removed != null; + } + + @Override + public synchronized void storeMasterKey(DelegationKey delegationKey) + throws Exception { + Set rmDTMasterKeyState = state.getRMDTMasterKeyState(); + + if (rmDTMasterKeyState.contains(delegationKey)) { + IOException e = new IOException("RMDTMasterKey with keyID: " + + delegationKey.getKeyId() + " is already stored"); + LOG.info("Error storing info for RMDTMasterKey with keyID: " + + delegationKey.getKeyId(), e); + throw e; + } + state.rmDTMasterKeyState.add(delegationKey); + LOG.info("rmDTMasterKeyState SIZE: " + rmDTMasterKeyState.size()); + } + + @Override + public synchronized void removeMasterKey(DelegationKey delegationKey) + throws Exception { + Set rmDTMasterKeyState = state.getRMDTMasterKeyState(); + boolean removed = rmDTMasterKeyState.remove(delegationKey); + assert removed; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index 29bdbb0..03c8095 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -18,10 +18,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; + import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; @Unstable public class NullRMStateStore extends RMStateStore { @@ -59,4 +62,28 @@ protected void removeApplicationState(ApplicationState appState) // Do nothing } + @Override + public void storeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) + throws Exception { + // Do nothing + } + + @Override + public void + removeRMDelegationToken(RMDelegationTokenIdentifier rmDTIdentifier) + throws Exception { + // Do nothing + } + + @Override + public void storeMasterKey(DelegationKey delegationKey) throws Exception { + // Do nothing + } + + @Override + public void removeMasterKey(DelegationKey delegationKey) throws Exception { + // Do nothing + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index e1400f4..44c0bf8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -20,7 +20,9 @@ import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,6 +32,7 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -42,6 +45,7 @@ import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -57,7 +61,8 @@ */ public abstract class RMStateStore { public static final Log LOG = LogFactory.getLog(RMStateStore.class); - + protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_"; + /** * State of an application attempt */ @@ -115,17 +120,32 @@ public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) { return attempts.get(attemptId); } } - + /** * State of the ResourceManager */ public static class RMState { - Map appState = - new HashMap(); - + Map appState = + new HashMap(); + + // DTIdentifier -> renewDate + Map rmDTState = + new HashMap(); + + Set rmDTMasterKeyState = + new HashSet(); + public Map getApplicationState() { return appState; } + + public Map getRMDelegationTokenState() { + return rmDTState; + } + + public Set getRMDTMasterKeyState() { + return rmDTMasterKeyState; + } } private Dispatcher rmDispatcher; @@ -225,12 +245,26 @@ public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) { * Blocking API * Derived classes must implement this method to store the state of an * application attempt + * @throws Exception TODO */ protected abstract void storeApplicationAttemptState(String attemptId, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception; - + + public abstract void storeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) + throws Exception; + + public abstract void removeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier) throws Exception; + + public abstract void storeMasterKey(DelegationKey delegationKey) + throws Exception; + + public abstract void removeMasterKey(DelegationKey delegationKey) + throws Exception; + /** * Non-blocking API * ResourceManager services call this to remove an application from the state diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java index f70605c..a4e77e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java @@ -18,10 +18,27 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; + +import com.google.common.annotations.VisibleForTesting; /** * A ResourceManager specific delegation token secret manager. @@ -32,6 +49,11 @@ @InterfaceStability.Unstable public class RMDelegationTokenSecretManager extends AbstractDelegationTokenSecretManager { + private static final Log LOG = LogFactory + .getLog(RMDelegationTokenSecretManager.class); + + protected final RMContext rmContext; + private RMStateStore rmStateStore; /** * Create a secret manager @@ -46,13 +68,161 @@ public RMDelegationTokenSecretManager(long delegationKeyUpdateInterval, long delegationTokenMaxLifetime, long delegationTokenRenewInterval, - long delegationTokenRemoverScanInterval) { + long delegationTokenRemoverScanInterval, + RMContext rmContext) { super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval, delegationTokenRemoverScanInterval); + this.rmContext = rmContext; + this.rmStateStore = rmContext.getStateStore(); } @Override public RMDelegationTokenIdentifier createIdentifier() { return new RMDelegationTokenIdentifier(); } + + @Override + protected void removeMasterKey(DelegationKey key) { + try { + LOG.info("removing master key with keyID " + key.getKeyId()); + rmStateStore.removeMasterKey(key); + } catch (Exception e) { + throw new RuntimeException("Error in removing master key", e); + } + } + + @Override + protected synchronized byte[] createPassword(RMDelegationTokenIdentifier identifier) { + byte[] password = super.createPassword(identifier); + try { + rmStateStore.storeRMDelegationToken(identifier, + currentTokens.get(identifier).getRenewDate()); + } catch (Exception e) { + throw new RuntimeException("Error in storing RMDelegationToken"); + } + return password; + } + + @Override + public synchronized long renewToken(Token token, + String renewer) throws InvalidToken, IOException { + long renewDate = super.renewToken(token, renewer); + ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); + DataInputStream in = new DataInputStream(buf); + RMDelegationTokenIdentifier id = createIdentifier(); + id.readFields(in); + try { + rmStateStore.removeRMDelegationToken(id); + rmStateStore.storeRMDelegationToken(id, renewDate); + } catch (Exception e) { + throw new IOException("Error in storing RMDelegationToken", e); + } + return renewDate; + } + + public synchronized RMDelegationTokenIdentifier cancelToken( + Token token, String canceller) + throws IOException { + ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); + DataInputStream in = new DataInputStream(buf); + RMDelegationTokenIdentifier id = createIdentifier(); + id.readFields(in); + + RMDelegationTokenIdentifier identifier = super.cancelToken(token, canceller); + try { + rmStateStore.removeRMDelegationToken(id); + } catch (Exception e) { + throw new IOException("Error in removing RMDelegationToken" ,e); + } + return identifier; + } + + /** + * remove all expired master keys except current key and store the new key + */ + @Override + protected void logUpdateMasterKey(DelegationKey newKey) { + try { + rmStateStore.storeMasterKey(newKey); + } catch (Exception e) { + throw new RuntimeException("Error in storing master key", e); + } + } + + @Override + @VisibleForTesting + public void logExpireToken(RMDelegationTokenIdentifier ident) + throws IOException { + try { + rmStateStore.removeRMDelegationToken(ident); + } catch (Exception e) { + throw new IOException("Error in removing RMDelegationToken", e); + } + } + + public synchronized Set getAllMasterKeys() { + HashSet keySet = new HashSet(); + keySet.addAll(allKeys.values()); + return keySet; + } + + public synchronized Map getAllTokens() { + Map allTokens = + new HashMap(); + + for (Map.Entry entry : currentTokens.entrySet()) { + allTokens.put(entry.getKey(), entry.getValue().getRenewDate()); + } + return allTokens; + } + + public void recover(RMState rmState) throws Exception { + + LOG.info("recovering RMDelegationTokenSecretManager."); + // recover RMDTMasterKeys + for (DelegationKey dtKey : rmState.getRMDTMasterKeyState()) { + addKey(dtKey); + } + + // recover RMDelegationTokens + Map rmDelegationTokens = + rmState.getRMDelegationTokenState(); + for (Map.Entry entry : rmDelegationTokens + .entrySet()) { + addPersistedRMDelegationToken(entry.getKey(), entry.getValue()); + } + } + + public synchronized void addPersistedRMDelegationToken( + RMDelegationTokenIdentifier identifier, long expiryTime) + throws IOException { + if (running) { + // a safety check + throw new IOException( + "Can't add persisted RMDelegation token to a running SecretManager."); + } + int keyId = identifier.getMasterKeyId(); + DelegationKey dKey = allKeys.get(keyId); + if (dKey == null) { + LOG.warn("No KEY found for persisted identifier "+ identifier.toString()); + return; + } + byte[] password = createPassword(identifier.getBytes(), dKey.getKey()); + if (identifier.getSequenceNumber() > this.delegationTokenSequenceNumber) { + this.delegationTokenSequenceNumber = identifier.getSequenceNumber(); + } + if (currentTokens.get(identifier) == null) { + currentTokens.put(identifier, new DelegationTokenInformation(expiryTime, + password)); + } else { + throw new IOException( + "Same RMDelegation token being added twice."); + } + } + + // FOR TEST + public void setRMStateStore(RMStateStore rmStore) { + this.rmStateStore = rmStore; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 0855a6f..77e60ca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -81,6 +81,7 @@ public MockRM(Configuration conf, RMStateStore store) { init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); if(store != null) { setRMStateStore(store); + super.rmDTSecretManager.setRMStateStore(store); } Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index d25f0f9..89e16b5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -90,7 +90,8 @@ @BeforeClass public static void setupSecretManager() throws IOException { - dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000); + dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, + mock(RMContext.class)); dtsm.startThreads(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java index d334cc5..d0c83c0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java @@ -438,10 +438,12 @@ private static ResourceScheduler createMockScheduler(Configuration conf) { return mockSched; } - private static RMDelegationTokenSecretManager createRMDelegationTokenSecretManager( - long secretKeyInterval, long tokenMaxLifetime, long tokenRenewInterval) { - RMDelegationTokenSecretManager rmDtSecretManager = new RMDelegationTokenSecretManager( - secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, 3600000); + private static RMDelegationTokenSecretManager + createRMDelegationTokenSecretManager(long secretKeyInterval, + long tokenMaxLifetime, long tokenRenewInterval) { + RMDelegationTokenSecretManager rmDtSecretManager = + new RMDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime, + tokenRenewInterval, 3600000, mock(RMContext.class)); return rmDtSecretManager; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 30ae6ce..93c0bab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -25,6 +27,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -33,6 +37,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -57,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -572,6 +578,198 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { rm2.stop(); } + @Test + public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + ExitUtil.disableSystemExit(); + + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Map rmAppState = + rmState.getApplicationState(); + Map rmDTState = + rmState.getRMDelegationTokenState(); + Set rmDTMasterKeyState = + rmState.getRMDTMasterKeyState(); + + MockRM rm1 = new TestSecurityMockRM(conf, memStore); + rm1.start(); + + // create an empty credential + Credentials ts = new Credentials(); + + // create a token and add into credential + HashSet tokenIdentSet = + new HashSet(); + Text userText1 = new Text("user1"); + RMDelegationTokenIdentifier dtId1 = + new RMDelegationTokenIdentifier(userText1, new Text("renewer1"), + userText1); + Token token1 = + new Token(dtId1, + rm1.getRMDTSecretManager()); + ts.addToken(userText1, token1); + tokenIdentSet.add(dtId1); + + // submit an app with customized credential + RMApp app = rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, ts); + + // assert app info is saved + ApplicationState appState = rmAppState.get(app.getApplicationId()); + Assert.assertNotNull(appState); + + // assert all master keys are saved + Set allKeysRM1 = rm1.getRMDTSecretManager().getAllMasterKeys(); + Assert.assertEquals(allKeysRM1, rmDTMasterKeyState); + + // assert all tokens are saved + Map allTokensRM1 = + rm1.getRMDTSecretManager().getAllTokens(); + Assert.assertEquals(tokenIdentSet, allTokensRM1.keySet()); + Assert.assertEquals(allTokensRM1, rmDTState); + + // start new RM + MockRM rm2 = new TestSecurityMockRM(conf, memStore); + rm2.start(); + + // assert master keys and tokens are populated back to DTSecretManager + Map allTokensRM2 = + rm2.getRMDTSecretManager().getAllTokens(); + Assert.assertTrue(rm2.getRMDTSecretManager().getAllMasterKeys() + .containsAll(allKeysRM1)); + Assert.assertEquals(allTokensRM1, allTokensRM2); + + // renewDate before renewing + Long renewDateBeforeRenew = allTokensRM2.get(dtId1); + try{ + // renew recovered token + rm2.getRMDTSecretManager().renewToken(token1, "renewer1"); + } catch(Exception e) { + Assert.fail(); + } + + allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens(); + Long renewDateAfterRenew = allTokensRM2.get(dtId1); + // assert token is renewed + Assert.assertTrue(renewDateAfterRenew > renewDateBeforeRenew); + + // assert new token is added into state store + Assert.assertTrue(rmDTState.containsValue(renewDateAfterRenew)); + + try{ + rm2.getRMDTSecretManager().cancelToken(token1, "user1"); + } catch(Exception e) { + Assert.fail(); + } + + // manually call expireToken + rm2.getRMDTSecretManager().logExpireToken(dtId1); + + // assert old token is removed from state store + Assert.assertFalse(rmDTState.containsValue(renewDateBeforeRenew)); + + // assert token is removed from state after its cancelled + allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens(); + Assert.assertFalse(allTokensRM2.containsKey(dtId1)); + Assert.assertFalse(rmDTState.containsKey(dtId1)); + + // stop the RM + rm1.stop(); + rm2.stop(); + } + + @Test + public void testRMDTMasterKeyStateStoreOnRollingMasterKey() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + ExitUtil.disableSystemExit(); + + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Map rmDTState = + rmState.getRMDelegationTokenState(); + Set rmDTMasterKeyState = + rmState.getRMDTMasterKeyState(); + + MockRM rm1 = new MyMockRM(conf, memStore); + rm1.start(); + // on rm start, two master keys are created. + // One is created at RMDTSecretMgr.startThreads.updateCurrentKey(); + // the other is created on the first run of tokenRemoverThread.rollMasterKey() + + RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager(); + // assert all master keys are saved + Assert.assertEquals(dtSecretManager.getAllMasterKeys(), rmDTMasterKeyState); + Set expiringKeys = new HashSet(); + expiringKeys.addAll(dtSecretManager.getAllMasterKeys()); + + // record the current key + DelegationKey oldCurrentKey = + ((TestRMDelegationTokenSecretManager) dtSecretManager).getCurrentKey(); + + // generate a RMDelegationToken + Text userText1 = new Text("user1"); + RMDelegationTokenIdentifier dtId1 = + new RMDelegationTokenIdentifier(userText1, new Text("renewer1"), + userText1); + Token token1 = + new Token(dtId1, dtSecretManager); + byte[] oldPasswd = token1.getPassword(); + + // wait for the first rollMasterKey + while(((TestRMDelegationTokenSecretManager)dtSecretManager) + .numUpdatedKeys.get() < 1); + + // assert old-current-key and new-current-key exist + Assert.assertTrue(rmDTMasterKeyState.contains(oldCurrentKey)); + DelegationKey newCurrentKey = + ((TestRMDelegationTokenSecretManager) dtSecretManager).getCurrentKey(); + Assert.assertTrue(rmDTMasterKeyState.contains(newCurrentKey)); + + //after rolling, the token that was generated earlier must still be valid + ByteArrayInputStream bi = new ByteArrayInputStream(token1.getIdentifier()); + RMDelegationTokenIdentifier identifier = dtSecretManager.createIdentifier(); + identifier.readFields(new DataInputStream(bi)); + byte[] newPasswd = dtSecretManager.retrievePassword(identifier); + // compare the passwords + Assert.assertEquals(oldPasswd, newPasswd); + + // wait for initial 2 keys to expire; + // rollMasterKey is called every 2 seconds. this indicates 8 seconds passed. + while(((TestRMDelegationTokenSecretManager)dtSecretManager) + .numUpdatedKeys.get() < 5); + + // assert initial 2 keys expired + for (DelegationKey key : expiringKeys) { + Assert.assertFalse(rmDTMasterKeyState.contains(key)); + } + + // totally (numRolledKeys) keys are created; 2 keys expired. + // we expect (numRolledKeys) - 2 keys remaining in rmDTMasterKeyState + Assert.assertEquals(((TestRMDelegationTokenSecretManager)dtSecretManager) + .numUpdatedKeys.get() - 2, rmDTMasterKeyState.size()); + + // at this point, the token should expire as well. + Assert.assertFalse(rmDTState.containsKey(dtId1)); + rm1.stop(); + } + class TestSecurityMockRM extends MockRM { public TestSecurityMockRM(Configuration conf, RMStateStore store) { @@ -600,4 +798,45 @@ protected void setTimerForTokenRenewal(DelegationTokenToRenew token) }; } } + + class MyMockRM extends TestSecurityMockRM { + public MyMockRM(Configuration conf, RMStateStore store) { + super(conf, store); + } + + @Override + protected RMDelegationTokenSecretManager + createRMDelegationTokenSecretManager(RMContext rmContext) { + // KeyUpdateInterval-> 2 seconds + // TokenMaxLifetime-> 3 seconds. + // every master key will expire in 2 + 3 seconds + return new TestRMDelegationTokenSecretManager(2000, 3000, 2000, + 1000, rmContext); + } + } + public class TestRMDelegationTokenSecretManager extends + RMDelegationTokenSecretManager { + public AtomicInteger numUpdatedKeys = new AtomicInteger(0); + public TestRMDelegationTokenSecretManager(long delegationKeyUpdateInterval, + long delegationTokenMaxLifetime, long delegationTokenRenewInterval, + long delegationTokenRemoverScanInterval, RMContext rmContext) { + super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, + delegationTokenRenewInterval, delegationTokenRemoverScanInterval, + rmContext); + } + + @Override protected void logUpdateMasterKey(DelegationKey newKey) { + super.logUpdateMasterKey(newKey); + numUpdatedKeys.incrementAndGet(); + } + public DelegationKey getCurrentKey() { + for (int keyId : allKeys.keySet()) { + if (keyId == currentId) { + return allKeys.get(keyId); + } + } + return null; + } + } + }