diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index fd39dad..f070f28 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -72,7 +72,6 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; @@ -81,7 +80,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -97,8 +95,8 @@ private Server server; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - private final ConcurrentMap responseMap = - new ConcurrentHashMap(); + private final ConcurrentMap responseMap = + new ConcurrentHashMap(); private final AllocateResponse resync = recordFactory.newRecordInstance(AllocateResponse.class); private final RMContext rmContext; @@ -217,21 +215,19 @@ public RegisterApplicationMasterResponse registerApplicationMaster( ApplicationAttemptId applicationAttemptId = authorizeRequest(); ApplicationId appID = applicationAttemptId.getApplicationId(); - AllocateResponse lastResponse = responseMap.get(applicationAttemptId); - if (lastResponse == null) { - String message = "Application doesn't exist in cache " - + applicationAttemptId; - LOG.error(message); + AllocateResponseLock lock = responseMap.get(applicationAttemptId); + if (lock == null) { RMAuditLogger.logFailure(this.rmContext.getRMApps().get(appID).getUser(), - AuditConstants.REGISTER_AM, message, "ApplicationMasterService", + AuditConstants.REGISTER_AM, "Application doesn't exist in cache " + + applicationAttemptId, "ApplicationMasterService", "Error in registering application master", appID, applicationAttemptId); - throw RPCUtil.getRemoteException(message); + throwApplicationDoesNotExistInCacheException(applicationAttemptId); } // Allow only one thread in AM to do registerApp at a time. - synchronized (lastResponse) { - + synchronized (lock) { + AllocateResponse lastResponse = lock.getAllocateResponse(); if (hasApplicationMasterRegistered(applicationAttemptId)) { String message = "Application Master is already registered : " @@ -251,7 +247,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster( // Setting the response id to 0 to identify if the // application master is register for the respective attemptid lastResponse.setResponseId(0); - responseMap.put(applicationAttemptId, lastResponse); + lock.setAllocateResponse(lastResponse); LOG.info("AM registration " + applicationAttemptId); this.rmContext .getDispatcher() @@ -286,17 +282,14 @@ public FinishApplicationMasterResponse finishApplicationMaster( ApplicationAttemptId applicationAttemptId = authorizeRequest(); - AllocateResponse lastResponse = responseMap.get(applicationAttemptId); - if (lastResponse == null) { - String message = "Application doesn't exist in cache " - + applicationAttemptId; - LOG.error(message); - throw RPCUtil.getRemoteException(message); + AllocateResponseLock lock = responseMap.get(applicationAttemptId); + if (lock == null) { + throwApplicationDoesNotExistInCacheException(applicationAttemptId); } // Allow only one thread in AM to do finishApp at a time. - synchronized (lastResponse) { - + synchronized (lock) { + this.amLivelinessMonitor.receivedPing(applicationAttemptId); rmContext.getDispatcher().getEventHandler().handle( @@ -313,6 +306,15 @@ public FinishApplicationMasterResponse finishApplicationMaster( } } + private void throwApplicationDoesNotExistInCacheException( + ApplicationAttemptId appAttemptId) + throws InvalidApplicationMasterRequestException { + String message = "Application doesn't exist in cache " + + appAttemptId; + LOG.error(message); + throw new InvalidApplicationMasterRequestException(message); + } + /** * @param appAttemptId * @return true if application is registered for the respective attemptid @@ -320,10 +322,11 @@ public FinishApplicationMasterResponse finishApplicationMaster( public boolean hasApplicationMasterRegistered( ApplicationAttemptId appAttemptId) { boolean hasApplicationMasterRegistered = false; - AllocateResponse lastResponse = responseMap.get(appAttemptId); + AllocateResponseLock lastResponse = responseMap.get(appAttemptId); if (lastResponse != null) { synchronized (lastResponse) { - if (lastResponse.getResponseId() >= 0) { + if (lastResponse.getAllocateResponse() != null + && lastResponse.getAllocateResponse().getResponseId() >= 0) { hasApplicationMasterRegistered = true; } } @@ -340,38 +343,38 @@ public AllocateResponse allocate(AllocateRequest request) this.amLivelinessMonitor.receivedPing(appAttemptId); /* check if its in cache */ - AllocateResponse lastResponse = responseMap.get(appAttemptId); - if (lastResponse == null) { + AllocateResponseLock lock = responseMap.get(appAttemptId); + if (lock == null) { LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); return resync; } - - if (!hasApplicationMasterRegistered(appAttemptId)) { - String message = - "Application Master is trying to allocate before registering for: " - + appAttemptId.getApplicationId(); - LOG.error(message); - RMAuditLogger.logFailure( - this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) - .getUser(), AuditConstants.REGISTER_AM, "", - "ApplicationMasterService", message, appAttemptId.getApplicationId(), - appAttemptId); - throw new InvalidApplicationMasterRequestException(message); - } + synchronized (lock) { + AllocateResponse lastResponse = lock.getAllocateResponse(); + if (!hasApplicationMasterRegistered(appAttemptId)) { + String message = + "Application Master is trying to allocate before registering for: " + + appAttemptId.getApplicationId(); + LOG.error(message); + RMAuditLogger.logFailure( + this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) + .getUser(), AuditConstants.REGISTER_AM, "", + "ApplicationMasterService", message, + appAttemptId.getApplicationId(), + appAttemptId); + throw new InvalidApplicationMasterRequestException(message); + } - if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { - /* old heartbeat */ - return lastResponse; - } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) { - LOG.error("Invalid responseid from appAttemptId " + appAttemptId); - // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO: - // Reboot is not useful since after AM reboots, it will send register and - // get an exception. Might as well throw an exception here. - return resync; - } - - // Allow only one thread in AM to do heartbeat at a time. - synchronized (lastResponse) { + if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { + /* old heartbeat */ + return lastResponse; + } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) { + LOG.error("Invalid responseid from appAttemptId " + appAttemptId); + // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO: + // Reboot is not useful since after AM reboots, it will send register + // and + // get an exception. Might as well throw an exception here. + return resync; + } // Send the status update to the appAttempt. this.rmContext.getDispatcher().getEventHandler().handle( @@ -380,15 +383,16 @@ public AllocateResponse allocate(AllocateRequest request) List ask = request.getAskList(); List release = request.getReleaseList(); - - ResourceBlacklistRequest blacklistRequest = request.getResourceBlacklistRequest(); - List blacklistAdditions = - (blacklistRequest != null) ? + + ResourceBlacklistRequest blacklistRequest = + request.getResourceBlacklistRequest(); + List blacklistAdditions = + (blacklistRequest != null) ? blacklistRequest.getBlacklistAdditions() : null; - List blacklistRemovals = - (blacklistRequest != null) ? + List blacklistRemovals = + (blacklistRequest != null) ? blacklistRequest.getBlacklistRemovals() : null; - + // sanity check try { RMServerUtils.validateResourceRequests(ask, @@ -443,7 +447,7 @@ public AllocateResponse allocate(AllocateRequest request) rmNode.getTotalCapability(), numContainers, rmNode.getHealthReport(), rmNode.getLastHealthReportTime()); - + updatedNodeReports.add(report); } allocateResponse.setUpdatedNodes(updatedNodeReports); @@ -454,11 +458,12 @@ public AllocateResponse allocate(AllocateRequest request) .pullJustFinishedContainers()); allocateResponse.setResponseId(lastResponse.getResponseId() + 1); allocateResponse.setAvailableResources(allocation.getResourceLimit()); - + allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); - + // add preemption to the allocateResponse message (if any) - allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation)); + allocateResponse + .setPreemptionMessage(generatePreemptionMessage(allocation)); // Adding NMTokens for allocated containers. if (!allocation.getContainers().isEmpty()) { @@ -466,21 +471,14 @@ public AllocateResponse allocate(AllocateRequest request) .createAndGetNMTokens(app.getUser(), appAttemptId, allocation.getContainers())); } - - // before returning response, verify in sync - AllocateResponse oldResponse = - responseMap.put(appAttemptId, allocateResponse); - if (oldResponse == null) { - // appAttempt got unregistered, remove it back out - responseMap.remove(appAttemptId); - String message = "App Attempt removed from the cache during allocate" - + appAttemptId; - LOG.error(message); - return resync; - } - + /* + * As we are updating the response inside the lock object so we don't + * need to worry about unregister call occurring in between (which + * removes the lock object). + */ + lock.setAllocateResponse(allocateResponse); return allocateResponse; - } + } } private PreemptionMessage generatePreemptionMessage(Allocation allocation){ @@ -542,7 +540,7 @@ public void registerAppAttempt(ApplicationAttemptId attemptId) { // attemptID get registered response.setResponseId(-1); LOG.info("Registering app attempt : " + attemptId); - responseMap.put(attemptId, response); + responseMap.put(attemptId, new AllocateResponseLock(response)); rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId); } @@ -564,4 +562,20 @@ protected void serviceStop() throws Exception { } super.serviceStop(); } -} + + public static class AllocateResponseLock { + private AllocateResponse response; + + public AllocateResponseLock(AllocateResponse response) { + this.response = response; + } + + public synchronized AllocateResponse getAllocateResponse() { + return response; + } + + public synchronized void setAllocateResponse(AllocateResponse response) { + this.response = response; + } + } +} \ No newline at end of file