diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
index 4b32c04..89abf27 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
@@ -28,6 +28,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
@@ -39,7 +40,9 @@
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -57,6 +60,7 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import com.google.common.annotations.VisibleForTesting;
@@ -167,6 +171,9 @@ protected void register() {
String queue = response.getQueue();
LOG.info("queue: " + queue);
job.setQueueName(queue);
+ if (response.getAMRMToken() != null) {
+ updateAMRMToken(response.getAMRMToken());
+ }
} catch (Exception are) {
LOG.error("Exception while registering", are);
throw new YarnRuntimeException(are);
@@ -343,4 +350,17 @@ public void setSignalled(boolean isSignalled) {
protected boolean isApplicationMasterRegistered() {
return isApplicationMasterRegistered;
}
+
+ protected void updateAMRMToken(ByteBuffer amRMToken) throws IOException {
+ Credentials credentials = new Credentials();
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ dibb.reset(amRMToken);
+ credentials.readTokenStorageStream(dibb);
+ @SuppressWarnings("unchecked")
+ Token amrmToken =
+ (Token) credentials
+ .getToken(AMRMTokenIdentifier.KIND_NAME);
+ UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
+ currentUGI.addToken(amrmToken);
+ }
}
diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 307cdfe..03f5625 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -674,7 +674,12 @@ public void rampDownReduces(int rampDown) {
nmToken.getToken());
}
}
-
+
+ // Setting AMRMToken
+ if (response.getAMRMToken() != null) {
+ updateAMRMToken(response.getAMRMToken());
+ }
+
List finishedContainers = response.getCompletedContainersStatuses();
// propagate preemption requests
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
index 0e27f32..d19e4e0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.api.protocolrecords;
+import java.nio.ByteBuffer;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -56,6 +57,7 @@
* A list of nodes whose status has been updated.
* The number of available nodes in a cluster.
* A description of resources requested back by the cluster
+ * AMRMToken, if AMRMToken has been rolled over
*
*
*
@@ -71,7 +73,7 @@ public static AllocateResponse newInstance(int responseId,
List completedContainers,
List allocatedContainers, List updatedNodes,
Resource availResources, AMCommand command, int numClusterNodes,
- PreemptionMessage preempt, List nmTokens) {
+ PreemptionMessage preempt, List nmTokens, ByteBuffer amRMToken) {
AllocateResponse response = Records.newRecord(AllocateResponse.class);
response.setNumClusterNodes(numClusterNodes);
response.setResponseId(responseId);
@@ -82,9 +84,23 @@ public static AllocateResponse newInstance(int responseId,
response.setAMCommand(command);
response.setPreemptionMessage(preempt);
response.setNMTokens(nmTokens);
+ response.setAMRMToken(amRMToken);
return response;
}
-
+
+ @Public
+ @Stable
+ public static AllocateResponse newInstance(int responseId,
+ List completedContainers,
+ List allocatedContainers, List updatedNodes,
+ Resource availResources, AMCommand command, int numClusterNodes,
+ PreemptionMessage preempt, List nmTokens) {
+ AllocateResponse response = newInstance(responseId, completedContainers,
+ allocatedContainers, updatedNodes, availResources, command,
+ numClusterNodes, preempt, nmTokens, null);
+ return response;
+ }
+
@Public
@Stable
public static AllocateResponse newInstance(int responseId,
@@ -102,6 +118,23 @@ public static AllocateResponse newInstance(int responseId,
return response;
}
+ @Public
+ @Stable
+ public static AllocateResponse newInstance(int responseId,
+ List completedContainers,
+ List allocatedContainers, List updatedNodes,
+ Resource availResources, AMCommand command, int numClusterNodes,
+ PreemptionMessage preempt, List nmTokens, ByteBuffer amRMToken,
+ List increasedContainers,
+ List decreasedContainers) {
+ AllocateResponse response = newInstance(responseId, completedContainers,
+ allocatedContainers, updatedNodes, availResources, command,
+ numClusterNodes, preempt, nmTokens, amRMToken);
+ response.setIncreasedContainers(increasedContainers);
+ response.setDecreasedContainers(decreasedContainers);
+ return response;
+ }
+
/**
* If the ResourceManager needs the
* ApplicationMaster to take some action then it will send an
@@ -270,4 +303,17 @@ public abstract void setIncreasedContainers(
@Unstable
public abstract void setDecreasedContainers(
List decreasedContainers);
+
+ /**
+ * The AMRMToken that belong to this attempt
+ *
+ * @return The AMRMToken that belong to this attempt
+ */
+ @Public
+ @Stable
+ public abstract ByteBuffer getAMRMToken();
+
+ @Private
+ @Unstable
+ public abstract void setAMRMToken(ByteBuffer amRMToken);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
index 79f9f3a..e7f1c14 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
@@ -42,6 +42,7 @@
* Maximum capability for allocated resources in the cluster.
* ApplicationACLs for the application.
* ClientToAMToken master key.
+ * AMRMToken, if AMRMToken has been rolled over
*
*
*
@@ -58,6 +59,17 @@ public static RegisterApplicationMasterResponse newInstance(
Map acls, ByteBuffer key,
List containersFromPreviousAttempt, String queue,
List nmTokensFromPreviousAttempts) {
+ return newInstance(minCapability, maxCapability, acls, key,
+ containersFromPreviousAttempt, queue, nmTokensFromPreviousAttempts, null);
+ }
+
+ @Private
+ @Unstable
+ public static RegisterApplicationMasterResponse newInstance(
+ Resource minCapability, Resource maxCapability,
+ Map acls, ByteBuffer key,
+ List containersFromPreviousAttempt, String queue,
+ List nmTokensFromPreviousAttempts, ByteBuffer amRMToken) {
RegisterApplicationMasterResponse response =
Records.newRecord(RegisterApplicationMasterResponse.class);
response.setMaximumResourceCapability(maxCapability);
@@ -66,6 +78,7 @@ public static RegisterApplicationMasterResponse newInstance(
response.setContainersFromPreviousAttempts(containersFromPreviousAttempt);
response.setNMTokensFromPreviousAttempts(nmTokensFromPreviousAttempts);
response.setQueue(queue);
+ response.setAMRMToken(amRMToken);
return response;
}
@@ -180,4 +193,17 @@ public abstract void setContainersFromPreviousAttempts(
@Private
@Unstable
public abstract void setNMTokensFromPreviousAttempts(List nmTokens);
+
+ /**
+ * The AMRMToken that belong to this attempt
+ *
+ * @return The AMRMToken that belong to this attempt
+ */
+ @Public
+ @Stable
+ public abstract ByteBuffer getAMRMToken();
+
+ @Private
+ @Unstable
+ public abstract void setAMRMToken(ByteBuffer amRMToken);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index a1f6d2e..d16fa47 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -47,6 +47,7 @@ message RegisterApplicationMasterResponseProto {
repeated ContainerProto containers_from_previous_attempts = 4;
optional string queue = 5;
repeated NMTokenProto nm_tokens_from_previous_attempts = 6;
+ optional bytes am_rm_token = 7;
}
message FinishApplicationMasterRequestProto {
@@ -85,6 +86,7 @@ message AllocateResponseProto {
repeated NMTokenProto nm_tokens = 9;
repeated ContainerResourceIncreaseProto increased_containers = 10;
repeated ContainerResourceDecreaseProto decreased_containers = 11;
+ optional bytes am_rm_token = 12;
}
//////////////////////////////////////////////////////
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index 1db7054..33e0938 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -39,7 +40,11 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -64,6 +69,7 @@
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.RackResolver;
import com.google.common.annotations.VisibleForTesting;
@@ -223,6 +229,9 @@ private RegisterApplicationMasterResponse registerApplicationMaster()
if (!response.getNMTokensFromPreviousAttempts().isEmpty()) {
populateNMTokens(response.getNMTokensFromPreviousAttempts());
}
+ if (response.getAMRMToken() != null) {
+ updateAMRMToken(response.getAMRMToken());
+ }
}
return response;
}
@@ -300,6 +309,9 @@ public AllocateResponse allocate(float progressIndicator)
if (!allocateResponse.getNMTokens().isEmpty()) {
populateNMTokens(allocateResponse.getNMTokens());
}
+ if (allocateResponse.getAMRMToken() != null) {
+ updateAMRMToken(allocateResponse.getAMRMToken());
+ }
if (!pendingRelease.isEmpty()
&& !allocateResponse.getCompletedContainersStatuses().isEmpty()) {
removePendingReleaseRequests(allocateResponse
@@ -743,4 +755,17 @@ public synchronized void updateBlacklist(List blacklistAdditions,
"blacklistRemovals in updateBlacklist.");
}
}
+
+ private void updateAMRMToken(ByteBuffer amRMToken) throws IOException {
+ Credentials credentials = new Credentials();
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ dibb.reset(amRMToken);
+ credentials.readTokenStorageStream(dibb);
+ @SuppressWarnings("unchecked")
+ Token amrmToken =
+ (Token) credentials
+ .getToken(AMRMTokenIdentifier.KIND_NAME);
+ UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
+ currentUGI.addToken(amrmToken);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
index 4d7c0a3..c670032 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -55,6 +56,7 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto;
+import com.google.protobuf.ByteString;
import com.google.protobuf.TextFormat;
@Private
@@ -74,7 +76,7 @@
private List updatedNodes = null;
private PreemptionMessage preempt;
-
+ private ByteBuffer amRMToken = null;
public AllocateResponsePBImpl() {
builder = AllocateResponseProto.newBuilder();
@@ -154,6 +156,9 @@ private synchronized void mergeLocalToBuilder() {
getChangeProtoIterable(this.decreasedContainers);
builder.addAllDecreasedContainers(iterable);
}
+ if (this.amRMToken != null) {
+ builder.setAmRmToken(convertToProtoFormat(this.amRMToken));
+ }
}
private synchronized void mergeLocalToProto() {
@@ -357,6 +362,28 @@ public synchronized void setDecreasedContainers(
this.decreasedContainers.addAll(decreasedContainers);
}
+ @Override
+ public synchronized ByteBuffer getAMRMToken() {
+ AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
+ if (amRMToken != null) {
+ return amRMToken;
+ }
+ if (!p.hasAmRmToken()) {
+ return null;
+ }
+ this.amRMToken = convertFromProtoFormat(p.getAmRmToken());
+ return amRMToken;
+ }
+
+ @Override
+ public synchronized void setAMRMToken(ByteBuffer amRMToken) {
+ maybeInitBuilder();
+ if (amRMToken == null) {
+ builder.clearAmRmToken();
+ }
+ this.amRMToken = amRMToken;
+ }
+
private synchronized void initLocalIncreasedContainerList() {
if (this.increasedContainers != null) {
return;
@@ -699,4 +726,12 @@ private synchronized NMTokenProto convertToProtoFormat(NMToken token) {
private synchronized NMToken convertFromProtoFormat(NMTokenProto proto) {
return new NMTokenPBImpl(proto);
}
+
+ private synchronized ByteBuffer convertFromProtoFormat(ByteString byteString) {
+ return ProtoUtils.convertFromProtoFormat(byteString);
+ }
+
+ private synchronized ByteString convertToProtoFormat(ByteBuffer byteBuffer) {
+ return ProtoUtils.convertToProtoFormat(byteBuffer);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
index 06a637a..c17f2f3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
@@ -61,6 +61,7 @@
private Map applicationACLS = null;
private List containersFromPreviousAttempts = null;
private List nmTokens = null;
+ private ByteBuffer amRMToken = null;
public RegisterApplicationMasterResponsePBImpl() {
builder = RegisterApplicationMasterResponseProto.newBuilder();
@@ -122,6 +123,9 @@ private void mergeLocalToBuilder() {
Iterable iterable = getTokenProtoIterable(nmTokens);
builder.addAllNmTokensFromPreviousAttempts(iterable);
}
+ if (this.amRMToken != null) {
+ builder.setAmRmToken(convertToProtoFormat(this.amRMToken));
+ }
}
@@ -282,6 +286,28 @@ public void setQueue(String queue) {
}
}
+ @Override
+ public ByteBuffer getAMRMToken() {
+ RegisterApplicationMasterResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (amRMToken != null) {
+ return amRMToken;
+ }
+ if (!p.hasAmRmToken()) {
+ return null;
+ }
+ this.amRMToken = convertFromProtoFormat(p.getAmRmToken());
+ return amRMToken;
+ }
+
+ @Override
+ public void setAMRMToken(ByteBuffer amRMToken) {
+ maybeInitBuilder();
+ if (amRMToken == null) {
+ builder.clearAmRmToken();
+ }
+ this.amRMToken = amRMToken;
+ }
private void initContainersPreviousAttemptList() {
RegisterApplicationMasterResponseProtoOrBuilder p =
@@ -387,4 +413,12 @@ private NMTokenProto convertToProtoFormat(NMToken token) {
private NMToken convertFromProtoFormat(NMTokenProto proto) {
return new NMTokenPBImpl(proto);
}
+
+ private ByteBuffer convertFromProtoFormat(ByteString byteString) {
+ return ProtoUtils.convertFromProtoFormat(byteString);
+ }
+
+ private ByteString convertToProtoFormat(ByteBuffer byteBuffer) {
+ return ProtoUtils.convertToProtoFormat(byteBuffer);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index e60add4..8253f22 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -22,6 +22,7 @@
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -35,10 +36,13 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
@@ -80,6 +84,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
@@ -89,6 +94,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -186,7 +192,7 @@ private AMRMTokenIdentifier selectAMRMTokenIdentifier(
return result;
}
- private ApplicationAttemptId authorizeRequest()
+ private AMRMTokenIdentifier authorizeRequest()
throws YarnException {
UserGroupInformation remoteUgi;
@@ -223,7 +229,7 @@ private ApplicationAttemptId authorizeRequest()
throw RPCUtil.getRemoteException(message);
}
- return appTokenIdentifier.getApplicationAttemptId();
+ return appTokenIdentifier;
}
@Override
@@ -231,7 +237,9 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
- ApplicationAttemptId applicationAttemptId = authorizeRequest();
+ AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+ ApplicationAttemptId applicationAttemptId =
+ amrmTokenIdentifier.getApplicationAttemptId();
ApplicationId appID = applicationAttemptId.getApplicationId();
AllocateResponseLock lock = responseMap.get(applicationAttemptId);
@@ -292,6 +300,25 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
.getMasterKey(applicationAttemptId).getEncoded()));
}
+ // update AMRMToken if the token is rolled-up
+ MasterKeyData nextMasterKey =
+ this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData();
+ if (nextMasterKey != null
+ && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
+ .getKeyId()) {
+ Token amrmToken =
+ rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+ applicationAttemptId);
+ ((RMAppAttemptImpl) app.getRMAppAttempt(applicationAttemptId))
+ .setAMRMToken(amrmToken);
+ Credentials credentials = new Credentials();
+ credentials.addToken(AMRMTokenIdentifier.KIND_NAME, amrmToken);
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ response.setAMRMToken(ByteBuffer.wrap(dob.getData(), 0,
+ dob.getLength()));
+ }
+
// For work-preserving AM restart, retrieve previous attempts' containers
// and corresponding NM tokens.
List transferredContainers =
@@ -330,7 +357,8 @@ public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) throws YarnException,
IOException {
- ApplicationAttemptId applicationAttemptId = authorizeRequest();
+ ApplicationAttemptId applicationAttemptId =
+ authorizeRequest().getApplicationAttemptId();
AllocateResponseLock lock = responseMap.get(applicationAttemptId);
if (lock == null) {
@@ -405,7 +433,10 @@ public boolean hasApplicationMasterRegistered(
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
- ApplicationAttemptId appAttemptId = authorizeRequest();
+ AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+
+ ApplicationAttemptId appAttemptId =
+ amrmTokenIdentifier.getApplicationAttemptId();
this.amLivelinessMonitor.receivedPing(appAttemptId);
@@ -554,6 +585,25 @@ public AllocateResponse allocate(AllocateRequest request)
allocateResponse
.setPreemptionMessage(generatePreemptionMessage(allocation));
+ // update AMRMToken if the token is rolled-up
+ MasterKeyData nextMasterKey =
+ this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData();
+
+ if (nextMasterKey != null
+ && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
+ .getKeyId()) {
+ Token amrmToken =
+ rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+ appAttemptId);
+ ((RMAppAttemptImpl)appAttempt).setAMRMToken(amrmToken);
+ Credentials credentials = new Credentials();
+ credentials.addToken(AMRMTokenIdentifier.KIND_NAME, amrmToken);
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ allocateResponse.setAMRMToken(ByteBuffer.wrap(dob.getData(), 0,
+ dob.getLength()));
+ }
+
/*
* As we are updating the response inside the lock object so we don't
* need to worry about unregister call occurring in between (which
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 50a0755..e66f0d8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -37,6 +37,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -553,7 +554,22 @@ public SecretKey getClientTokenMasterKey() {
@Override
public Token getAMRMToken() {
- return this.amrmToken;
+ this.readLock.lock();
+ try {
+ return this.amrmToken;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Private
+ public void setAMRMToken(Token lastToken) {
+ this.writeLock.lock();
+ try {
+ this.amrmToken = lastToken;
+ } finally {
+ this.writeLock.unlock();
+ }
}
@Override