diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 54cbf8b0fbd..83da58d5cbb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3343,9 +3343,12 @@ public static boolean areNodeLabelsEnabled(
public static final String NM_NODE_LABELS_PROVIDER_CONFIG =
NM_NODE_LABELS_PREFIX + "provider";
+ public static final String NM_NODE_ATTRIBUTES_PROVIDER_CONFIG =
+ NM_NODE_ATTRIBUTES_PREFIX + "provider";
+
// whitelist names for the yarn.nodemanager.node-labels.provider
- public static final String CONFIG_NODE_LABELS_PROVIDER = "config";
- public static final String SCRIPT_NODE_LABELS_PROVIDER = "script";
+ public static final String CONFIG_NODE_DESCRIPTOR_PROVIDER = "config";
+ public static final String SCRIPT_NODE_DESCRIPTOR_PROVIDER = "script";
private static final String NM_NODE_LABELS_PROVIDER_PREFIX =
NM_NODE_LABELS_PREFIX + "provider.";
@@ -3377,6 +3380,9 @@ public static boolean areNodeLabelsEnabled(
public static final String NM_PROVIDER_CONFIGURED_NODE_PARTITION =
NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-partition";
+ public static final String NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES =
+ NM_NODE_ATTRIBUTES_PROVIDER_PREFIX + "configured-node-attributes";
+
private static final String RM_NODE_LABELS_PREFIX = RM_PREFIX
+ "node-labels.";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 684674bb5c7..8c514ff6848 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2773,6 +2773,20 @@
+
+
+ This property determines which provider will be plugged by the
+ node manager to collect node-attributes. Administrators can
+ configure "config", "script" or the class name of the provider.
+ Configured class needs to extend
+ org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider.
+ If "config" is configured, then "ConfigurationNodeLabelsProvider" and if
+ "script" is configured, then "ScriptBasedNodeAttributesProvider"
+ will be used.
+
+ yarn.nodemanager.node-attributes.provider
+
+
The node attribute script NM runs to collect node attributes.
@@ -2810,6 +2824,16 @@
1200000
+
+
+ When "yarn.nodemanager.node-attributes.provider" is configured with
+ "config" then ConfigurationNodeAttributesProvider fetches node attributes
+ from this parameter.
+
+ yarn.nodemanager.node-attributes.provider.configured-node-attributes
+
+
+
Timeout in seconds for YARN node graceful decommission.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 5cacd20c61e..3ee3ea235ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -63,6 +63,9 @@
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeLabelsProvider;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeAttributesProvider;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeAttributesProvider;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@@ -120,6 +123,7 @@ public int getExitCode() {
private ApplicationACLsManager aclsManager;
private NodeHealthCheckerService nodeHealthChecker;
private NodeLabelsProvider nodeLabelsProvider;
+ private NodeAttributesProvider nodeAttributesProvider;
private LocalDirsHandlerService dirsHandler;
private Context context;
private AsyncDispatcher dispatcher;
@@ -157,14 +161,46 @@ public static long getNMStartupTime() {
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
- metrics, nodeLabelsProvider);
+ metrics);
}
- protected NodeStatusUpdater createNodeStatusUpdater(Context context,
- Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
- NodeLabelsProvider nodeLabelsProvider) {
- return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
- metrics, nodeLabelsProvider);
+ protected NodeAttributesProvider createNodeAttributesProvider(
+ Configuration conf) throws IOException {
+ NodeAttributesProvider attributesProvider = null;
+ String providerString =
+ conf.get(YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_CONFIG, null);
+ if (providerString == null || providerString.trim().length() == 0) {
+ // Seems like Distributed Node Labels configuration is not enabled
+ return attributesProvider;
+ }
+ switch (providerString.trim().toLowerCase()) {
+ case YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER:
+ attributesProvider = new ConfigurationNodeAttributesProvider();
+ break;
+ case YarnConfiguration.SCRIPT_NODE_DESCRIPTOR_PROVIDER:
+ attributesProvider = new ScriptBasedNodeAttributesProvider();
+ break;
+ default:
+ try {
+ Class extends NodeAttributesProvider> labelsProviderClass =
+ conf.getClass(YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_CONFIG,
+ null, NodeAttributesProvider.class);
+ attributesProvider = labelsProviderClass.newInstance();
+ } catch (InstantiationException | IllegalAccessException
+ | RuntimeException e) {
+ LOG.error("Failed to create NodeAttributesProvider"
+ + " based on Configuration", e);
+ throw new IOException(
+ "Failed to create NodeAttributesProvider : "
+ + e.getMessage(), e);
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Distributed Node Attributes is enabled"
+ + " with provider class as : "
+ + attributesProvider.getClass().toString());
+ }
+ return attributesProvider;
}
protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
@@ -177,10 +213,10 @@ protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
return provider;
}
switch (providerString.trim().toLowerCase()) {
- case YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER:
+ case YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER:
provider = new ConfigurationNodeLabelsProvider();
break;
- case YarnConfiguration.SCRIPT_NODE_LABELS_PROVIDER:
+ case YarnConfiguration.SCRIPT_NODE_DESCRIPTOR_PROVIDER:
provider = new ScriptBasedNodeLabelsProvider();
break;
default:
@@ -402,16 +438,19 @@ protected void serviceInit(Configuration conf) throws Exception {
((NMContext)context).setContainerExecutor(exec);
((NMContext)context).setDeletionService(del);
- nodeLabelsProvider = createNodeLabelsProvider(conf);
+ nodeStatusUpdater =
+ createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
- if (null == nodeLabelsProvider) {
- nodeStatusUpdater =
- createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
- } else {
+ nodeLabelsProvider = createNodeLabelsProvider(conf);
+ if (nodeLabelsProvider != null ) {
addIfService(nodeLabelsProvider);
- nodeStatusUpdater =
- createNodeStatusUpdater(context, dispatcher, nodeHealthChecker,
- nodeLabelsProvider);
+ nodeStatusUpdater.setNodeLabelsProvider(nodeLabelsProvider);
+ }
+
+ nodeAttributesProvider = createNodeAttributesProvider(conf);
+ if (nodeAttributesProvider != null) {
+ addIfService(nodeAttributesProvider);
+ nodeStatusUpdater.setNodeAttributesProvider(nodeAttributesProvider);
}
nodeResourceMonitor = createNodeResourceMonitor();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
index 08892d20799..142cbbc9cbd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
@@ -20,6 +20,8 @@
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
public interface NodeStatusUpdater extends Service {
@@ -59,4 +61,16 @@
* @param ex exception that makes the node unhealthy
*/
void reportException(Exception ex);
+
+ /**
+ * Sets a node attributes provider to node manager.
+ * @param provider
+ */
+ void setNodeAttributesProvider(NodeAttributesProvider provider);
+
+ /**
+ * Sets a node labels provider to the node manager.
+ * @param provider
+ */
+ void setNodeLabelsProvider(NodeLabelsProvider provider);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index cb3bec3270e..17d9e2ecf26 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +58,7 @@
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -152,21 +154,16 @@
Set pendingContainersToRemove = new HashSet();
private NMNodeLabelsHandler nodeLabelsHandler;
- private final NodeLabelsProvider nodeLabelsProvider;
+ private NMNodeAttributesHandler nodeAttributesHandler;
+ private NodeLabelsProvider nodeLabelsProvider;
+ private NodeAttributesProvider nodeAttributesProvider;
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
- this(context, dispatcher, healthChecker, metrics, null);
- }
-
- public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
- NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
- NodeLabelsProvider nodeLabelsProvider) {
super(NodeStatusUpdaterImpl.class.getName());
this.healthChecker = healthChecker;
this.context = context;
this.dispatcher = dispatcher;
- this.nodeLabelsProvider = nodeLabelsProvider;
this.metrics = metrics;
this.recentlyStoppedContainers = new LinkedHashMap();
this.pendingCompletedContainers =
@@ -175,6 +172,16 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
new ArrayList();
}
+ @Override
+ public void setNodeAttributesProvider(NodeAttributesProvider provider) {
+ this.nodeAttributesProvider = provider;
+ }
+
+ @Override
+ public void setNodeLabelsProvider(NodeLabelsProvider provider) {
+ this.nodeLabelsProvider = provider;
+ }
+
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.totalResource = NodeManagerHardwareUtils.getNodeResources(conf);
@@ -214,7 +221,11 @@ protected void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION);
- nodeLabelsHandler = createNMNodeLabelsHandler(nodeLabelsProvider);
+ nodeLabelsHandler =
+ createNMNodeLabelsHandler(nodeLabelsProvider);
+ nodeAttributesHandler =
+ createNMNodeAttributesHandler(nodeAttributesProvider);
+
// Default duration to track stopped containers on nodemanager is 10Min.
// This should not be assigned very large value as it will remember all the
// containers stopped during that time.
@@ -842,6 +853,43 @@ private NMNodeLabelsHandler createNMNodeLabelsHandler(
}
}
+ /**
+ * Returns a handler based on the configured node attributes provider.
+ * returns null if no provider is configured.
+ * @param provider
+ * @return attributes handler
+ */
+ private NMNodeAttributesHandler createNMNodeAttributesHandler(
+ NodeAttributesProvider provider) {
+ return provider == null ? null :
+ new NMDistributedNodeAttributesHandler(nodeAttributesProvider);
+ }
+
+ private interface NMNodeAttributesHandler {
+
+ /**
+ * @return the node attributes of this node manager.
+ */
+ Set getNodeAttributesForHeartbeat();
+ }
+
+ private static class NMDistributedNodeAttributesHandler
+ implements NMNodeAttributesHandler {
+
+ private final NodeAttributesProvider attributesProvider;
+
+ protected NMDistributedNodeAttributesHandler(
+ NodeAttributesProvider provider) {
+ this.attributesProvider = provider;
+ }
+
+ @Override
+ public Set getNodeAttributesForHeartbeat() {
+ return attributesProvider.getDescriptors();
+ }
+ }
+
+
private static interface NMNodeLabelsHandler {
/**
* validates nodeLabels From Provider and returns it to the caller. Also
@@ -1057,6 +1105,10 @@ public void run() {
NodeHeartbeatResponse response = null;
Set nodeLabelsForHeartbeat =
nodeLabelsHandler.getNodeLabelsForHeartbeat();
+ Set nodeAttributesForHeartbeat =
+ nodeAttributesHandler == null ? null :
+ nodeAttributesHandler.getNodeAttributesForHeartbeat();
+
NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
NodeHeartbeatRequest request =
NodeHeartbeatRequest.newInstance(nodeStatus,
@@ -1065,6 +1117,7 @@ public void run() {
NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey(),
nodeLabelsForHeartbeat,
+ nodeAttributesForHeartbeat,
NodeStatusUpdaterImpl.this.context
.getRegisteringCollectors());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java
new file mode 100644
index 00000000000..74341eb7d7c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeAttributesProvider.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.TimerTask;
+import java.util.Set;
+
+/**
+ * Configuration based node attributes provider.
+ */
+public class ConfigurationNodeAttributesProvider
+ extends NodeAttributesProvider {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ConfigurationNodeAttributesProvider.class);
+
+ public ConfigurationNodeAttributesProvider() {
+ super("Configuration Based Node Attributes Provider");
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ long taskInterval = conf.getLong(YarnConfiguration
+ .NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS,
+ YarnConfiguration
+ .DEFAULT_NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS);
+ this.setIntervalTime(taskInterval);
+ super.serviceInit(conf);
+ }
+
+ private void updateNodeAttributesFromConfig(Configuration conf)
+ throws IOException {
+ String configuredNodeAttributes = conf.get(
+ YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES, null);
+ setDescriptors(parseAttributes(configuredNodeAttributes));
+ }
+
+ // TODO parse attributes from configuration
+ @VisibleForTesting
+ public Set parseAttributes(String config)
+ throws IOException {
+ return new HashSet<>();
+ }
+
+ private class ConfigurationMonitorTimerTask extends TimerTask {
+ @Override
+ public void run() {
+ try {
+ updateNodeAttributesFromConfig(new YarnConfiguration());
+ } catch (Exception e) {
+ LOG.error("Failed to update node attributes from "
+ + YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES, e);
+ }
+ }
+ }
+
+ @Override
+ protected void cleanUp() throws Exception {
+ // Nothing to cleanup
+ }
+
+ @Override
+ public TimerTask createTimerTask() {
+ return new ConfigurationMonitorTimerTask();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
index b31215b0f3d..b2c2f6ee9a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
@@ -160,7 +160,7 @@ public void testCreationOfNodeLabelsProviderService()
// With valid whitelisted configurations
conf.set(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG,
- YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER);
+ YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER);
labelsProviderService = nodeManager.createNodeLabelsProvider(conf);
Assert.assertNotNull("LabelsProviderService should be initialized When "
+ "node labels provider class is configured", labelsProviderService);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
index 7ef23cbbd1c..3e2d963e898 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
@@ -225,11 +225,10 @@ protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
- Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
- NodeLabelsProvider labelsProvider) {
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
- metrics, labelsProvider) {
+ metrics) {
@Override
protected ResourceTracker getRMClient() {
return resourceTracker;
@@ -325,11 +324,10 @@ protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
- Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
- NodeLabelsProvider labelsProvider) {
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
- metrics, labelsProvider) {
+ metrics) {
@Override
protected ResourceTracker getRMClient() {
return resourceTracker;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java
new file mode 100644
index 00000000000..07847583cc5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeAttributesProvider.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.BeforeClass;
+import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.Assert;
+
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Test class for node configuration node attributes provider.
+ */
+public class TestConfigurationNodeAttributesProvider {
+
+ protected static File testRootDir = new File("target",
+ TestConfigurationNodeAttributesProvider.class.getName() + "-localDir")
+ .getAbsoluteFile();
+
+ private ConfigurationNodeAttributesProvider nodeAttributesProvider;
+
+ @BeforeClass
+ public static void create() {
+ testRootDir.mkdirs();
+ }
+
+ @Before
+ public void setup() {
+ nodeAttributesProvider = new ConfigurationNodeAttributesProvider();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (nodeAttributesProvider != null) {
+ nodeAttributesProvider.close();
+ nodeAttributesProvider.stop();
+ }
+ }
+
+ @AfterClass
+ public static void remove() throws Exception {
+ if (testRootDir.exists()) {
+ FileContext.getLocalFSFileContext()
+ .delete(new Path(testRootDir.getAbsolutePath()), true);
+ }
+ }
+
+ @Test(timeout=30000L)
+ public void testNodeAttributesFetchInterval()
+ throws IOException, InterruptedException {
+ Set expectedAttributes1 = new HashSet<>();
+ expectedAttributes1.add(NodeAttribute
+ .newInstance("test.io", "host",
+ NodeAttributeType.STRING, "host1"));
+
+ Configuration conf = new Configuration();
+ // Set fetch interval to 1s for testing
+ conf.setLong(
+ YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS, 1000);
+ ConfigurationNodeAttributesProvider spyProvider =
+ Mockito.spy(nodeAttributesProvider);
+ Mockito.when(spyProvider.parseAttributes(Mockito.anyString()))
+ .thenReturn(expectedAttributes1);
+
+ spyProvider.init(conf);
+ spyProvider.start();
+
+ // Verify init value is honored.
+ Assert.assertEquals(expectedAttributes1, spyProvider.getDescriptors());
+
+ // Configuration provider provides a different set of attributes.
+ Set expectedAttributes2 = new HashSet<>();
+ expectedAttributes2.add(NodeAttribute
+ .newInstance("test.io", "os",
+ NodeAttributeType.STRING, "windows"));
+ Mockito.when(spyProvider.parseAttributes(Mockito.anyString()))
+ .thenReturn(expectedAttributes2);
+
+ // Since we set fetch interval to 1s, it needs to wait for 1s until
+ // the updated attributes is updated to the provider. So we are expecting
+ // to see some old values for a short window.
+ ArrayList keysMet = new ArrayList<>();
+ int numOfOldValue = 0;
+ int numOfNewValue = 0;
+ // Run 5 times in 500ms interval
+ int times=5;
+ while(times>0) {
+ Set current = spyProvider.getDescriptors();
+ Assert.assertEquals(1, current.size());
+ String attributeName = current.iterator().next().getAttributeName();
+ if ("host".equals(attributeName)){
+ numOfOldValue++;
+ } else if ("os".equals(attributeName)) {
+ numOfNewValue++;
+ }
+ Thread.sleep(500);
+ times--;
+ }
+ // We should either see the old value or the new value.
+ Assert.assertEquals(5, numOfNewValue + numOfOldValue);
+ // Both values should be more than 0.
+ Assert.assertTrue(numOfOldValue > 0);
+ Assert.assertTrue(numOfNewValue > 0);
+ }
+
+ @Test
+ public void testDisableFetchNodeAttributes() throws IOException,
+ InterruptedException {
+ Set expectedAttributes1 = new HashSet<>();
+ expectedAttributes1.add(NodeAttribute
+ .newInstance("test.io", "host",
+ NodeAttributeType.STRING, "host1"));
+
+ Configuration conf = new Configuration();
+ // Set fetch interval to -1 to disable refresh.
+ conf.setLong(
+ YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS, -1);
+ ConfigurationNodeAttributesProvider spyProvider =
+ Mockito.spy(nodeAttributesProvider);
+ Mockito.when(spyProvider.parseAttributes(Mockito.anyString()))
+ .thenReturn(expectedAttributes1);
+ spyProvider.init(conf);
+ spyProvider.start();
+
+ Assert.assertEquals(expectedAttributes1,
+ spyProvider.getDescriptors());
+
+ // The configuration added another attribute,
+ // as we disabled the fetch interval, this value cannot be
+ // updated to the provider.
+ Set expectedAttributes2 = new HashSet<>();
+ expectedAttributes2.add(NodeAttribute
+ .newInstance("test.io", "os",
+ NodeAttributeType.STRING, "windows"));
+ Mockito.when(spyProvider.parseAttributes(Mockito.anyString()))
+ .thenReturn(expectedAttributes2);
+
+ // Wait a few seconds until we get the value update, expecting a failure.
+ try {
+ GenericTestUtils.waitFor(() -> {
+ Set attributes = spyProvider.getDescriptors();
+ return "os".equalsIgnoreCase(attributes
+ .iterator().next().getAttributeName());
+ }, 500, 1000);
+ } catch (Exception e) {
+ // Make sure we get the timeout exception.
+ Assert.assertTrue(e instanceof TimeoutException);
+ return;
+ }
+
+ Assert.fail("Expecting a failure in previous check!");
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index ec940304660..5dc75e158cb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeAttributesManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
@@ -166,4 +167,8 @@ void setRMDelegatedNodeLabelsUpdater(
void setResourceProfilesManager(ResourceProfilesManager mgr);
String getAppProxyUrl(Configuration conf, ApplicationId applicationId);
+
+ RMNodeAttributesManager getNodeAttributesManager();
+
+ void setNodeAttributesManager(RMNodeAttributesManager mgr);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 80a91096271..dc55fab3479 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeAttributesManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
@@ -95,6 +96,8 @@
private String proxyHostAndPort = null;
+ private RMNodeAttributesManager attributesManager;
+
/**
* Default constructor. To be used in conjunction with setter methods for
* individual fields.
@@ -503,6 +506,16 @@ public void setNodeLabelManager(RMNodeLabelsManager mgr) {
activeServiceContext.setNodeLabelManager(mgr);
}
+ @Override
+ public RMNodeAttributesManager getNodeAttributesManager() {
+ return this.attributesManager;
+ }
+
+ @Override
+ public void setNodeAttributesManager(RMNodeAttributesManager mgr) {
+ this.attributesManager = mgr;
+ }
+
@Override
public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {
return activeServiceContext.getRMDelegatedNodeLabelsUpdater();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 8641842e07e..a9e425c08c2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -69,7 +69,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.CombinedSystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.InMemoryRMNodeAttributesManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeAttributesManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@@ -247,6 +249,10 @@ protected void serviceInit(Configuration conf) throws Exception {
resourceProfilesManager.init(conf);
rmContext.setResourceProfilesManager(resourceProfilesManager);
+ RMNodeAttributesManager attributesManager =
+ new InMemoryRMNodeAttributesManager();
+ rmContext.setNodeAttributesManager(attributesManager);
+
this.configurationProvider =
ConfigurationProviderFactory.getConfigurationProvider(conf);
this.configurationProvider.init(this.conf);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index a42d0533c52..170995a7220 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -50,6 +50,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -611,6 +612,20 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
this.rmContext.getNodeManagerQueueLimitCalculator()
.createContainerQueuingLimit());
}
+
+ // 8. Get node's attributes and update node-to-attributes mapping
+ // in RMNodeAttributeManager.
+ Set nodeAttributes = request.getNodeAttributes();
+ if (nodeAttributes != null && !nodeAttributes.isEmpty()) {
+ nodeAttributes.forEach(nodeAttribute ->
+ LOG.debug(nodeId.toString() + " ATTRIBUTE : "
+ + nodeAttribute.toString()));
+
+ // Update attributes in RM
+ rmContext.getNodeAttributesManager()
+ .updateNodeAttributes(nodeId, nodeAttributes);
+ }
+
return nodeHeartBeatResponse;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/InMemoryRMNodeAttributesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/InMemoryRMNodeAttributesManager.java
new file mode 100644
index 00000000000..1fd0b0cf231
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/InMemoryRMNodeAttributesManager.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A in-memory implementation of RM node attributes manager,
+ * majorly for testing.
+ */
+public class InMemoryRMNodeAttributesManager
+ implements RMNodeAttributesManager {
+
+ private Map node2attributes;
+
+ @VisibleForTesting
+ public InMemoryRMNodeAttributesManager() {
+ this.node2attributes = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void updateNodeAttributes(NodeId nodeId,
+ Set attributes) {
+ node2attributes.put(nodeId, NodeToAttributes
+ .newInstance(nodeId.getHost(), new ArrayList<>(attributes)));
+ }
+
+ @Override
+ public NodeToAttributes getNodeAttributes(NodeId nodeId) {
+ return node2attributes.get(nodeId);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeAttributesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeAttributesManager.java
new file mode 100644
index 00000000000..c9471757751
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeAttributesManager.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
+
+import java.util.Set;
+
+/**
+ * RM node attribute manager interface.
+ */
+public interface RMNodeAttributesManager {
+
+ /**
+ * Updates the attributes of a node, it is upon to the implementation if
+ * it updates by a simple replace-all by node id or only updates when there
+ * is a change from existing value (avoid write lock).
+ * @param nodeId node ID
+ * @param attributes node attributes
+ */
+ void updateNodeAttributes(NodeId nodeId, Set attributes);
+
+ /**
+ * Returns the node attributes by node ID.
+ * @param nodeId node ID
+ * @return node attributes
+ */
+ NodeToAttributes getNodeAttributes(NodeId nodeId);
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index fc6326eb1e6..c2526911ca5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -37,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.HashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.xml.parsers.DocumentBuilderFactory;
@@ -64,6 +65,8 @@
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
@@ -77,12 +80,14 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeAttributesManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
@@ -818,6 +823,56 @@ protected RMNodeLabelsManager createNodeLabelManager() {
rm.stop();
}
+ @Test
+ public void testNodeHeartbeatWithNodeAttributes() throws Exception {
+ writeToHostsFile("host2");
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+ hostFile.getAbsolutePath());
+ rm = new MockRM(conf);
+ rm.start();
+
+ // Register to RM
+ ResourceTrackerService resourceTrackerService =
+ rm.getResourceTrackerService();
+ RegisterNodeManagerRequest registerReq =
+ Records.newRecord(RegisterNodeManagerRequest.class);
+ NodeId nodeId = NodeId.newInstance("host2", 1234);
+ Resource capability = BuilderUtils.newResource(1024, 1);
+ registerReq.setResource(capability);
+ registerReq.setNodeId(nodeId);
+ registerReq.setHttpPort(1234);
+ registerReq.setNMVersion(YarnVersionInfo.getVersion());
+ RegisterNodeManagerResponse registerResponse =
+ resourceTrackerService.registerNodeManager(registerReq);
+
+ Set nodeAttributes = new HashSet<>();
+ nodeAttributes.add(NodeAttribute.newInstance("host",
+ NodeAttributeType.STRING, "host2"));
+
+ // Set node attributes in HB.
+ NodeHeartbeatRequest heartbeatReq =
+ Records.newRecord(NodeHeartbeatRequest.class);
+ NodeStatus nodeStatusObject = getNodeStatusObject(nodeId);
+ heartbeatReq.setNodeStatus(nodeStatusObject);
+ heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
+ .getNMTokenMasterKey());
+ heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
+ .getContainerTokenMasterKey());
+ heartbeatReq.setNodeAttributes(nodeAttributes);
+ resourceTrackerService.nodeHeartbeat(heartbeatReq);
+
+ // Ensure RM gets correct node attributes update.
+ RMNodeAttributesManager attributeManager =
+ rm.getRMContext().getNodeAttributesManager();
+ NodeToAttributes attrs = attributeManager.getNodeAttributes(nodeId);
+ Assert.assertEquals(1, attrs.getNodeAttributes().size());
+ NodeAttribute na = attrs.getNodeAttributes().get(0);
+ Assert.assertEquals("host", na.getAttributeName());
+ Assert.assertEquals("host2", na.getAttributeValue());
+ Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType());
+ }
+
@Test
public void testNodeHeartBeatWithInvalidLabels() throws Exception {
writeToHostsFile("host2");