diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 54cbf8b0fbd..83da58d5cbb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3343,9 +3343,12 @@ public static boolean areNodeLabelsEnabled( public static final String NM_NODE_LABELS_PROVIDER_CONFIG = NM_NODE_LABELS_PREFIX + "provider"; + public static final String NM_NODE_ATTRIBUTES_PROVIDER_CONFIG = + NM_NODE_ATTRIBUTES_PREFIX + "provider"; + // whitelist names for the yarn.nodemanager.node-labels.provider - public static final String CONFIG_NODE_LABELS_PROVIDER = "config"; - public static final String SCRIPT_NODE_LABELS_PROVIDER = "script"; + public static final String CONFIG_NODE_DESCRIPTOR_PROVIDER = "config"; + public static final String SCRIPT_NODE_DESCRIPTOR_PROVIDER = "script"; private static final String NM_NODE_LABELS_PROVIDER_PREFIX = NM_NODE_LABELS_PREFIX + "provider."; @@ -3377,6 +3380,9 @@ public static boolean areNodeLabelsEnabled( public static final String NM_PROVIDER_CONFIGURED_NODE_PARTITION = NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-partition"; + public static final String NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES = + NM_NODE_ATTRIBUTES_PROVIDER_PREFIX + "configured-node-attributes"; + private static final String RM_NODE_LABELS_PREFIX = RM_PREFIX + "node-labels."; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 684674bb5c7..8c514ff6848 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2773,6 +2773,20 @@ + + + This property determines which provider will be plugged by the + node manager to collect node-attributes. Administrators can + configure "config", "script" or the class name of the provider. + Configured class needs to extend + org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider. + If "config" is configured, then "ConfigurationNodeLabelsProvider" and if + "script" is configured, then "ScriptBasedNodeAttributesProvider" + will be used. + + yarn.nodemanager.node-attributes.provider + + The node attribute script NM runs to collect node attributes. @@ -2810,6 +2824,16 @@ 1200000 + + + When "yarn.nodemanager.node-attributes.provider" is configured with + "config" then ConfigurationNodeAttributesProvider fetches node attributes + from this parameter. + + yarn.nodemanager.node-attributes.provider.configured-node-attributes + + + Timeout in seconds for YARN node graceful decommission. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 5cacd20c61e..3ee3ea235ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -63,6 +63,9 @@ import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeLabelsProvider; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeAttributesProvider; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeAttributesProvider; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -120,6 +123,7 @@ public int getExitCode() { private ApplicationACLsManager aclsManager; private NodeHealthCheckerService nodeHealthChecker; private NodeLabelsProvider nodeLabelsProvider; + private NodeAttributesProvider nodeAttributesProvider; private LocalDirsHandlerService dirsHandler; private Context context; private AsyncDispatcher dispatcher; @@ -157,14 +161,46 @@ public static long getNMStartupTime() { protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, - metrics, nodeLabelsProvider); + metrics); } - protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker, - NodeLabelsProvider nodeLabelsProvider) { - return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, - metrics, nodeLabelsProvider); + protected NodeAttributesProvider createNodeAttributesProvider( + Configuration conf) throws IOException { + NodeAttributesProvider attributesProvider = null; + String providerString = + conf.get(YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_CONFIG, null); + if (providerString == null || providerString.trim().length() == 0) { + // Seems like Distributed Node Labels configuration is not enabled + return attributesProvider; + } + switch (providerString.trim().toLowerCase()) { + case YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER: + attributesProvider = new ConfigurationNodeAttributesProvider(); + break; + case YarnConfiguration.SCRIPT_NODE_DESCRIPTOR_PROVIDER: + attributesProvider = new ScriptBasedNodeAttributesProvider(); + break; + default: + try { + Class labelsProviderClass = + conf.getClass(YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_CONFIG, + null, NodeAttributesProvider.class); + attributesProvider = labelsProviderClass.newInstance(); + } catch (InstantiationException | IllegalAccessException + | RuntimeException e) { + LOG.error("Failed to create NodeAttributesProvider" + + " based on Configuration", e); + throw new IOException( + "Failed to create NodeAttributesProvider : " + + e.getMessage(), e); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Distributed Node Attributes is enabled" + + " with provider class as : " + + attributesProvider.getClass().toString()); + } + return attributesProvider; } protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf) @@ -177,10 +213,10 @@ protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf) return provider; } switch (providerString.trim().toLowerCase()) { - case YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER: + case YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER: provider = new ConfigurationNodeLabelsProvider(); break; - case YarnConfiguration.SCRIPT_NODE_LABELS_PROVIDER: + case YarnConfiguration.SCRIPT_NODE_DESCRIPTOR_PROVIDER: provider = new ScriptBasedNodeLabelsProvider(); break; default: @@ -402,16 +438,19 @@ protected void serviceInit(Configuration conf) throws Exception { ((NMContext)context).setContainerExecutor(exec); ((NMContext)context).setDeletionService(del); - nodeLabelsProvider = createNodeLabelsProvider(conf); + nodeStatusUpdater = + createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); - if (null == nodeLabelsProvider) { - nodeStatusUpdater = - createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); - } else { + nodeLabelsProvider = createNodeLabelsProvider(conf); + if (nodeLabelsProvider != null ) { addIfService(nodeLabelsProvider); - nodeStatusUpdater = - createNodeStatusUpdater(context, dispatcher, nodeHealthChecker, - nodeLabelsProvider); + nodeStatusUpdater.setNodeLabelsProvider(nodeLabelsProvider); + } + + nodeAttributesProvider = createNodeAttributesProvider(conf); + if (nodeAttributesProvider != null) { + addIfService(nodeAttributesProvider); + nodeStatusUpdater.setNodeAttributesProvider(nodeAttributesProvider); } nodeResourceMonitor = createNodeResourceMonitor(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java index 08892d20799..142cbbc9cbd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java @@ -20,6 +20,8 @@ import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; public interface NodeStatusUpdater extends Service { @@ -59,4 +61,16 @@ * @param ex exception that makes the node unhealthy */ void reportException(Exception ex); + + /** + * Sets a node attributes provider to node manager. + * @param provider + */ + void setNodeAttributesProvider(NodeAttributesProvider provider); + + /** + * Sets a node labels provider to the node manager. + * @param provider + */ + void setNodeLabelsProvider(NodeLabelsProvider provider); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index cb3bec3270e..17d9e2ecf26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -152,21 +154,16 @@ Set pendingContainersToRemove = new HashSet(); private NMNodeLabelsHandler nodeLabelsHandler; - private final NodeLabelsProvider nodeLabelsProvider; + private NMNodeAttributesHandler nodeAttributesHandler; + private NodeLabelsProvider nodeLabelsProvider; + private NodeAttributesProvider nodeAttributesProvider; public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - this(context, dispatcher, healthChecker, metrics, null); - } - - public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, - NodeLabelsProvider nodeLabelsProvider) { super(NodeStatusUpdaterImpl.class.getName()); this.healthChecker = healthChecker; this.context = context; this.dispatcher = dispatcher; - this.nodeLabelsProvider = nodeLabelsProvider; this.metrics = metrics; this.recentlyStoppedContainers = new LinkedHashMap(); this.pendingCompletedContainers = @@ -175,6 +172,16 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, new ArrayList(); } + @Override + public void setNodeAttributesProvider(NodeAttributesProvider provider) { + this.nodeAttributesProvider = provider; + } + + @Override + public void setNodeLabelsProvider(NodeLabelsProvider provider) { + this.nodeLabelsProvider = provider; + } + @Override protected void serviceInit(Configuration conf) throws Exception { this.totalResource = NodeManagerHardwareUtils.getNodeResources(conf); @@ -214,7 +221,11 @@ protected void serviceInit(Configuration conf) throws Exception { YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION, YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION); - nodeLabelsHandler = createNMNodeLabelsHandler(nodeLabelsProvider); + nodeLabelsHandler = + createNMNodeLabelsHandler(nodeLabelsProvider); + nodeAttributesHandler = + createNMNodeAttributesHandler(nodeAttributesProvider); + // Default duration to track stopped containers on nodemanager is 10Min. // This should not be assigned very large value as it will remember all the // containers stopped during that time. @@ -842,6 +853,43 @@ private NMNodeLabelsHandler createNMNodeLabelsHandler( } } + /** + * Returns a handler based on the configured node attributes provider. + * returns null if no provider is configured. + * @param provider + * @return attributes handler + */ + private NMNodeAttributesHandler createNMNodeAttributesHandler( + NodeAttributesProvider provider) { + return provider == null ? null : + new NMDistributedNodeAttributesHandler(nodeAttributesProvider); + } + + private interface NMNodeAttributesHandler { + + /** + * @return the node attributes of this node manager. + */ + Set getNodeAttributesForHeartbeat(); + } + + private static class NMDistributedNodeAttributesHandler + implements NMNodeAttributesHandler { + + private final NodeAttributesProvider attributesProvider; + + protected NMDistributedNodeAttributesHandler( + NodeAttributesProvider provider) { + this.attributesProvider = provider; + } + + @Override + public Set getNodeAttributesForHeartbeat() { + return attributesProvider.getDescriptors(); + } + } + + private static interface NMNodeLabelsHandler { /** * validates nodeLabels From Provider and returns it to the caller. Also @@ -1057,6 +1105,10 @@ public void run() { NodeHeartbeatResponse response = null; Set nodeLabelsForHeartbeat = nodeLabelsHandler.getNodeLabelsForHeartbeat(); + Set nodeAttributesForHeartbeat = + nodeAttributesHandler == null ? null : + nodeAttributesHandler.getNodeAttributesForHeartbeat(); + NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID); NodeHeartbeatRequest request = NodeHeartbeatRequest.newInstance(nodeStatus, @@ -1065,6 +1117,7 @@ public void run() { NodeStatusUpdaterImpl.this.context .getNMTokenSecretManager().getCurrentKey(), nodeLabelsForHeartbeat, + nodeAttributesForHeartbeat, NodeStatusUpdaterImpl.this.context .getRegisteringCollectors()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java new file mode 100644 index 00000000000..74341eb7d7c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.nodelabels; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.TimerTask; +import java.util.Set; + +/** + * Configuration based node attributes provider. + */ +public class ConfigurationNodeAttributesProvider + extends NodeAttributesProvider { + + private static final Logger LOG = + LoggerFactory.getLogger(ConfigurationNodeAttributesProvider.class); + + public ConfigurationNodeAttributesProvider() { + super("Configuration Based Node Attributes Provider"); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + long taskInterval = conf.getLong(YarnConfiguration + .NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS, + YarnConfiguration + .DEFAULT_NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS); + this.setIntervalTime(taskInterval); + super.serviceInit(conf); + } + + private void updateNodeAttributesFromConfig(Configuration conf) + throws IOException { + String configuredNodeAttributes = conf.get( + YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES, null); + setDescriptors(parseAttributes(configuredNodeAttributes)); + } + + // TODO parse attributes from configuration + @VisibleForTesting + public Set parseAttributes(String config) + throws IOException { + return new HashSet<>(); + } + + private class ConfigurationMonitorTimerTask extends TimerTask { + @Override + public void run() { + try { + updateNodeAttributesFromConfig(new YarnConfiguration()); + } catch (Exception e) { + LOG.error("Failed to update node attributes from " + + YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES, e); + } + } + } + + @Override + protected void cleanUp() throws Exception { + // Nothing to cleanup + } + + @Override + public TimerTask createTimerTask() { + return new ConfigurationMonitorTimerTask(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java index b31215b0f3d..b2c2f6ee9a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java @@ -160,7 +160,7 @@ public void testCreationOfNodeLabelsProviderService() // With valid whitelisted configurations conf.set(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG, - YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER); + YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER); labelsProviderService = nodeManager.createNodeLabelsProvider(conf); Assert.assertNotNull("LabelsProviderService should be initialized When " + "node labels provider class is configured", labelsProviderService); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java index 7ef23cbbd1c..3e2d963e898 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java @@ -225,11 +225,10 @@ protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf) @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker, - NodeLabelsProvider labelsProvider) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, - metrics, labelsProvider) { + metrics) { @Override protected ResourceTracker getRMClient() { return resourceTracker; @@ -325,11 +324,10 @@ protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf) @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker, - NodeLabelsProvider labelsProvider) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, - metrics, labelsProvider) { + metrics) { @Override protected ResourceTracker getRMClient() { return resourceTracker; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java new file mode 100644 index 00000000000..07847583cc5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.nodelabels; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.BeforeClass; +import org.junit.Before; +import org.junit.AfterClass; +import org.junit.After; +import org.junit.Test; +import org.junit.Assert; + +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.ArrayList; +import java.util.concurrent.TimeoutException; + +/** + * Test class for node configuration node attributes provider. + */ +public class TestConfigurationNodeAttributesProvider { + + protected static File testRootDir = new File("target", + TestConfigurationNodeAttributesProvider.class.getName() + "-localDir") + .getAbsoluteFile(); + + private ConfigurationNodeAttributesProvider nodeAttributesProvider; + + @BeforeClass + public static void create() { + testRootDir.mkdirs(); + } + + @Before + public void setup() { + nodeAttributesProvider = new ConfigurationNodeAttributesProvider(); + } + + @After + public void tearDown() throws Exception { + if (nodeAttributesProvider != null) { + nodeAttributesProvider.close(); + nodeAttributesProvider.stop(); + } + } + + @AfterClass + public static void remove() throws Exception { + if (testRootDir.exists()) { + FileContext.getLocalFSFileContext() + .delete(new Path(testRootDir.getAbsolutePath()), true); + } + } + + @Test(timeout=30000L) + public void testNodeAttributesFetchInterval() + throws IOException, InterruptedException { + Set expectedAttributes1 = new HashSet<>(); + expectedAttributes1.add(NodeAttribute + .newInstance("test.io", "host", + NodeAttributeType.STRING, "host1")); + + Configuration conf = new Configuration(); + // Set fetch interval to 1s for testing + conf.setLong( + YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS, 1000); + ConfigurationNodeAttributesProvider spyProvider = + Mockito.spy(nodeAttributesProvider); + Mockito.when(spyProvider.parseAttributes(Mockito.anyString())) + .thenReturn(expectedAttributes1); + + spyProvider.init(conf); + spyProvider.start(); + + // Verify init value is honored. + Assert.assertEquals(expectedAttributes1, spyProvider.getDescriptors()); + + // Configuration provider provides a different set of attributes. + Set expectedAttributes2 = new HashSet<>(); + expectedAttributes2.add(NodeAttribute + .newInstance("test.io", "os", + NodeAttributeType.STRING, "windows")); + Mockito.when(spyProvider.parseAttributes(Mockito.anyString())) + .thenReturn(expectedAttributes2); + + // Since we set fetch interval to 1s, it needs to wait for 1s until + // the updated attributes is updated to the provider. So we are expecting + // to see some old values for a short window. + ArrayList keysMet = new ArrayList<>(); + int numOfOldValue = 0; + int numOfNewValue = 0; + // Run 5 times in 500ms interval + int times=5; + while(times>0) { + Set current = spyProvider.getDescriptors(); + Assert.assertEquals(1, current.size()); + String attributeName = current.iterator().next().getAttributeName(); + if ("host".equals(attributeName)){ + numOfOldValue++; + } else if ("os".equals(attributeName)) { + numOfNewValue++; + } + Thread.sleep(500); + times--; + } + // We should either see the old value or the new value. + Assert.assertEquals(5, numOfNewValue + numOfOldValue); + // Both values should be more than 0. + Assert.assertTrue(numOfOldValue > 0); + Assert.assertTrue(numOfNewValue > 0); + } + + @Test + public void testDisableFetchNodeAttributes() throws IOException, + InterruptedException { + Set expectedAttributes1 = new HashSet<>(); + expectedAttributes1.add(NodeAttribute + .newInstance("test.io", "host", + NodeAttributeType.STRING, "host1")); + + Configuration conf = new Configuration(); + // Set fetch interval to -1 to disable refresh. + conf.setLong( + YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS, -1); + ConfigurationNodeAttributesProvider spyProvider = + Mockito.spy(nodeAttributesProvider); + Mockito.when(spyProvider.parseAttributes(Mockito.anyString())) + .thenReturn(expectedAttributes1); + spyProvider.init(conf); + spyProvider.start(); + + Assert.assertEquals(expectedAttributes1, + spyProvider.getDescriptors()); + + // The configuration added another attribute, + // as we disabled the fetch interval, this value cannot be + // updated to the provider. + Set expectedAttributes2 = new HashSet<>(); + expectedAttributes2.add(NodeAttribute + .newInstance("test.io", "os", + NodeAttributeType.STRING, "windows")); + Mockito.when(spyProvider.parseAttributes(Mockito.anyString())) + .thenReturn(expectedAttributes2); + + // Wait a few seconds until we get the value update, expecting a failure. + try { + GenericTestUtils.waitFor(() -> { + Set attributes = spyProvider.getDescriptors(); + return "os".equalsIgnoreCase(attributes + .iterator().next().getAttributeName()); + }, 500, 1000); + } catch (Exception e) { + // Make sure we get the timeout exception. + Assert.assertTrue(e instanceof TimeoutException); + return; + } + + Assert.fail("Expecting a failure in previous check!"); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index ec940304660..5dc75e158cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeAttributesManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; @@ -166,4 +167,8 @@ void setRMDelegatedNodeLabelsUpdater( void setResourceProfilesManager(ResourceProfilesManager mgr); String getAppProxyUrl(Configuration conf, ApplicationId applicationId); + + RMNodeAttributesManager getNodeAttributesManager(); + + void setNodeAttributesManager(RMNodeAttributesManager mgr); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 80a91096271..dc55fab3479 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeAttributesManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; @@ -95,6 +96,8 @@ private String proxyHostAndPort = null; + private RMNodeAttributesManager attributesManager; + /** * Default constructor. To be used in conjunction with setter methods for * individual fields. @@ -503,6 +506,16 @@ public void setNodeLabelManager(RMNodeLabelsManager mgr) { activeServiceContext.setNodeLabelManager(mgr); } + @Override + public RMNodeAttributesManager getNodeAttributesManager() { + return this.attributesManager; + } + + @Override + public void setNodeAttributesManager(RMNodeAttributesManager mgr) { + this.attributesManager = mgr; + } + @Override public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() { return activeServiceContext.getRMDelegatedNodeLabelsUpdater(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 8641842e07e..a9e425c08c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -69,7 +69,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.CombinedSystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.InMemoryRMNodeAttributesManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeAttributesManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -247,6 +249,10 @@ protected void serviceInit(Configuration conf) throws Exception { resourceProfilesManager.init(conf); rmContext.setResourceProfilesManager(resourceProfilesManager); + RMNodeAttributesManager attributesManager = + new InMemoryRMNodeAttributesManager(); + rmContext.setNodeAttributesManager(attributesManager); + this.configurationProvider = ConfigurationProviderFactory.getConfigurationProvider(conf); this.configurationProvider.init(this.conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index a42d0533c52..170995a7220 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -611,6 +612,20 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) this.rmContext.getNodeManagerQueueLimitCalculator() .createContainerQueuingLimit()); } + + // 8. Get node's attributes and update node-to-attributes mapping + // in RMNodeAttributeManager. + Set nodeAttributes = request.getNodeAttributes(); + if (nodeAttributes != null && !nodeAttributes.isEmpty()) { + nodeAttributes.forEach(nodeAttribute -> + LOG.debug(nodeId.toString() + " ATTRIBUTE : " + + nodeAttribute.toString())); + + // Update attributes in RM + rmContext.getNodeAttributesManager() + .updateNodeAttributes(nodeId, nodeAttributes); + } + return nodeHeartBeatResponse; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/InMemoryRMNodeAttributesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/InMemoryRMNodeAttributesManager.java new file mode 100644 index 00000000000..1fd0b0cf231 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/InMemoryRMNodeAttributesManager.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; + +import java.util.ArrayList; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * A in-memory implementation of RM node attributes manager, + * majorly for testing. + */ +public class InMemoryRMNodeAttributesManager + implements RMNodeAttributesManager { + + private Map node2attributes; + + @VisibleForTesting + public InMemoryRMNodeAttributesManager() { + this.node2attributes = new ConcurrentHashMap<>(); + } + + @Override + public void updateNodeAttributes(NodeId nodeId, + Set attributes) { + node2attributes.put(nodeId, NodeToAttributes + .newInstance(nodeId.getHost(), new ArrayList<>(attributes))); + } + + @Override + public NodeToAttributes getNodeAttributes(NodeId nodeId) { + return node2attributes.get(nodeId); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeAttributesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeAttributesManager.java new file mode 100644 index 00000000000..c9471757751 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeAttributesManager.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; + +import java.util.Set; + +/** + * RM node attribute manager interface. + */ +public interface RMNodeAttributesManager { + + /** + * Updates the attributes of a node, it is upon to the implementation if + * it updates by a simple replace-all by node id or only updates when there + * is a change from existing value (avoid write lock). + * @param nodeId node ID + * @param attributes node attributes + */ + void updateNodeAttributes(NodeId nodeId, Set attributes); + + /** + * Returns the node attributes by node ID. + * @param nodeId node ID + * @return node attributes + */ + NodeToAttributes getNodeAttributes(NodeId nodeId); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index fc6326eb1e6..c2526911ca5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.HashSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.xml.parsers.DocumentBuilderFactory; @@ -64,6 +65,8 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; @@ -77,12 +80,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeAttributesManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; @@ -818,6 +823,56 @@ protected RMNodeLabelsManager createNodeLabelManager() { rm.stop(); } + @Test + public void testNodeHeartbeatWithNodeAttributes() throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + rm = new MockRM(conf); + rm.start(); + + // Register to RM + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest registerReq = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + registerReq.setResource(capability); + registerReq.setNodeId(nodeId); + registerReq.setHttpPort(1234); + registerReq.setNMVersion(YarnVersionInfo.getVersion()); + RegisterNodeManagerResponse registerResponse = + resourceTrackerService.registerNodeManager(registerReq); + + Set nodeAttributes = new HashSet<>(); + nodeAttributes.add(NodeAttribute.newInstance("host", + NodeAttributeType.STRING, "host2")); + + // Set node attributes in HB. + NodeHeartbeatRequest heartbeatReq = + Records.newRecord(NodeHeartbeatRequest.class); + NodeStatus nodeStatusObject = getNodeStatusObject(nodeId); + heartbeatReq.setNodeStatus(nodeStatusObject); + heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse + .getNMTokenMasterKey()); + heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse + .getContainerTokenMasterKey()); + heartbeatReq.setNodeAttributes(nodeAttributes); + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + // Ensure RM gets correct node attributes update. + RMNodeAttributesManager attributeManager = + rm.getRMContext().getNodeAttributesManager(); + NodeToAttributes attrs = attributeManager.getNodeAttributes(nodeId); + Assert.assertEquals(1, attrs.getNodeAttributes().size()); + NodeAttribute na = attrs.getNodeAttributes().get(0); + Assert.assertEquals("host", na.getAttributeName()); + Assert.assertEquals("host2", na.getAttributeValue()); + Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType()); + } + @Test public void testNodeHeartBeatWithInvalidLabels() throws Exception { writeToHostsFile("host2");