diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 8dac19b..dd26dae 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -21,13 +21,16 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; @@ -39,7 +42,9 @@ import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -56,6 +61,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import com.google.common.annotations.VisibleForTesting; @@ -74,6 +80,8 @@ protected ApplicationMasterProtocol scheduler; private final ClientService clientService; protected int lastResponseID; + protected AtomicInteger currentAMRMTokenKeyId = new AtomicInteger( + Integer.MIN_VALUE); private Resource maxContainerCapability; protected Map applicationACLs; private volatile long lastHeartbeatTime; @@ -105,6 +113,18 @@ protected void serviceInit(Configuration conf) throws Exception { rmPollInterval = conf.getInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, MRJobConfig.DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS); + // Initiate current AMRMToken KeyId + Credentials credentials = + UserGroupInformation.getCurrentUser().getCredentials(); + Iterator> iter = credentials.getAllTokens().iterator(); + while (iter.hasNext()) { + Token token = iter.next(); + if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { + currentAMRMTokenKeyId.set( + ((AMRMTokenIdentifier) token.decodeIdentifier()).getKeyId()); + break; + } + } } @Override @@ -152,6 +172,7 @@ protected void register() { .getAMWebappScheme(getConfig()) + serviceAddr.getHostName() + ":" + clientService.getHttpPort()); } + request.setCurrentAMRMTokenKeyId(this.currentAMRMTokenKeyId.get()); RegisterApplicationMasterResponse response = scheduler.registerApplicationMaster(request); isApplicationMasterRegistered = true; @@ -166,6 +187,9 @@ protected void register() { String queue = response.getQueue(); LOG.info("queue: " + queue); job.setQueueName(queue); + if (response.getAMRMToken() != null) { + updateAMRMToken(response.getAMRMToken()); + } } catch (Exception are) { LOG.error("Exception while registering", are); throw new YarnRuntimeException(are); @@ -335,4 +359,19 @@ public void setSignalled(boolean isSignalled) { protected boolean isApplicationMasterRegistered() { return isApplicationMasterRegistered; } + + protected void updateAMRMToken(ByteBuffer amRMToken) throws IOException { + Credentials credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(amRMToken); + credentials.readTokenStorageStream(dibb); + @SuppressWarnings("unchecked") + Token amrmToken = + (Token) credentials + .getToken(AMRMTokenIdentifier.KIND_NAME); + UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser(); + currentUGI.addToken(amrmToken); + this.currentAMRMTokenKeyId.set(amrmToken.decodeIdentifier().getKeyId()); + LOG.info("Update AMRMToken: " + amrmToken.toString()); + } } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 11bc406..1f560fc 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -665,6 +665,10 @@ public void rampDownReduces(int rampDown) { } } + // Setting AMRMToken + if (response.getAMRMToken() != null) { + updateAMRMToken(response.getAMRMToken()); + } List finishedContainers = response.getCompletedContainersStatuses(); // propagate preemption requests diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index 1824211..6839025 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -179,7 +179,8 @@ protected AllocateResponse makeRemoteRequest() throws IOException { AllocateRequest allocateRequest = AllocateRequest.newInstance(lastResponseID, super.getApplicationProgress(), new ArrayList(ask), - new ArrayList(release), blacklistRequest); + new ArrayList(release), blacklistRequest, + this.currentAMRMTokenKeyId.get()); AllocateResponse allocateResponse; try { allocateResponse = scheduler.allocate(allocateRequest);