diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
index 4b32c04..aeb15ad 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
@@ -28,6 +28,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
@@ -50,6 +51,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
@@ -57,6 +59,8 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -343,4 +347,16 @@ public void setSignalled(boolean isSignalled) {
protected boolean isApplicationMasterRegistered() {
return isApplicationMasterRegistered;
}
+
+ protected void updateAMRMToken(Token token) throws IOException {
+ org.apache.hadoop.security.token.Token amrmToken =
+ new org.apache.hadoop.security.token.Token(token
+ .getIdentifier().array(), token.getPassword().array(), new Text(
+ token.getKind()), new Text(token.getService()));
+ UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
+ if (UserGroupInformation.isSecurityEnabled()) {
+ currentUGI = UserGroupInformation.getLoginUser();
+ }
+ currentUGI.addToken(amrmToken);
+ }
}
diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 307cdfe..03f5625 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -674,7 +674,12 @@ public void rampDownReduces(int rampDown) {
nmToken.getToken());
}
}
-
+
+ // Setting AMRMToken
+ if (response.getAMRMToken() != null) {
+ updateAMRMToken(response.getAMRMToken());
+ }
+
List finishedContainers = response.getCompletedContainersStatuses();
// propagate preemption requests
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
index 0e27f32..3a843c0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.util.Records;
/**
@@ -56,6 +57,7 @@
* A list of nodes whose status has been updated.
* The number of available nodes in a cluster.
* A description of resources requested back by the cluster
+ * AMRMToken, if AMRMToken has been rolled over
*
*
*
@@ -71,7 +73,7 @@ public static AllocateResponse newInstance(int responseId,
List completedContainers,
List allocatedContainers, List updatedNodes,
Resource availResources, AMCommand command, int numClusterNodes,
- PreemptionMessage preempt, List nmTokens) {
+ PreemptionMessage preempt, List nmTokens, Token amRMToken) {
AllocateResponse response = Records.newRecord(AllocateResponse.class);
response.setNumClusterNodes(numClusterNodes);
response.setResponseId(responseId);
@@ -82,9 +84,23 @@ public static AllocateResponse newInstance(int responseId,
response.setAMCommand(command);
response.setPreemptionMessage(preempt);
response.setNMTokens(nmTokens);
+ response.setAMRMToken(amRMToken);
return response;
}
-
+
+ @Public
+ @Stable
+ public static AllocateResponse newInstance(int responseId,
+ List completedContainers,
+ List allocatedContainers, List updatedNodes,
+ Resource availResources, AMCommand command, int numClusterNodes,
+ PreemptionMessage preempt, List nmTokens) {
+ AllocateResponse response = newInstance(responseId, completedContainers,
+ allocatedContainers, updatedNodes, availResources, command,
+ numClusterNodes, preempt, nmTokens, null);
+ return response;
+ }
+
@Public
@Stable
public static AllocateResponse newInstance(int responseId,
@@ -102,6 +118,23 @@ public static AllocateResponse newInstance(int responseId,
return response;
}
+ @Public
+ @Stable
+ public static AllocateResponse newInstance(int responseId,
+ List completedContainers,
+ List allocatedContainers, List updatedNodes,
+ Resource availResources, AMCommand command, int numClusterNodes,
+ PreemptionMessage preempt, List nmTokens, Token amRMToken,
+ List increasedContainers,
+ List decreasedContainers) {
+ AllocateResponse response = newInstance(responseId, completedContainers,
+ allocatedContainers, updatedNodes, availResources, command,
+ numClusterNodes, preempt, nmTokens, amRMToken);
+ response.setIncreasedContainers(increasedContainers);
+ response.setDecreasedContainers(decreasedContainers);
+ return response;
+ }
+
/**
* If the ResourceManager needs the
* ApplicationMaster to take some action then it will send an
@@ -270,4 +303,17 @@ public abstract void setIncreasedContainers(
@Unstable
public abstract void setDecreasedContainers(
List decreasedContainers);
+
+ /**
+ * The AMRMToken that belong to this attempt
+ *
+ * @return The AMRMToken that belong to this attempt
+ */
+ @Public
+ @Unstable
+ public abstract Token getAMRMToken();
+
+ @Private
+ @Unstable
+ public abstract void setAMRMToken(Token amRMToken);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index a1f6d2e..df8784b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -85,6 +85,7 @@ message AllocateResponseProto {
repeated NMTokenProto nm_tokens = 9;
repeated ContainerResourceIncreaseProto increased_containers = 10;
repeated ContainerResourceDecreaseProto decreased_containers = 11;
+ optional hadoop.common.TokenProto am_rm_token = 12;
}
//////////////////////////////////////////////////////
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index 1db7054..e36d7ad 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -39,7 +39,9 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -56,6 +58,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@ -64,6 +67,7 @@
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.RackResolver;
import com.google.common.annotations.VisibleForTesting;
@@ -300,6 +304,9 @@ public AllocateResponse allocate(float progressIndicator)
if (!allocateResponse.getNMTokens().isEmpty()) {
populateNMTokens(allocateResponse.getNMTokens());
}
+ if (allocateResponse.getAMRMToken() != null) {
+ updateAMRMToken(allocateResponse.getAMRMToken());
+ }
if (!pendingRelease.isEmpty()
&& !allocateResponse.getCompletedContainersStatuses().isEmpty()) {
removePendingReleaseRequests(allocateResponse
@@ -743,4 +750,16 @@ public synchronized void updateBlacklist(List blacklistAdditions,
"blacklistRemovals in updateBlacklist.");
}
}
+
+ private void updateAMRMToken(Token token) throws IOException {
+ org.apache.hadoop.security.token.Token amrmToken =
+ new org.apache.hadoop.security.token.Token(token
+ .getIdentifier().array(), token.getPassword().array(), new Text(
+ token.getKind()), new Text(token.getService()));
+ UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
+ if (UserGroupInformation.isSecurityEnabled()) {
+ currentUGI = UserGroupInformation.getLoginUser();
+ }
+ currentUGI.addToken(amrmToken);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 5961532..7991073 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -27,18 +27,21 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.junit.Assert;
-
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
@@ -71,6 +74,8 @@
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -93,6 +98,9 @@
static ApplicationAttemptId attemptId = null;
static int nodeCount = 3;
+ static final int rolling_interval_sec = 13;
+ static final long am_expire_ms = 4000;
+
static Resource capability;
static Priority priority;
static Priority priority2;
@@ -106,6 +114,10 @@
public static void setup() throws Exception {
// start minicluster
conf = new YarnConfiguration();
+ conf.setLong(
+ YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
+ rolling_interval_sec);
+ conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms);
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
@@ -809,4 +821,122 @@ private void sleep(int sleepTime) {
}
}
+ @Test(timeout = 60000)
+ public void testAMRMClientOnAMRMTokenRollOver() throws YarnException,
+ IOException {
+ AMRMClient amClient = null;
+ try {
+ // start am rm client
+ amClient = AMRMClient. createAMRMClient();
+
+ amClient.init(conf);
+ amClient.start();
+
+ Long startTime = System.currentTimeMillis();
+ amClient.registerApplicationMaster("Host", 10000, "");
+
+ org.apache.hadoop.security.token.Token amrmToken_1 =
+ getAMRMToken();
+ Assert.assertNotNull(amrmToken_1);
+ Assert
+ .assertEquals(amrmToken_1.decodeIdentifier().getKeyId(), yarnCluster
+ .getResourceManager().getRMContext().getAMRMTokenSecretManager()
+ .getMasterKey().getMasterKey().getKeyId());
+
+ // Wait for enough time and make sure the roll_over happens
+ // At mean time, the old AMRMToken should continue to work
+ while (System.currentTimeMillis() - startTime <
+ rolling_interval_sec * 1000) {
+ amClient.allocate(0.1f);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ amClient.allocate(0.1f);
+
+ org.apache.hadoop.security.token.Token amrmToken_2 =
+ getAMRMToken();
+ Assert.assertNotNull(amrmToken_2);
+ Assert
+ .assertEquals(amrmToken_2.decodeIdentifier().getKeyId(), yarnCluster
+ .getResourceManager().getRMContext().getAMRMTokenSecretManager()
+ .getMasterKey().getMasterKey().getKeyId());
+
+ Assert.assertNotEquals(amrmToken_1, amrmToken_2);
+
+ // can do the allocate call with latest AMRMToken
+ amClient.allocate(0.1f);
+
+ // Make sure previous token has been rolled-over
+ // and can not use this rolled-over token to make a allocate all.
+ while (true) {
+ if (amrmToken_2.decodeIdentifier().getKeyId() != yarnCluster
+ .getResourceManager().getRMContext().getAMRMTokenSecretManager()
+ .getCurrnetMasterKeyData().getMasterKey().getKeyId()) {
+ if (yarnCluster.getResourceManager().getRMContext()
+ .getAMRMTokenSecretManager().getNextMasterKeyData() == null) {
+ break;
+ } else if (amrmToken_2.decodeIdentifier().getKeyId() != yarnCluster
+ .getResourceManager().getRMContext().getAMRMTokenSecretManager()
+ .getNextMasterKeyData().getMasterKey().getKeyId()) {
+ break;
+ }
+ }
+ amClient.allocate(0.1f);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // DO NOTHING
+ }
+ }
+
+ try {
+ UserGroupInformation testUser =
+ UserGroupInformation.createRemoteUser("testUser");
+ SecurityUtil.setTokenService(amrmToken_2, yarnCluster
+ .getResourceManager().getApplicationMasterService().getBindAddress());
+ testUser.addToken(amrmToken_2);
+ testUser.doAs(new PrivilegedAction() {
+ @Override
+ public ApplicationMasterProtocol run() {
+ return (ApplicationMasterProtocol) YarnRPC.create(conf).getProxy(
+ ApplicationMasterProtocol.class,
+ yarnCluster.getResourceManager().getApplicationMasterService()
+ .getBindAddress(), conf);
+ }
+ }).allocate(Records.newRecord(AllocateRequest.class));
+ Assert.fail("The old Token should not work");
+ } catch (Exception ex) {
+ // Should have exception
+ }
+
+ amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+ null, null);
+
+ } finally {
+ if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+ amClient.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private org.apache.hadoop.security.token.Token
+ getAMRMToken() throws IOException {
+ Credentials credentials =
+ UserGroupInformation.getCurrentUser().getCredentials();
+ Iterator> iter =
+ credentials.getAllTokens().iterator();
+ while (iter.hasNext()) {
+ org.apache.hadoop.security.token.Token> token = iter.next();
+ if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+ return (org.apache.hadoop.security.token.Token)
+ token;
+ }
+ }
+ return null;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
index 4d7c0a3..f2796fd 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container;
@@ -35,6 +36,7 @@
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceDecreasePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceIncreasePBImpl;
@@ -44,6 +46,7 @@
import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceDecreaseProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceIncreaseProto;
@@ -74,7 +77,7 @@
private List updatedNodes = null;
private PreemptionMessage preempt;
-
+ private Token amrmToken = null;
public AllocateResponsePBImpl() {
builder = AllocateResponseProto.newBuilder();
@@ -154,6 +157,9 @@ private synchronized void mergeLocalToBuilder() {
getChangeProtoIterable(this.decreasedContainers);
builder.addAllDecreasedContainers(iterable);
}
+ if (this.amrmToken != null) {
+ builder.setAmRmToken(convertToProtoFormat(this.amrmToken));
+ }
}
private synchronized void mergeLocalToProto() {
@@ -357,6 +363,28 @@ public synchronized void setDecreasedContainers(
this.decreasedContainers.addAll(decreasedContainers);
}
+ @Override
+ public synchronized Token getAMRMToken() {
+ AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
+ if (amrmToken != null) {
+ return amrmToken;
+ }
+ if (!p.hasAmRmToken()) {
+ return null;
+ }
+ this.amrmToken = convertFromProtoFormat(p.getAmRmToken());
+ return amrmToken;
+ }
+
+ @Override
+ public synchronized void setAMRMToken(Token amRMToken) {
+ maybeInitBuilder();
+ if (amRMToken == null) {
+ builder.clearAmRmToken();
+ }
+ this.amrmToken = amRMToken;
+ }
+
private synchronized void initLocalIncreasedContainerList() {
if (this.increasedContainers != null) {
return;
@@ -699,4 +727,12 @@ private synchronized NMTokenProto convertToProtoFormat(NMToken token) {
private synchronized NMToken convertFromProtoFormat(NMTokenProto proto) {
return new NMTokenPBImpl(proto);
}
+
+ private TokenPBImpl convertFromProtoFormat(TokenProto p) {
+ return new TokenPBImpl(p);
+ }
+
+ private TokenProto convertToProtoFormat(Token t) {
+ return ((TokenPBImpl)t).getProto();
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index eda4c7b..d77180c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -39,6 +39,7 @@
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
@@ -80,6 +81,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
@@ -89,6 +91,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -189,7 +192,7 @@ private AMRMTokenIdentifier selectAMRMTokenIdentifier(
return result;
}
- private ApplicationAttemptId authorizeRequest()
+ private AMRMTokenIdentifier authorizeRequest()
throws YarnException {
UserGroupInformation remoteUgi;
@@ -226,7 +229,7 @@ private ApplicationAttemptId authorizeRequest()
throw RPCUtil.getRemoteException(message);
}
- return appTokenIdentifier.getApplicationAttemptId();
+ return appTokenIdentifier;
}
@Override
@@ -234,7 +237,9 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
- ApplicationAttemptId applicationAttemptId = authorizeRequest();
+ AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+ ApplicationAttemptId applicationAttemptId =
+ amrmTokenIdentifier.getApplicationAttemptId();
ApplicationId appID = applicationAttemptId.getApplicationId();
AllocateResponseLock lock = responseMap.get(applicationAttemptId);
@@ -333,7 +338,8 @@ public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) throws YarnException,
IOException {
- ApplicationAttemptId applicationAttemptId = authorizeRequest();
+ ApplicationAttemptId applicationAttemptId =
+ authorizeRequest().getApplicationAttemptId();
AllocateResponseLock lock = responseMap.get(applicationAttemptId);
if (lock == null) {
@@ -408,7 +414,10 @@ public boolean hasApplicationMasterRegistered(
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
- ApplicationAttemptId appAttemptId = authorizeRequest();
+ AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+
+ ApplicationAttemptId appAttemptId =
+ amrmTokenIdentifier.getApplicationAttemptId();
this.amLivelinessMonitor.receivedPing(appAttemptId);
@@ -557,6 +566,23 @@ public AllocateResponse allocate(AllocateRequest request)
allocateResponse
.setPreemptionMessage(generatePreemptionMessage(allocation));
+ // update AMRMToken if the token is rolled-up
+ MasterKeyData nextMasterKey =
+ this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData();
+
+ if (nextMasterKey != null
+ && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
+ .getKeyId()) {
+ Token amrmToken =
+ rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+ appAttemptId);
+ ((RMAppAttemptImpl)appAttempt).setAMRMToken(amrmToken);
+ allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
+ .newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
+ .toString(), amrmToken.getPassword(), amrmToken.getService()
+ .toString()));
+ }
+
/*
* As we are updating the response inside the lock object so we don't
* need to worry about unregister call occurring in between (which
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
index a1c1a40..0dd9ba1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
@@ -58,6 +58,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -226,7 +227,7 @@ private void setupTokens(
}
// Add AMRMToken
- Token amrmToken = getAMRMToken();
+ Token amrmToken = createAndSetAMRMToken();
if (amrmToken != null) {
credentials.addToken(amrmToken.getService(), amrmToken);
}
@@ -236,8 +237,12 @@ private void setupTokens(
}
@VisibleForTesting
- protected Token getAMRMToken() {
- return application.getAMRMToken();
+ protected Token createAndSetAMRMToken() {
+ Token amrmToken =
+ this.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+ application.getAppAttemptId());
+ ((RMAppAttemptImpl)application).setAMRMToken(amrmToken);
+ return amrmToken;
}
@SuppressWarnings("unchecked")
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index 2f8a944..808d737 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -71,6 +71,10 @@
* FileSystem interface. Does not use directories so that simple key-value
* stores can be used. The retry policy for the real filesystem client must be
* configured separately to enable retry of filesystem operations when needed.
+ *
+ * In current 1.2 RMStateVersion, AMRMTokenSecretManager state has been saved
+ * separately. The currentMasterkey and nextMasterkey have been stored.
+ * Also, AMRMToken has been removed from ApplicationAttemptState.
*/
public class FileSystemRMStateStore extends RMStateStore {
@@ -78,7 +82,7 @@
protected static final String ROOT_DIR_NAME = "FSRMStateRoot";
protected static final Version CURRENT_VERSION_INFO = Version
- .newInstance(1, 1);
+ .newInstance(1, 2);
protected static final String AMRMTOKEN_SECRET_MANAGER_NODE =
"AMRMTokenSecretManagerNode";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index da08d80..876a2ea 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -32,7 +32,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -45,7 +44,6 @@
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
@@ -769,10 +767,7 @@ protected abstract void removeApplicationStateInternal(
public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) {
Credentials credentials = new Credentials();
- Token appToken = appAttempt.getAMRMToken();
- if(appToken != null){
- credentials.addToken(AM_RM_TOKEN_SERVICE, appToken);
- }
+
SecretKey clientTokenMasterKey =
appAttempt.getClientTokenMasterKey();
if(clientTokenMasterKey != null){
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index bb379c5..32bead0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -78,6 +78,11 @@
import com.google.common.annotations.VisibleForTesting;
+/**
+ * In current 1.2 RMStateVersion, AMRMTokenSecretManager state has been saved
+ * separately. The currentMasterkey and nextMasterkey have been stored.
+ * Also, AMRMToken has been removed from ApplicationAttemptState.
+ */
@Private
@Unstable
public class ZKRMStateStore extends RMStateStore {
@@ -87,7 +92,7 @@
protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
protected static final Version CURRENT_VERSION_INFO = Version
- .newInstance(1, 1);
+ .newInstance(1, 2);
private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
"RMDelegationTokensRoot";
private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 50a0755..23a9975 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -37,6 +37,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -553,7 +554,22 @@ public SecretKey getClientTokenMasterKey() {
@Override
public Token getAMRMToken() {
- return this.amrmToken;
+ this.readLock.lock();
+ try {
+ return this.amrmToken;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Private
+ public void setAMRMToken(Token lastToken) {
+ this.writeLock.lock();
+ try {
+ this.amrmToken = lastToken;
+ } finally {
+ this.writeLock.unlock();
+ }
}
@Override
@@ -707,7 +723,8 @@ public void recover(RMState state) throws Exception {
this.attemptMetrics.setIsPreempted();
}
setMasterContainer(attemptState.getMasterContainer());
- recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials());
+ recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(),
+ attemptState.getState());
this.recoveredFinalState = attemptState.getState();
this.originalTrackingUrl = attemptState.getFinalTrackingUrl();
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
@@ -719,8 +736,8 @@ public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
this.justFinishedContainers = attempt.getJustFinishedContainers();
}
- private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
- throws IOException {
+ private void recoverAppAttemptCredentials(Credentials appAttemptTokens,
+ RMAppAttemptState state) throws IOException {
if (appAttemptTokens == null) {
return;
}
@@ -732,12 +749,25 @@ private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
.registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes);
}
- // Only one AMRMToken is stored per-attempt, so this should be fine. Can't
- // use TokenSelector as service may change - think fail-over.
- this.amrmToken =
- (Token) appAttemptTokens
- .getToken(RMStateStore.AM_RM_TOKEN_SERVICE);
- rmContext.getAMRMTokenSecretManager().addPersistedPassword(this.amrmToken);
+ // Right now the master key of AMRMToken is stored separately.
+ // Based on the current state of RMAppAttempt,
+ // we could recover AMRMToken differently.
+ if (!(state == RMAppAttemptState.FAILED
+ || state == RMAppAttemptState.FINISHED
+ || state == RMAppAttemptState.KILLED)) {
+ // If this RMAppAttemp is not at final state.
+ // assign the latest AMRMToken to this RMAppAttempt.
+ // case 1: AM has the AMRMToken which is generated based on
+ // currentMasterKey/nextMasterKey, and
+ // AMRMTokenSecretManager only recovered currentMasterKey/nextMasterKey.
+ // case 2: AM has the AMRMToken which is generated based on
+ // currentMasterKey, but AMRMTokenSecretManager only recovered
+ // currentMasterKey and nextMasterKey. For the next allocate call,
+ // AM will get the latest AMRMToken.
+ this.amrmToken =
+ rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+ applicationAttemptId);
+ }
}
private static class BaseTransition implements
@@ -773,11 +803,6 @@ public void transition(RMAppAttemptImpl appAttempt,
.createMasterKey(appAttempt.applicationAttemptId);
}
- // create AMRMToken
- appAttempt.amrmToken =
- appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
- appAttempt.applicationAttemptId);
-
// Add the applicationAttempt to the scheduler and inform the scheduler
// whether to transfer the state from previous attempt.
appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
@@ -890,6 +915,7 @@ public void run() {
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
appAttempt.checkAttemptStoreError(event);
+
appAttempt.launchAttempt();
}
}
@@ -1179,6 +1205,12 @@ public boolean shouldCountTowardsMaxAttemptRetry() {
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
appAttempt.checkAttemptStoreError(event);
+
+ // create AMRMToken
+ appAttempt.amrmToken =
+ appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+ appAttempt.applicationAttemptId);
+
// TODO Today unmanaged AM client is waiting for app state to be Accepted to
// launch the AM. This is broken since we changed to start the attempt
// after the application is Accepted. We may need to introduce an attempt
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 6949a81..a7f6240 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -53,6 +54,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
@@ -62,6 +64,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -412,6 +415,13 @@ public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId)
throws Exception {
MockAM am = new MockAM(getRMContext(), masterService, appAttemptId);
am.waitForState(RMAppAttemptState.ALLOCATED);
+ //create and set AMRMToken
+ Token amrmToken =
+ this.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+ appAttemptId);
+ ((RMAppAttemptImpl) this.rmContext.getRMApps()
+ .get(appAttemptId.getApplicationId()).getRMAppAttempt(appAttemptId))
+ .setAMRMToken(amrmToken);
getRMContext()
.getDispatcher()
.getEventHandler()
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java
index 0ea2b5e..0c4d262 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMWithCustomAMLauncher.java
@@ -59,8 +59,9 @@ protected ContainerManagementProtocol getContainerMgrProxy(
return containerManager;
}
@Override
- protected Token getAMRMToken() {
- Token amRmToken = super.getAMRMToken();
+ protected Token createAndSetAMRMToken() {
+ Token amRmToken =
+ super.createAndSetAMRMToken();
InetSocketAddress serviceAddr =
getConfig().getSocketAddr(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index dc3e9f1..e60ac6a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -1208,18 +1208,13 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
attemptState.getMasterContainer().getId());
- // the appToken and clientTokenMasterKey that are generated when
+ // the clientTokenMasterKey that are generated when
// RMAppAttempt is created,
- HashSet> tokenSet = new HashSet>();
- tokenSet.add(attempt1.getAMRMToken());
byte[] clientTokenMasterKey =
attempt1.getClientTokenMasterKey().getEncoded();
// assert application credentials are saved
Credentials savedCredentials = attemptState.getAppAttemptCredentials();
- HashSet> savedTokens = new HashSet>();
- savedTokens.addAll(savedCredentials.getAllTokens());
- Assert.assertEquals(tokenSet, savedTokens);
Assert.assertArrayEquals("client token master key not saved",
clientTokenMasterKey, savedCredentials.getSecretKey(
RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
@@ -1232,11 +1227,8 @@ public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
rm2.getRMContext().getRMApps().get(app1.getApplicationId());
RMAppAttempt loadedAttempt1 = loadedApp1.getRMAppAttempt(attemptId1);
- // assert loaded attempt recovered attempt tokens
+ // assert loaded attempt recovered
Assert.assertNotNull(loadedAttempt1);
- savedTokens.clear();
- savedTokens.add(loadedAttempt1.getAMRMToken());
- Assert.assertEquals(tokenSet, savedTokens);
// assert client token master key is recovered back to api-versioned
// client token master key
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 5d3e51a..0da3c55 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -198,8 +198,6 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
// create application token and client token key for attempt1
Token appAttemptToken1 =
generateAMRMToken(attemptId1, appTokenMgr);
- HashSet> attemptTokenSet1 = new HashSet>();
- attemptTokenSet1.add(appAttemptToken1);
SecretKey clientTokenKey1 =
clientToAMTokenMgr.createMasterKey(attemptId1);
@@ -214,8 +212,6 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
// create application token and client token key for attempt2
Token appAttemptToken2 =
generateAMRMToken(attemptId2, appTokenMgr);
- HashSet> attemptTokenSet2 = new HashSet>();
- attemptTokenSet2.add(appAttemptToken2);
SecretKey clientTokenKey2 =
clientToAMTokenMgr.createMasterKey(attemptId2);
@@ -280,10 +276,6 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
assertEquals(-1000, attemptState.getAMContainerExitStatus());
// attempt1 container is loaded correctly
assertEquals(containerId1, attemptState.getMasterContainer().getId());
- // attempt1 applicationToken is loaded correctly
- HashSet> savedTokens = new HashSet>();
- savedTokens.addAll(attemptState.getAppAttemptCredentials().getAllTokens());
- assertEquals(attemptTokenSet1, savedTokens);
// attempt1 client token master key is loaded correctly
assertArrayEquals(clientTokenKey1.getEncoded(),
attemptState.getAppAttemptCredentials()
@@ -295,10 +287,6 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
assertEquals(attemptId2, attemptState.getAttemptId());
// attempt2 container is loaded correctly
assertEquals(containerId2, attemptState.getMasterContainer().getId());
- // attempt2 applicationToken is loaded correctly
- savedTokens.clear();
- savedTokens.addAll(attemptState.getAppAttemptCredentials().getAllTokens());
- assertEquals(attemptTokenSet2, savedTokens);
// attempt2 client token master key is loaded correctly
assertArrayEquals(clientTokenKey2.getEncoded(),
attemptState.getAppAttemptCredentials()
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
index 01a6973..d389a15 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
@@ -348,7 +348,6 @@ private void testAppAttemptSubmittedState() {
assertNull(applicationAttempt.createClientToken("some client"));
}
assertNull(applicationAttempt.createClientToken(null));
- assertNotNull(applicationAttempt.getAMRMToken());
// Check events
verify(masterService).
registerAppAttempt(applicationAttempt.getAppAttemptId());
@@ -444,7 +443,6 @@ private void testAppAttemptAllocatedState(Container amContainer) {
assertEquals(RMAppAttemptState.ALLOCATED,
applicationAttempt.getAppAttemptState());
assertEquals(amContainer, applicationAttempt.getMasterContainer());
-
// Check events
verify(applicationMasterLauncher).handle(any(AMLauncherEvent.class));
verify(scheduler, times(2)).
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
index 14385c4..b3dc35f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
@@ -18,15 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.security;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collection;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@@ -34,6 +35,7 @@
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -43,6 +45,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
@@ -53,6 +56,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
@@ -328,6 +332,51 @@ public void testMasterKeyRollOver() throws Exception {
}
}
+ @Test (timeout = 20000)
+ public void testAMRMMasterKeysUpdate() throws Exception {
+ MockRM rm = new MockRM(conf) {
+ @Override
+ protected void doSecureLogin() throws IOException {
+ // Skip the login.
+ }
+ };
+ rm.start();
+ MockNM nm = rm.registerNode("127.0.0.1:1234", 8000);
+ RMApp app = rm.submitApp(200);
+ MockAM am = MockRM.launchAndRegisterAM(app, rm, nm);
+
+ // Do allocate. Should not update AMRMToken
+ AllocateResponse response =
+ am.allocate(Records.newRecord(AllocateRequest.class));
+ Assert.assertNull(response.getAMRMToken());
+
+ // roll over the master key
+ // Do allocate again. the AM should get the latest AMRMToken
+ rm.getRMContext().getAMRMTokenSecretManager().rollMasterKey();
+ response = am.allocate(Records.newRecord(AllocateRequest.class));
+ Assert.assertNotNull(response.getAMRMToken());
+
+ Token amrmToken =
+ ConverterUtils.convertFromYarn(response.getAMRMToken(), new Text(
+ response.getAMRMToken().getService()));
+
+ Assert.assertEquals(amrmToken.decodeIdentifier().getKeyId(), rm
+ .getRMContext().getAMRMTokenSecretManager().getMasterKey().getMasterKey()
+ .getKeyId());
+
+ // Do allocate again. The master key does not update.
+ // AM should not update its AMRMToken either
+ response = am.allocate(Records.newRecord(AllocateRequest.class));
+ Assert.assertNull(response.getAMRMToken());
+
+ // Activate the next master key. Since there is new master key generated
+ // in AMRMTokenSecretManager. The AMRMToken will not get updated for AM
+ rm.getRMContext().getAMRMTokenSecretManager().activateNextMasterKey();
+ response = am.allocate(Records.newRecord(AllocateRequest.class));
+ Assert.assertNull(response.getAMRMToken());
+ rm.stop();
+ }
+
private ApplicationMasterProtocol createRMClient(final MockRM rm,
final Configuration conf, final YarnRPC rpc,
UserGroupInformation currentUser) {