diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 55b8fbb..6ec9fc5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -113,6 +113,8 @@ public class ApplicationMasterService extends AbstractService implements ApplicationMasterProtocol { private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class); + private static final int PRE_REGISTER_RESPONSE_ID = -1; + private final AMLivelinessMonitor amLivelinessMonitor; private YarnScheduler rScheduler; protected InetSocketAddress masterServiceAddress; @@ -385,7 +387,8 @@ public boolean hasApplicationMasterRegistered( if (lastResponse != null) { synchronized (lastResponse) { if (lastResponse.getAllocateResponse() != null - && lastResponse.getAllocateResponse().getResponseId() >= 0) { + && lastResponse.getAllocateResponse() + .getResponseId() != PRE_REGISTER_RESPONSE_ID) { hasApplicationMasterRegistered = true; } } @@ -401,7 +404,6 @@ public boolean hasApplicationMasterRegistered( @Override public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { - AMRMTokenIdentifier amrmTokenIdentifier = YarnServerSecurityUtils.authorizeRequest(); @@ -430,15 +432,19 @@ public AllocateResponse allocate(AllocateRequest request) throw new ApplicationMasterNotRegisteredException(message); } - if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { - /* old heartbeat */ - return lastResponse; - } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) { - String message = - "Invalid responseId in AllocateRequest from application attempt: " - + appAttemptId + ", expect responseId to be " - + (lastResponse.getResponseId() + 1); - throw new InvalidApplicationMasterRequestException(message); + // Normally request.getResponseId() == lastResponse.getResponseId() + if (request.getResponseId() != lastResponse.getResponseId()) { + if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { + /* heartbeat one step old, simply return lastReponse */ + return lastResponse; + } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) { + String message = + "Invalid responseId in AllocateRequest from application attempt: " + + appAttemptId + ", expect responseId to be " + + lastResponse.getResponseId() + ", but get " + + request.getResponseId(); + throw new InvalidApplicationMasterRequestException(message); + } } AllocateResponse response = @@ -477,7 +483,13 @@ public AllocateResponse allocate(AllocateRequest request) * need to worry about unregister call occurring in between (which * removes the lock object). */ - response.setResponseId(lastResponse.getResponseId() + 1); + if (lastResponse.getResponseId() + 1 == PRE_REGISTER_RESPONSE_ID) { + // Skip the special value PRE_REGISTER_RESPONSE_ID + response.setResponseId(lastResponse.getResponseId() + 2); + } else { + // Normal case + response.setResponseId(lastResponse.getResponseId() + 1); + } lock.setAllocateResponse(response); return response; } @@ -772,12 +784,23 @@ public void registerAppAttempt(ApplicationAttemptId attemptId) { recordFactory.newRecordInstance(AllocateResponse.class); // set response id to -1 before application master for the following // attemptID get registered - response.setResponseId(-1); + response.setResponseId(PRE_REGISTER_RESPONSE_ID); LOG.info("Registering app attempt : " + attemptId); responseMap.put(attemptId, new AllocateResponseLock(response)); rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId); } + @VisibleForTesting + protected boolean setAttemptLastResponseId(ApplicationAttemptId attemptId, + int lastResponseId) { + AllocateResponseLock lock = responseMap.get(attemptId); + if (lock == null || lock.getAllocateResponse() == null) { + return false; + } + lock.getAllocateResponse().setResponseId(lastResponseId); + return true; + } + public void unregisterAttempt(ApplicationAttemptId attemptId) { LOG.info("Unregistering app attempt : " + attemptId); responseMap.remove(attemptId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 2451c1e..810fc42 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -51,7 +51,7 @@ private static final Logger LOG = Logger.getLogger(MockAM.class); - private volatile int responseId = 0; + private volatile int lastResponseId = 0; private final ApplicationAttemptId attemptId; private RMContext context; private ApplicationMasterProtocol amRMProtocol; @@ -98,7 +98,7 @@ public RegisterApplicationMasterResponse registerAppAttempt(boolean wait) if (wait) { waitForState(RMAppAttemptState.LAUNCHED); } - responseId = 0; + lastResponseId = 0; final RegisterApplicationMasterRequest req = Records.newRecord(RegisterApplicationMasterRequest.class); req.setHost(""); @@ -126,6 +126,14 @@ public RegisterApplicationMasterResponse run() throws Exception { } } + public boolean setApplicationLastResponseId(int newLastResponseId) { + ApplicationMasterService applicationMasterService = + (ApplicationMasterService) amRMProtocol; + lastResponseId = newLastResponseId; + return applicationMasterService.setAttemptLastResponseId(attemptId, + newLastResponseId); + } + public void addRequests(String[] hosts, int memory, int priority, int containers) throws Exception { addRequests(hosts, memory, priority, containers, 0L); @@ -272,19 +280,22 @@ public AllocateResponse allocate(AllocateRequest allocateRequest) public AllocateResponse doAllocateAs(UserGroupInformation ugi, final AllocateRequest req) throws Exception { - req.setResponseId(++responseId); + req.setResponseId(lastResponseId); try { - return ugi.doAs(new PrivilegedExceptionAction() { - @Override - public AllocateResponse run() throws Exception { - return amRMProtocol.allocate(req); - } - }); + AllocateResponse response = + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public AllocateResponse run() throws Exception { + return amRMProtocol.allocate(req); + } + }); + lastResponseId = response.getResponseId(); + return response; } catch (UndeclaredThrowableException e) { throw (Exception) e.getCause(); } } - + public AllocateResponse doHeartbeat() throws Exception { return allocate(null, null); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 18c49bd..89e0e7f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -115,7 +115,39 @@ public void testRMIdentifierOnContainerAllocation() throws Exception { Assert.assertEquals(MockRM.getClusterTimeStamp(), tokenId.getRMIdentifier()); rm.stop(); } - + + @Test(timeout = 3000000) + public void testAllocateResponseIdOverflow() throws Exception { + MockRM rm = new MockRM(conf); + try { + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); + + // Submit an application + RMApp app1 = rm.submitApp(2048); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + // Set the last reponseId to be MAX_INT + Assert.assertTrue(am1.setApplicationLastResponseId(Integer.MAX_VALUE)); + + // Both allocate should succeed + am1.schedule(); // send allocate with reponseId = MAX_INT + am1.schedule(); // send allocate with reponseId = MIN_INT (MAX_INT+1) + + } finally { + if (rm != null) { + rm.stop(); + } + } + } + @Test(timeout=600000) public void testInvalidContainerReleaseRequest() throws Exception { MockRM rm = new MockRM(conf);