diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index f6b4523..b1518d3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -272,25 +272,35 @@ public AllocateResponse allocate(AllocateRequest request) LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); return resync; } - if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { - /* old heartbeat */ - return lastResponse; - } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) { - LOG.error("Invalid responseid from appAttemptId " + appAttemptId); - // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO: - // Reboot is not useful since after AM reboots, it will send register and - // get an exception. Might as well throw an exception here. - return resync; - } - + // Allow only one thread in AM to do heartbeat at a time. synchronized (lastResponse) { + AllocateResponse updatedResponse = responseMap.get(appAttemptId); + + if (updatedResponse == null) { + LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); + return resync; + } + if ((updatedResponse.getResponseId() != lastResponse.getResponseId()) + || ((request.getResponseId() + 1) == lastResponse.getResponseId())) { + /* old heartbeat */ + return lastResponse; + } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) { + LOG.error("Invalid responseid from appAttemptId " + appAttemptId); + // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO: + // Reboot is not useful since after AM reboots, it will send register + // and + // get an exception. Might as well throw an exception here. + return resync; + } + // Send the status update to the appAttempt. this.rmContext.getDispatcher().getEventHandler().handle( new RMAppAttemptStatusupdateEvent(appAttemptId, request .getProgress())); + List ask = request.getAskList(); List release = request.getReleaseList();