diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeLabelsProvider.java new file mode 100644 index 0000000..9bcea50 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeLabelsProvider.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.nodelabels; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Date; +import java.util.TimerTask; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestConfigurationNodeLabelsProvider extends NodeLabelTestBase { + + protected static File testRootDir = new File("target", + TestConfigurationNodeLabelsProvider.class.getName() + "-localDir") + .getAbsoluteFile(); + + final static File nodeLabelsConfigFile = new File(testRootDir, + "yarn-site.xml"); + + private static XMLPathClassLoader loader; + + private ConfigurationNodeLabelsProvider nodeLabelsProvider; + + @Before + public void setup() { + loader = + new XMLPathClassLoader( + TestConfigurationNodeLabelsProvider.class.getClassLoader()); + testRootDir.mkdirs(); + + Configuration conf = getConfForNodeLabels(); + nodeLabelsProvider = new ConfigurationNodeLabelsProvider(); + nodeLabelsProvider.init(conf); + // To delay the timer to run and we call timerTask.run manually + nodeLabelsProvider.startTime=new Date().getTime()+1*60*60*1000l; + } + + @After + public void tearDown() throws Exception { + if (nodeLabelsProvider != null) { + nodeLabelsProvider.close(); + } + if (testRootDir.exists()) { + FileContext.getLocalFSFileContext().delete( + new Path(testRootDir.getAbsolutePath()), true); + } + } + + private Configuration getConfForNodeLabels() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.NM_NODE_LABELS_FROM_CONFIG, "A,B,CX"); + return conf; + } + + @Test + public void testNodeLabelsFromConfig() throws IOException, + InterruptedException { + // test for ensuring labels are set during initialization of the class + nodeLabelsProvider.start(); + Thread.sleep(5000l); // sleep so that timer has run once during + // initialization + assertCollectionEquals(toSet("A", "B", "CX"), + nodeLabelsProvider.getNodeLabels()); + + // test for valid Modification + TimerTask timerTask = nodeLabelsProvider.getTimerTask(); + modifyConfAndCallTimer(timerTask, "X,y,Z"); + assertCollectionEquals(toSet("X", "y", "Z"), + nodeLabelsProvider.getNodeLabels()); + + // test for Invalid Modification. Provider is expected to return the last + // read labels + modifyConfAndCallTimer(timerTask, "A,#Xy,Z"); + assertCollectionEquals(toSet("X", "y", "Z"), + nodeLabelsProvider.getNodeLabels()); + nodeLabelsProvider.close(); + } + + private static void modifyConfAndCallTimer(TimerTask timerTask, + String nodeLabels) throws FileNotFoundException, IOException { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.NM_NODE_LABELS_FROM_CONFIG, nodeLabels); + conf.writeXml(new FileOutputStream(nodeLabelsConfigFile)); + ClassLoader actualLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(loader); + timerTask.run(); + } finally { + Thread.currentThread().setContextClassLoader(actualLoader); + } + } + + private static class XMLPathClassLoader extends ClassLoader { + public XMLPathClassLoader(ClassLoader wrapper) { + super(wrapper); + } + + public URL getResource(String name) { + if (name.equals(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE)) { + try { + return nodeLabelsConfigFile.toURI().toURL(); + } catch (MalformedURLException e) { + e.printStackTrace(); + Assert.fail(); + } + } + return super.getResource(name); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index f5583bc..2c03cf9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -20,6 +20,10 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,6 +33,7 @@ import org.apache.hadoop.net.Node; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; @@ -97,6 +102,8 @@ private int minAllocMb; private int minAllocVcores; + private boolean isDecentralizedNodeLabelsConf; + static { resync.setNodeAction(NodeAction.RESYNC); @@ -145,7 +152,11 @@ protected void serviceInit(Configuration conf) throws Exception { minimumNodeManagerVersion = conf.get( YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION, YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION); - + + isDecentralizedNodeLabelsConf = + conf.getBoolean( + YarnConfiguration.ENABLE_DECENTRALIZED_NODELABEL_CONFIGURATION, + YarnConfiguration.DEFAULT_DECENTRALIZED_NODELABEL_CONFIGURATION_ENABLED); super.serviceInit(conf); } @@ -329,11 +340,33 @@ public RegisterNodeManagerResponse registerNodeManager( } } } + + //Update node's labels to RM's NodeLabelManager. + Set nodeLabels = Collections.EMPTY_SET; + if (isDecentralizedNodeLabelsConf) { + nodeLabels = request.getNodeLabels(); + String errorMessage = null; + Map> labelsUpdate = + new HashMap>(); + labelsUpdate.put(rmNode.getNodeID(), nodeLabels); + try { + this.rmContext.getNodeLabelManager().replaceLabelsOnNode(labelsUpdate); + response.setRMacceptNodeLabelsUpdate(true); + } catch (IOException ex) { + LOG.info("Node Labels {" + StringUtils.join(",", nodeLabels) + + "} of NodeManager from " + host + + " is not properly configured: " + ex.getMessage() + " "); + response.setDiagnosticsMessage(errorMessage); + response.setRMacceptNodeLabelsUpdate(false); + nodeLabels = Collections.EMPTY_SET; + } + } String message = "NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: " + httpPort + ") " + "registered with capability: " + capability - + ", assigned nodeId " + nodeId; + + ", assigned nodeId " + nodeId + ", node labels { " + + StringUtils.join(",", nodeLabels) + " } "; LOG.info(message); response.setNodeAction(NodeAction.NORMAL); response.setRMIdentifier(ResourceManager.getClusterTimeStamp()); @@ -415,10 +448,29 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), remoteNodeStatus.getContainersStatuses(), remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse)); + + // 5. Update node's labels to RM's NodeLabelManager. + if (isDecentralizedNodeLabelsConf && request.isNodeLabelsUpdated()) { + Set nodeLabels = request.getNodeLabels(); + Map> labelsUpdate = + new HashMap>(); + labelsUpdate.put(rmNode.getNodeID(), nodeLabels); + try { + this.rmContext.getNodeLabelManager().replaceLabelsOnNode(labelsUpdate); + nodeHeartBeatResponse.setRMacceptNodeLabelsUpdate(true); + LOG.info("Node Labels {" + StringUtils.join(",", nodeLabels) + + "} from Node " + rmNode.getNodeID() + "for were Accepted from RM"); + } catch (IOException ex) { + nodeHeartBeatResponse.setRMacceptNodeLabelsUpdate(false); + nodeHeartBeatResponse.setDiagnosticsMessage(ex.getMessage()); + LOG.info("Node Labels from Node " + rmNode.getNodeID() + + " failed to get validated by RM : " + ex.getMessage()); + } + } return nodeHeartBeatResponse; } - + private void populateKeys(NodeHeartbeatRequest request, NodeHeartbeatResponse nodeHeartBeatResponse) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 28d1d63..769eb5c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -27,6 +27,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -49,11 +50,16 @@ import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -65,7 +71,7 @@ import org.junit.Assert; import org.junit.Test; -public class TestResourceTrackerService { +public class TestResourceTrackerService extends NodeLabelTestBase{ private final static File TEMP_DIR = new File(System.getProperty( "test.build.data", "/tmp"), "decommision"); @@ -309,6 +315,398 @@ public void testNodeRegistrationSuccess() throws Exception { } @Test + public void testNodeRegistrationWithLabels() throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.setBoolean( + YarnConfiguration.ENABLE_DECENTRALIZED_NODELABEL_CONFIGURATION, true); + + final RMNodeLabelsManager lblsMgr = new MemoryRMNodeLabelsManager(); + + rm = new MockRM(conf) { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + return lblsMgr; + } + }; + rm.start(); + + try { + lblsMgr.addToCluserNodeLabels(toSet("A", "B", "C")); + } catch (IOException e) { + Assert.fail("Caught Exception while intializing"); + e.printStackTrace(); + } + + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest registerReq = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + registerReq.setResource(capability); + registerReq.setNodeId(nodeId); + registerReq.setHttpPort(1234); + registerReq.setNMVersion(YarnVersionInfo.getVersion()); + registerReq.setNodeLabels(toSet("A", "B", "C")); + RegisterNodeManagerResponse response = + resourceTrackerService.registerNodeManager(registerReq); + + Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction()); + assertCollectionEquals(lblsMgr.getNodeLabels().get(nodeId), + registerReq.getNodeLabels()); + Assert.assertTrue("Valid Node Labels were not accepted by RM", + response.getRMacceptNodeLabels()); + rm.stop(); + } + + @Test + public void testNodeRegistrationWithInvalidLabels() throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.setBoolean( + YarnConfiguration.ENABLE_DECENTRALIZED_NODELABEL_CONFIGURATION, true); + + final RMNodeLabelsManager lblsMgr = new MemoryRMNodeLabelsManager(); + + rm = new MockRM(conf) { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + return lblsMgr; + } + }; + rm.start(); + + try { + lblsMgr.addToCluserNodeLabels(toSet("X", "Y", "Z")); + } catch (IOException e) { + Assert.fail("Caught Exception while intializing"); + e.printStackTrace(); + } + + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest registerReq = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + registerReq.setResource(capability); + registerReq.setNodeId(nodeId); + registerReq.setHttpPort(1234); + registerReq.setNMVersion(YarnVersionInfo.getVersion()); + registerReq.setNodeLabels(toSet("A", "B", "C")); + RegisterNodeManagerResponse response = + resourceTrackerService.registerNodeManager(registerReq); + + Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction()); + Assert.assertNull(lblsMgr.getNodeLabels().get(nodeId)); + Assert.assertFalse("Node Labels should not accepted by RM If Invalid", + response.getRMacceptNodeLabels()); + if (rm != null) { + rm.stop(); + } + } + + @Test + public void testNodeRegistrationWithInvalidLabelsSyntax() throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.setBoolean( + YarnConfiguration.ENABLE_DECENTRALIZED_NODELABEL_CONFIGURATION, true); + + final RMNodeLabelsManager lblsMgr = new MemoryRMNodeLabelsManager(); + + rm = new MockRM(conf) { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + return lblsMgr; + } + }; + rm.start(); + + try { + lblsMgr.addToCluserNodeLabels(toSet("X", "Y", "Z")); + } catch (IOException e) { + Assert.fail("Caught Exception while intializing"); + e.printStackTrace(); + } + + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest req = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + req.setResource(capability); + req.setNodeId(nodeId); + req.setHttpPort(1234); + req.setNMVersion(YarnVersionInfo.getVersion()); + req.setNodeLabels(toSet("Y", "#B", "Z")); + RegisterNodeManagerResponse response = + resourceTrackerService.registerNodeManager(req); + + Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction()); + Assert.assertNull(lblsMgr.getNodeLabels().get(nodeId)); + Assert.assertFalse("Invalid Node Labels should not accepted by RM ", + response.getRMacceptNodeLabels()); + if (rm != null) { + rm.stop(); + } + } + + @Test + public void testNodeRegistrationWithCentralLabelConfig() throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.setBoolean( + YarnConfiguration.ENABLE_DECENTRALIZED_NODELABEL_CONFIGURATION, false); + + final RMNodeLabelsManager lblsMgr = new MemoryRMNodeLabelsManager(); + + rm = new MockRM(conf) { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + return lblsMgr; + } + }; + rm.start(); + try { + lblsMgr.addToCluserNodeLabels(toSet("A", "B", "C")); + } catch (IOException e) { + Assert.fail("Caught Exception while intializing"); + e.printStackTrace(); + } + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest req = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + req.setResource(capability); + req.setNodeId(nodeId); + req.setHttpPort(1234); + req.setNMVersion(YarnVersionInfo.getVersion()); + req.setNodeLabels(toSet("A", "B")); + RegisterNodeManagerResponse response = + resourceTrackerService.registerNodeManager(req); + // registered to RM wth central label config + Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction()); + Assert.assertNull(lblsMgr.getNodeLabels().get(nodeId)); + Assert + .assertFalse( + "Node Labels should not accepted by RM If its configured with Central configuration", + response.getRMacceptNodeLabels()); + if (rm != null) { + rm.stop(); + } + } + + + + @SuppressWarnings("unchecked") + private NodeStatus getNodeStatusObject(NodeId nodeId) { + NodeStatus status=Records.newRecord( + NodeStatus.class); + status.setNodeId(nodeId); + status.setResponseId(0); + status.setContainersStatuses(Collections.EMPTY_LIST); + status.setKeepAliveApplications(Collections.EMPTY_LIST); + return status; + } + + @Test + public void testNodeHeartBeatWithLabels() throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.setBoolean( + YarnConfiguration.ENABLE_DECENTRALIZED_NODELABEL_CONFIGURATION, true); + + final RMNodeLabelsManager lblsMgr = new MemoryRMNodeLabelsManager(); + + rm = new MockRM(conf) { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + return lblsMgr; + } + }; + rm.start(); + // adding valid labels + try { + lblsMgr.addToCluserNodeLabels(toSet("A", "B", "C")); + } catch (IOException e) { + Assert.fail("Caught Exception while intializing"); + e.printStackTrace(); + } + + //Registering of labels and other required info to RM + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest registerReq = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + registerReq.setResource(capability); + registerReq.setNodeId(nodeId); + registerReq.setHttpPort(1234); + registerReq.setNMVersion(YarnVersionInfo.getVersion()); + registerReq.setNodeLabels(toSet("A")); // Node register label + RegisterNodeManagerResponse registerResponse = + resourceTrackerService.registerNodeManager(registerReq); + + //modification of labels during heartbeat + NodeHeartbeatRequest heartbeatReq = + Records.newRecord(NodeHeartbeatRequest.class); + heartbeatReq.setNodeLabelsUpdated(true); + heartbeatReq.setNodeLabels(toSet("B", "C")); // Node heartbeat label update + heartbeatReq.setNodeStatus(getNodeStatusObject(nodeId)); + heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse + .getNMTokenMasterKey()); + heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse + .getContainerTokenMasterKey()); + NodeHeartbeatResponse nodeHeartbeatResponse = + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + Assert.assertEquals(NodeAction.NORMAL, + nodeHeartbeatResponse.getNodeAction()); + assertCollectionEquals(lblsMgr.getNodeLabels().get(nodeId), + heartbeatReq.getNodeLabels()); + Assert.assertTrue("Valid Node Labels were not accepted by RM", + nodeHeartbeatResponse.getRMacceptNodeLabelsUpdate()); + rm.stop(); + } + + @Test + public void testNodeHeartBeatWithInvalidLabels() throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.setBoolean( + YarnConfiguration.ENABLE_DECENTRALIZED_NODELABEL_CONFIGURATION, true); + + final RMNodeLabelsManager lblsMgr = new MemoryRMNodeLabelsManager(); + + rm = new MockRM(conf) { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + return lblsMgr; + } + }; + rm.start(); + + try { + lblsMgr.addToCluserNodeLabels(toSet("A", "B", "C")); + } catch (IOException e) { + Assert.fail("Caught Exception while intializing"); + e.printStackTrace(); + } + + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest registerReq = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + registerReq.setResource(capability); + registerReq.setNodeId(nodeId); + registerReq.setHttpPort(1234); + registerReq.setNMVersion(YarnVersionInfo.getVersion()); + registerReq.setNodeLabels(toSet("A")); + RegisterNodeManagerResponse registerResponse = + resourceTrackerService.registerNodeManager(registerReq); + + NodeHeartbeatRequest heartbeatReq = + Records.newRecord(NodeHeartbeatRequest.class); + heartbeatReq.setNodeLabelsUpdated(true); + heartbeatReq.setNodeLabels(toSet("B", "#C")); // Invalid heart beat labels + heartbeatReq.setNodeStatus(getNodeStatusObject(nodeId)); + heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse + .getNMTokenMasterKey()); + heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse + .getContainerTokenMasterKey()); + NodeHeartbeatResponse nodeHeartbeatResponse = + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + //response should be ok but the RMacceptNodeLabelsUpdate should be false + Assert.assertEquals(NodeAction.NORMAL, + nodeHeartbeatResponse.getNodeAction()); + assertCollectionEquals(lblsMgr.getNodeLabels().get(nodeId), + registerReq.getNodeLabels()); // no change in the labels, + Assert.assertFalse("Invalid Node Labels should not accepted by RM", + nodeHeartbeatResponse.getRMacceptNodeLabelsUpdate());// heartbeat labels + // rejected + rm.stop(); + } + + @Test + public void testNodeHeartbeatWithCentralLabelConfig() throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.setBoolean( + YarnConfiguration.ENABLE_DECENTRALIZED_NODELABEL_CONFIGURATION, false); + + final RMNodeLabelsManager lblsMgr = new MemoryRMNodeLabelsManager(); + + rm = new MockRM(conf) { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + return lblsMgr; + } + }; + rm.start(); + + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest req = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + req.setResource(capability); + req.setNodeId(nodeId); + req.setHttpPort(1234); + req.setNMVersion(YarnVersionInfo.getVersion()); + req.setNodeLabels(toSet("A", "B", "C")); + RegisterNodeManagerResponse registerResponse = + resourceTrackerService.registerNodeManager(req); + + NodeHeartbeatRequest heartbeatReq = + Records.newRecord(NodeHeartbeatRequest.class); + heartbeatReq.setNodeLabelsUpdated(true); + heartbeatReq.setNodeLabels(toSet("B", "#C")); // Invalid heart beat labels + heartbeatReq.setNodeStatus(getNodeStatusObject(nodeId)); + heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse + .getNMTokenMasterKey()); + heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse + .getContainerTokenMasterKey()); + NodeHeartbeatResponse nodeHeartbeatResponse = + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + //response should be ok but the RMacceptNodeLabelsUpdate should be false + Assert.assertEquals(NodeAction.NORMAL, + nodeHeartbeatResponse.getNodeAction()); + Assert.assertNull(lblsMgr.getNodeLabels().get(nodeId)); // no change in the + // labels, + Assert.assertFalse("Invalid Node Labels should not accepted by RM", + nodeHeartbeatResponse.getRMacceptNodeLabelsUpdate());// heartbeat labels + // rejected + if (rm != null) { + rm.stop(); + } + } + + @Test public void testNodeRegistrationVersionLessThanRM() throws Exception { writeToHostsFile("host2"); Configuration conf = new Configuration(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 59e303f..c7418ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1486,6 +1486,7 @@ */ public static final String RM_NODE_LABELS_MANAGER_CLASS = NODE_LABELS_PREFIX + "manager-class"; + /** URI for NodeLabelManager */ public static final String FS_NODE_LABELS_STORE_ROOT_DIR = NODE_LABELS_PREFIX @@ -1494,6 +1495,25 @@ NODE_LABELS_PREFIX + "fs-store.retry-policy-spec"; public static final String DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC = "2000, 500"; + public static final String ENABLE_DECENTRALIZED_NODELABEL_CONFIGURATION = + NODE_LABELS_PREFIX + "decentralized-configuration.enabled"; + public static final boolean DEFAULT_DECENTRALIZED_NODELABEL_CONFIGURATION_ENABLED = + false; + + /** Configurations in NodeManager for NodeLabelsFeature*/ + public static final String NM_NODE_LABELS_PREFIX = NODE_LABELS_PREFIX + + "nm."; + public static final String NM_NODE_LABELS_PROVIDER_CLASS = + NM_NODE_LABELS_PREFIX + "labels-provider.class"; + public static final String NM_NODE_LABELS_FETCH_INTERVAL_MS = + NM_NODE_LABELS_PREFIX + "interval-ms"; + + public static final String NM_NODE_LABELS_FROM_CONFIG = NM_NODE_LABELS_PREFIX + + "from-config"; + + public static final long DEFAULT_NM_LABELS_FETCH_INTERVAL_MS = 10 * 60 * 1000; + + /** End of Configurations in NodeManager for NodeLabelsFeature*/ public YarnConfiguration() { super(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java index 8885769..1e4fcdc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java @@ -60,7 +60,7 @@ public void testResourceTrackerOnHA() throws Exception { // make sure registerNodeManager works when failover happens RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, 0, resource, - YarnVersionInfo.getVersion(), null, null); + YarnVersionInfo.getVersion(), null, null,null); resourceTracker.registerNodeManager(request); Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId)); @@ -70,7 +70,7 @@ public void testResourceTrackerOnHA() throws Exception { NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null, null, null); NodeHeartbeatRequest request2 = - NodeHeartbeatRequest.newInstance(status, null, null); + NodeHeartbeatRequest.newInstance(status, null, null,null,false); resourceTracker.nodeHeartbeat(request2); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java index ee1b945..cd63a83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java @@ -637,7 +637,7 @@ public void replaceLabelsOnNode(Map> replaceLabelsToNode) } } - private void checkAndThrowLabelName(String label) throws IOException { + public static void checkAndThrowLabelName(String label) throws IOException { if (label == null || label.isEmpty() || label.length() > MAX_LABEL_LENGTH) { throw new IOException("label added is empty or exceeds " + MAX_LABEL_LENGTH + " character(s)"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java index 9749299..ac5955c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java @@ -50,11 +50,7 @@ public static void assertMapContains(Map> m1, public static void assertCollectionEquals(Collection c1, Collection c2) { Assert.assertEquals(c1.size(), c2.size()); - Iterator i1 = c1.iterator(); - Iterator i2 = c2.iterator(); - while (i1.hasNext()) { - Assert.assertEquals(i1.next(), i2.next()); - } + Assert.assertTrue(c1.containsAll(c2)); } public static Set toSet(E... elements) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index addd3fe..747c930 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; +import java.util.Set; + import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.util.Records; @@ -26,7 +28,8 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, MasterKey lastKnownContainerTokenMasterKey, - MasterKey lastKnownNMTokenMasterKey) { + MasterKey lastKnownNMTokenMasterKey, Set nodeLabels, + boolean isNodeLabelsUpdated) { NodeHeartbeatRequest nodeHeartbeatRequest = Records.newRecord(NodeHeartbeatRequest.class); nodeHeartbeatRequest.setNodeStatus(nodeStatus); @@ -34,6 +37,8 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, .setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey); nodeHeartbeatRequest .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey); + nodeHeartbeatRequest.setNodeLabels(nodeLabels); + nodeHeartbeatRequest.setNodeLabelsUpdated(isNodeLabelsUpdated); return nodeHeartbeatRequest; } @@ -45,4 +50,10 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, public abstract MasterKey getLastKnownNMTokenMasterKey(); public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey); + + public abstract Set getNodeLabels(); + public abstract void setNodeLabels(Set nodeLabels); + + public abstract boolean isNodeLabelsUpdated(); + public abstract void setNodeLabelsUpdated(boolean isNodeLabelsUpdated); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 12e1f54..0432ed5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -58,4 +58,7 @@ String getDiagnosticsMessage(); void setDiagnosticsMessage(String diagnosticsMessage); + + boolean getRMacceptNodeLabelsUpdate(); + void setRMacceptNodeLabelsUpdate(boolean rmAcceptNodeLabelsUpdate); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index 0e3d7e4..011055e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import java.util.List; +import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -30,7 +31,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, int httpPort, Resource resource, String nodeManagerVersionId, List containerStatuses, - List runningApplications) { + List runningApplications,Set nodeLabels) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -39,6 +40,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, request.setNMVersion(nodeManagerVersionId); request.setContainerStatuses(containerStatuses); request.setRunningApplications(runningApplications); + request.setNodeLabels(nodeLabels); return request; } @@ -47,6 +49,8 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, public abstract Resource getResource(); public abstract String getNMVersion(); public abstract List getNMContainerStatuses(); + public abstract Set getNodeLabels(); + public abstract void setNodeLabels(Set nodeLabels); /** * We introduce this here because currently YARN RM doesn't persist nodes info diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java index b20803f..efd9fcc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java @@ -45,4 +45,7 @@ void setRMVersion(String version); String getRMVersion(); + + boolean getRMacceptNodeLabels(); + void setRMacceptNodeLabelsUpdate(boolean rmAcceptNodeLabels); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index 26d1f19..1bcc34f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; +import java.util.HashSet; +import java.util.Set; + import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; @@ -36,6 +39,7 @@ private NodeStatus nodeStatus = null; private MasterKey lastKnownContainerTokenMasterKey = null; private MasterKey lastKnownNMTokenMasterKey = null; + private Set labels = null; public NodeHeartbeatRequestPBImpl() { builder = NodeHeartbeatRequestProto.newBuilder(); @@ -80,6 +84,10 @@ private void mergeLocalToBuilder() { builder.setLastKnownNmTokenMasterKey( convertToProtoFormat(this.lastKnownNMTokenMasterKey)); } + if (this.labels != null && !this.labels.isEmpty()) { + builder.clearNodeLabels(); + builder.addAllNodeLabels(this.labels); + } } private void mergeLocalToProto() { @@ -178,4 +186,40 @@ private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) { private MasterKeyProto convertToProtoFormat(MasterKey t) { return ((MasterKeyPBImpl)t).getProto(); } + + @Override + public Set getNodeLabels() { + initNodeLabels(); + return labels; + } + + private void initNodeLabels(){ + if (this.labels != null) { + return; + } + NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; + labels = new HashSet(p.getNodeLabelsList()); + } + + @Override + public void setNodeLabels(Set nodeLabels) { + maybeInitBuilder(); + if (labels == null || labels.isEmpty()) { + builder.clearNodeLabels(); + } + this.labels = nodeLabels; + } + + @Override + public boolean isNodeLabelsUpdated() { + NodeHeartbeatRequestProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getNodeLabelsUpdated(); + } + + @Override + public void setNodeLabelsUpdated(boolean isNodeLabelsUpdated) { + maybeInitBuilder(); + this.builder.setNodeLabelsUpdated(isNodeLabelsUpdated); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 78979d5..878b091 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -19,8 +19,8 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; import java.util.ArrayList; -import java.util.List; import java.util.Iterator; +import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -430,5 +430,18 @@ private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) { private MasterKeyProto convertToProtoFormat(MasterKey t) { return ((MasterKeyPBImpl) t).getProto(); } + + @Override + public boolean getRMacceptNodeLabelsUpdate() { + NodeHeartbeatResponseProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getRmAcceptNodeLabelsUpdate(); + } + + @Override + public void setRMacceptNodeLabelsUpdate(boolean rmAcceptNodeLabelsUpdate) { + maybeInitBuilder(); + this.builder.setRmAcceptNodeLabelsUpdate(rmAcceptNodeLabelsUpdate); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index ce4faec..2c58eab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -20,23 +20,18 @@ import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; @@ -56,6 +51,7 @@ private NodeId nodeId = null; private List containerStatuses = null; private List runningApplications = null; + private Set labels = null; public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); @@ -86,7 +82,10 @@ private void mergeLocalToBuilder() { if (this.nodeId != null) { builder.setNodeId(convertToProtoFormat(this.nodeId)); } - + if (this.labels != null && !this.labels.isEmpty()) { + builder.clearNodeLabels(); + builder.addAllNodeLabels(this.labels); + } } private synchronized void addNMContainerStatusesToProto() { @@ -292,6 +291,29 @@ public void setNMVersion(String version) { builder.setNmVersion(version); } + @Override + public Set getNodeLabels() { + initNodeLabels(); + return labels; + } + + private void initNodeLabels(){ + if (this.labels != null) { + return; + } + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + labels = new HashSet(p.getNodeLabelsList()); + } + + @Override + public void setNodeLabels(Set nodeLabels) { + maybeInitBuilder(); + if (labels == null || labels.isEmpty()) { + builder.clearNodeLabels(); + } + this.labels = nodeLabels; + } + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { return new ApplicationIdPBImpl(p); } @@ -323,4 +345,5 @@ private NMContainerStatusPBImpl convertFromProtoFormat(NMContainerStatusProto c) private NMContainerStatusProto convertToProtoFormat(NMContainerStatus c) { return ((NMContainerStatusPBImpl)c).getProto(); } + } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java index ac329ed..5ae33f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java @@ -216,4 +216,17 @@ private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) { private MasterKeyProto convertToProtoFormat(MasterKey t) { return ((MasterKeyPBImpl)t).getProto(); } + + @Override + public boolean getRMacceptNodeLabels() { + RegisterNodeManagerResponseProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + return p.getRmAcceptNodeLabels(); + } + + @Override + public void setRMacceptNodeLabelsUpdate(boolean rmAcceptNodeLabels) { + maybeInitBuilder(); + this.builder.setRmAcceptNodeLabels(rmAcceptNodeLabels); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index d0990fb..f3263f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -32,6 +32,7 @@ message RegisterNodeManagerRequestProto { optional string nm_version = 5; repeated NMContainerStatusProto container_statuses = 6; repeated ApplicationIdProto runningApplications = 7; + repeated string nodeLabels = 8; } message RegisterNodeManagerResponseProto { @@ -41,12 +42,15 @@ message RegisterNodeManagerResponseProto { optional int64 rm_identifier = 4; optional string diagnostics_message = 5; optional string rm_version = 6; + optional bool rmAcceptNodeLabels = 7 [default = false]; } message NodeHeartbeatRequestProto { optional NodeStatusProto node_status = 1; optional MasterKeyProto last_known_container_token_master_key = 2; optional MasterKeyProto last_known_nm_token_master_key = 3; + repeated string nodeLabels = 4; + optional bool nodeLabelsUpdated = 5 [default = false]; } message NodeHeartbeatResponseProto { @@ -59,6 +63,7 @@ message NodeHeartbeatResponseProto { optional int64 nextHeartBeatInterval = 7; optional string diagnostics_message = 8; repeated ContainerIdProto containers_to_be_removed_from_nm = 9; + optional bool rmAcceptNodeLabelsUpdate = 10 [default = false]; } message NMContainerStatusProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index da25aa2..66738b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -36,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; @@ -46,6 +48,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl; +import org.junit.Assert; import org.junit.Test; /** @@ -89,11 +92,28 @@ public void testNodeHeartbeatRequestPBImpl() { original.setLastKnownContainerTokenMasterKey(getMasterKey()); original.setLastKnownNMTokenMasterKey(getMasterKey()); original.setNodeStatus(getNodeStatus()); + original.setNodeLabels(getValidNodeLabels()); NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl( original.getProto()); assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId()); assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost()); + assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost()); + // check labels are coming with valid values + Assert.assertEquals(true, + original.getNodeLabels().containsAll(copy.getNodeLabels())); + } + + /** + * Test NodeHeartbeatRequestPBImpl. + */ + @Test + public void testNodeHeartbeatRequestPBImplWithNullLabels() { + NodeHeartbeatRequestPBImpl original = new NodeHeartbeatRequestPBImpl(); + original.setNodeLabels(null); + NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl( + original.getProto()); + Assert.assertEquals(0,copy.getNodeLabels().size()); } /** @@ -119,6 +139,16 @@ public void testNodeHeartbeatResponsePBImpl() { assertEquals(1, copy.getContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getNMTokenMasterKey().getKeyId()); assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage()); + assertEquals(false, copy.getRMacceptNodeLabelsUpdate()); + } + + @Test + public void testNodeHeartbeatResponsePBImplWithRMAcceptLbls() { + NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl(); + original.setRMacceptNodeLabelsUpdate(true); + NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl( + original.getProto()); + assertEquals(true, copy.getRMacceptNodeLabelsUpdate()); } /** @@ -207,6 +237,65 @@ public void testNodeStatusPBImpl() { assertEquals(1, copy.getResponseId()); } + + + @Test + public void testRegisterNodeManagerRequestWithLabels() { + RegisterNodeManagerRequest request = + RegisterNodeManagerRequest.newInstance( + NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0), + "version", null, null, new HashSet()); + + // serialze to proto, and get request from proto + RegisterNodeManagerRequest request1 = + new RegisterNodeManagerRequestPBImpl( + ((RegisterNodeManagerRequestPBImpl) request).getProto()); + + // check labels are coming with no values + Assert.assertEquals(0, request1.getNodeLabels().size()); + } + + @Test + public void testRegisterNodeManagerRequestWithNullLabels() { + RegisterNodeManagerRequest request = + RegisterNodeManagerRequest.newInstance( + NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0), + "version", null, null, new HashSet()); + + // serialze to proto, and get request from proto + RegisterNodeManagerRequest request1 = + new RegisterNodeManagerRequestPBImpl( + ((RegisterNodeManagerRequestPBImpl) request).getProto()); + + // check labels are coming with no values + Assert.assertEquals(0, request1.getNodeLabels().size()); + } + + @Test + public void testRegisterNodeManagerRequestWithValidLabels() { + HashSet nodeLabels = getValidNodeLabels(); + RegisterNodeManagerRequest request = + RegisterNodeManagerRequest.newInstance( + NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0), + "version", null, null, nodeLabels); + + // serialze to proto, and get request from proto + RegisterNodeManagerRequest request1 = + new RegisterNodeManagerRequestPBImpl( + ((RegisterNodeManagerRequestPBImpl) request).getProto()); + + // check labels are coming with valid values + Assert.assertEquals(true,nodeLabels.containsAll(request1.getNodeLabels())); + } + + private HashSet getValidNodeLabels() { + HashSet nodeLabels = new HashSet(); + nodeLabels.add("java"); + nodeLabels.add("windows"); + nodeLabels.add("gpu"); + nodeLabels.add("x86"); + return nodeLabels; + } private ContainerStatus getContainerStatus(int applicationId, int containerID, int appAttemptId) { @@ -272,4 +361,6 @@ private MasterKey getMasterKey() { return key; } + + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java index 7165445..7f5e4db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -29,8 +30,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; -import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; import org.junit.Assert; @@ -78,7 +77,7 @@ public void testRegisterNodeManagerRequest() { RegisterNodeManagerRequest.newInstance( NodeId.newInstance("1.1.1.1", 1000), 8080, Resource.newInstance(1024, 1), "NM-version-id", reports, - Arrays.asList(appId)); + Arrays.asList(appId),new HashSet()); RegisterNodeManagerRequest requestProto = new RegisterNodeManagerRequestPBImpl( ((RegisterNodeManagerRequestPBImpl) request).getProto()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java index fdacd92..672b9a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import java.util.Arrays; +import java.util.HashSet; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -44,7 +45,7 @@ public void testRegisterNodeManagerRequest() { ContainerState.RUNNING, Resource.newInstance(1024, 1), "good", -1, Priority.newInstance(0), 1234)), Arrays.asList( ApplicationId.newInstance(1234L, 1), - ApplicationId.newInstance(1234L, 2))); + ApplicationId.newInstance(1234L, 2)),new HashSet()); // serialze to proto, and get request from proto RegisterNodeManagerRequest request1 = @@ -67,8 +68,9 @@ public void testRegisterNodeManagerRequest() { @Test public void testRegisterNodeManagerRequestWithNullArrays() { RegisterNodeManagerRequest request = - RegisterNodeManagerRequest.newInstance(NodeId.newInstance("host", 1234), - 1234, Resource.newInstance(0, 0), "version", null, null); + RegisterNodeManagerRequest.newInstance( + NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0), + "version", null, null, new HashSet()); // serialze to proto, and get request from proto RegisterNodeManagerRequest request1 = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 43770c1..c4c98b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -53,6 +53,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProviderService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -75,6 +77,7 @@ protected final NodeManagerMetrics metrics = NodeManagerMetrics.create(); private ApplicationACLsManager aclsManager; private NodeHealthCheckerService nodeHealthChecker; + private NodeLabelsProviderService nodeLabelsProviderService; private LocalDirsHandlerService dirsHandler; private Context context; private AsyncDispatcher dispatcher; @@ -92,8 +95,34 @@ public NodeManager() { protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, + metrics,nodeLabelsProviderService); + } + + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService nodeLabelsProviderService) { return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, - metrics); + metrics,nodeLabelsProviderService); + } + + /** + * Useful for testing hence making it static method + */ + static NodeLabelsProviderService createNodeLabelsProviderService( + Configuration conf) throws InstantiationException, IllegalAccessException { + NodeLabelsProviderService provider = null; + if (conf + .getBoolean( + YarnConfiguration.ENABLE_DECENTRALIZED_NODELABEL_CONFIGURATION, + YarnConfiguration.DEFAULT_DECENTRALIZED_NODELABEL_CONFIGURATION_ENABLED)) { + Class labelsProviderClass = + conf.getClass(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CLASS, + ConfigurationNodeLabelsProvider.class, + NodeLabelsProviderService.class); + provider = labelsProviderClass.newInstance(); + } + return provider; } protected NodeResourceMonitor createNodeResourceMonitor() { @@ -220,8 +249,17 @@ protected void serviceInit(Configuration conf) throws Exception { this.context = createNMContext(containerTokenSecretManager, nmTokenSecretManager, nmStore); - nodeStatusUpdater = - createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); + nodeLabelsProviderService = createNodeLabelsProviderService(conf); + + if (null == nodeLabelsProviderService) { + nodeStatusUpdater = + createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); + } else { + addService(nodeLabelsProviderService); + nodeStatusUpdater = + createNodeStatusUpdater(context, dispatcher, nodeHealthChecker, + nodeLabelsProviderService); + } NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor(); addService(nodeResourceMonitor); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index bed58f5..d9d3a21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -62,10 +63,11 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProviderService; import org.apache.hadoop.yarn.util.YarnVersionInfo; import com.google.common.annotations.VisibleForTesting; @@ -112,10 +114,19 @@ private Thread statusUpdater; private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER; + private final NodeLabelsProviderService nodeLabelsProvider; + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + this(context, dispatcher, healthChecker, metrics, null); + } + + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsProviderService nodeLabelsProviderService) { super(NodeStatusUpdaterImpl.class.getName()); this.healthChecker = healthChecker; + this.nodeLabelsProvider = nodeLabelsProviderService; this.context = context; this.dispatcher = dispatcher; this.metrics = metrics; @@ -243,9 +254,19 @@ protected ResourceTracker getRMClient() throws IOException { protected void registerWithRM() throws YarnException, IOException { List containerReports = getNMContainerStatuses(); + @SuppressWarnings("unchecked") + Set nodeLabels = + (nodeLabelsProvider == null) ? Collections.EMPTY_SET + : nodeLabelsProvider.getNodeLabels(); RegisterNodeManagerRequest request = - RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, - nodeManagerVersionId, containerReports, getRunningApplications()); + RegisterNodeManagerRequest.newInstance( + nodeId, + httpPort, + totalResource, + nodeManagerVersionId, + containerReports, + getRunningApplications(), + (nodeLabelsProvider == null) ? null : nodeLabels); if (containerReports != null) { LOG.info("Registering with RM using containers :" + containerReports); } @@ -295,6 +316,12 @@ protected void registerWithRM() if (masterKey != null) { this.context.getNMTokenSecretManager().setMasterKey(masterKey); } + + if (!regNMResponse.getRMacceptNodeLabels()) { + LOG.info("Failed to register Nodelabels {" + + StringUtils.join(", ", nodeLabels) + "} for NodeManager " + + this.nodeId); + } LOG.info("Registered with ResourceManager as " + this.nodeId + " with total resource of " + this.totalResource); @@ -532,18 +559,36 @@ protected void startStatusUpdater() { @SuppressWarnings("unchecked") public void run() { int lastHeartBeatID = 0; + Set nodeLabelsLastReceived = + nodeLabelsProvider.getNodeLabels(); + Set nodeLabelsForHeartBeat = null; + boolean nodeLabelsUpdated = false; while (!isStopped) { // Send heartbeat try { NodeHeartbeatResponse response = null; NodeStatus nodeStatus = getNodeStatus(lastHeartBeatID); + nodeLabelsForHeartBeat = null; + if (nodeLabelsProvider!=null) { + nodeLabelsForHeartBeat = + nodeLabelsProvider.getNodeLabels(); + if (areLabelsSameAsPrevOutPut(nodeLabelsForHeartBeat, + nodeLabelsLastReceived)) { + nodeLabelsUpdated = false; + } else { + nodeLabelsUpdated = true; + nodeLabelsLastReceived = nodeLabelsForHeartBeat; + } + } + NodeHeartbeatRequest request = NodeHeartbeatRequest.newInstance(nodeStatus, - NodeStatusUpdaterImpl.this.context - .getContainerTokenSecretManager().getCurrentKey(), - NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager() - .getCurrentKey()); + NodeStatusUpdaterImpl.this.context + .getContainerTokenSecretManager().getCurrentKey(), + NodeStatusUpdaterImpl.this.context + .getNMTokenSecretManager().getCurrentKey(), + nodeLabelsForHeartBeat, nodeLabelsUpdated); response = resourceTracker.nodeHeartbeat(request); //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); @@ -572,6 +617,19 @@ public void run() { new NodeManagerEvent(NodeManagerEventType.RESYNC)); break; } + + if (nodeLabelsUpdated) { + if (response.getRMacceptNodeLabelsUpdate()) { + LOG.info("Node Labels {" + + StringUtils.join(",", nodeLabelsForHeartBeat) + + "} were Accepted from RM "); + } else { + LOG.info("Node Labels {" + + StringUtils.join(",", nodeLabelsForHeartBeat) + + "} were rejected from RM. Please check RM logs for " + + "more details "); + } + } // Explicitly put this method after checking the resync response. We // don't want to remove the completed containers before resync @@ -622,6 +680,14 @@ public void run() { } } } + + private boolean areLabelsSameAsPrevOutPut(Set nodeLabelsNew, + Set nodeLabelsOld) { + if (nodeLabelsNew.size() != nodeLabelsOld.size()) { + return false; + } + return nodeLabelsOld.containsAll(nodeLabelsNew); + } private void updateMasterKeys(NodeHeartbeatResponse response) { // See if the master-key has rolled over diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java new file mode 100644 index 0000000..bc86a79 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.nodelabels; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; + +/** + * Provides Node's Labels by constantly monitoring the configuration. + */ +public class ConfigurationNodeLabelsProvider extends NodeLabelsProviderService { + + private static final Log LOG = LogFactory + .getLog(ConfigurationNodeLabelsProvider.class); + + /** Timer used to schedule node health monitoring script execution */ + private Timer configurationMonitorTimer; + + public ConfigurationNodeLabelsProvider() { + super("Configuration Based NodeLabels Provider Service"); + } + + private Set nodeLabels = new HashSet(); + private long intervalTime; + //for testing purpose + long startTime=0; + + @Override + public Set getNodeLabels() { + readLock.lock(); + try { + return nodeLabels; + } finally { + readLock.unlock(); + } + } + + protected Lock readLock; + protected Lock writeLock; + + private TimerTask timerTask; + + + public TimerTask getTimerTask() { + return timerTask; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.intervalTime = + conf.getLong(YarnConfiguration.NM_NODE_LABELS_FETCH_INTERVAL_MS, + YarnConfiguration.DEFAULT_NM_LABELS_FETCH_INTERVAL_MS); + ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + readLock = readWriteLock.readLock(); + writeLock = readWriteLock.writeLock(); + checkForNodeLabelModification(conf); + } + + @Override + protected void serviceStart() { + configurationMonitorTimer = + new Timer("Node Labels Configuration Monitor", true); + timerTask = new ConfigurationMonitorTimerTask(); + configurationMonitorTimer.scheduleAtFixedRate(timerTask, startTime, + intervalTime); + } + + private void checkForNodeLabelModification(Configuration conf) { + String confLabelString = + conf.get(YarnConfiguration.NM_NODE_LABELS_FROM_CONFIG, ""); + String[] nodeLabelsFromConfiguration = + (confLabelString == null || confLabelString.isEmpty()) ? new String[] {} + : StringUtils.getStrings(confLabelString); + boolean validLabels = true; + StringBuffer errorMsg = new StringBuffer(""); + for (int i = 0; i < nodeLabelsFromConfiguration.length; i++) { + try { + CommonNodeLabelsManager + .checkAndThrowLabelName(nodeLabelsFromConfiguration[i]); + } catch (IOException e) { + validLabels = false; + errorMsg.append(e.getMessage()); + errorMsg.append(" , "); + } + } + if (validLabels) { + writeLock.lock(); + try { + nodeLabels = + new HashSet(Arrays.asList(nodeLabelsFromConfiguration)); + } finally { + writeLock.unlock(); + } + } else { + if (LOG.isDebugEnabled()) + LOG.debug(errorMsg.toString()); + } + } + + private class ConfigurationMonitorTimerTask extends TimerTask { + @Override + public void run() { + try { + checkForNodeLabelModification(new YarnConfiguration()); + } catch (Exception e) { + e.printStackTrace(); + } + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProviderService.java new file mode 100644 index 0000000..e6cc30d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProviderService.java @@ -0,0 +1,40 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.nodelabels; + +import java.util.Set; + +import org.apache.hadoop.service.AbstractService; + +/** + * Interface which will be responsible for fetching the labels + * + */ +public abstract class NodeLabelsProviderService extends AbstractService{ + + public NodeLabelsProviderService(String name) { + super(name); + } + + /** + * Provides the labels + * @return + */ + public abstract Set getNodeLabels(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java new file mode 100644 index 0000000..df7ba09 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Set; + +import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; +import org.apache.hadoop.yarn.server.api.ResourceTracker; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProviderService; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase { + private static final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + private NodeManager nm; + + @Before + public void setUp() { + } + + public static MasterKey createMasterKey() { + MasterKey masterKey = new MasterKeyPBImpl(); + masterKey.setKeyId(123); + masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123) + .byteValue() })); + return masterKey; + } + + @After + public void tearDown() { + if (null != nm) { + ServiceOperations.stop(nm); + } + } + + private class ResourceTrackerForLabels implements ResourceTracker { + int heartBeatID = 0; + Set labels; + boolean nodeLabelsUpdated; + + @Override + public RegisterNodeManagerResponse registerNodeManager( + RegisterNodeManagerRequest request) throws YarnException, IOException { + labels = request.getNodeLabels(); + RegisterNodeManagerResponse response = + recordFactory.newRecordInstance(RegisterNodeManagerResponse.class); + response.setNodeAction(NodeAction.NORMAL); + response.setContainerTokenMasterKey(createMasterKey()); + response.setNMTokenMasterKey(createMasterKey()); + return response; + } + + @Override + public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) + throws YarnException, IOException { + labels = request.getNodeLabels(); + nodeLabelsUpdated = request.isNodeLabelsUpdated(); + NodeStatus nodeStatus = request.getNodeStatus(); + nodeStatus.setResponseId(heartBeatID++); + + NodeHeartbeatResponse nhResponse = + YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID, + NodeAction.NORMAL, null, null, null, null, 1000L); + // to ensure that heartbeats are sent only when required. + nhResponse.setNextHeartBeatInterval(Long.MAX_VALUE); + nhResponse.setRMacceptNodeLabelsUpdate(nodeLabelsUpdated); + return nhResponse; + } + } + + public static class DummyNodeLabelsProvider extends NodeLabelsProviderService { + + @SuppressWarnings("unchecked") + private Set nodeLabels = Collections.EMPTY_SET; + + public DummyNodeLabelsProvider() { + super(DummyNodeLabelsProvider.class.getName()); + } + + @Override + public synchronized Set getNodeLabels() { + return nodeLabels; + } + + synchronized void setNodeLabels(Set nodeLabels) { + this.nodeLabels = nodeLabels; + } + } + + private YarnConfiguration createNMConfig() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean( + YarnConfiguration.ENABLE_DECENTRALIZED_NODELABEL_CONFIGURATION, true); + conf.setClass(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CLASS, + DummyNodeLabelsProvider.class, NodeLabelsProviderService.class); + return conf; + } + + private YarnConfiguration createNMConfigWithDistLabelsDisabled() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean( + YarnConfiguration.ENABLE_DECENTRALIZED_NODELABEL_CONFIGURATION, false); + return conf; + } + + private YarnConfiguration createNMConfigWithDistLabelsWithoutClass() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean( + YarnConfiguration.ENABLE_DECENTRALIZED_NODELABEL_CONFIGURATION, true); + return conf; + } + + protected DummyNodeLabelsProvider labelsProviderRef; + + @Test + public void testCreationOfNodeLabelsProviderService() + throws InterruptedException { + try { + NodeLabelsProviderService labelsProviderService = + NodeManager + .createNodeLabelsProviderService(createNMConfigWithDistLabelsDisabled()); + Assert.assertNull( + "labelsProviderService should not be initialized in Centralized Node Labels configuration ", + labelsProviderService); + } catch (Exception e) { + Assert.fail("Exception caught"); + e.printStackTrace(); + } + } + + @Test + public void testCreationOfNodeLabelsProviderWithoutClassConfig() + throws InterruptedException { + try { + NodeLabelsProviderService labelsProviderService = + NodeManager + .createNodeLabelsProviderService(createNMConfigWithDistLabelsWithoutClass()); + Assert + .assertTrue( + "Distribution Node Labels should be disabled as per configuration", + (labelsProviderService != null && labelsProviderService instanceof ConfigurationNodeLabelsProvider)); + } catch (Exception e) { + Assert.fail("Exception caught"); + e.printStackTrace(); + } + } + + @Test + public void testNMRegistrationWithLabels() throws InterruptedException { + final ResourceTrackerForLabels resourceTracker = + new ResourceTrackerForLabels(); + nm = new NodeManager() { + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsProviderService labelsProvider) { + + Assert.assertTrue( + "NodeLabelsProviderService not set as per Configuration", + labelsProvider instanceof DummyNodeLabelsProvider); + + labelsProviderRef = (DummyNodeLabelsProvider) labelsProvider; + + return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, + metrics, labelsProvider) { + @Override + protected ResourceTracker getRMClient() { + return resourceTracker; + } + + @Override + protected void stopRMProxy() { + return; + } + }; + } + }; + + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + new Thread() { + public void run() { + try { + nm.start(); + } catch (Throwable e) { + Assert.fail("Exception caught"); + throw new YarnRuntimeException(e); + } + } + }.start(); + Thread.sleep(5000l); + assertCollectionEquals(resourceTracker.labels, + labelsProviderRef.getNodeLabels()); + + // heartbeat with updated labels + labelsProviderRef.setNodeLabels(toSet("P", "X", "z")); + nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); + Thread.sleep(5000l); + Assert.assertTrue("Labels needs to be updated along with the heartbeat", + resourceTracker.nodeLabelsUpdated); + assertCollectionEquals(resourceTracker.labels, + labelsProviderRef.getNodeLabels()); + + // heartbeat without updating labels + nm.getNodeStatusUpdater().sendOutofBandHeartBeat(); + Thread.sleep(5000l); + Assert + .assertFalse( + "No change in Labels from the NM's heartbeat but heartbeat request had nodeLabelsUpdated set to true", + resourceTracker.nodeLabelsUpdated); + + nm.stop(); + labelsProviderRef = null; + } +}