diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index c72deda..2a58383 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionContainer; import org.apache.hadoop.yarn.api.records.PreemptionContract; @@ -446,6 +447,7 @@ public AllocateResponse allocate(AllocateRequest request) } allocateResponse.setAllocatedContainers(allocation.getContainers()); + allocateResponse.setNMTokens(allocation.getNMTokens()); allocateResponse.setCompletedContainersStatuses(appAttempt .pullJustFinishedContainers()); allocateResponse.setResponseId(lastResponse.getResponseId() + 1); @@ -455,14 +457,7 @@ public AllocateResponse allocate(AllocateRequest request) // add preemption to the allocateResponse message (if any) allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation)); - - // Adding NMTokens for allocated containers. - if (!allocation.getContainers().isEmpty()) { - // nmtokens are assigned only when containers are assigned. By default - // AM will receive NMToken for itself. - allocateResponse.setNMTokens(allocation.getNMTokens()); - } - + // before returning response, verify in sync AllocateResponse oldResponse = responseMap.put(appAttemptId, allocateResponse); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index a1c1a40..731ad6d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.security.PrivilegedAction; import java.util.ArrayList; @@ -55,6 +56,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; @@ -146,7 +148,7 @@ private void cleanup() throws IOException, YarnException { // Protected. For tests. protected ContainerManagementProtocol getContainerMgrProxy( - final ContainerId containerId) { + final ContainerId containerId) throws IOException{ final NodeId node = masterContainer.getNodeId(); final InetSocketAddress containerManagerBindAddress = @@ -157,14 +159,27 @@ protected ContainerManagementProtocol getContainerMgrProxy( UserGroupInformation currentUser = UserGroupInformation.createRemoteUser(containerId .getApplicationAttemptId().toString()); + RMApp rmApp = + rmContext.getRMApps().get( + containerId.getApplicationAttemptId().getApplicationId()); - String user = - rmContext.getRMApps() - .get(containerId.getApplicationAttemptId().getApplicationId()) - .getUser(); - org.apache.hadoop.yarn.api.records.Token token = - rmContext.getNMTokenSecretManager().createNMToken( + String user = rmApp.getUser(); + org.apache.hadoop.yarn.api.records.Token token = null; + + if(eventType == AMLauncherEventType.LAUNCH) { + token = rmApp.getCurrentAppAttempt().getNMToken().getToken(); + } else { + try { + token = rmContext.getNMTokenSecretManager().createNMToken( containerId.getApplicationAttemptId(), node, user); + } catch (IllegalArgumentException e) { + if ( e.getCause() instanceof UnknownHostException) { + throw new IOException(e); + } else { + throw e; + } + } + } currentUser.addToken(ConverterUtils.convertFromYarn(token, containerManagerBindAddress)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index e9f064d..1019f51 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -30,10 +30,12 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; /** @@ -166,4 +168,14 @@ * @return the start time of the application. */ long getStartTime(); + + /** + * Get Application Master node's nmToken + */ + NMToken getNMToken(); + + /** + * Get RMContext + */ + RMContext getRMContext(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 00397cf..ae78166 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -142,6 +143,7 @@ private String origTrackingUrl = "N/A"; private String proxiedTrackingUrl = "N/A"; private long startTime = 0; + private NMToken amNMToken = null; // Set to null initially. Will eventually get set // if an RMAppAttemptUnregistrationEvent occurs @@ -395,6 +397,11 @@ public ApplicationSubmissionContext getSubmissionContext() { } @Override + public RMContext getRMContext() { + return this.rmContext; + } + + @Override public FinalApplicationStatus getFinalApplicationStatus() { this.readLock.lock(); try { @@ -503,6 +510,11 @@ public SecretKey getClientTokenMasterKey() { } @Override + public NMToken getNMToken() { + return amNMToken; + } + + @Override public Token getAMRMToken() { return this.amrmToken; } @@ -811,6 +823,8 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null, null); + appAttempt.amNMToken = + amContainerAllocation.getNMTokens().get(0); // Set the masterContainer appAttempt.setMasterContainer(amContainerAllocation.getContainers().get( 0)); @@ -818,6 +832,12 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.getMasterContainer().getResource()); RMStateStore store = appAttempt.rmContext.getStateStore(); appAttempt.storeAttempt(store); + //We need to clear NMToken for this appattempt from NMTokenSecretManager + //cache. + appAttempt + .getRMContext().getNMTokenSecretManager() + .removeNodeKeyForAppAttempt(appAttempt.getAppAttemptId(), + appAttempt.getNMToken().getNodeId()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index ee74523..7856f29 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -96,6 +96,8 @@ new ArrayList(); private List newlyAllocatedNMTokens = new ArrayList(); + private final static List EMPTY_NM_TOKEN_LIST = + new ArrayList(); final Map> reservedContainers = new HashMap>(); @@ -585,8 +587,14 @@ public synchronized Allocation getAllocation(ResourceCalculator rc, ResourceRequest rr = ResourceRequest.newInstance( Priority.UNDEFINED, ResourceRequest.ANY, minimumAllocation, numCont); - return new Allocation(pullNewlyAllocatedContainers(), - pullNewlyAssignedNMTokens(), getHeadroom(), null, + List newAllocatedContainers = pullNewlyAllocatedContainers(); + List newlyAllocatedNMTokens = EMPTY_NM_TOKEN_LIST; + if (!newAllocatedContainers.isEmpty()) { + newlyAllocatedNMTokens = pullNewlyAssignedNMTokens(); + } + + return new Allocation(newAllocatedContainers, + newlyAllocatedNMTokens, getHeadroom(), null, currentContPreemption, Collections.singletonList(rr)); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 760463e..5db9532 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -860,10 +860,14 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, for (RMContainer container : application.getPreemptionContainers()) { preemptionContainerIds.add(container.getContainerId()); } - - return new Allocation(application.pullNewlyAllocatedContainers(), - application.pullNewlyAssignedNMTokens(), application.getHeadroom(), - preemptionContainerIds); + List newAllocatedContainers = + application.pullNewlyAllocatedContainers(); + List newlyAllocatedNMTokens = EMPTY_NM_TOKEN_LIST; + if (!newAllocatedContainers.isEmpty()) { + newlyAllocatedNMTokens = application.pullNewlyAssignedNMTokens(); + } + return new Allocation(newAllocatedContainers, newlyAllocatedNMTokens, + application.getHeadroom(), preemptionContainerIds); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index b96d009..3cf106c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -312,9 +312,14 @@ public Allocation allocate( " applicationId=" + applicationAttemptId + " #ask=" + ask.size()); } - - return new Allocation(application.pullNewlyAllocatedContainers(), - application.pullNewlyAssignedNMTokens(), application.getHeadroom()); + List newAllocatedContainers = + application.pullNewlyAllocatedContainers(); + List newlyAllocatedNMTokens = EMPTY_NM_TOKEN_LIST; + if (!newAllocatedContainers.isEmpty()) { + newlyAllocatedNMTokens = application.pullNewlyAssignedNMTokens(); + } + return new Allocation(newAllocatedContainers, newlyAllocatedNMTokens, + application.getHeadroom()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java index b4f949c..a3c4af0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java @@ -268,4 +268,18 @@ public void removeNodeKey(NodeId nodeId) { this.writeLock.unlock(); } } + + /** + * This will be called by RM when it wants to remove NMToken key for + * specific node. + */ + public void removeNodeKeyForAppAttempt(ApplicationAttemptId appAttemptId, + NodeId nodeId) { + try { + this.writeLock.lock(); + appAttemptToNodeKeyMap.get(appAttemptId).remove(nodeId); + } finally { + this.writeLock.unlock(); + } + } }