diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java index 62316a6..39a1f87 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -64,7 +65,19 @@ public static AllocateRequest newInstance(int responseID, float appProgress, return newInstance(responseID, appProgress, resourceAsk, containersToBeReleased, resourceBlacklistRequest, null); } - + + @Public + @Stable + public static AllocateRequest newInstance(int responseID, float appProgress, + List resourceAsk, + List containersToBeReleased, + ResourceBlacklistRequest resourceBlacklistRequest, + int currentAMRMTokenKeyId) { + return newInstance(responseID, appProgress, resourceAsk, + containersToBeReleased, resourceBlacklistRequest, null, + currentAMRMTokenKeyId); + } + @Public @Stable public static AllocateRequest newInstance(int responseID, float appProgress, @@ -72,6 +85,19 @@ public static AllocateRequest newInstance(int responseID, float appProgress, List containersToBeReleased, ResourceBlacklistRequest resourceBlacklistRequest, List increaseRequests) { + return newInstance(responseID, appProgress, resourceAsk, + containersToBeReleased, resourceBlacklistRequest, increaseRequests, + Integer.MIN_VALUE); + } + + @Public + @Stable + public static AllocateRequest newInstance(int responseID, float appProgress, + List resourceAsk, + List containersToBeReleased, + ResourceBlacklistRequest resourceBlacklistRequest, + List increaseRequests, + int currentAMRMTokenKeyId) { AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); allocateRequest.setResponseId(responseID); allocateRequest.setProgress(appProgress); @@ -79,9 +105,10 @@ public static AllocateRequest newInstance(int responseID, float appProgress, allocateRequest.setReleaseList(containersToBeReleased); allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest); allocateRequest.setIncreaseRequests(increaseRequests); + allocateRequest.setCurrentAMRMTokenKeyId(currentAMRMTokenKeyId); return allocateRequest; } - + /** * Get the response id used to track duplicate responses. * @return response id @@ -201,4 +228,15 @@ public abstract void setResourceBlacklistRequest( @Stable public abstract void setIncreaseRequests( List increaseRequests); + + /** + * Get the keyId for currentAMRMToken of ApplicationMaster + */ + @Public + @Unstable + public abstract int getCurrentAMRMTokenKeyId(); + + @Public + @Unstable + public abstract void setCurrentAMRMTokenKeyId(int keyId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index 0e27f32..1d8a798 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.api.protocolrecords; +import java.nio.ByteBuffer; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -71,7 +72,7 @@ public static AllocateResponse newInstance(int responseId, List completedContainers, List allocatedContainers, List updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, - PreemptionMessage preempt, List nmTokens) { + PreemptionMessage preempt, List nmTokens, ByteBuffer amRMToken) { AllocateResponse response = Records.newRecord(AllocateResponse.class); response.setNumClusterNodes(numClusterNodes); response.setResponseId(responseId); @@ -82,6 +83,7 @@ public static AllocateResponse newInstance(int responseId, response.setAMCommand(command); response.setPreemptionMessage(preempt); response.setNMTokens(nmTokens); + response.setAMRMToken(amRMToken); return response; } @@ -91,12 +93,12 @@ public static AllocateResponse newInstance(int responseId, List completedContainers, List allocatedContainers, List updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, - PreemptionMessage preempt, List nmTokens, + PreemptionMessage preempt, List nmTokens, ByteBuffer amRMToken, List increasedContainers, List decreasedContainers) { AllocateResponse response = newInstance(responseId, completedContainers, allocatedContainers, updatedNodes, availResources, command, - numClusterNodes, preempt, nmTokens); + numClusterNodes, preempt, nmTokens, amRMToken); response.setIncreasedContainers(increasedContainers); response.setDecreasedContainers(decreasedContainers); return response; @@ -270,4 +272,17 @@ public abstract void setIncreasedContainers( @Unstable public abstract void setDecreasedContainers( List decreasedContainers); + + /** + * The AMRMToken that belong to this attempt + * + * @return The AMRMToken that belong to this attempt + */ + @Public + @Unstable + public abstract ByteBuffer getAMRMToken(); + + @Private + @Unstable + public abstract void setAMRMToken(ByteBuffer amRMToken); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterRequest.java index 6b01854..2c4d23e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterRequest.java @@ -20,6 +20,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.util.Records; @@ -47,6 +48,7 @@ *
    *
  • port: -1
  • *
  • trackingUrl: null
  • + *
  • currentAMRMTokenKeyId: Integer.MIN_VALUE
  • *
* The port is allowed to be any integer larger than or equal to -1. * @return the new instance of RegisterApplicationMasterRequest @@ -54,15 +56,23 @@ @Public @Stable public static RegisterApplicationMasterRequest newInstance(String host, - int port, String trackingUrl) { + int port, String trackingUrl, int currentAMRMTokenKeyId) { RegisterApplicationMasterRequest request = Records.newRecord(RegisterApplicationMasterRequest.class); request.setHost(host); request.setRpcPort(port); request.setTrackingUrl(trackingUrl); + request.setCurrentAMRMTokenKeyId(currentAMRMTokenKeyId); return request; } + @Public + @Stable + public static RegisterApplicationMasterRequest newInstance(String host, + int port, String trackingUrl) { + return newInstance(host, port, trackingUrl, Integer.MIN_VALUE); + } + /** * Get the host on which the ApplicationMaster is * running. @@ -133,4 +143,15 @@ public static RegisterApplicationMasterRequest newInstance(String host, @Public @Stable public abstract void setTrackingUrl(String trackingUrl); + + /** + * Get the keyId for currentAMRMToken of ApplicationMaster + */ + @Public + @Unstable + public abstract int getCurrentAMRMTokenKeyId(); + + @Public + @Unstable + public abstract void setCurrentAMRMTokenKeyId(int keyId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java index 79f9f3a..859f096 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java @@ -57,7 +57,7 @@ public static RegisterApplicationMasterResponse newInstance( Resource minCapability, Resource maxCapability, Map acls, ByteBuffer key, List containersFromPreviousAttempt, String queue, - List nmTokensFromPreviousAttempts) { + List nmTokensFromPreviousAttempts, ByteBuffer amRMToken) { RegisterApplicationMasterResponse response = Records.newRecord(RegisterApplicationMasterResponse.class); response.setMaximumResourceCapability(maxCapability); @@ -66,6 +66,7 @@ public static RegisterApplicationMasterResponse newInstance( response.setContainersFromPreviousAttempts(containersFromPreviousAttempt); response.setNMTokensFromPreviousAttempts(nmTokensFromPreviousAttempts); response.setQueue(queue); + response.setAMRMToken(amRMToken); return response; } @@ -180,4 +181,17 @@ public abstract void setContainersFromPreviousAttempts( @Private @Unstable public abstract void setNMTokensFromPreviousAttempts(List nmTokens); + + /** + * The AMRMToken that belong to this attempt + * + * @return The AMRMToken that belong to this attempt + */ + @Public + @Unstable + public abstract ByteBuffer getAMRMToken(); + + @Private + @Unstable + public abstract void setAMRMToken(ByteBuffer amRMToken); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index a1f6d2e..e3677e7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -38,6 +38,7 @@ message RegisterApplicationMasterRequestProto { optional string host = 1; optional int32 rpc_port = 2; optional string tracking_url = 3; + optional int32 currentAMRMToken_keyid = 4; } message RegisterApplicationMasterResponseProto { @@ -47,6 +48,7 @@ message RegisterApplicationMasterResponseProto { repeated ContainerProto containers_from_previous_attempts = 4; optional string queue = 5; repeated NMTokenProto nm_tokens_from_previous_attempts = 6; + optional bytes am_rm_token = 7; } message FinishApplicationMasterRequestProto { @@ -66,6 +68,7 @@ message AllocateRequestProto { optional int32 response_id = 4; optional float progress = 5; repeated ContainerResourceIncreaseRequestProto increase_request = 6; + optional int32 currentAMRMToken_keyid = 7; } message NMTokenProto { @@ -85,6 +88,7 @@ message AllocateResponseProto { repeated NMTokenProto nm_tokens = 9; repeated ContainerResourceIncreaseProto increased_containers = 10; repeated ContainerResourceDecreaseProto decreased_containers = 11; + optional bytes am_rm_token = 12; } ////////////////////////////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 1eebaac..ebb4dd0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -19,12 +19,14 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; @@ -33,13 +35,18 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -61,6 +68,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.RackResolver; import com.google.common.annotations.VisibleForTesting; @@ -77,6 +85,9 @@ private int lastResponseId = 0; + private AtomicInteger currentAMRMTokenKeyId = new AtomicInteger( + Integer.MIN_VALUE); + protected ApplicationMasterProtocol rmClient; protected Resource clusterAvailableResources; protected int clusterNodeCount; @@ -159,6 +170,20 @@ public AMRMClientImpl() { protected void serviceInit(Configuration conf) throws Exception { RackResolver.init(conf); super.serviceInit(conf); + + // Initiate current AMRMToken KeyId + Credentials credentials = + UserGroupInformation.getCurrentUser().getCredentials(); + // Now remove the AM->RM token so tasks don't have it + Iterator> iter = credentials.getAllTokens().iterator(); + while (iter.hasNext()) { + Token token = iter.next(); + if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { + currentAMRMTokenKeyId.set(((AMRMTokenIdentifier) token + .decodeIdentifier()).getKeyId()); + break; + } + } } @Override @@ -192,7 +217,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster( // do this only once ??? RegisterApplicationMasterRequest request = RegisterApplicationMasterRequest.newInstance(appHostName, appHostPort, - appTrackingUrl); + appTrackingUrl, this.currentAMRMTokenKeyId.get()); RegisterApplicationMasterResponse response = rmClient.registerApplicationMaster(request); @@ -200,6 +225,9 @@ public RegisterApplicationMasterResponse registerApplicationMaster( if(!response.getNMTokensFromPreviousAttempts().isEmpty()) { populateNMTokens(response.getNMTokensFromPreviousAttempts()); } + if (response.getAMRMToken() != null) { + updateAMRMToken(response.getAMRMToken()); + } } return response; } @@ -241,7 +269,8 @@ public AllocateResponse allocate(float progressIndicator) allocateRequest = AllocateRequest.newInstance(lastResponseId, progressIndicator, - askList, releaseList, blacklistRequest); + askList, releaseList, blacklistRequest, + this.currentAMRMTokenKeyId.get()); // clear blacklistAdditions and blacklistRemovals before // unsynchronized part blacklistAdditions.clear(); @@ -258,6 +287,9 @@ public AllocateResponse allocate(float progressIndicator) if (!allocateResponse.getNMTokens().isEmpty()) { populateNMTokens(allocateResponse.getNMTokens()); } + if (allocateResponse.getAMRMToken() != null) { + updateAMRMToken(allocateResponse.getAMRMToken()); + } } } finally { // TODO how to differentiate remote yarn exception vs error in rpc @@ -675,4 +707,21 @@ public synchronized void updateBlacklist(List blacklistAdditions, "blacklistRemovals in updateBlacklist."); } } + + private void updateAMRMToken(ByteBuffer amRMToken) throws IOException { + Credentials credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(amRMToken); + credentials.readTokenStorageStream(dibb); + @SuppressWarnings("unchecked") + Token amrmToken = + (Token) credentials + .getToken(AMRMTokenIdentifier.KIND_NAME); + UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser(); + currentUGI.addToken(amrmToken); + this.currentAMRMTokenKeyId.set(amrmToken.decodeIdentifier().getKeyId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Update AMRMToken: " + amrmToken.toString()); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index 15bfa28..f53bc8a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -766,7 +766,7 @@ public AllocateResponse createFakeAllocateResponse() { new ArrayList(), new ArrayList(), new ArrayList(), Resource.newInstance(1024, 2), AMCommand.AM_RESYNC, 1, - null, new ArrayList()); + null, new ArrayList(), null); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index e21c4ba..7ed6415 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -359,7 +359,7 @@ private AllocateResponse createAllocateResponse( List nmTokens) { AllocateResponse response = AllocateResponse.newInstance(0, completed, allocated, - new ArrayList(), null, null, 1, null, nmTokens); + new ArrayList(), null, null, 1, null, nmTokens, null); return response; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java index dc11165..6eb207a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java @@ -311,7 +311,19 @@ public void setReleaseList(List releaseContainers) { this.release.clear(); this.release.addAll(releaseContainers); } - + + @Override + public int getCurrentAMRMTokenKeyId() { + AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; + return p.getCurrentAMRMTokenKeyid(); + } + + @Override + public void setCurrentAMRMTokenKeyId(int keyId) { + maybeInitBuilder(); + builder.setCurrentAMRMTokenKeyid(keyId); + } + private void initReleases() { if (this.release != null) { return; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index 4d7c0a3..af67bdf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -55,6 +56,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto; +import com.google.protobuf.ByteString; import com.google.protobuf.TextFormat; @Private @@ -74,6 +76,7 @@ private List updatedNodes = null; private PreemptionMessage preempt; + private ByteBuffer amRMToken = null; public AllocateResponsePBImpl() { @@ -154,6 +157,9 @@ private synchronized void mergeLocalToBuilder() { getChangeProtoIterable(this.decreasedContainers); builder.addAllDecreasedContainers(iterable); } + if (this.amRMToken != null) { + builder.setAmRmToken(convertToProtoFormat(this.amRMToken)); + } } private synchronized void mergeLocalToProto() { @@ -357,6 +363,28 @@ public synchronized void setDecreasedContainers( this.decreasedContainers.addAll(decreasedContainers); } + @Override + public synchronized ByteBuffer getAMRMToken() { + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + if (amRMToken != null) { + return amRMToken; + } + if (!p.hasAmRmToken()) { + return null; + } + this.amRMToken = convertFromProtoFormat(p.getAmRmToken()); + return amRMToken; + } + + @Override + public synchronized void setAMRMToken(ByteBuffer amRMToken) { + maybeInitBuilder(); + if (amRMToken == null) { + builder.clearAmRmToken(); + } + this.amRMToken = amRMToken; + } + private synchronized void initLocalIncreasedContainerList() { if (this.increasedContainers != null) { return; @@ -699,4 +727,12 @@ private synchronized NMTokenProto convertToProtoFormat(NMToken token) { private synchronized NMToken convertFromProtoFormat(NMTokenProto proto) { return new NMTokenPBImpl(proto); } + + private synchronized ByteBuffer convertFromProtoFormat(ByteString byteString) { + return ProtoUtils.convertFromProtoFormat(byteString); + } + + private synchronized ByteString convertToProtoFormat(ByteBuffer byteBuffer) { + return ProtoUtils.convertToProtoFormat(byteBuffer); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterRequestPBImpl.java index 037dfd9..77d85e0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterRequestPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterRequestPBImpl.java @@ -131,4 +131,17 @@ public void setTrackingUrl(String url) { } builder.setTrackingUrl(url); } + + @Override + public int getCurrentAMRMTokenKeyId() { + RegisterApplicationMasterRequestProtoOrBuilder p = + viaProto ? proto : builder; + return p.getCurrentAMRMTokenKeyid(); + } + + @Override + public void setCurrentAMRMTokenKeyId(int keyId) { + maybeInitBuilder(); + builder.setCurrentAMRMTokenKeyid(keyId); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java index 06a637a..446b260 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java @@ -61,6 +61,7 @@ private Map applicationACLS = null; private List containersFromPreviousAttempts = null; private List nmTokens = null; + private ByteBuffer amRMToken = null; public RegisterApplicationMasterResponsePBImpl() { builder = RegisterApplicationMasterResponseProto.newBuilder(); @@ -122,6 +123,9 @@ private void mergeLocalToBuilder() { Iterable iterable = getTokenProtoIterable(nmTokens); builder.addAllNmTokensFromPreviousAttempts(iterable); } + if (this.amRMToken != null) { + builder.setAmRmToken(convertToProtoFormat(this.amRMToken)); + } } @@ -364,6 +368,29 @@ public void remove() { }; } + @Override + public ByteBuffer getAMRMToken() { + RegisterApplicationMasterResponseProtoOrBuilder p = + viaProto ? proto : builder; + if (amRMToken != null) { + return amRMToken; + } + if (!p.hasAmRmToken()) { + return null; + } + this.amRMToken = convertFromProtoFormat(p.getAmRmToken()); + return amRMToken; + } + + @Override + public void setAMRMToken(ByteBuffer amRMToken) { + maybeInitBuilder(); + if (amRMToken == null) { + builder.clearAmRmToken(); + } + this.amRMToken = amRMToken; + } + private Resource convertFromProtoFormat(ResourceProto resource) { return new ResourcePBImpl(resource); } @@ -387,4 +414,12 @@ private NMTokenProto convertToProtoFormat(NMToken token) { private NMToken convertFromProtoFormat(NMTokenProto proto) { return new NMTokenPBImpl(proto); } + + private ByteBuffer convertFromProtoFormat(ByteString byteString) { + return ProtoUtils.convertFromProtoFormat(byteString); + } + + private ByteString convertToProtoFormat(ByteBuffer byteBuffer) { + return ProtoUtils.convertToProtoFormat(byteBuffer); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java index a8a56d7..16e1abb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java @@ -72,7 +72,7 @@ public void testAllocateResponseWithIncDecContainers() { AllocateResponse r = AllocateResponse.newInstance(3, new ArrayList(), new ArrayList(), new ArrayList(), null, - AMCommand.AM_RESYNC, 3, null, new ArrayList(), + AMCommand.AM_RESYNC, 3, null, new ArrayList(), null, incContainers, decContainers); // serde @@ -100,8 +100,9 @@ public void testAllocateResponseWithIncDecContainers() { public void testAllocateResponseWithoutIncDecContainers() { AllocateResponse r = AllocateResponse.newInstance(3, new ArrayList(), - new ArrayList(), new ArrayList(), null, - AMCommand.AM_RESYNC, 3, null, new ArrayList(), null, null); + new ArrayList(), new ArrayList(), null, + AMCommand.AM_RESYNC, 3, null, new ArrayList(), null, null, + null); // serde AllocateResponseProto p = ((AllocateResponsePBImpl) r).getProto(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index e60add4..0289006 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -35,10 +36,13 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.PolicyProvider; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; @@ -89,6 +93,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; +import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import com.google.common.annotations.VisibleForTesting; @@ -292,6 +297,23 @@ public RegisterApplicationMasterResponse registerApplicationMaster( .getMasterKey(applicationAttemptId).getEncoded())); } + // update AMRMToken if the token is rolled-up + MasterKeyData nextMasterKey = + this.rmContext.getAMRMTokenSecretManager().getNextMasterKey(); + if (nextMasterKey != null + && nextMasterKey.getMasterKey().getKeyId() != request + .getCurrentAMRMTokenKeyId()) { + Token amrmToken = + rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + applicationAttemptId); + Credentials credentials = new Credentials(); + credentials.addToken(AMRMTokenIdentifier.KIND_NAME, amrmToken); + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + response + .setAMRMToken(ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); + } + // For work-preserving AM restart, retrieve previous attempts' containers // and corresponding NM tokens. List transferredContainers = @@ -554,6 +576,22 @@ public AllocateResponse allocate(AllocateRequest request) allocateResponse .setPreemptionMessage(generatePreemptionMessage(allocation)); + // update AMRMToken if the token is rolled-up + MasterKeyData nextMasterKey = + this.rmContext.getAMRMTokenSecretManager().getNextMasterKey(); + if (nextMasterKey != null + && nextMasterKey.getMasterKey().getKeyId() != request + .getCurrentAMRMTokenKeyId()) { + Token amrmToken = + rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + appAttemptId); + Credentials credentials = new Credentials(); + credentials.addToken(AMRMTokenIdentifier.KIND_NAME, amrmToken); + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + allocateResponse + .setAMRMToken(ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); + } /* * As we are updating the response inside the lock object so we don't * need to worry about unregister call occurring in between (which diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 6961c86..83b0d5e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -202,16 +202,17 @@ private void loadAMRMTokenSecretManagerState(RMState rmState) try { FileStatus status = fs.getFileStatus(amrmTokenSecretManagerStateDataDir); assert status.isFile(); + byte[] data = + readFile(amrmTokenSecretManagerStateDataDir, status.getLen()); + AMRMTokenSecretManagerStateDataPBImpl stateData = + new AMRMTokenSecretManagerStateDataPBImpl( + AMRMTokenSecretManagerStateDataProto.parseFrom(data)); + rmState.amrmTokenSecretManagerState = + new AMRMTokenSecretManagerState(stateData.getCurrentTokenMasterKey(), + stateData.getNextTokenMasterKey()); } catch (FileNotFoundException ex) { return; } - byte[] data = readFile(amrmTokenSecretManagerStateDataDir, status.getLen()); - AMRMTokenSecretManagerStateDataPBImpl stateData = - new AMRMTokenSecretManagerStateDataPBImpl( - AMRMTokenSecretManagerStateDataProto.parseFrom(data)); - rmState.amrmTokenSecretManagerState = - new AMRMTokenSecretManagerState(stateData.getCurrentTokenMasterKey(), - stateData.getNextTokenMasterKey()); } private void loadRMAppState(RMState rmState) throws Exception { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java index 2141353..3098aa0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java @@ -268,4 +268,10 @@ public void recover(RMState state) { } } } + + public synchronized MasterKeyData getNextMasterKey() { + return this.nextMasterKey; + } + + }