diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index fd39dad..5f1cf35 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -86,6 +86,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import com.google.common.annotations.VisibleForTesting; + @SuppressWarnings("unchecked") @Private public class ApplicationMasterService extends AbstractService implements @@ -97,8 +99,8 @@ private Server server; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - private final ConcurrentMap responseMap = - new ConcurrentHashMap(); + private final ConcurrentMap responseMap = + new ConcurrentHashMap(); private final AllocateResponse resync = recordFactory.newRecordInstance(AllocateResponse.class); private final RMContext rmContext; @@ -169,7 +171,9 @@ private AMRMTokenIdentifier selectAMRMTokenIdentifier( return result; } - private ApplicationAttemptId authorizeRequest() + @Private + @VisibleForTesting + protected ApplicationAttemptId authorizeRequest() throws YarnException { UserGroupInformation remoteUgi; @@ -217,8 +221,8 @@ public RegisterApplicationMasterResponse registerApplicationMaster( ApplicationAttemptId applicationAttemptId = authorizeRequest(); ApplicationId appID = applicationAttemptId.getApplicationId(); - AllocateResponse lastResponse = responseMap.get(applicationAttemptId); - if (lastResponse == null) { + AllocateResponseLock res = responseMap.get(applicationAttemptId); + if (res == null) { String message = "Application doesn't exist in cache " + applicationAttemptId; LOG.error(message); @@ -230,8 +234,8 @@ public RegisterApplicationMasterResponse registerApplicationMaster( } // Allow only one thread in AM to do registerApp at a time. - synchronized (lastResponse) { - + synchronized (res) { + AllocateResponse lastResponse = res.getAllocateResponse(); if (hasApplicationMasterRegistered(applicationAttemptId)) { String message = "Application Master is already registered : " @@ -251,7 +255,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster( // Setting the response id to 0 to identify if the // application master is register for the respective attemptid lastResponse.setResponseId(0); - responseMap.put(applicationAttemptId, lastResponse); + res.setAllocateResponse(lastResponse); LOG.info("AM registration " + applicationAttemptId); this.rmContext .getDispatcher() @@ -286,17 +290,14 @@ public FinishApplicationMasterResponse finishApplicationMaster( ApplicationAttemptId applicationAttemptId = authorizeRequest(); - AllocateResponse lastResponse = responseMap.get(applicationAttemptId); - if (lastResponse == null) { - String message = "Application doesn't exist in cache " - + applicationAttemptId; - LOG.error(message); - throw RPCUtil.getRemoteException(message); + AllocateResponseLock res = responseMap.get(applicationAttemptId); + if (res == null) { + throwApplicationAttemptDoesNotExistInCacheException(applicationAttemptId); } // Allow only one thread in AM to do finishApp at a time. - synchronized (lastResponse) { - + synchronized (res) { + this.amLivelinessMonitor.receivedPing(applicationAttemptId); rmContext.getDispatcher().getEventHandler().handle( @@ -313,6 +314,14 @@ public FinishApplicationMasterResponse finishApplicationMaster( } } + private void throwApplicationAttemptDoesNotExistInCacheException( + ApplicationAttemptId appAttemptId) throws YarnException { + String message = "Application doesn't exist in cache " + + appAttemptId; + LOG.error(message); + throw RPCUtil.getRemoteException(message); + } + /** * @param appAttemptId * @return true if application is registered for the respective attemptid @@ -320,10 +329,11 @@ public FinishApplicationMasterResponse finishApplicationMaster( public boolean hasApplicationMasterRegistered( ApplicationAttemptId appAttemptId) { boolean hasApplicationMasterRegistered = false; - AllocateResponse lastResponse = responseMap.get(appAttemptId); + AllocateResponseLock lastResponse = responseMap.get(appAttemptId); if (lastResponse != null) { synchronized (lastResponse) { - if (lastResponse.getResponseId() >= 0) { + if (lastResponse.getAllocateResponse() != null + && lastResponse.getAllocateResponse().getResponseId() >= 0) { hasApplicationMasterRegistered = true; } } @@ -340,38 +350,38 @@ public AllocateResponse allocate(AllocateRequest request) this.amLivelinessMonitor.receivedPing(appAttemptId); /* check if its in cache */ - AllocateResponse lastResponse = responseMap.get(appAttemptId); - if (lastResponse == null) { + AllocateResponseLock res = responseMap.get(appAttemptId); + if (res == null) { LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); return resync; } - - if (!hasApplicationMasterRegistered(appAttemptId)) { - String message = - "Application Master is trying to allocate before registering for: " - + appAttemptId.getApplicationId(); - LOG.error(message); - RMAuditLogger.logFailure( - this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) - .getUser(), AuditConstants.REGISTER_AM, "", - "ApplicationMasterService", message, appAttemptId.getApplicationId(), - appAttemptId); - throw new InvalidApplicationMasterRequestException(message); - } + synchronized (res) { + AllocateResponse lastResponse = res.getAllocateResponse(); + if (!hasApplicationMasterRegistered(appAttemptId)) { + String message = + "Application Master is trying to allocate before registering for: " + + appAttemptId.getApplicationId(); + LOG.error(message); + RMAuditLogger.logFailure( + this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) + .getUser(), AuditConstants.REGISTER_AM, "", + "ApplicationMasterService", message, + appAttemptId.getApplicationId(), + appAttemptId); + throw new InvalidApplicationMasterRequestException(message); + } - if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { - /* old heartbeat */ - return lastResponse; - } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) { - LOG.error("Invalid responseid from appAttemptId " + appAttemptId); - // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO: - // Reboot is not useful since after AM reboots, it will send register and - // get an exception. Might as well throw an exception here. - return resync; - } - - // Allow only one thread in AM to do heartbeat at a time. - synchronized (lastResponse) { + if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { + /* old heartbeat */ + return lastResponse; + } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) { + LOG.error("Invalid responseid from appAttemptId " + appAttemptId); + // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO: + // Reboot is not useful since after AM reboots, it will send register + // and + // get an exception. Might as well throw an exception here. + return resync; + } // Send the status update to the appAttempt. this.rmContext.getDispatcher().getEventHandler().handle( @@ -380,15 +390,16 @@ public AllocateResponse allocate(AllocateRequest request) List ask = request.getAskList(); List release = request.getReleaseList(); - - ResourceBlacklistRequest blacklistRequest = request.getResourceBlacklistRequest(); - List blacklistAdditions = - (blacklistRequest != null) ? + + ResourceBlacklistRequest blacklistRequest = + request.getResourceBlacklistRequest(); + List blacklistAdditions = + (blacklistRequest != null) ? blacklistRequest.getBlacklistAdditions() : null; - List blacklistRemovals = - (blacklistRequest != null) ? + List blacklistRemovals = + (blacklistRequest != null) ? blacklistRequest.getBlacklistRemovals() : null; - + // sanity check try { RMServerUtils.validateResourceRequests(ask, @@ -443,7 +454,7 @@ public AllocateResponse allocate(AllocateRequest request) rmNode.getTotalCapability(), numContainers, rmNode.getHealthReport(), rmNode.getLastHealthReportTime()); - + updatedNodeReports.add(report); } allocateResponse.setUpdatedNodes(updatedNodeReports); @@ -454,11 +465,12 @@ public AllocateResponse allocate(AllocateRequest request) .pullJustFinishedContainers()); allocateResponse.setResponseId(lastResponse.getResponseId() + 1); allocateResponse.setAvailableResources(allocation.getResourceLimit()); - + allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); - + // add preemption to the allocateResponse message (if any) - allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation)); + allocateResponse + .setPreemptionMessage(generatePreemptionMessage(allocation)); // Adding NMTokens for allocated containers. if (!allocation.getContainers().isEmpty()) { @@ -466,21 +478,9 @@ public AllocateResponse allocate(AllocateRequest request) .createAndGetNMTokens(app.getUser(), appAttemptId, allocation.getContainers())); } - - // before returning response, verify in sync - AllocateResponse oldResponse = - responseMap.put(appAttemptId, allocateResponse); - if (oldResponse == null) { - // appAttempt got unregistered, remove it back out - responseMap.remove(appAttemptId); - String message = "App Attempt removed from the cache during allocate" - + appAttemptId; - LOG.error(message); - return resync; - } - + res.setAllocateResponse(allocateResponse); return allocateResponse; - } + } } private PreemptionMessage generatePreemptionMessage(Allocation allocation){ @@ -542,7 +542,7 @@ public void registerAppAttempt(ApplicationAttemptId attemptId) { // attemptID get registered response.setResponseId(-1); LOG.info("Registering app attempt : " + attemptId); - responseMap.put(attemptId, response); + responseMap.put(attemptId, new AllocateResponseLock(response)); rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId); } @@ -564,4 +564,20 @@ protected void serviceStop() throws Exception { } super.serviceStop(); } -} + + public static class AllocateResponseLock { + private AllocateResponse response; + + public AllocateResponseLock(AllocateResponse response) { + this.response = response; + } + + public synchronized AllocateResponse getAllocateResponse() { + return response; + } + + public synchronized void setAllocateResponse(AllocateResponse response) { + this.response = response; + } + } +} \ No newline at end of file