diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 8dac19b..d448cf9 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -28,6 +29,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; @@ -39,7 +41,9 @@ import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -56,6 +60,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import com.google.common.annotations.VisibleForTesting; @@ -74,6 +79,7 @@ protected ApplicationMasterProtocol scheduler; private final ClientService clientService; protected int lastResponseID; + protected int currentAMRMTokenKeyId = Integer.MIN_VALUE; private Resource maxContainerCapability; protected Map applicationACLs; private volatile long lastHeartbeatTime; @@ -105,6 +111,19 @@ protected void serviceInit(Configuration conf) throws Exception { rmPollInterval = conf.getInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, MRJobConfig.DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS); + // Initiate current AMRMToken KeyId + Credentials credentials = + UserGroupInformation.getCurrentUser().getCredentials(); + // Now remove the AM->RM token so tasks don't have it + Iterator> iter = credentials.getAllTokens().iterator(); + while (iter.hasNext()) { + Token token = iter.next(); + if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { + currentAMRMTokenKeyId = + ((AMRMTokenIdentifier) token.decodeIdentifier()).getKeyId(); + break; + } + } } @Override @@ -152,6 +171,7 @@ protected void register() { .getAMWebappScheme(getConfig()) + serviceAddr.getHostName() + ":" + clientService.getHttpPort()); } + request.setCurrentAMRMTokenKeyId(this.currentAMRMTokenKeyId); RegisterApplicationMasterResponse response = scheduler.registerApplicationMaster(request); isApplicationMasterRegistered = true; @@ -166,6 +186,9 @@ protected void register() { String queue = response.getQueue(); LOG.info("queue: " + queue); job.setQueueName(queue); + if (response.getAMRMToken() != null) { + updateAMRMToken(response.getAMRMToken()); + } } catch (Exception are) { LOG.error("Exception while registering", are); throw new YarnRuntimeException(are); @@ -335,4 +358,19 @@ public void setSignalled(boolean isSignalled) { protected boolean isApplicationMasterRegistered() { return isApplicationMasterRegistered; } + + protected void updateAMRMToken(ByteBuffer amRMToken) throws IOException { + Credentials credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(amRMToken); + credentials.readTokenStorageStream(dibb); + @SuppressWarnings("unchecked") + Token amrmToken = + (Token) credentials + .getToken(AMRMTokenIdentifier.KIND_NAME); + UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser(); + currentUGI.addToken(amrmToken); + this.currentAMRMTokenKeyId = amrmToken.decodeIdentifier().getKeyId(); + LOG.info("Update AMRMToken: " + amrmToken.toString()); + } } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index b9d283f..49874d8 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -609,7 +609,10 @@ public void rampDownReduces(int rampDown) { nmToken.getToken()); } } - + // Setting AMRMToken + if (response.getAMRMToken() != null) { + updateAMRMToken(response.getAMRMToken()); + } List finishedContainers = response.getCompletedContainersStatuses(); // propagate preemption requests diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index a9b5ce5..de8450e 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -156,7 +156,8 @@ protected AllocateResponse makeRemoteRequest() throws IOException { AllocateRequest allocateRequest = AllocateRequest.newInstance(lastResponseID, super.getApplicationProgress(), new ArrayList(ask), - new ArrayList(release), blacklistRequest); + new ArrayList(release), blacklistRequest, + this.currentAMRMTokenKeyId); AllocateResponse allocateResponse; try { allocateResponse = scheduler.allocate(allocateRequest); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java index 62316a6..3156b39 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -64,7 +65,19 @@ public static AllocateRequest newInstance(int responseID, float appProgress, return newInstance(responseID, appProgress, resourceAsk, containersToBeReleased, resourceBlacklistRequest, null); } - + + @Public + @Unstable + public static AllocateRequest newInstance(int responseID, float appProgress, + List resourceAsk, + List containersToBeReleased, + ResourceBlacklistRequest resourceBlacklistRequest, + int currentAMRMTokenKeyId) { + return newInstance(responseID, appProgress, resourceAsk, + containersToBeReleased, resourceBlacklistRequest, null, + currentAMRMTokenKeyId); + } + @Public @Stable public static AllocateRequest newInstance(int responseID, float appProgress, @@ -79,9 +92,28 @@ public static AllocateRequest newInstance(int responseID, float appProgress, allocateRequest.setReleaseList(containersToBeReleased); allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest); allocateRequest.setIncreaseRequests(increaseRequests); + allocateRequest.setCurrentAMRMTokenKeyId(Integer.MIN_VALUE); + return allocateRequest; + } + + @Public + @Stable + public static AllocateRequest newInstance(int responseID, float appProgress, + List resourceAsk, + List containersToBeReleased, + ResourceBlacklistRequest resourceBlacklistRequest, + List increaseRequests, + int currentAMRMTokenKeyId) { + AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(responseID); + allocateRequest.setProgress(appProgress); + allocateRequest.setAskList(resourceAsk); + allocateRequest.setReleaseList(containersToBeReleased); + allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest); + allocateRequest.setIncreaseRequests(increaseRequests); + allocateRequest.setCurrentAMRMTokenKeyId(currentAMRMTokenKeyId); return allocateRequest; } - /** * Get the response id used to track duplicate responses. * @return response id @@ -201,4 +233,12 @@ public abstract void setResourceBlacklistRequest( @Stable public abstract void setIncreaseRequests( List increaseRequests); + + @Public + @Unstable + public abstract int getCurrentAMRMTokenKeyId(); + + @Public + @Unstable + public abstract void setCurrentAMRMTokenKeyId(int keyId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index 0e27f32..fd7c306 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.api.protocolrecords; +import java.nio.ByteBuffer; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -71,7 +72,7 @@ public static AllocateResponse newInstance(int responseId, List completedContainers, List allocatedContainers, List updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, - PreemptionMessage preempt, List nmTokens) { + PreemptionMessage preempt, List nmTokens, ByteBuffer amRMToken) { AllocateResponse response = Records.newRecord(AllocateResponse.class); response.setNumClusterNodes(numClusterNodes); response.setResponseId(responseId); @@ -82,6 +83,7 @@ public static AllocateResponse newInstance(int responseId, response.setAMCommand(command); response.setPreemptionMessage(preempt); response.setNMTokens(nmTokens); + response.setAMRMToken(amRMToken); return response; } @@ -91,12 +93,12 @@ public static AllocateResponse newInstance(int responseId, List completedContainers, List allocatedContainers, List updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, - PreemptionMessage preempt, List nmTokens, + PreemptionMessage preempt, List nmTokens, ByteBuffer amRMToken, List increasedContainers, List decreasedContainers) { AllocateResponse response = newInstance(responseId, completedContainers, allocatedContainers, updatedNodes, availResources, command, - numClusterNodes, preempt, nmTokens); + numClusterNodes, preempt, nmTokens, amRMToken); response.setIncreasedContainers(increasedContainers); response.setDecreasedContainers(decreasedContainers); return response; @@ -270,4 +272,17 @@ public abstract void setIncreasedContainers( @Unstable public abstract void setDecreasedContainers( List decreasedContainers); + + /** + * The AMRMToken that belong to this attempt + * + * @return The AMRMToken that belong to this attempt + */ + @Public + @Unstable + public abstract ByteBuffer getAMRMToken(); + + @Private + @Unstable + public abstract void setAMRMToken(ByteBuffer amRMToken); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterRequest.java index 6b01854..b0d7e89 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterRequest.java @@ -20,6 +20,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.util.Records; @@ -55,14 +56,33 @@ @Stable public static RegisterApplicationMasterRequest newInstance(String host, int port, String trackingUrl) { + return newInstance(host, port, trackingUrl, Integer.MIN_VALUE); + } + + /** + * Create a new instance of RegisterApplicationMasterRequest. + * If port, trackingUrl, currentAMRMTokenKeyId is not used, + * use the following default value: + *
    + *
  • port: -1
  • + *
  • trackingUrl: null
  • + *
  • currentAMRMTokenKeyId: Integer.MIN_VALUE
  • + *
