diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index cc47e02..9a70ee8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -507,12 +507,13 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); - if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse - .getResponseId()) { + if (getNextResponseId( + remoteNodeStatus.getResponseId()) == lastNodeHeartbeatResponse + .getResponseId()) { LOG.info("Received duplicate heartbeat from node " + rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId()); return lastNodeHeartbeatResponse; - } else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse + } else if (remoteNodeStatus.getResponseId() != lastNodeHeartbeatResponse .getResponseId()) { String message = "Too far behind rm response id:" @@ -548,10 +549,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } // Heartbeat response - NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils - .newNodeHeartbeatResponse(lastNodeHeartbeatResponse. - getResponseId() + 1, NodeAction.NORMAL, null, null, null, null, - nextHeartBeatInterval); + NodeHeartbeatResponse nodeHeartBeatResponse = + YarnServerBuilderUtils.newNodeHeartbeatResponse( + getNextResponseId(lastNodeHeartbeatResponse.getResponseId()), + NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval); rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse); rmNode.updateNodeHeartbeatResponseForUpdatedContainers( nodeHeartBeatResponse); @@ -613,6 +614,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) return nodeHeartBeatResponse; } + private int getNextResponseId(int responseId) { + // Loop between 0 and Integer.MAX_VALUE + return (responseId + 1) & Integer.MAX_VALUE; + } + private void setAppCollectorsMapToResponse( List runningApps, NodeHeartbeatResponse response) { Map liveAppCollectorsMap = new diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 4a8ff00..dcf3b07 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -118,7 +118,7 @@ public void containerIncreaseStatus(Container container) throws Exception { container.getResource()); List increasedConts = Collections.singletonList(container); nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts, - true, ++responseId); + true, responseId); } public void addRegisteringCollector(ApplicationId appId, @@ -177,7 +177,7 @@ public RegisterNodeManagerResponse registerNode( public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception { return nodeHeartbeat(Collections.emptyList(), - Collections.emptyList(), isHealthy, ++responseId); + Collections.emptyList(), isHealthy, responseId); } public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, @@ -190,12 +190,12 @@ public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, containerStatusList.add(containerStatus); Log.getLog().info("ContainerStatus: " + containerStatus); return nodeHeartbeat(containerStatusList, - Collections.emptyList(), true, ++responseId); + Collections.emptyList(), true, responseId); } public NodeHeartbeatResponse nodeHeartbeat(Map> conts, boolean isHealthy) throws Exception { - return nodeHeartbeat(conts, isHealthy, ++responseId); + return nodeHeartbeat(conts, isHealthy, responseId); } public NodeHeartbeatResponse nodeHeartbeat(Map updatedStats, boolean isHealthy) throws Exception { return nodeHeartbeat(updatedStats, Collections.emptyList(), - isHealthy, ++responseId); + isHealthy, responseId); } public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, @@ -247,7 +247,8 @@ public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, NodeHeartbeatResponse heartbeatResponse = resourceTracker.nodeHeartbeat(req); - + responseId = heartbeatResponse.getResponseId(); + MasterKey masterKeyFromRM = heartbeatResponse.getContainerTokenMasterKey(); if (masterKeyFromRM != null && masterKeyFromRM.getKeyId() != this.currentContainerTokenMasterKey @@ -282,4 +283,8 @@ public int getvCores() { public String getVersion() { return version; } + + public void setResponseId(int id) { + this.responseId = id; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 5ed3278..eec8520 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -726,7 +726,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { nodeHeartbeatResponse = resourceTrackerService.nodeHeartbeat(heartbeatReq); Assert.assertEquals("InValid Node Labels were not accepted by RM", - NodeAction.NORMAL, nodeHeartbeatResponse.getNodeAction()); + NodeAction.RESYNC, nodeHeartbeatResponse.getNodeAction()); assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId), oldLabels); Assert.assertFalse("Node Labels should not accepted by RM", @@ -2078,4 +2078,31 @@ public void testNodeHeartBeatResponseForUnknownContainerCleanUp() } } } + + @Test + public void testResponseIdOverflow() throws Exception { + Configuration conf = new Configuration(); + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + + NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); + + // prepare the responseId that's about to overflow + RMNode node = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + node.getLastNodeHeartBeatResponse().setResponseId(Integer.MAX_VALUE); + + nm1.setResponseId(Integer.MAX_VALUE); + + // heartbeat twice and check responseId + nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); + Assert.assertEquals(0, nodeHeartbeat.getResponseId()); + + nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); + Assert.assertEquals(1, nodeHeartbeat.getResponseId()); + } }