diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 4b32c04..89abf27 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; @@ -39,7 +40,9 @@ import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -57,6 +60,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import com.google.common.annotations.VisibleForTesting; @@ -167,6 +171,9 @@ protected void register() { String queue = response.getQueue(); LOG.info("queue: " + queue); job.setQueueName(queue); + if (response.getAMRMToken() != null) { + updateAMRMToken(response.getAMRMToken()); + } } catch (Exception are) { LOG.error("Exception while registering", are); throw new YarnRuntimeException(are); @@ -343,4 +350,17 @@ public void setSignalled(boolean isSignalled) { protected boolean isApplicationMasterRegistered() { return isApplicationMasterRegistered; } + + protected void updateAMRMToken(ByteBuffer amRMToken) throws IOException { + Credentials credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(amRMToken); + credentials.readTokenStorageStream(dibb); + @SuppressWarnings("unchecked") + Token amrmToken = + (Token) credentials + .getToken(AMRMTokenIdentifier.KIND_NAME); + UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser(); + currentUGI.addToken(amrmToken); + } } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 307cdfe..03f5625 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -674,7 +674,12 @@ public void rampDownReduces(int rampDown) { nmToken.getToken()); } } - + + // Setting AMRMToken + if (response.getAMRMToken() != null) { + updateAMRMToken(response.getAMRMToken()); + } + List finishedContainers = response.getCompletedContainersStatuses(); // propagate preemption requests diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index 0e27f32..d19e4e0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.api.protocolrecords; +import java.nio.ByteBuffer; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -56,6 +57,7 @@ *
  • A list of nodes whose status has been updated.
  • *
  • The number of available nodes in a cluster.
  • *
  • A description of resources requested back by the cluster
  • + *
  • AMRMToken, if AMRMToken has been rolled over
  • * *

    * @@ -71,7 +73,7 @@ public static AllocateResponse newInstance(int responseId, List completedContainers, List allocatedContainers, List updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, - PreemptionMessage preempt, List nmTokens) { + PreemptionMessage preempt, List nmTokens, ByteBuffer amRMToken) { AllocateResponse response = Records.newRecord(AllocateResponse.class); response.setNumClusterNodes(numClusterNodes); response.setResponseId(responseId); @@ -82,9 +84,23 @@ public static AllocateResponse newInstance(int responseId, response.setAMCommand(command); response.setPreemptionMessage(preempt); response.setNMTokens(nmTokens); + response.setAMRMToken(amRMToken); return response; } - + + @Public + @Stable + public static AllocateResponse newInstance(int responseId, + List completedContainers, + List allocatedContainers, List updatedNodes, + Resource availResources, AMCommand command, int numClusterNodes, + PreemptionMessage preempt, List nmTokens) { + AllocateResponse response = newInstance(responseId, completedContainers, + allocatedContainers, updatedNodes, availResources, command, + numClusterNodes, preempt, nmTokens, null); + return response; + } + @Public @Stable public static AllocateResponse newInstance(int responseId, @@ -102,6 +118,23 @@ public static AllocateResponse newInstance(int responseId, return response; } + @Public + @Stable + public static AllocateResponse newInstance(int responseId, + List completedContainers, + List allocatedContainers, List updatedNodes, + Resource availResources, AMCommand command, int numClusterNodes, + PreemptionMessage preempt, List nmTokens, ByteBuffer amRMToken, + List increasedContainers, + List decreasedContainers) { + AllocateResponse response = newInstance(responseId, completedContainers, + allocatedContainers, updatedNodes, availResources, command, + numClusterNodes, preempt, nmTokens, amRMToken); + response.setIncreasedContainers(increasedContainers); + response.setDecreasedContainers(decreasedContainers); + return response; + } + /** * If the ResourceManager needs the * ApplicationMaster to take some action then it will send an @@ -270,4 +303,17 @@ public abstract void setIncreasedContainers( @Unstable public abstract void setDecreasedContainers( List decreasedContainers); + + /** + * The AMRMToken that belong to this attempt + * + * @return The AMRMToken that belong to this attempt + */ + @Public + @Stable + public abstract ByteBuffer getAMRMToken(); + + @Private + @Unstable + public abstract void setAMRMToken(ByteBuffer amRMToken); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java index 79f9f3a..e7f1c14 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java @@ -42,6 +42,7 @@ *
  • Maximum capability for allocated resources in the cluster.
  • *
  • ApplicationACLs for the application.
  • *
  • ClientToAMToken master key.
  • + *
  • AMRMToken, if AMRMToken has been rolled over
  • * *

    * @@ -58,6 +59,17 @@ public static RegisterApplicationMasterResponse newInstance( Map acls, ByteBuffer key, List containersFromPreviousAttempt, String queue, List nmTokensFromPreviousAttempts) { + return newInstance(minCapability, maxCapability, acls, key, + containersFromPreviousAttempt, queue, nmTokensFromPreviousAttempts, null); + } + + @Private + @Unstable + public static RegisterApplicationMasterResponse newInstance( + Resource minCapability, Resource maxCapability, + Map acls, ByteBuffer key, + List containersFromPreviousAttempt, String queue, + List nmTokensFromPreviousAttempts, ByteBuffer amRMToken) { RegisterApplicationMasterResponse response = Records.newRecord(RegisterApplicationMasterResponse.class); response.setMaximumResourceCapability(maxCapability); @@ -66,6 +78,7 @@ public static RegisterApplicationMasterResponse newInstance( response.setContainersFromPreviousAttempts(containersFromPreviousAttempt); response.setNMTokensFromPreviousAttempts(nmTokensFromPreviousAttempts); response.setQueue(queue); + response.setAMRMToken(amRMToken); return response; } @@ -180,4 +193,17 @@ public abstract void setContainersFromPreviousAttempts( @Private @Unstable public abstract void setNMTokensFromPreviousAttempts(List nmTokens); + + /** + * The AMRMToken that belong to this attempt + * + * @return The AMRMToken that belong to this attempt + */ + @Public + @Stable + public abstract ByteBuffer getAMRMToken(); + + @Private + @Unstable + public abstract void setAMRMToken(ByteBuffer amRMToken); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index a1f6d2e..d16fa47 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -47,6 +47,7 @@ message RegisterApplicationMasterResponseProto { repeated ContainerProto containers_from_previous_attempts = 4; optional string queue = 5; repeated NMTokenProto nm_tokens_from_previous_attempts = 6; + optional bytes am_rm_token = 7; } message FinishApplicationMasterRequestProto { @@ -85,6 +86,7 @@ message AllocateResponseProto { repeated NMTokenProto nm_tokens = 9; repeated ContainerResourceIncreaseProto increased_containers = 10; repeated ContainerResourceDecreaseProto decreased_containers = 11; + optional bytes am_rm_token = 12; } ////////////////////////////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 1db7054..33e0938 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -39,7 +40,11 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -64,6 +69,7 @@ import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.RackResolver; import com.google.common.annotations.VisibleForTesting; @@ -223,6 +229,9 @@ private RegisterApplicationMasterResponse registerApplicationMaster() if (!response.getNMTokensFromPreviousAttempts().isEmpty()) { populateNMTokens(response.getNMTokensFromPreviousAttempts()); } + if (response.getAMRMToken() != null) { + updateAMRMToken(response.getAMRMToken()); + } } return response; } @@ -300,6 +309,9 @@ public AllocateResponse allocate(float progressIndicator) if (!allocateResponse.getNMTokens().isEmpty()) { populateNMTokens(allocateResponse.getNMTokens()); } + if (allocateResponse.getAMRMToken() != null) { + updateAMRMToken(allocateResponse.getAMRMToken()); + } if (!pendingRelease.isEmpty() && !allocateResponse.getCompletedContainersStatuses().isEmpty()) { removePendingReleaseRequests(allocateResponse @@ -743,4 +755,17 @@ public synchronized void updateBlacklist(List blacklistAdditions, "blacklistRemovals in updateBlacklist."); } } + + private void updateAMRMToken(ByteBuffer amRMToken) throws IOException { + Credentials credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(amRMToken); + credentials.readTokenStorageStream(dibb); + @SuppressWarnings("unchecked") + Token amrmToken = + (Token) credentials + .getToken(AMRMTokenIdentifier.KIND_NAME); + UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser(); + currentUGI.addToken(amrmToken); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index 4d7c0a3..c670032 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -55,6 +56,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto; +import com.google.protobuf.ByteString; import com.google.protobuf.TextFormat; @Private @@ -74,7 +76,7 @@ private List updatedNodes = null; private PreemptionMessage preempt; - + private ByteBuffer amRMToken = null; public AllocateResponsePBImpl() { builder = AllocateResponseProto.newBuilder(); @@ -154,6 +156,9 @@ private synchronized void mergeLocalToBuilder() { getChangeProtoIterable(this.decreasedContainers); builder.addAllDecreasedContainers(iterable); } + if (this.amRMToken != null) { + builder.setAmRmToken(convertToProtoFormat(this.amRMToken)); + } } private synchronized void mergeLocalToProto() { @@ -357,6 +362,28 @@ public synchronized void setDecreasedContainers( this.decreasedContainers.addAll(decreasedContainers); } + @Override + public synchronized ByteBuffer getAMRMToken() { + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + if (amRMToken != null) { + return amRMToken; + } + if (!p.hasAmRmToken()) { + return null; + } + this.amRMToken = convertFromProtoFormat(p.getAmRmToken()); + return amRMToken; + } + + @Override + public synchronized void setAMRMToken(ByteBuffer amRMToken) { + maybeInitBuilder(); + if (amRMToken == null) { + builder.clearAmRmToken(); + } + this.amRMToken = amRMToken; + } + private synchronized void initLocalIncreasedContainerList() { if (this.increasedContainers != null) { return; @@ -699,4 +726,12 @@ private synchronized NMTokenProto convertToProtoFormat(NMToken token) { private synchronized NMToken convertFromProtoFormat(NMTokenProto proto) { return new NMTokenPBImpl(proto); } + + private synchronized ByteBuffer convertFromProtoFormat(ByteString byteString) { + return ProtoUtils.convertFromProtoFormat(byteString); + } + + private synchronized ByteString convertToProtoFormat(ByteBuffer byteBuffer) { + return ProtoUtils.convertToProtoFormat(byteBuffer); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java index 06a637a..c17f2f3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java @@ -61,6 +61,7 @@ private Map applicationACLS = null; private List containersFromPreviousAttempts = null; private List nmTokens = null; + private ByteBuffer amRMToken = null; public RegisterApplicationMasterResponsePBImpl() { builder = RegisterApplicationMasterResponseProto.newBuilder(); @@ -122,6 +123,9 @@ private void mergeLocalToBuilder() { Iterable iterable = getTokenProtoIterable(nmTokens); builder.addAllNmTokensFromPreviousAttempts(iterable); } + if (this.amRMToken != null) { + builder.setAmRmToken(convertToProtoFormat(this.amRMToken)); + } } @@ -282,6 +286,28 @@ public void setQueue(String queue) { } } + @Override + public ByteBuffer getAMRMToken() { + RegisterApplicationMasterResponseProtoOrBuilder p = + viaProto ? proto : builder; + if (amRMToken != null) { + return amRMToken; + } + if (!p.hasAmRmToken()) { + return null; + } + this.amRMToken = convertFromProtoFormat(p.getAmRmToken()); + return amRMToken; + } + + @Override + public void setAMRMToken(ByteBuffer amRMToken) { + maybeInitBuilder(); + if (amRMToken == null) { + builder.clearAmRmToken(); + } + this.amRMToken = amRMToken; + } private void initContainersPreviousAttemptList() { RegisterApplicationMasterResponseProtoOrBuilder p = @@ -387,4 +413,12 @@ private NMTokenProto convertToProtoFormat(NMToken token) { private NMToken convertFromProtoFormat(NMTokenProto proto) { return new NMTokenPBImpl(proto); } + + private ByteBuffer convertFromProtoFormat(ByteString byteString) { + return ProtoUtils.convertFromProtoFormat(byteString); + } + + private ByteString convertToProtoFormat(ByteBuffer byteBuffer) { + return ProtoUtils.convertToProtoFormat(byteBuffer); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index e60add4..8253f22 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -35,10 +36,13 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.PolicyProvider; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; @@ -80,6 +84,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; @@ -89,6 +94,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; +import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import com.google.common.annotations.VisibleForTesting; @@ -186,7 +192,7 @@ private AMRMTokenIdentifier selectAMRMTokenIdentifier( return result; } - private ApplicationAttemptId authorizeRequest() + private AMRMTokenIdentifier authorizeRequest() throws YarnException { UserGroupInformation remoteUgi; @@ -223,7 +229,7 @@ private ApplicationAttemptId authorizeRequest() throw RPCUtil.getRemoteException(message); } - return appTokenIdentifier.getApplicationAttemptId(); + return appTokenIdentifier; } @Override @@ -231,7 +237,9 @@ public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException { - ApplicationAttemptId applicationAttemptId = authorizeRequest(); + AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest(); + ApplicationAttemptId applicationAttemptId = + amrmTokenIdentifier.getApplicationAttemptId(); ApplicationId appID = applicationAttemptId.getApplicationId(); AllocateResponseLock lock = responseMap.get(applicationAttemptId); @@ -292,6 +300,25 @@ public RegisterApplicationMasterResponse registerApplicationMaster( .getMasterKey(applicationAttemptId).getEncoded())); } + // update AMRMToken if the token is rolled-up + MasterKeyData nextMasterKey = + this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData(); + if (nextMasterKey != null + && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier + .getKeyId()) { + Token amrmToken = + rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + applicationAttemptId); + ((RMAppAttemptImpl) app.getRMAppAttempt(applicationAttemptId)) + .setAMRMToken(amrmToken); + Credentials credentials = new Credentials(); + credentials.addToken(AMRMTokenIdentifier.KIND_NAME, amrmToken); + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + response.setAMRMToken(ByteBuffer.wrap(dob.getData(), 0, + dob.getLength())); + } + // For work-preserving AM restart, retrieve previous attempts' containers // and corresponding NM tokens. List transferredContainers = @@ -330,7 +357,8 @@ public FinishApplicationMasterResponse finishApplicationMaster( FinishApplicationMasterRequest request) throws YarnException, IOException { - ApplicationAttemptId applicationAttemptId = authorizeRequest(); + ApplicationAttemptId applicationAttemptId = + authorizeRequest().getApplicationAttemptId(); AllocateResponseLock lock = responseMap.get(applicationAttemptId); if (lock == null) { @@ -405,7 +433,10 @@ public boolean hasApplicationMasterRegistered( public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { - ApplicationAttemptId appAttemptId = authorizeRequest(); + AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest(); + + ApplicationAttemptId appAttemptId = + amrmTokenIdentifier.getApplicationAttemptId(); this.amLivelinessMonitor.receivedPing(appAttemptId); @@ -554,6 +585,25 @@ public AllocateResponse allocate(AllocateRequest request) allocateResponse .setPreemptionMessage(generatePreemptionMessage(allocation)); + // update AMRMToken if the token is rolled-up + MasterKeyData nextMasterKey = + this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData(); + + if (nextMasterKey != null + && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier + .getKeyId()) { + Token amrmToken = + rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + appAttemptId); + ((RMAppAttemptImpl)appAttempt).setAMRMToken(amrmToken); + Credentials credentials = new Credentials(); + credentials.addToken(AMRMTokenIdentifier.KIND_NAME, amrmToken); + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + allocateResponse.setAMRMToken(ByteBuffer.wrap(dob.getData(), 0, + dob.getLength())); + } + /* * As we are updating the response inside the lock object so we don't * need to worry about unregister call occurring in between (which diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 50a0755..b649cda 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -37,6 +37,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -553,7 +554,22 @@ public SecretKey getClientTokenMasterKey() { @Override public Token getAMRMToken() { - return this.amrmToken; + this.readLock.lock(); + try { + return this.amrmToken; + } finally { + this.readLock.unlock(); + } + } + + @Private + public void setAMRMToken(Token lastToken) { + this.writeLock.lock(); + try { + this.amrmToken = lastToken; + } finally { + this.writeLock.unlock(); + } } @Override @@ -707,7 +723,8 @@ public void recover(RMState state) throws Exception { this.attemptMetrics.setIsPreempted(); } setMasterContainer(attemptState.getMasterContainer()); - recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials()); + recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(), + attemptState.getState()); this.recoveredFinalState = attemptState.getState(); this.originalTrackingUrl = attemptState.getFinalTrackingUrl(); this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl); @@ -719,8 +736,8 @@ public void transferStateFromPreviousAttempt(RMAppAttempt attempt) { this.justFinishedContainers = attempt.getJustFinishedContainers(); } - private void recoverAppAttemptCredentials(Credentials appAttemptTokens) - throws IOException { + private void recoverAppAttemptCredentials(Credentials appAttemptTokens, + RMAppAttemptState state) throws IOException { if (appAttemptTokens == null) { return; } @@ -732,12 +749,31 @@ private void recoverAppAttemptCredentials(Credentials appAttemptTokens) .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes); } - // Only one AMRMToken is stored per-attempt, so this should be fine. Can't - // use TokenSelector as service may change - think fail-over. - this.amrmToken = - (Token) appAttemptTokens - .getToken(RMStateStore.AM_RM_TOKEN_SERVICE); - rmContext.getAMRMTokenSecretManager().addPersistedPassword(this.amrmToken); + // Right now the master key of AMRMToken is stored separately. + // Based on the current state of RMAppAttempt, + // we could recover AMRMToken differently. + if (state == RMAppAttemptState.FAILED + || state == RMAppAttemptState.FINISHED + || state == RMAppAttemptState.KILLED) { + this.amrmToken = + (Token) appAttemptTokens + .getToken(RMStateStore.AM_RM_TOKEN_SERVICE); + rmContext.getAMRMTokenSecretManager() + .addPersistedPassword(this.amrmToken); + } else { + // If this RMAppAttemp is not at final state. + // assign the latest AMRMToken to this RMAppAttempt. + // case 1: AM has the AMRMToken which is generated based on + // currentMasterKey/nextMasterKey, and + // AMRMTokenSecretManager only recovered currentMasterKey/nextMasterKey. + // case 2: AM has the AMRMToken which is generated based on + // currentMasterKey, but AMRMTokenSecretManager only recovered + // currentMasterKey and nextMasterKey. For the next allocate call, + // AM will get the latest AMRMToken. + this.amrmToken = + rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + applicationAttemptId); + } } private static class BaseTransition implements diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java index 14385c4..7442617 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; +import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.util.Arrays; @@ -27,6 +28,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -34,6 +36,7 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -43,6 +46,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; @@ -328,6 +332,58 @@ public void testMasterKeyRollOver() throws Exception { } } + @Test (timeout = 20000) + public void testAMRMMasterKeysUpdate() throws Exception { + MockRM rm = new MockRM(conf) { + @Override + protected void doSecureLogin() throws IOException { + // Skip the login. + } + }; + rm.start(); + rm.start(); + MockNM nm = rm.registerNode("127.0.0.1:1234", 8000); + RMApp app = rm.submitApp(200); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm); + + // Do allocate. Should not update AMRMToken + AllocateResponse response = + am.allocate(Records.newRecord(AllocateRequest.class)); + Assert.assertNull(response.getAMRMToken()); + + // roll over the master key + // Do allocate again. the AM should get the latest AMRMToken + rm.getRMContext().getAMRMTokenSecretManager().rollMasterKey(); + response = am.allocate(Records.newRecord(AllocateRequest.class)); + Assert.assertNotNull(response.getAMRMToken()); + + Credentials credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(response.getAMRMToken()); + credentials.readTokenStorageStream(dibb); + @SuppressWarnings("unchecked") + Token amrmToken = + (Token) credentials + .getToken(AMRMTokenIdentifier.KIND_NAME); + + Assert.assertEquals(amrmToken.decodeIdentifier().getKeyId(), rm + .getRMContext().getAMRMTokenSecretManager().getMasterKey().getMasterKey() + .getKeyId()); + + // Do allocate again. The master key does not update. + // AM should not update its AMRMToken either + response = am.allocate(Records.newRecord(AllocateRequest.class)); + Assert.assertNull(response.getAMRMToken()); + + // Activate the next master key. Since there is new master key generated + // in AMRMTokenSecretManager. The AMRMToken will not get updated for AM + rm.getRMContext().getAMRMTokenSecretManager().activateNextMasterKey(); + response = am.allocate(Records.newRecord(AllocateRequest.class)); + Assert.assertNull(response.getAMRMToken()); + rm.stop(); + + } + private ApplicationMasterProtocol createRMClient(final MockRM rm, final Configuration conf, final YarnRPC rpc, UserGroupInformation currentUser) {