diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 7f3cf8d..97dc1bf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -399,6 +399,9 @@ protected void registerWithRM() this.context.getNMTokenSecretManager().setMasterKey(masterKey); } + // Re-register known timeline collectors. + reregisterCollectors(); + StringBuilder successfullRegistrationMsg = new StringBuilder(); successfullRegistrationMsg.append("Registered with ResourceManager as ") .append(this.nodeId); @@ -517,6 +520,32 @@ private void updateNMResource(Resource resource) { this.totalResource = resource; } + /** + * Reregister all collectors known by this node to the RM. This method is + * called when the RM needs to resync with the node. + */ + private void reregisterCollectors() { + Map knownCollectors + = this.context.getKnownCollectors(); + Map registeringCollectors + = this.context.getRegisteringCollectors(); + for (Map.Entry entry + : knownCollectors.entrySet()) { + registeringCollectors.putIfAbsent(entry.getKey(), entry.getValue()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("List of reregistering collectors: "); + for (Map.Entry entry + : registeringCollectors.entrySet()) { + AppCollectorData data = entry.getValue(); + LOG.debug(entry.getKey() + " : " + data.getCollectorAddr() + "@<" + + data.getRMIdentifier() + ", " + data.getVersion() + ">"); + } + } + knownCollectors.clear(); + } + + // Iterate through the NMContext and clone and get all the containers' // statuses. If it's a completed container, add into the // recentlyStoppedContainers collections. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 2e2bef7..c1258b8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -39,6 +40,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -60,6 +62,8 @@ private String version; private Map containerStats = new HashMap(); + private Map registeringCollectors + = new ConcurrentHashMap<>(); public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { // scale vcores based on the requested memory @@ -117,6 +121,15 @@ public void containerIncreaseStatus(Container container) throws Exception { true, ++responseId); } + public void addRegisteringCollector(ApplicationId appId, + AppCollectorData data) { + this.registeringCollectors.put(appId, data); + } + + public Map getRegisteringCollectors() { + return this.registeringCollectors; + } + public RegisterNodeManagerResponse registerNode() throws Exception { return registerNode(null, null); } @@ -223,6 +236,9 @@ public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, req.setNodeStatus(status); req.setLastKnownContainerTokenMasterKey(this.currentContainerTokenMasterKey); req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey); + + req.setRegisteringCollectors(this.registeringCollectors); + NodeHeartbeatResponse heartbeatResponse = resourceTracker.nodeHeartbeat(req); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java new file mode 100644 index 0000000..4bbecf5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class TestRMHATimelineCollectors extends RMHATestBase { + public static final Log LOG = LogFactory + .getLog(TestSubmitApplicationWithRMHA.class); + + @Before + @Override + public void setup() throws Exception { + super.setup(); + confForRM1.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + confForRM2.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + confForRM1.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + confForRM2.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + } + + @Test + public void testRebuildCollectorDataOnFailover() throws Exception { + startRMs(); + MockNM nm1 + = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); + MockNM nm2 + = new MockNM("127.0.0.1:5678", 15121, rm2.getResourceTrackerService()); + RMApp app1 = rm1.submitApp(1024); + String collectorAddr1 = "1.2.3.4:5"; + AppCollectorData data1 = AppCollectorData.newInstance( + app1.getApplicationId(), collectorAddr1); + nm1.addRegisteringCollector(app1.getApplicationId(), data1); + + String collectorAddr2 = "5.4.3.2:1"; + RMApp app2 = rm1.submitApp(1024); + AppCollectorData data2 = AppCollectorData.newInstance( + app2.getApplicationId(), collectorAddr2, rm1.getStartTime(), 1); + nm1.addRegisteringCollector(app2.getApplicationId(), data2); + + explicitFailover(); + + List runningApps = new ArrayList<>(); + runningApps.add(app1.getApplicationId()); + runningApps.add(app2.getApplicationId()); + nm1.registerNode(runningApps); + nm2.registerNode(runningApps); + + String collectorAddr12 = "1.2.3.4:56"; + AppCollectorData data12 = AppCollectorData.newInstance( + app1.getApplicationId(), collectorAddr12, rm1.getStartTime(), 0); + nm2.addRegisteringCollector(app1.getApplicationId(), data12); + + String collectorAddr22 = "5.4.3.2:10"; + AppCollectorData data22 = AppCollectorData.newInstance( + app2.getApplicationId(), collectorAddr22, rm1.getStartTime(), 2); + nm2.addRegisteringCollector(app2.getApplicationId(), data22); + + Map results1 + = nm1.nodeHeartbeat(true).getAppCollectorsMap(); + assertEquals(collectorAddr1, + results1.get(app1.getApplicationId()).getCollectorAddr()); + assertEquals(collectorAddr2, + results1.get(app2.getApplicationId()).getCollectorAddr()); + + Map results2 + = nm2.nodeHeartbeat(true).getAppCollectorsMap(); + // addr of app1 should be collectorAddr1 since it's registering (no time + // stamp). + assertEquals(collectorAddr1, + results2.get(app1.getApplicationId()).getCollectorAddr()); + // addr of app2 should be collectorAddr22 since its version number is + // greater. + assertEquals(collectorAddr22, + results2.get(app2.getApplicationId()).getCollectorAddr()); + + // Now nm1 should get updated collector list + nm1.getRegisteringCollectors().clear(); + Map results12 + = nm1.nodeHeartbeat(true).getAppCollectorsMap(); + assertEquals(collectorAddr1, + results12.get(app1.getApplicationId()).getCollectorAddr()); + assertEquals(collectorAddr22, + results12.get(app2.getApplicationId()).getCollectorAddr()); + + + } +}