diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java index 0947ba8..69c9a13 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java @@ -73,7 +73,7 @@ // resource manager private ResourceManager rm; // heart beat response id - private int RESPONSE_ID = 1; + private int responseId = 0; private final static Logger LOG = Logger.getLogger(NMSimulator.class); public void init(String nodeIdStr, int memory, int cores, @@ -134,7 +134,7 @@ public void middleStep() throws Exception { ns.setContainersStatuses(generateContainerStatusList()); ns.setNodeId(node.getNodeID()); ns.setKeepAliveApplications(new ArrayList()); - ns.setResponseId(RESPONSE_ID ++); + ns.setResponseId(responseId++); ns.setNodeHealthStatus(NodeHealthStatus.newInstance(true, "", 0)); beatRequest.setNodeStatus(ns); NodeHeartbeatResponse beatResponse = diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 951f5a8..780be4a 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -143,8 +143,8 @@ public NodeState getState() { return runningApplications; } - public void updateNodeHeartbeatResponseForCleanup( - NodeHeartbeatResponse response) { + public void setAndUpdateNodeHeartbeatResponse( + NodeHeartbeatResponse response) { } public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { @@ -177,14 +177,6 @@ public String getNodeManagerVersion() { return RMNodeLabelsManager.EMPTY_STRING_SET; } - @Override - public void updateNodeHeartbeatResponseForContainersDecreasing( - NodeHeartbeatResponse response) { - // TODO Auto-generated method stub - - } - - @Override public List pullNewlyIncreasedContainers() { // TODO Auto-generated method stub return null; diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index e5013c4..4c2a615 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -126,9 +126,9 @@ public NodeState getState() { } @Override - public void updateNodeHeartbeatResponseForCleanup( - NodeHeartbeatResponse nodeHeartbeatResponse) { - node.updateNodeHeartbeatResponseForCleanup(nodeHeartbeatResponse); + public void setAndUpdateNodeHeartbeatResponse( + NodeHeartbeatResponse nodeHeartbeatResponse) { + node.setAndUpdateNodeHeartbeatResponse(nodeHeartbeatResponse); } @Override @@ -166,12 +166,6 @@ public String getNodeManagerVersion() { return RMNodeLabelsManager.EMPTY_STRING_SET; } - @Override - public void updateNodeHeartbeatResponseForContainersDecreasing( - NodeHeartbeatResponse response) { - // TODO Auto-generated method stub - } - @SuppressWarnings("unchecked") @Override public List pullNewlyIncreasedContainers() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 3a3dd63..8483bbd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -490,12 +490,13 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); - if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse - .getResponseId()) { + if (getNextResponseId( + remoteNodeStatus.getResponseId()) == lastNodeHeartbeatResponse + .getResponseId()) { LOG.info("Received duplicate heartbeat from node " + rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId()); return lastNodeHeartbeatResponse; - } else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse + } else if (remoteNodeStatus.getResponseId() != lastNodeHeartbeatResponse .getResponseId()) { String message = "Too far behind rm response id:" @@ -510,13 +511,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } // Heartbeat response - NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils - .newNodeHeartbeatResponse(lastNodeHeartbeatResponse. - getResponseId() + 1, NodeAction.NORMAL, null, null, null, null, - nextHeartBeatInterval); - rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse); - rmNode.updateNodeHeartbeatResponseForContainersDecreasing( - nodeHeartBeatResponse); + NodeHeartbeatResponse nodeHeartBeatResponse = + YarnServerBuilderUtils.newNodeHeartbeatResponse( + getNextResponseId(lastNodeHeartbeatResponse.getResponseId()), + NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval); + rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse); populateKeys(request, nodeHeartBeatResponse); @@ -528,7 +527,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // 4. Send status to RMNode, saving the latest response. RMNodeStatusEvent nodeStatusEvent = - new RMNodeStatusEvent(nodeId, remoteNodeStatus, nodeHeartBeatResponse); + new RMNodeStatusEvent(nodeId, remoteNodeStatus); if (request.getLogAggregationReportsForApps() != null && !request.getLogAggregationReportsForApps().isEmpty()) { nodeStatusEvent.setLogAggregationReportsForApps(request @@ -562,6 +561,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) return nodeHeartBeatResponse; } + private int getNextResponseId(int responseId) { + // Loop between 0 and Integer.MAX_VALUE + return (responseId + 1) & Integer.MAX_VALUE; + } + /** * Check if node in decommissioning state. * @param nodeId diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 48df1e8..2e164a0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -134,10 +134,11 @@ /** * Update a {@link NodeHeartbeatResponse} with the list of containers and - * applications to clean up for this node. + * applications to clean up for this node, and the containers to be updated. + * * @param response the {@link NodeHeartbeatResponse} to update */ - public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response); + void setAndUpdateNodeHeartbeatResponse(NodeHeartbeatResponse response); public NodeHeartbeatResponse getLastNodeHeartBeatResponse(); @@ -160,13 +161,7 @@ * @return labels in this node */ public Set getNodeLabels(); - - /** - * Update containers to be decreased - */ - public void updateNodeHeartbeatResponseForContainersDecreasing( - NodeHeartbeatResponse response); - + public List pullNewlyIncreasedContainers(); long getUntrackedTimeStamp(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 830a6a9..a009934 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -559,7 +559,8 @@ public NodeState getState() { }; @Override - public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) { + public void setAndUpdateNodeHeartbeatResponse( + NodeHeartbeatResponse response) { this.writeLock.lock(); try { @@ -573,6 +574,14 @@ public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response this.finishedApplications.clear(); this.containersToSignal.clear(); this.containersToBeRemovedFromNM.clear(); + + // NOTE: This is required for backward compatibility. + response.addAllContainersToDecrease(toBeDecreasedContainers.values()); + toBeDecreasedContainers.clear(); + + // Synchronously update the last response in rmNode with updated + // responseId + this.latestNodeHeartBeatResponse = response; } finally { this.writeLock.unlock(); } @@ -582,19 +591,6 @@ public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response public Collection getToBeDecreasedContainers() { return toBeDecreasedContainers.values(); } - - @Override - public void updateNodeHeartbeatResponseForContainersDecreasing( - NodeHeartbeatResponse response) { - this.writeLock.lock(); - - try { - response.addAllContainersToDecrease(toBeDecreasedContainers.values()); - toBeDecreasedContainers.clear(); - } finally { - this.writeLock.unlock(); - } - } @Override public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { @@ -1118,10 +1114,6 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event; - - // Switch the last heartbeatresponse. - rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse(); - NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus(); rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); @@ -1190,9 +1182,6 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { @Override public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { RMNodeStatusEvent statusEvent = (RMNodeStatusEvent)event; - - // Switch the last heartbeatresponse. - rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse(); NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus(); rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index ba6ac9b..411c0f2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -27,27 +27,22 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; -import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; public class RMNodeStatusEvent extends RMNodeEvent { private final NodeStatus nodeStatus; - private final NodeHeartbeatResponse latestResponse; private List logAggregationReportsForApps; - public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus, - NodeHeartbeatResponse latestResponse) { - this(nodeId, nodeStatus, latestResponse, null); + public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus) { + this(nodeId, nodeStatus, null); } public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus, - NodeHeartbeatResponse latestResponse, List logAggregationReportsForApps) { super(nodeId, RMNodeEventType.STATUS_UPDATE); this.nodeStatus = nodeStatus; - this.latestResponse = latestResponse; this.logAggregationReportsForApps = logAggregationReportsForApps; } @@ -59,10 +54,6 @@ public NodeHealthStatus getNodeHealthStatus() { return this.nodeStatus.getContainersStatuses(); } - public NodeHeartbeatResponse getLatestResponse() { - return this.latestResponse; - } - public List getKeepAliveAppIds() { return this.nodeStatus.getKeepAliveApplications(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 2e95ffe..107c224 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -116,7 +116,7 @@ public void containerIncreaseStatus(Container container) throws Exception { container.getResource()); List increasedConts = Collections.singletonList(container); nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts, - true, ++responseId); + true, responseId); } public RegisterNodeManagerResponse registerNode() throws Exception { @@ -161,12 +161,13 @@ public RegisterNodeManagerResponse registerNode( memory = newResource.getMemory(); vCores = newResource.getVirtualCores(); } + responseId = 0; return registrationResponse; } public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception { return nodeHeartbeat(Collections.emptyList(), - Collections.emptyList(), isHealthy, ++responseId); + Collections.emptyList(), isHealthy, responseId); } public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, @@ -179,12 +180,12 @@ public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, containerStatusList.add(containerStatus); Log.info("ContainerStatus: " + containerStatus); return nodeHeartbeat(containerStatusList, - Collections.emptyList(), true, ++responseId); + Collections.emptyList(), true, responseId); } public NodeHeartbeatResponse nodeHeartbeat(Map> conts, boolean isHealthy) throws Exception { - return nodeHeartbeat(conts, isHealthy, ++responseId); + return nodeHeartbeat(conts, isHealthy, responseId); } public NodeHeartbeatResponse nodeHeartbeat(Map updatedStats, req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey); NodeHeartbeatResponse heartbeatResponse = resourceTracker.nodeHeartbeat(req); - + responseId = heartbeatResponse.getResponseId(); + MasterKey masterKeyFromRM = heartbeatResponse.getContainerTokenMasterKey(); if (masterKeyFromRM != null && masterKeyFromRM.getKeyId() != this.currentContainerTokenMasterKey @@ -262,4 +264,8 @@ public int getvCores() { public String getVersion() { return version; } + + public void setResponseId(int id) { + this.responseId = id; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 83e901d..4bfb189 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -200,7 +200,8 @@ public NodeState getState() { } @Override - public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) { + public void setAndUpdateNodeHeartbeatResponse( + NodeHeartbeatResponse response) { } @Override @@ -241,12 +242,6 @@ public long getLastHealthReportTime() { } @Override - public void updateNodeHeartbeatResponseForContainersDecreasing( - NodeHeartbeatResponse response) { - - } - - @Override public List pullNewlyIncreasedContainers() { return Collections.emptyList(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 5a462ea..67920a0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -158,15 +158,12 @@ public void tearDown() throws Exception { private RMNodeStatusEvent getMockRMNodeStatusEvent( List containerStatus) { - NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class); - NodeHealthStatus healthStatus = mock(NodeHealthStatus.class); Boolean yes = new Boolean(true); doReturn(yes).when(healthStatus).getIsNodeHealthy(); RMNodeStatusEvent event = mock(RMNodeStatusEvent.class); doReturn(healthStatus).when(event).getNodeHealthStatus(); - doReturn(response).when(event).getLatestResponse(); doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType(); if (containerStatus != null) { doReturn(containerStatus).when(event).getContainers(); @@ -175,15 +172,12 @@ private RMNodeStatusEvent getMockRMNodeStatusEvent( } private RMNodeStatusEvent getMockRMNodeStatusEventWithRunningApps() { - NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class); - NodeHealthStatus healthStatus = mock(NodeHealthStatus.class); Boolean yes = new Boolean(true); doReturn(yes).when(healthStatus).getIsNodeHealthy(); RMNodeStatusEvent event = mock(RMNodeStatusEvent.class); doReturn(healthStatus).when(event).getNodeHealthStatus(); - doReturn(response).when(event).getLatestResponse(); doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType(); doReturn(getAppIdList()).when(event).getKeepAliveAppIds(); return event; @@ -196,15 +190,12 @@ private RMNodeStatusEvent getMockRMNodeStatusEventWithRunningApps() { } private RMNodeStatusEvent getMockRMNodeStatusEventWithoutRunningApps() { - NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class); - NodeHealthStatus healthStatus = mock(NodeHealthStatus.class); Boolean yes = new Boolean(true); doReturn(yes).when(healthStatus).getIsNodeHealthy(); RMNodeStatusEvent event = mock(RMNodeStatusEvent.class); doReturn(healthStatus).when(event).getNodeHealthStatus(); - doReturn(response).when(event).getLatestResponse(); doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType(); doReturn(null).when(event).getKeepAliveAppIds(); return event; @@ -651,7 +642,7 @@ public void testUpdateHeartbeatResponseForCleanup() { Assert.assertEquals(1, node.getContainersToCleanUp().size()); Assert.assertEquals(1, node.getAppsToCleanup().size()); NodeHeartbeatResponse hbrsp = Records.newRecord(NodeHeartbeatResponse.class); - node.updateNodeHeartbeatResponseForCleanup(hbrsp); + node.setAndUpdateNodeHeartbeatResponse(hbrsp); Assert.assertEquals(0, node.getContainersToCleanUp().size()); Assert.assertEquals(0, node.getAppsToCleanup().size()); Assert.assertEquals(1, hbrsp.getContainersToCleanup().size()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index dbcbe30..14b6dbd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -706,7 +706,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { Records.newRecord(NodeHeartbeatRequest.class); heartbeatReq.setNodeLabels(null); // Node heartbeat label update nodeStatusObject = getNodeStatusObject(nodeId); - nodeStatusObject.setResponseId(responseId+2); + nodeStatusObject.setResponseId(responseId+1); heartbeatReq.setNodeStatus(nodeStatusObject); heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse .getNMTokenMasterKey()); @@ -1908,4 +1908,31 @@ public void tearDown() { DefaultMetricsSystem.shutdown(); } } + + @Test + public void testResponseIdOverflow() throws Exception { + Configuration conf = new Configuration(); + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + + NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); + + // prepare the responseId that's about to overflow + RMNode node = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + node.getLastNodeHeartBeatResponse().setResponseId(Integer.MAX_VALUE); + + nm1.setResponseId(Integer.MAX_VALUE); + + // heartbeat twice and check responseId + nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); + Assert.assertEquals(0, nodeHeartbeat.getResponseId()); + + nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); + Assert.assertEquals(1, nodeHeartbeat.getResponseId()); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index 0c1777b..4b110c9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -167,7 +167,7 @@ public void testLogAggregationStatus() throws Exception { NodeStatus nodeStatus1 = NodeStatus.newInstance(node1.getNodeID(), 0, new ArrayList(), null, NodeHealthStatus.newInstance(true, null, 0), null, null, null); - node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null, + node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, node1ReportForApp)); List node2ReportForApp = @@ -181,7 +181,7 @@ public void testLogAggregationStatus() throws Exception { NodeStatus nodeStatus2 = NodeStatus.newInstance(node2.getNodeID(), 0, new ArrayList(), null, NodeHealthStatus.newInstance(true, null, 0), null, null, null); - node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null, + node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, node2ReportForApp)); // node1 and node2 has updated its log aggregation status // verify that the log aggregation status for node1, node2 @@ -218,7 +218,7 @@ public void testLogAggregationStatus() throws Exception { LogAggregationReport.newInstance(appId, LogAggregationStatus.RUNNING, messageForNode1_2); node1ReportForApp2.add(report1_2); - node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null, + node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, node1ReportForApp2)); // verify that the log aggregation status for node1 @@ -286,7 +286,7 @@ public void testLogAggregationStatus() throws Exception { LogAggregationStatus.SUCCEEDED, "")); // For every logAggregationReport cached in memory, we can only save at most // 10 diagnostic messages/failure messages - node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null, + node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, node1ReportForApp3)); logAggregationStatus = rmApp.getLogAggregationReportsForApp(); @@ -330,7 +330,7 @@ public void testLogAggregationStatus() throws Exception { LogAggregationStatus.FAILED, ""); node2ReportForApp2.add(report2_2); node2ReportForApp2.add(report2_3); - node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null, + node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, node2ReportForApp2)); Assert.assertEquals(LogAggregationStatus.FAILED, rmApp.getLogAggregationStatusForAppReport());