diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 3c2c09b..74efbb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -325,6 +325,8 @@ public RegisterNodeManagerResponse registerNodeManager( } else { LOG.info("Reconnect from the node at: " + host); this.nmLivelinessMonitor.unregister(nodeId); + // Reset heartbeat ID since node just restarted. + oldNode.getLastNodeHeartBeatResponse().setResponseId(0); this.rmContext .getDispatcher() .getEventHandler() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index d1e6190..e415a9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -595,8 +595,6 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { new NodeRemovedSchedulerEvent(rmNode)); if (rmNode.getHttpPort() == newNode.getHttpPort()) { - // Reset heartbeat ID since node just restarted. - rmNode.getLastNodeHeartBeatResponse().setResponseId(0); if (!rmNode.getTotalCapability().equals( newNode.getTotalCapability())) { rmNode.totalCapability = newNode.getTotalCapability(); @@ -634,9 +632,6 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode); - // Reset heartbeat ID since node just restarted. - rmNode.getLastNodeHeartBeatResponse().setResponseId(0); - for (ApplicationId appId : reconnectEvent.getRunningApplications()) { handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java index b525efc..9d53975 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -21,6 +21,9 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.NodeId; @@ -113,7 +116,7 @@ public void tearDown() { resourceTrackerService.stop(); } - @Test + @Test(timeout = 10000) public void testReconnect() throws Exception { String hostname1 = "localhost1"; Resource capability = BuilderUtils.newResource(1024, 1); @@ -139,6 +142,41 @@ public void testReconnect() throws Exception { request1.setResource(capability); Assert.assertEquals(RMNodeEventType.RECONNECTED, rmNodeEvents.get(0).getType()); + + // Simulate scenario from YARN-3896: + // The node(127.0.0.1:1234) reconnected with RM. When it registered with + // RM, RM set its lastNodeHeartbeatResponse's id to 0 asynchronously. But + // the node's heartbeat come before RM succeeded setting the id to 0. + MockRM rm = new MockRM(); + // Simulate that RM is busying with dealing with RMNodeEvent. + rm.getRMContext().getDispatcher().register(RMNodeEventType.class, + new EventHandler() { + @Override + public void handle(RMNodeEvent event) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + } + }); + rm.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + nm1.registerNode(); + int i = 0; + while(i < 3) { + nm1.nodeHeartbeat(true); + Thread.sleep(50); + i++; + } + nm1.nodeHeartbeat(true); + MockNM nm2 = + new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + nm2.registerNode(); + nm2.nodeHeartbeat(true); + Thread.sleep(100); + rm.NMwaitForState(nm2.getNodeId(), NodeState.RUNNING); + rm.stop(); } @Test