diff --git hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 2de4044..58748fb 100644 --- hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -173,29 +173,32 @@ public class ApplicationMasterService extends AbstractService implements throw RPCUtil.getRemoteException(message); } - // Allow only one thread in AM to do registerApp at a time. - synchronized (lastResponse) { - - LOG.info("AM registration " + applicationAttemptId); - this.amLivelinessMonitor.receivedPing(applicationAttemptId); - - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptRegistrationEvent(applicationAttemptId, request - .getHost(), request.getRpcPort(), request.getTrackingUrl())); - - RMAuditLogger.logSuccess(this.rmContext.getRMApps().get(appID).getUser(), - AuditConstants.REGISTER_AM, "ApplicationMasterService", appID, - applicationAttemptId); - - // Pick up min/max resource from scheduler... - RegisterApplicationMasterResponse response = recordFactory - .newRecordInstance(RegisterApplicationMasterResponse.class); - response.setMinimumResourceCapability(rScheduler - .getMinimumResourceCapability()); - response.setMaximumResourceCapability(rScheduler - .getMaximumResourceCapability()); - return response; - } + LOG.info("AM registration " + applicationAttemptId); + this.amLivelinessMonitor.receivedPing(applicationAttemptId); + + // this code is should be executed in isolation per AM. + // AM doesnt have multiple threads for this request and different attempts + // for an AM will have different attemptIds. + // No need for lock. + this.rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMAppAttemptRegistrationEvent(applicationAttemptId, request + .getHost(), request.getRpcPort(), request.getTrackingUrl())); + + RMAuditLogger.logSuccess(this.rmContext.getRMApps().get(appID).getUser(), + AuditConstants.REGISTER_AM, "ApplicationMasterService", appID, + applicationAttemptId); + + // Pick up min/max resource from scheduler... + RegisterApplicationMasterResponse response = recordFactory + .newRecordInstance(RegisterApplicationMasterResponse.class); + response.setMinimumResourceCapability(rScheduler + .getMinimumResourceCapability()); + response.setMaximumResourceCapability(rScheduler + .getMaximumResourceCapability()); + return response; } @Override @@ -214,37 +217,43 @@ public class ApplicationMasterService extends AbstractService implements throw RPCUtil.getRemoteException(message); } - // Allow only one thread in AM to do finishApp at a time. - synchronized (lastResponse) { - - this.amLivelinessMonitor.receivedPing(applicationAttemptId); - - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptUnregistrationEvent(applicationAttemptId, request - .getTrackingUrl(), request.getFinalApplicationStatus(), request - .getDiagnostics())); - - FinishApplicationMasterResponse response = recordFactory - .newRecordInstance(FinishApplicationMasterResponse.class); - return response; - } + LOG.info("AM finish reported " + applicationAttemptId); + this.amLivelinessMonitor.receivedPing(applicationAttemptId); + + // this code is should be executed in isolation per AM. + // AM doesnt have multiple threads for this request and different attempts + // for an AM will have different attemptIds. + // No need for lock. + rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMAppAttemptUnregistrationEvent(applicationAttemptId, request + .getTrackingUrl(), request.getFinalApplicationStatus(), request + .getDiagnostics())); + + FinishApplicationMasterResponse response = recordFactory + .newRecordInstance(FinishApplicationMasterResponse.class); + return response; } @Override public AllocateResponse allocate(AllocateRequest request) throws YarnRemoteException { - ApplicationAttemptId appAttemptId = request.getApplicationAttemptId(); - authorizeRequest(appAttemptId); + ApplicationAttemptId applicationAttemptId = request + .getApplicationAttemptId(); + authorizeRequest(applicationAttemptId); - this.amLivelinessMonitor.receivedPing(appAttemptId); + LOG.info("AM allocate request " + applicationAttemptId); + this.amLivelinessMonitor.receivedPing(applicationAttemptId); /* check if its in cache */ AllocateResponse allocateResponse = recordFactory .newRecordInstance(AllocateResponse.class); - AMResponse lastResponse = responseMap.get(appAttemptId); + AMResponse lastResponse = responseMap.get(applicationAttemptId); if (lastResponse == null) { - LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); + LOG.error("AppAttemptId doesnt exist in cache " + applicationAttemptId); allocateResponse.setAMResponse(reboot); return allocateResponse; } @@ -253,41 +262,48 @@ public class ApplicationMasterService extends AbstractService implements allocateResponse.setAMResponse(lastResponse); return allocateResponse; } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) { - LOG.error("Invalid responseid from appAttemptId " + appAttemptId); + LOG.error("Invalid responseid from appAttemptId " + applicationAttemptId); // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO: allocateResponse.setAMResponse(reboot); return allocateResponse; } - // Allow only one thread in AM to do heartbeat at a time. - synchronized (lastResponse) { // BUG TODO: Locking order is screwed. - - // Send the status update to the appAttempt. - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptStatusupdateEvent(appAttemptId, request - .getProgress())); - - List ask = request.getAskList(); - List release = request.getReleaseList(); - - // Send new requests to appAttempt. - Allocation allocation = - this.rScheduler.allocate(appAttemptId, ask, release); - - RMApp app = this.rmContext.getRMApps().get(appAttemptId.getApplicationId()); - RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); - - AMResponse response = recordFactory.newRecordInstance(AMResponse.class); - response.setAllocatedContainers(allocation.getContainers()); - response.setCompletedContainersStatuses(appAttempt - .pullJustFinishedContainers()); - response.setResponseId(lastResponse.getResponseId() + 1); - response.setAvailableResources(allocation.getResourceLimit()); - responseMap.put(appAttemptId, response); - allocateResponse.setAMResponse(response); - allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); - return allocateResponse; - } + // this code is should be executed in isolation per AM. + // AM doesnt have multiple threads for this request and different attempts + // for an AM will have different attemptIds. There is a potential race + // condition when there are multiple outstanding allocate requests (due to + // some network timeout). Multiple calls to scheduler.allocate() will be ok + // because ask/release are absolute values that are set. + // No need for lock. + // Send the status update to the appAttempt. + this.rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMAppAttemptStatusupdateEvent(applicationAttemptId, request + .getProgress())); + + List ask = request.getAskList(); + List release = request.getReleaseList(); + + // Send new requests to appAttempt. + Allocation allocation = this.rScheduler + .allocate(applicationAttemptId, ask, release); + + RMApp app = this.rmContext.getRMApps().get( + applicationAttemptId.getApplicationId()); + RMAppAttempt appAttempt = app.getRMAppAttempt(applicationAttemptId); + + AMResponse response = recordFactory.newRecordInstance(AMResponse.class); + response.setAllocatedContainers(allocation.getContainers()); + response.setCompletedContainersStatuses(appAttempt + .pullJustFinishedContainers()); + response.setResponseId(lastResponse.getResponseId() + 1); + response.setAvailableResources(allocation.getResourceLimit()); + responseMap.put(applicationAttemptId, response); + allocateResponse.setAMResponse(response); + allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); + return allocateResponse; } public void registerAppAttempt(ApplicationAttemptId attemptId) {