+ * The port is allowed to be any integer larger than or equal to -1. + * @return the new instance of RegisterApplicationMasterRequest + */ + @Public + @Stable + public static RegisterApplicationMasterRequest newInstance(String host, + int port, String trackingUrl, int currentAMRMTokenKeyId) { RegisterApplicationMasterRequest request = Records.newRecord(RegisterApplicationMasterRequest.class); request.setHost(host); request.setRpcPort(port); request.setTrackingUrl(trackingUrl); + request.setCurrentAMRMTokenKeyId(currentAMRMTokenKeyId); return request; } - /** * Get the host on which the ApplicationMaster is * running. @@ -133,4 +153,12 @@ public static RegisterApplicationMasterRequest newInstance(String host, @Public @Stable public abstract void setTrackingUrl(String trackingUrl); + + @Public + @Unstable + public abstract int getCurrentAMRMTokenKeyId(); + + @Public + @Unstable + public abstract void setCurrentAMRMTokenKeyId(int keyId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java index 79f9f3a..65be2f0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java @@ -57,7 +57,7 @@ public static RegisterApplicationMasterResponse newInstance( Resource minCapability, Resource maxCapability, Map acls, ByteBuffer key, List containersFromPreviousAttempt, String queue, - List nmTokensFromPreviousAttempts) { + List nmTokensFromPreviousAttempts, ByteBuffer amRMToken) { RegisterApplicationMasterResponse response = Records.newRecord(RegisterApplicationMasterResponse.class); response.setMaximumResourceCapability(maxCapability); @@ -66,6 +66,7 @@ public static RegisterApplicationMasterResponse newInstance( response.setContainersFromPreviousAttempts(containersFromPreviousAttempt); response.setNMTokensFromPreviousAttempts(nmTokensFromPreviousAttempts); response.setQueue(queue); + response.setAMRMToken(amRMToken); return response; } @@ -180,4 +181,17 @@ public abstract void setContainersFromPreviousAttempts( @Private @Unstable public abstract void setNMTokensFromPreviousAttempts(List nmTokens); + + /** + * The AMRMToken that belong to this attempt + * + * @return The AMRMToken that belong to this attempt + */ + @Public + @Unstable + public abstract ByteBuffer getAMRMToken(); + + @Private + @Unstable + public abstract void setAMRMToken(ByteBuffer amRMToken); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index a1f6d2e..e3677e7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -38,6 +38,7 @@ message RegisterApplicationMasterRequestProto { optional string host = 1; optional int32 rpc_port = 2; optional string tracking_url = 3; + optional int32 currentAMRMToken_keyid = 4; } message RegisterApplicationMasterResponseProto { @@ -47,6 +48,7 @@ message RegisterApplicationMasterResponseProto { repeated ContainerProto containers_from_previous_attempts = 4; optional string queue = 5; repeated NMTokenProto nm_tokens_from_previous_attempts = 6; + optional bytes am_rm_token = 7; } message FinishApplicationMasterRequestProto { @@ -66,6 +68,7 @@ message AllocateRequestProto { optional int32 response_id = 4; optional float progress = 5; repeated ContainerResourceIncreaseRequestProto increase_request = 6; + optional int32 currentAMRMToken_keyid = 7; } message NMTokenProto { @@ -85,6 +88,7 @@ message AllocateResponseProto { repeated NMTokenProto nm_tokens = 9; repeated ContainerResourceIncreaseProto increased_containers = 10; repeated ContainerResourceDecreaseProto decreased_containers = 11; + optional bytes am_rm_token = 12; } ////////////////////////////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 1eebaac..c208c16 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -19,12 +19,14 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; @@ -33,13 +35,18 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -61,6 +68,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.RackResolver; import com.google.common.annotations.VisibleForTesting; @@ -77,6 +85,9 @@ private int lastResponseId = 0; + private AtomicInteger currentAMRMTokenKeyId = new AtomicInteger( + Integer.MIN_VALUE); + protected ApplicationMasterProtocol rmClient; protected Resource clusterAvailableResources; protected int clusterNodeCount; @@ -159,6 +170,20 @@ public AMRMClientImpl() { protected void serviceInit(Configuration conf) throws Exception { RackResolver.init(conf); super.serviceInit(conf); + + //Initiate current AMRMToken KeyId + Credentials credentials = + UserGroupInformation.getCurrentUser().getCredentials(); + // Now remove the AM->RM token so tasks don't have it + Iterator> iter = credentials.getAllTokens().iterator(); + while (iter.hasNext()) { + Token token = iter.next(); + if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { + currentAMRMTokenKeyId.set( + ((AMRMTokenIdentifier) token.decodeIdentifier()).getKeyId()); + break; + } + } } @Override @@ -192,7 +217,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster( // do this only once ??? RegisterApplicationMasterRequest request = RegisterApplicationMasterRequest.newInstance(appHostName, appHostPort, - appTrackingUrl); + appTrackingUrl, this.currentAMRMTokenKeyId.get()); RegisterApplicationMasterResponse response = rmClient.registerApplicationMaster(request); @@ -200,6 +225,9 @@ public RegisterApplicationMasterResponse registerApplicationMaster( if(!response.getNMTokensFromPreviousAttempts().isEmpty()) { populateNMTokens(response.getNMTokensFromPreviousAttempts()); } + if (response.getAMRMToken() != null) { + updateAMRMToken(response.getAMRMToken()); + } } return response; } @@ -241,7 +269,8 @@ public AllocateResponse allocate(float progressIndicator) allocateRequest = AllocateRequest.newInstance(lastResponseId, progressIndicator, - askList, releaseList, blacklistRequest); + askList, releaseList, blacklistRequest, + this.currentAMRMTokenKeyId.get()); // clear blacklistAdditions and blacklistRemovals before // unsynchronized part blacklistAdditions.clear(); @@ -258,6 +287,9 @@ public AllocateResponse allocate(float progressIndicator) if (!allocateResponse.getNMTokens().isEmpty()) { populateNMTokens(allocateResponse.getNMTokens()); } + if (allocateResponse.getAMRMToken() != null) { + updateAMRMToken(allocateResponse.getAMRMToken()); + } } } finally { // TODO how to differentiate remote yarn exception vs error in rpc @@ -302,6 +334,21 @@ protected void populateNMTokens(List nmTokens) { } } + private void updateAMRMToken(ByteBuffer amRMToken) throws IOException { + Credentials credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(amRMToken); + credentials.readTokenStorageStream(dibb); + @SuppressWarnings("unchecked") + Token amrmToken = + (Token) credentials + .getToken(AMRMTokenIdentifier.KIND_NAME); + UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser(); + currentUGI.addToken(amrmToken); + this.currentAMRMTokenKeyId.set(amrmToken.decodeIdentifier().getKeyId()); + LOG.info("Update AMRMToken: " + amrmToken.toString()); + } + @Override public void unregisterApplicationMaster(FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) throws YarnException, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index 15bfa28..f53bc8a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -766,7 +766,7 @@ public AllocateResponse createFakeAllocateResponse() { new ArrayList(), new ArrayList(), new ArrayList(), Resource.newInstance(1024, 2), AMCommand.AM_RESYNC, 1, - null, new ArrayList()); + null, new ArrayList(), null); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index e21c4ba..7ed6415 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -359,7 +359,7 @@ private AllocateResponse createAllocateResponse( List nmTokens) { AllocateResponse response = AllocateResponse.newInstance(0, completed, allocated, - new ArrayList(), null, null, 1, null, nmTokens); + new ArrayList(), null, null, 1, null, nmTokens, null); return response; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java index dc11165..b10bddb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java @@ -392,4 +392,16 @@ private ResourceBlacklistRequestPBImpl convertFromProtoFormat(ResourceBlacklistR private ResourceBlacklistRequestProto convertToProtoFormat(ResourceBlacklistRequest t) { return ((ResourceBlacklistRequestPBImpl)t).getProto(); } + + @Override + public int getCurrentAMRMTokenKeyId() { + AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; + return p.getCurrentAMRMTokenKeyid(); + } + + @Override + public void setCurrentAMRMTokenKeyId(int keyId) { + maybeInitBuilder(); + builder.setCurrentAMRMTokenKeyid(keyId); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index 4d7c0a3..9b15264 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -55,6 +56,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto; +import com.google.protobuf.ByteString; import com.google.protobuf.TextFormat; @Private @@ -74,7 +76,8 @@ private List updatedNodes = null; private PreemptionMessage preempt; - + + private ByteBuffer amRMToken = null; public AllocateResponsePBImpl() { builder = AllocateResponseProto.newBuilder(); @@ -154,6 +157,9 @@ private synchronized void mergeLocalToBuilder() { getChangeProtoIterable(this.decreasedContainers); builder.addAllDecreasedContainers(iterable); } + if(this.amRMToken != null) { + builder.setAmRmToken(convertToProtoFormat(this.amRMToken)); + } } private synchronized void mergeLocalToProto() { @@ -357,6 +363,28 @@ public synchronized void setDecreasedContainers( this.decreasedContainers.addAll(decreasedContainers); } + @Override + public synchronized ByteBuffer getAMRMToken() { + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + if(amRMToken != null) { + return amRMToken; + } + if(!p.hasAmRmToken()) { + return null; + } + this.amRMToken = convertFromProtoFormat(p.getAmRmToken()); + return amRMToken; + } + + @Override + public synchronized void setAMRMToken(ByteBuffer amRMToken) { + maybeInitBuilder(); + if(amRMToken == null) { + builder.clearAmRmToken(); + } + this.amRMToken = amRMToken; + } + private synchronized void initLocalIncreasedContainerList() { if (this.increasedContainers != null) { return; @@ -699,4 +727,14 @@ private synchronized NMTokenProto convertToProtoFormat(NMToken token) { private synchronized NMToken convertFromProtoFormat(NMTokenProto proto) { return new NMTokenPBImpl(proto); } + + private synchronized ByteBuffer convertFromProtoFormat( + ByteString byteString) { + return ProtoUtils.convertFromProtoFormat(byteString); + } + + private synchronized ByteString convertToProtoFormat( + ByteBuffer byteBuffer) { + return ProtoUtils.convertToProtoFormat(byteBuffer); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterRequestPBImpl.java index 037dfd9..77d85e0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterRequestPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterRequestPBImpl.java @@ -131,4 +131,17 @@ public void setTrackingUrl(String url) { } builder.setTrackingUrl(url); } + + @Override + public int getCurrentAMRMTokenKeyId() { + RegisterApplicationMasterRequestProtoOrBuilder p = + viaProto ? proto : builder; + return p.getCurrentAMRMTokenKeyid(); + } + + @Override + public void setCurrentAMRMTokenKeyId(int keyId) { + maybeInitBuilder(); + builder.setCurrentAMRMTokenKeyid(keyId); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java index 06a637a..8c9aaf0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java @@ -61,6 +61,7 @@ private Map applicationACLS = null; private List containersFromPreviousAttempts = null; private List nmTokens = null; + private ByteBuffer amRMToken = null; public RegisterApplicationMasterResponsePBImpl() { builder = RegisterApplicationMasterResponseProto.newBuilder(); @@ -122,6 +123,9 @@ private void mergeLocalToBuilder() { Iterable iterable = getTokenProtoIterable(nmTokens); builder.addAllNmTokensFromPreviousAttempts(iterable); } + if (this.amRMToken != null) { + builder.setAmRmToken(convertToProtoFormat(this.amRMToken)); + } } @@ -326,6 +330,29 @@ public void setNMTokensFromPreviousAttempts(final List nmTokens) { this.nmTokens.addAll(nmTokens); } + @Override + public ByteBuffer getAMRMToken() { + RegisterApplicationMasterResponseProtoOrBuilder p = + viaProto ? proto : builder; + if (amRMToken != null) { + return amRMToken; + } + if (!p.hasAmRmToken()) { + return null; + } + this.amRMToken = convertFromProtoFormat(p.getAmRmToken()); + return amRMToken; + } + + @Override + public void setAMRMToken(ByteBuffer amRMToken) { + maybeInitBuilder(); + if(amRMToken == null) { + builder.clearAmRmToken(); + } + this.amRMToken = amRMToken; + } + private synchronized void initLocalNewNMTokenList() { RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; List list = p.getNmTokensFromPreviousAttemptsList(); @@ -387,4 +414,14 @@ private NMTokenProto convertToProtoFormat(NMToken token) { private NMToken convertFromProtoFormat(NMTokenProto proto) { return new NMTokenPBImpl(proto); } + + private ByteBuffer convertFromProtoFormat( + ByteString byteString) { + return ProtoUtils.convertFromProtoFormat(byteString); + } + + private ByteString convertToProtoFormat( + ByteBuffer byteBuffer) { + return ProtoUtils.convertToProtoFormat(byteBuffer); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java index 99495d7..0b9fa5e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java @@ -44,6 +44,7 @@ public static final Text KIND_NAME = new Text("YARN_AM_RM_TOKEN"); private ApplicationAttemptId applicationAttemptId; + private int keyId; public AMRMTokenIdentifier() { } @@ -51,6 +52,14 @@ public AMRMTokenIdentifier() { public AMRMTokenIdentifier(ApplicationAttemptId appAttemptId) { this(); this.applicationAttemptId = appAttemptId; + this.keyId = -1; + } + + public AMRMTokenIdentifier(ApplicationAttemptId appAttemptId, + int masterKeyId) { + this(); + this.applicationAttemptId = appAttemptId; + this.keyId = masterKeyId; } @Private @@ -64,6 +73,7 @@ public void write(DataOutput out) throws IOException { out.writeLong(appId.getClusterTimestamp()); out.writeInt(appId.getId()); out.writeInt(this.applicationAttemptId.getAttemptId()); + out.writeInt(this.keyId); } @Override @@ -75,6 +85,7 @@ public void readFields(DataInput in) throws IOException { ApplicationId.newInstance(clusterTimeStamp, appId); this.applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, attemptId); + this.keyId = in.readInt(); } @Override @@ -92,6 +103,10 @@ public UserGroupInformation getUser() { .toString()); } + public int getKeyId() { + return this.keyId; + } + // TODO: Needed? @InterfaceAudience.Private public static class Renewer extends Token.TrivialRenewer { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java index a8a56d7..16e1abb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java @@ -72,7 +72,7 @@ public void testAllocateResponseWithIncDecContainers() { AllocateResponse r = AllocateResponse.newInstance(3, new ArrayList(), new ArrayList(), new ArrayList(), null, - AMCommand.AM_RESYNC, 3, null, new ArrayList(), + AMCommand.AM_RESYNC, 3, null, new ArrayList(), null, incContainers, decContainers); // serde @@ -100,8 +100,9 @@ public void testAllocateResponseWithIncDecContainers() { public void testAllocateResponseWithoutIncDecContainers() { AllocateResponse r = AllocateResponse.newInstance(3, new ArrayList(), - new ArrayList(), new ArrayList(), null, - AMCommand.AM_RESYNC, 3, null, new ArrayList(), null, null); + new ArrayList(), new ArrayList(), null, + AMCommand.AM_RESYNC, 3, null, new ArrayList(), null, null, + null); // serde AllocateResponseProto p = ((AllocateResponsePBImpl) r).getProto(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 94dc474..b5537df 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -35,10 +36,13 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.PolicyProvider; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; @@ -82,12 +86,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateAMRMTokenEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; +import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import com.google.common.annotations.VisibleForTesting; @@ -288,6 +294,13 @@ public RegisterApplicationMasterResponse registerApplicationMaster( .getMasterKey(applicationAttemptId).getEncoded())); } + // roll up AMRMToken and save into StateStore + RMAppAttempt appAttempt = app.getRMAppAttempt(applicationAttemptId); + rollupAMRMToken(appAttempt, lock); + // update AMRMToken if the token is rolled-up and saved into StateStore + updateAMRMToken(appAttempt, lock, response, + request.getCurrentAMRMTokenKeyId()); + // For work-preserving AM restart, retrieve previous attempts' containers // and corresponding NM tokens. List transferredContainers = @@ -549,6 +562,11 @@ public AllocateResponse allocate(AllocateRequest request) allocateResponse .setPreemptionMessage(generatePreemptionMessage(allocation)); + // roll up AMRMToken and save into StateStore + rollupAMRMToken(appAttempt, lock); + // update AMRMToken if the token is rolled-up and saved into StateStore + updateAMRMToken(appAttempt, lock, allocateResponse, + request.getCurrentAMRMTokenKeyId()); /* * As we are updating the response inside the lock object so we don't * need to worry about unregister call occurring in between (which @@ -617,8 +635,22 @@ public void registerAppAttempt(ApplicationAttemptId attemptId) { // set response id to -1 before application master for the following // attemptID get registered response.setResponseId(-1); + RMAppAttempt rmAttempt = + this.rmContext.getRMApps().get(attemptId.getApplicationId()) + .getRMAppAttempt(attemptId); + AllocateResponseLock lock = new AllocateResponseLock(response); + if (rmAttempt.getAMRMToken() != null) { + lock.setNextActiveAMRMToken(rmAttempt.getAMRMToken()); + try { + lock.setNextActiveAMRMTokenKeyId(rmAttempt.getAMRMToken() + .decodeIdentifier().getKeyId()); + } catch (IOException e) { + // Do nothing + // Will use default value: Integer.MIN_VALUE + } + } LOG.info("Registering app attempt : " + attemptId); - responseMap.put(attemptId, new AllocateResponseLock(response)); + responseMap.put(attemptId, lock); rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId); } @@ -644,6 +676,8 @@ protected void serviceStop() throws Exception { public static class AllocateResponseLock { private AllocateResponse response; + private int nextActiveAMRMTokenKeyId = Integer.MIN_VALUE; + private Token nextActiveAMRMToken; public AllocateResponseLock(AllocateResponse response) { this.response = response; @@ -656,10 +690,69 @@ public synchronized AllocateResponse getAllocateResponse() { public synchronized void setAllocateResponse(AllocateResponse response) { this.response = response; } + + public synchronized int getNextActiveAMRMTokenKeyId() { + return nextActiveAMRMTokenKeyId; + } + + public synchronized void setNextActiveAMRMTokenKeyId( + int nextActiveAMRMTokenKeyId) { + this.nextActiveAMRMTokenKeyId = nextActiveAMRMTokenKeyId; + } + + public synchronized Token getNextActiveAMRMToken() { + return nextActiveAMRMToken; + } + + public synchronized void setNextActiveAMRMToken( + Token nextActiveAMRMToken) { + this.nextActiveAMRMToken = nextActiveAMRMToken; + } } @VisibleForTesting public Server getServer() { return this.server; } + + private void updateAMRMToken(RMAppAttempt appAttempt, + AllocateResponseLock lock, T response, int keyId) throws IOException { + if (appAttempt.canAMRMTokenRollup() && keyId != lock + .getNextActiveAMRMTokenKeyId()) { + Credentials credentials = new Credentials(); + credentials.addToken(AMRMTokenIdentifier.KIND_NAME, + lock.getNextActiveAMRMToken()); + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + if (response instanceof RegisterApplicationMasterResponse) { + ((RegisterApplicationMasterResponse) response).setAMRMToken(ByteBuffer + .wrap(dob.getData(), 0, dob.getLength())); + } else if (response instanceof AllocateResponse) { + ((AllocateResponse) response).setAMRMToken(ByteBuffer.wrap( + dob.getData(), 0, dob.getLength())); + } + } + } + + private void rollupAMRMToken(RMAppAttempt appAttempt, + AllocateResponseLock lock) throws IOException { + MasterKeyData nextMasterKey = + this.rmContext.getAMRMTokenSecretManager().getNextMasterKey(); + if (appAttempt.canAMRMTokenRollup() && nextMasterKey != null + && nextMasterKey.getMasterKey().getKeyId() != lock + .getNextActiveAMRMTokenKeyId()) { + appAttempt.resetAMRMTokenRollupFlag(false); + ApplicationAttemptId applicationAttemptId = appAttempt.getAppAttemptId(); + Token amrmToken = + rmContext.getAMRMTokenSecretManager().createAMRMToken( + applicationAttemptId); + this.rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMAppAttemptUpdateAMRMTokenEvent(applicationAttemptId, amrmToken)); + lock.setNextActiveAMRMTokenKeyId(nextMasterKey.getMasterKey().getKeyId()); + lock.setNextActiveAMRMToken(amrmToken); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index affc6f9..df73119 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -51,14 +51,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptAMRMTokenSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; @@ -232,8 +231,13 @@ public void transition(RMStateStore store, RMStateStoreEvent event) { } store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(), attemptStateData); - store.notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(), - null); + if (!((RMStateUpdateAppAttemptEvent) event).onlyUpdateAMRMToken()) { + store.notifyDoneUpdatingApplicationAttempt( + attemptState.getAttemptId(), null); + } else { + store.notifyDoneUpdatingAMRMToken(attemptState.getAttemptId(), + attemptState.getAppAttemptCredentials(), null); + } } catch (Exception e) { LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e); store.notifyStoreOperationFailed(e); @@ -580,6 +584,13 @@ public synchronized void updateApplicationAttemptState( new RMStateUpdateAppAttemptEvent(attemptState)); } + @SuppressWarnings("unchecked") + public synchronized void updateApplicationAttemptState( + ApplicationAttemptState attemptState, boolean onlyUpdateAMRMToken) { + dispatcher.getEventHandler().handle( + new RMStateUpdateAppAttemptEvent(attemptState, onlyUpdateAMRMToken)); + } + /** * Blocking API * Derived classes must implement this method to store the state of an @@ -818,6 +829,14 @@ private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId new RMAppAttemptUpdateSavedEvent(attemptId, updatedException)); } + @SuppressWarnings("unchecked") + private void notifyDoneUpdatingAMRMToken(ApplicationAttemptId attemptId, + Credentials credentials, Exception updatedException) { + rmDispatcher.getEventHandler().handle( + new RMAppAttemptAMRMTokenSavedEvent(attemptId, credentials, + updatedException)); + } + /** * EventHandler implementation which forward events to the FSRMStateStore * This hides the EventHandle methods of the store from its public interface diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java index 9ded673..5dd9301 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java @@ -23,13 +23,26 @@ public class RMStateUpdateAppAttemptEvent extends RMStateStoreEvent { ApplicationAttemptState attemptState; + boolean onlyUpdateAMRMToken; public RMStateUpdateAppAttemptEvent(ApplicationAttemptState attemptState) { super(RMStateStoreEventType.UPDATE_APP_ATTEMPT); this.attemptState = attemptState; + this.onlyUpdateAMRMToken = false; + } + + public RMStateUpdateAppAttemptEvent(ApplicationAttemptState attemptState, + boolean onlyUpdateAMRMToken) { + super(RMStateStoreEventType.UPDATE_APP_ATTEMPT); + this.attemptState = attemptState; + this.onlyUpdateAMRMToken = onlyUpdateAMRMToken; } public ApplicationAttemptState getAppAttemptState() { return attemptState; } + + public boolean onlyUpdateAMRMToken() { + return onlyUpdateAMRMToken; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index d472ad4..5adf4bd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -196,4 +196,12 @@ */ ApplicationAttemptReport createApplicationAttemptReport(); + /** + * Flag whether AMRMToken can be rolled up. + * If true, update AMRMToken for RMAppAttempt, and inform ApplicationMaster. + * Otherwise, wait for the next active AMRMToken saving into RMStateStore + */ + boolean canAMRMTokenRollup(); + + void resetAMRMTokenRollupFlag(boolean canAMRMTokenRollup); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java index ddf782e..dd2049d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java @@ -34,6 +34,8 @@ REGISTERED, STATUS_UPDATE, UNREGISTERED, + AMRMTOKEN_UPDATE, + AMRMTOKEN_SAVED, // Source: Containers CONTAINER_ALLOCATED, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 5e71c93..698ca69 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -38,6 +38,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -52,7 +53,6 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.event.EventHandler; @@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptAMRMTokenSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; @@ -83,6 +84,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateAMRMTokenEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -108,6 +110,9 @@ private static final RecordFactory recordFactory = RecordFactoryProvider .getRecordFactory(null); + private static final Text NEXT_ACTIVE_AMRMTOKEN = new Text( + "NEXT_ACTIVE_AMRMTOKEN"); + public final static Priority AM_CONTAINER_PRIORITY = recordFactory .newRecordInstance(Priority.class); static { @@ -130,7 +135,8 @@ private final ApplicationSubmissionContext submissionContext; private Token amrmToken = null; private SecretKey clientTokenMasterKey = null; - + private ApplicationAttemptState attemptStateForSaving = null; + private List justFinishedContainers = new ArrayList(); private Container masterContainer; @@ -157,6 +163,7 @@ private RMAppAttemptState recoveredFinalState; private RMAppAttemptState stateBeforeFinalSaving; private Object transitionTodo; + private boolean canAMRMTokenRollup; private static final StateMachineFactory getAMRMToken() { - return this.amrmToken; + try { + this.readLock.lock(); + return this.amrmToken; + } finally { + this.readLock.unlock(); + } } @Override @@ -697,12 +735,17 @@ private void recoverAppAttemptCredentials(Credentials appAttemptTokens) .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes); } - // Only one AMRMToken is stored per-attempt, so this should be fine. Can't - // use TokenSelector as service may change - think fail-over. this.amrmToken = (Token) appAttemptTokens .getToken(RMStateStore.AM_RM_TOKEN_SERVICE); rmContext.getAMRMTokenSecretManager().addPersistedPassword(this.amrmToken); + + Token token = + (Token) appAttemptTokens + .getToken(NEXT_ACTIVE_AMRMTOKEN); + if (token != null) { + rmContext.getAMRMTokenSecretManager().addPersistedPassword(token); + } } private static class BaseTransition implements @@ -728,10 +771,6 @@ public void transition(RMAppAttemptImpl appAttempt, } appAttempt.startTime = System.currentTimeMillis(); - // Register with the ApplicationMasterService - appAttempt.masterService - .registerAppAttempt(appAttempt.applicationAttemptId); - if (UserGroupInformation.isSecurityEnabled()) { appAttempt.clientTokenMasterKey = appAttempt.rmContext.getClientToAMTokenSecretManager() @@ -739,11 +778,13 @@ public void transition(RMAppAttemptImpl appAttempt, } // create AMRMToken - AMRMTokenIdentifier id = - new AMRMTokenIdentifier(appAttempt.applicationAttemptId); appAttempt.amrmToken = - new Token(id, - appAttempt.rmContext.getAMRMTokenSecretManager()); + appAttempt.rmContext.getAMRMTokenSecretManager().createAMRMToken( + appAttempt.applicationAttemptId); + + // Register with the ApplicationMasterService + appAttempt.masterService + .registerAppAttempt(appAttempt.applicationAttemptId); // Add the applicationAttempt to the scheduler and inform the scheduler // whether to transfer the state from previous attempt. @@ -918,7 +959,7 @@ private void rememberTargetTransitions(RMAppAttemptEvent event, private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event, Object transitionToDo, RMAppAttemptState targetFinalState, - RMAppAttemptState stateToBeStored) { + RMAppAttemptState stateToBeStored, RMAppAttemptImpl appAttempt) { rememberTargetTransitions(event, transitionToDo, targetFinalState); stateBeforeFinalSaving = getState(); @@ -967,6 +1008,7 @@ private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event, new ApplicationAttemptState(applicationAttemptId, getMasterContainer(), rmStore.getCredentialsFromAppAttempt(this), startTime, stateToBeStored, finalTrackingUrl, diags, finalStatus); + appAttempt.setApplicationAttemptState(attemptState); LOG.info("Updating application attempt " + applicationAttemptId + " with final state: " + targetedFinalState); rmStore.updateApplicationAttemptState(attemptState); @@ -988,7 +1030,7 @@ public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { // For cases Killed/Failed, targetedFinalState is the same as the state to // be stored appAttempt.rememberTargetTransitionsAndStoreState(event, transitionToDo, - targetedFinalState, targetedFinalState); + targetedFinalState, targetedFinalState, appAttempt); } } @@ -1334,7 +1376,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, // Saving the attempt final state appAttempt.rememberTargetTransitionsAndStoreState(event, new FinalStateSavedAfterAMUnregisterTransition(), - RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED); + RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED, appAttempt); ApplicationId applicationId = appAttempt.getAppAttemptId().getApplicationId(); @@ -1408,7 +1450,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, containerStatus.getContainerId())) { // Remember the follow up transition and save the final attempt state. appAttempt.rememberTargetTransitionsAndStoreState(event, - transitionToDo, RMAppAttemptState.FAILED, RMAppAttemptState.FAILED); + transitionToDo, RMAppAttemptState.FAILED, RMAppAttemptState.FAILED + , appAttempt); return RMAppAttemptState.FINAL_SAVING; } @@ -1546,6 +1589,57 @@ public AMFinishedAfterFinalSavingTransition( } } + private static final class AMRMTokenUpdateTransition extends BaseTransition { + @Override + public void + transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { + RMAppAttemptUpdateAMRMTokenEvent amRMTokenUpdateEvent = + (RMAppAttemptUpdateAMRMTokenEvent) event; + RMStateStore rmStore = appAttempt.rmContext.getStateStore(); + Credentials credentials = + rmStore.getCredentialsFromAppAttempt(appAttempt); + credentials.addToken(NEXT_ACTIVE_AMRMTOKEN, + amRMTokenUpdateEvent.getAMRMToken()); + ApplicationAttemptState attemptState = null; + if (appAttempt.getApplicationAttemptState() == null) { + attemptState = + new ApplicationAttemptState(appAttempt.getAppAttemptId(), + appAttempt.getMasterContainer(), credentials, + appAttempt.getStartTime()); + } else { + ApplicationAttemptState attemptStateForSaving = + appAttempt.getApplicationAttemptState(); + attemptState = + new ApplicationAttemptState(attemptStateForSaving.getAttemptId(), + attemptStateForSaving.getMasterContainer(), credentials, + attemptStateForSaving.getStartTime(), + attemptStateForSaving.getState(), + attemptStateForSaving.getFinalTrackingUrl(), + attemptStateForSaving.getDiagnostics(), + attemptStateForSaving.getFinalApplicationStatus()); + } + LOG.info("Updating application attempt " + appAttempt.getAppAttemptId() + + " with updated AMRMToken"); + rmStore.updateApplicationAttemptState(attemptState, true); + } + } + + private static final class AMRMTokenSavedTransition extends BaseTransition { + @Override + public void + transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { + // update AMRMToken and reset AMRMTokenRollup Flag to true. + // It is OK now to roll up next Token or send current Token + // to ApplicationMaster for update. + RMAppAttemptAMRMTokenSavedEvent amRMTokenSavedEvent = + (RMAppAttemptAMRMTokenSavedEvent) event; + Credentials credentials = amRMTokenSavedEvent.getCredentials(); + appAttempt.updateAMRMToken((Token) credentials + .getToken(NEXT_ACTIVE_AMRMTOKEN)); + appAttempt.resetAMRMTokenRollupFlag(true); + } + } + @Override public long getStartTime() { this.readLock.lock(); @@ -1644,4 +1738,52 @@ public ApplicationAttemptReport createApplicationAttemptReport() { } return attemptReport; } + + @Override + public boolean canAMRMTokenRollup() { + try { + this.readLock.lock(); + return this.canAMRMTokenRollup; + } finally { + this.readLock.unlock(); + } + } + + @Override + public void resetAMRMTokenRollupFlag(boolean canAMRMTokenRollup) { + try { + this.writeLock.lock(); + this.canAMRMTokenRollup = canAMRMTokenRollup; + } finally { + this.writeLock.unlock(); + } + } + + private void updateAMRMToken(Token amrmToken) { + try { + this.writeLock.lock(); + this.amrmToken = amrmToken; + } finally { + this.writeLock.unlock(); + } + } + + private ApplicationAttemptState getApplicationAttemptState() { + try { + this.readLock.lock(); + return this.attemptStateForSaving; + } finally { + this.readLock.unlock(); + } + } + + private void setApplicationAttemptState( + ApplicationAttemptState attemptStateForSaving) { + try { + this.writeLock.lock(); + this.attemptStateForSaving = attemptStateForSaving; + } finally { + this.writeLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptAMRMTokenSavedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptAMRMTokenSavedEvent.java new file mode 100644 index 0000000..0cb7ad9 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptAMRMTokenSavedEvent.java @@ -0,0 +1,44 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event; + +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; + +public class RMAppAttemptAMRMTokenSavedEvent extends RMAppAttemptEvent{ + final Exception updatedException; + final Credentials credentials; + + public RMAppAttemptAMRMTokenSavedEvent(ApplicationAttemptId appAttemptId, + Credentials credentials, Exception updatedException) { + super(appAttemptId, RMAppAttemptEventType.AMRMTOKEN_SAVED); + this.updatedException = updatedException; + this.credentials = credentials; + } + + public Credentials getCredentials() { + return credentials; + } + + public Exception getUpdatedException() { + return updatedException; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUpdateAMRMTokenEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUpdateAMRMTokenEvent.java new file mode 100644 index 0000000..63c6eb2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUpdateAMRMTokenEvent.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event; + +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; + +public class RMAppAttemptUpdateAMRMTokenEvent extends RMAppAttemptEvent { + + private final Token amrmToken; + + public RMAppAttemptUpdateAMRMTokenEvent(ApplicationAttemptId appAttemptId, + Token amrmToken) { + super(appAttemptId, RMAppAttemptEventType.AMRMTOKEN_UPDATE); + this.amrmToken = amrmToken; + } + + public Token getAMRMToken() { + return amrmToken; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java index 5d21ec0..1372455 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java @@ -19,22 +19,25 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; import java.io.IOException; +import java.security.SecureRandom; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Timer; import java.util.TimerTask; -import javax.crypto.SecretKey; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.security.MasterKeyData; /** * AMRM-tokens are per ApplicationAttempt. If users redistribute their @@ -49,12 +52,21 @@ private static final Log LOG = LogFactory .getLog(AMRMTokenSecretManager.class); - private SecretKey masterKey; + private int serialNo = new SecureRandom().nextInt(); + + private MasterKeyData nextMasterKey; + private MasterKeyData currentMasterKey; + private final Timer timer; private final long rollingInterval; + private final long activationDelay; - private final Map passwords = - new HashMap(); + private final Map passwords = + new HashMap(); + + private final Map> + passwordsFromStateStore = new HashMap>(); /** * Create an {@link AMRMTokenSecretManager} @@ -67,6 +79,20 @@ public AMRMTokenSecretManager(Configuration conf) { .getLong( YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000; + // Adding delay = 1.5 * expiry interval makes sure that all active AMs get + // the updated shared-key. + this.activationDelay = + (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5); + LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval + + "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay + + "ms"); + if (rollingInterval <= activationDelay * 2) { + throw new IllegalArgumentException( + YarnConfiguration.RM_NMTOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS + + " should be more than 2 X " + + YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS); + } } public void start() { @@ -83,6 +109,7 @@ public synchronized void applicationMasterFinished( LOG.debug("Application finished, removing password for " + appAttemptId); } this.passwords.remove(appAttemptId); + this.passwordsFromStateStore.remove(appAttemptId); } private class MasterKeyRoller extends TimerTask { @@ -92,20 +119,36 @@ public void run() { } } - @Private - public synchronized void setMasterKey(SecretKey masterKey) { - this.masterKey = masterKey; + public synchronized void rollMasterKey() { + LOG.info("Rolling master-key for AMRM-tokens"); + if (this.currentMasterKey == null) { // Setting up for the first time. + this.currentMasterKey = createNewMasterKey(); + } else { + LOG.info("Rolling master-key for amrm-tokens"); + this.nextMasterKey = createNewMasterKey(); + LOG.info("Going to activate master-key with key-id " + + this.nextMasterKey.getMasterKey().getKeyId() + " in " + + this.activationDelay + "ms"); + this.timer.schedule(new NextKeyActivator(), this.activationDelay); + } } - @Private - public synchronized SecretKey getMasterKey() { - return this.masterKey; + private class NextKeyActivator extends TimerTask { + @Override + public void run() { + activateNextMasterKey(); + } } - @Private - synchronized void rollMasterKey() { - LOG.info("Rolling master-key for amrm-tokens"); - this.masterKey = generateSecret(); + public synchronized void activateNextMasterKey() { + LOG.info("Activating next master key with id: " + + this.nextMasterKey.getMasterKey().getKeyId()); + this.currentMasterKey = this.nextMasterKey; + this.nextMasterKey = null; + } + + private MasterKeyData createNewMasterKey() { + return new MasterKeyData(serialNo++, generateSecret()); } /** @@ -113,29 +156,42 @@ synchronized void rollMasterKey() { * send to the AppicationAttempt which can give it back during authentication. */ @Override - public synchronized byte[] createPassword( - AMRMTokenIdentifier identifier) { + @Private + public synchronized byte[] createPassword(AMRMTokenIdentifier identifier) { ApplicationAttemptId applicationAttemptId = identifier.getApplicationAttemptId(); if (LOG.isDebugEnabled()) { LOG.debug("Creating password for " + applicationAttemptId); } - byte[] password = createPassword(identifier.getBytes(), masterKey); - this.passwords.put(applicationAttemptId, password); + byte[] password = + createPassword(identifier.getBytes(), getMasterKey().getSecretKey()); + this.passwords.put(identifier.getApplicationAttemptId(), new PasswordInfo( + password, identifier.getKeyId())); return password; } /** - * Populate persisted password of AMRMToken back to AMRMTokenSecretManager. + * Populate persisted password of AMRMToken which is load from StateStore + * back to AMRMTokenSecretManager. */ - public synchronized void - addPersistedPassword(Token token) throws IOException { - AMRMTokenIdentifier identifier = token.decodeIdentifier(); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding password for " + identifier.getApplicationAttemptId()); + public synchronized void addPersistedPassword(Token token) + throws IOException { + AMRMTokenIdentifier identifier = token.decodeIdentifier(); + ApplicationAttemptId appAttemptId = identifier.getApplicationAttemptId(); + if (LOG.isDebugEnabled()) { + LOG + .debug("Adding password for " + appAttemptId); + } + List listOfPasswordInfo; + if (this.passwordsFromStateStore.containsKey(appAttemptId)) { + listOfPasswordInfo = + passwordsFromStateStore.get(appAttemptId); + } else { + listOfPasswordInfo = new ArrayList(); } - this.passwords.put(identifier.getApplicationAttemptId(), - token.getPassword()); + listOfPasswordInfo.add(new PasswordInfo(token.getPassword(), identifier + .getKeyId())); + this.passwordsFromStateStore.put(appAttemptId, listOfPasswordInfo); } /** @@ -143,19 +199,49 @@ synchronized void rollMasterKey() { * Used by RPC layer to validate a remote {@link AMRMTokenIdentifier}. */ @Override - public synchronized byte[] retrievePassword( - AMRMTokenIdentifier identifier) throws InvalidToken { + public synchronized byte[] retrievePassword(AMRMTokenIdentifier identifier) + throws InvalidToken { ApplicationAttemptId applicationAttemptId = identifier.getApplicationAttemptId(); if (LOG.isDebugEnabled()) { LOG.debug("Trying to retrieve password for " + applicationAttemptId); } - byte[] password = this.passwords.get(applicationAttemptId); + PasswordInfo password = this.passwords.get(applicationAttemptId); if (password == null) { + if (!passwordsFromStateStore.isEmpty() + && passwordsFromStateStore.containsKey(applicationAttemptId)) { + for (PasswordInfo passwordInfo : passwordsFromStateStore + .get(applicationAttemptId)) { + if (identifier.getKeyId() == passwordInfo.getKeyId()) { + return passwordInfo.getPassword(); + } + } + } throw new InvalidToken("Password not found for ApplicationAttempt " + applicationAttemptId); } - return password; + if (password.getKeyId() == identifier.getKeyId()) { + return password.getPassword(); + } else if (password.getKeyId() == this.currentMasterKey.getMasterKey() + .getKeyId()) { + byte[] pwd = + createPassword(identifier.getBytes(), + this.currentMasterKey.getSecretKey()); + this.passwords.put(applicationAttemptId, + new PasswordInfo(pwd, identifier.getKeyId())); + return pwd; + } else if (this.nextMasterKey != null + && password.getKeyId() == this.nextMasterKey.getMasterKey().getKeyId()) { + byte[] pwd = + createPassword(identifier.getBytes(), + this.nextMasterKey.getSecretKey()); + this.passwords.put(applicationAttemptId, + new PasswordInfo(pwd, identifier.getKeyId())); + return pwd; + } + throw new InvalidToken("Given AMRMToken for application : " + + applicationAttemptId.toString() + + " seems to have been generated illegally."); } /** @@ -167,4 +253,48 @@ public AMRMTokenIdentifier createIdentifier() { return new AMRMTokenIdentifier(); } + public synchronized Token createAMRMToken( + ApplicationAttemptId appAttemptId) { + AMRMTokenIdentifier identifier = + new AMRMTokenIdentifier(appAttemptId, getMasterKey().getMasterKey() + .getKeyId()); + byte[] password = this.createPassword(identifier); + return new Token(identifier.getBytes(), password, + identifier.getKind(), new Text()); + } + + // If nextMasterKey is not Null, then return nextMasterKey + // otherwise return currentMasterKey + @Private + public synchronized MasterKeyData getMasterKey() { + return nextMasterKey == null ? currentMasterKey : nextMasterKey; + } + + @Private + public synchronized MasterKeyData getNextMasterKey() { + return this.nextMasterKey; + } + + @Private + public synchronized MasterKeyData getCurrentMasterKey() { + return this.currentMasterKey; + } + + private static class PasswordInfo { + private final byte[] password; + private final int keyId; + + public PasswordInfo(byte[] password, int keyId) { + this.password = password; + this.keyId = keyId; + } + + public byte[] getPassword() { + return password; + } + + public int getKeyId() { + return keyId; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java index 0ea2b5e..290f805 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; public class MockRMWithCustomAMLauncher extends MockRM { @@ -46,6 +47,12 @@ public MockRMWithCustomAMLauncher(Configuration conf, this.containerManager = containerManager; } + public MockRMWithCustomAMLauncher(Configuration conf, + ContainerManagementProtocol containerManager, RMStateStore store) { + super(conf, store); + this.containerManager = containerManager; + } + @Override protected ApplicationMasterLauncher createAMLauncher() { return new ApplicationMasterLauncher(getRMContext()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java index c7f0d0a..968e59a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -138,6 +139,11 @@ public MockRMWithAMS(Configuration conf, ContainerManagementProtocol containerMa super(conf, containerManager); } + public MockRMWithAMS(Configuration conf, + ContainerManagementProtocol containerManager, RMStateStore store) { + super(conf, containerManager, store); + } + @Override protected void doSecureLogin() throws IOException { // Skip the login. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java index 64602bd..891d075 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java @@ -22,6 +22,7 @@ import java.security.PrivilegedAction; import java.util.Arrays; import java.util.Collection; +import java.util.Map; import javax.crypto.SecretKey; @@ -29,7 +30,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -38,16 +41,25 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -65,6 +77,8 @@ private final Configuration conf; private static final int maxWaitAttempts = 50; + private static final Text NEXT_ACTIVE_AMRMTOKEN = new Text( + "NEXT_ACTIVE_AMRMTOKEN"); @Parameters public static Collection configs() { @@ -198,12 +212,21 @@ public void testTokenExpiry() throws Exception { * * @throws Exception */ + @SuppressWarnings("unchecked") @Test public void testMasterKeyRollOver() throws Exception { + this.conf.set(YarnConfiguration.RM_STORE, + MemoryRMStateStore.class.getName()); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + Map rmAppState = + rmState.getApplicationState(); MyContainerManager containerManager = new MyContainerManager(); final MockRMWithAMS rm = - new MockRMWithAMS(conf, containerManager); + new MockRMWithAMS(conf, containerManager, memStore); rm.start(); final Configuration conf = rm.getConfig(); @@ -215,6 +238,10 @@ public void testMasterKeyRollOver() throws Exception { RMApp app = rm.submitApp(1024); + // assert app info is saved + ApplicationState appState = rmAppState.get(app.getApplicationId()); + Assert.assertNotNull(appState); + nm1.nodeHeartbeat(true); int waitCount = 0; @@ -227,6 +254,11 @@ public void testMasterKeyRollOver() throws Exception { RMAppAttempt attempt = app.getCurrentAppAttempt(); ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId(); + // assert attempt info is saved + ApplicationAttemptState attemptState = + appState.getAttempt(applicationAttemptId); + Assert.assertNotNull(attemptState); + // Create a client to the RM. UserGroupInformation currentUser = UserGroupInformation @@ -253,16 +285,67 @@ public void testMasterKeyRollOver() throws Exception { // Simulate a master-key-roll-over AMRMTokenSecretManager appTokenSecretManager = rm.getRMContext().getAMRMTokenSecretManager(); - SecretKey oldKey = appTokenSecretManager.getMasterKey(); + SecretKey oldKey = + appTokenSecretManager.getCurrentMasterKey().getSecretKey(); + + // Manually call rollMasterKey. It will create a newKey + // which will be assigned to nextMasterKey appTokenSecretManager.rollMasterKey(); - SecretKey newKey = appTokenSecretManager.getMasterKey(); + SecretKey newKey = + appTokenSecretManager.getNextMasterKey().getSecretKey(); + Assert.assertTrue(newKey != null); + + // Do another allocate call, the new AMRMToken should be + // saved in RMStateStore + allocateRequest.setResponseId(allocateRequest.getResponseId() + 1); + rmClient.allocate(allocateRequest); + Token recoveredToken = null; + + // Verify the nextActiveToken is saved in RMStateStore + int count = 0; + while (count < maxWaitAttempts) { + recoveredToken = + (Token) rmAppState.get(app.getApplicationId()) + .getAttempt(applicationAttemptId).getAppAttemptCredentials() + .getToken(NEXT_ACTIVE_AMRMTOKEN); + if (recoveredToken != null + && recoveredToken.decodeIdentifier().getKeyId() == appTokenSecretManager + .getNextMasterKey().getMasterKey().getKeyId()) { + break; + } + count++; + Thread.sleep(100); + } + + Assert + .assertTrue(recoveredToken.decodeIdentifier().getKeyId() == appTokenSecretManager + .getNextMasterKey().getMasterKey().getKeyId()); + + // Manually call activateNextMasterKey. Will replace + // currentMasterKey with nextMasterKey + appTokenSecretManager.activateNextMasterKey(); + SecretKey currentKeyAfterActive = + appTokenSecretManager.getCurrentMasterKey().getSecretKey(); + Assert.assertFalse("Master key should have changed!", - oldKey.equals(newKey)); + oldKey.equals(currentKeyAfterActive)); + Assert.assertTrue(newKey.equals(currentKeyAfterActive)); + + // renew the token for currentUser + Token newAMRMToken = attempt.getAMRMToken(); + SecurityUtil.setTokenService(newAMRMToken, rmBindAddress); + currentUser.addToken(newAMRMToken); + for (Token token : currentUser.getTokens()) { + if (token.decodeIdentifier() instanceof AMRMTokenIdentifier) { + Assert.assertFalse(token.getPassword() + .equals(amRMToken.getPassword())); + } + } // Another allocate call. Should continue to work. rpc.stopProxy(rmClient, conf); // To avoid using cached client rmClient = createRMClient(rm, conf, rpc, currentUser); - allocateRequest = Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(allocateRequest.getResponseId() + 1); Assert.assertTrue( rmClient.allocate(allocateRequest).getAMCommand() == null); } finally {