diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index f238f79f172..da7a648cbd5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -23,6 +23,7 @@ import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; @@ -61,6 +62,18 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, return nodeHeartbeatRequest; } + public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, + MasterKey lastKnownContainerTokenMasterKey, + MasterKey lastKnownNMTokenMasterKey, Set nodeLabels, + Set nodeAttributes, + Map registeringCollectors) { + NodeHeartbeatRequest request = NodeHeartbeatRequest + .newInstance(nodeStatus, lastKnownContainerTokenMasterKey, + lastKnownNMTokenMasterKey, nodeLabels, registeringCollectors); + request.setNodeAttributes(nodeAttributes); + return request; + } + public abstract NodeStatus getNodeStatus(); public abstract void setNodeStatus(NodeStatus status); @@ -73,6 +86,9 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, public abstract Set getNodeLabels(); public abstract void setNodeLabels(Set nodeLabels); + public abstract Set getNodeAttributes(); + public abstract void setNodeAttributes(Set nodeAttributes); + public abstract List getLogAggregationReportsForApps(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index 1ffd223f8a6..b57533b4297 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -34,10 +34,14 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.impl.pb.NodeAttributePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; +import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeAttributesProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; @@ -60,6 +64,7 @@ private MasterKey lastKnownContainerTokenMasterKey = null; private MasterKey lastKnownNMTokenMasterKey = null; private Set labels = null; + private Set attributes = null; private List logAggregationReportsForApps = null; private Map registeringCollectors = null; @@ -346,6 +351,35 @@ public void setNodeLabels(Set nodeLabels) { builder.clearNodeLabels(); this.labels = nodeLabels; } + + @Override + public Set getNodeAttributes() { + initNodeAttributes(); + return this.attributes; + } + + private void initNodeAttributes() { + if (this.attributes != null) { + return; + } + NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasNodeAttributes()) { + return; + } + NodeAttributesProto nodeAttributes = p.getNodeAttributes(); + attributes = new HashSet<>(); + for (NodeAttributeProto attributeProto : + nodeAttributes.getNodeAttributesList()) { + attributes.add(new NodeAttributePBImpl(attributeProto)); + } + } + + @Override + public void setNodeAttributes(Set attributes) { + maybeInitBuilder(); + builder.clearNodeAttributes(); + this.attributes = attributes; + } private void initNodeLabels() { if (this.labels != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 8c4fc69b4f5..039abf1e66a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -57,6 +57,10 @@ message NodeLabelsProto { repeated NodeLabelProto nodeLabels = 1; } +message NodeAttributesProto { + repeated NodeAttributeProto nodeAttributes = 1; +} + message RegisterNodeManagerRequestProto { optional NodeIdProto node_id = 1; optional int32 http_port = 3; @@ -93,6 +97,7 @@ message NodeHeartbeatRequestProto { optional NodeLabelsProto nodeLabels = 4; repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5; repeated AppCollectorDataProto registering_collectors = 6; + optional NodeAttributesProto nodeAttributes = 7; } message LogAggregationReportProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 3d3f573769c..d8056a7fd84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -36,6 +36,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; +import org.apache.hadoop.yarn.server.nodemanager.nodeattributes.NMNodeAttributeManager; +import org.apache.hadoop.yarn.server.nodemanager.nodeattributes.NMNodeAttributeManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.nodeattributes.ScriptBasedNodeAttributeProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +60,7 @@ import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -154,6 +158,8 @@ private NMNodeLabelsHandler nodeLabelsHandler; private final NodeLabelsProvider nodeLabelsProvider; + private final NMNodeAttributeManager attributeManager; + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { this(context, dispatcher, healthChecker, metrics, null); @@ -173,6 +179,10 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, new HashMap(); this.logAggregationReportForAppsTempList = new ArrayList(); + + ScriptBasedNodeAttributeProvider attributeProvider = + new ScriptBasedNodeAttributeProvider(); + this.attributeManager = new NMNodeAttributeManagerImpl(attributeProvider); } @Override @@ -1057,6 +1067,11 @@ public void run() { NodeHeartbeatResponse response = null; Set nodeLabelsForHeartbeat = nodeLabelsHandler.getNodeLabelsForHeartbeat(); + + // Fetch node attributes + Set nodeAttributesForHeartbeat = + attributeManager.getNodeAttributes(); + NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID); NodeHeartbeatRequest request = NodeHeartbeatRequest.newInstance(nodeStatus, @@ -1065,6 +1080,7 @@ public void run() { NodeStatusUpdaterImpl.this.context .getNMTokenSecretManager().getCurrentKey(), nodeLabelsForHeartbeat, + nodeAttributesForHeartbeat, NodeStatusUpdaterImpl.this.context .getRegisteringCollectors()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodeattributes/NMNodeAttributeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodeattributes/NMNodeAttributeManager.java new file mode 100644 index 00000000000..32764f46107 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodeattributes/NMNodeAttributeManager.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.nodeattributes; + +import org.apache.hadoop.yarn.api.records.NodeAttribute; + +import java.util.Set; + +/** + * NMNodeAttributeManager manages all node attributes of current + * node manager. + */ +public interface NMNodeAttributeManager { + + /** + * Returns node attributes on current node manager. + * @return a set of node attributes. + */ + Set getNodeAttributes(); + + /** + * Replace node attributes with the given set of attributes, + * this method doesn't update deltas, it simply remove all existing + * attributes and then map the new set of attributes to this node. + * @param newAttributes all attributes of current node. + */ + void replaceNodeAttributes(Set newAttributes); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodeattributes/NMNodeAttributeManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodeattributes/NMNodeAttributeManagerImpl.java new file mode 100644 index 00000000000..3c1cec27007 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodeattributes/NMNodeAttributeManagerImpl.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.nodeattributes; + +import org.apache.hadoop.yarn.api.records.NodeAttribute; + +import java.util.HashSet; +import java.util.Set; + +/** + * NMNodeAttributeManager manages all node attributes of current + * node manager. It fetches node attributes from a given provider + * in sync manner. + */ +public class NMNodeAttributeManagerImpl implements NMNodeAttributeManager{ + + private NMNodeAttributeProvider provider; + private Set attributes; + + public NMNodeAttributeManagerImpl(NMNodeAttributeProvider provider) { + this.provider = provider; + this.attributes = new HashSet<>(); + } + + @Override + public Set getNodeAttributes() { + // retrieve node attributes sync-ly from the provider + // this operation is not trivial as it needs to launch a script + Set newAttributes = provider.getNodeAttributes(); + replaceNodeAttributes(newAttributes); + return attributes; + } + + @Override + public void replaceNodeAttributes(Set newAttributes){ + this.attributes = newAttributes; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodeattributes/NMNodeAttributeProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodeattributes/NMNodeAttributeProvider.java new file mode 100644 index 00000000000..cedd84a3074 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodeattributes/NMNodeAttributeProvider.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.nodeattributes; + +import org.apache.hadoop.yarn.api.records.NodeAttribute; + +import java.util.Set; + +public interface NMNodeAttributeProvider { + + Set getNodeAttributes(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodeattributes/ScriptBasedNodeAttributeProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodeattributes/ScriptBasedNodeAttributeProvider.java new file mode 100644 index 00000000000..27956e2dd93 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodeattributes/ScriptBasedNodeAttributeProvider.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.nodeattributes; + +import org.apache.hadoop.yarn.api.records.NodeAttribute; + +import java.util.HashSet; +import java.util.Set; + +public class ScriptBasedNodeAttributeProvider + implements NMNodeAttributeProvider { + + @Override public Set getNodeAttributes() { + return new HashSet<>(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodeattributes/TestNMNodeAttributeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodeattributes/TestNMNodeAttributeManager.java new file mode 100644 index 00000000000..10a6b846792 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodeattributes/TestNMNodeAttributeManager.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.nodeattributes; + +import com.google.common.collect.Sets; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +public class TestNMNodeAttributeManager { + + @Test + public void testReplaceNodeAttributes() { + NodeAttribute attribute0 = NodeAttribute.newInstance("host", + NodeAttributeType.STRING, "host0"); + NodeAttribute attribute1 = NodeAttribute.newInstance("host", + NodeAttributeType.STRING, "host1"); + NodeAttribute attribute2 = NodeAttribute.newInstance("host", + NodeAttributeType.STRING, "host2"); + + NMNodeAttributeManager nmAttrMgr = new NMNodeAttributeManagerImpl( + () -> new HashSet<>()); + Set attributeSet = Sets.newHashSet(); + attributeSet.add(attribute0); + attributeSet.add(attribute1); + attributeSet.add(attribute2); + nmAttrMgr.replaceNodeAttributes(attributeSet); + + Assert.assertEquals(3, nmAttrMgr.getNodeAttributes().size()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index a42d0533c52..f525e1bc580 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -68,6 +69,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.nodeattributes.RMNodeAttributesManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -103,6 +105,7 @@ private final NMLivelinessMonitor nmLivelinessMonitor; private final RMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInRM nmTokenSecretManager; + private final RMNodeAttributesManager nodeAttributesManager; private final ReadLock readLock; private final WriteLock writeLock; @@ -134,6 +137,7 @@ public ResourceTrackerService(RMContext rmContext, this.nmLivelinessMonitor = nmLivelinessMonitor; this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; + this.nodeAttributesManager = new RMNodeAttributesManager(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -611,6 +615,15 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) this.rmContext.getNodeManagerQueueLimitCalculator() .createContainerQueuingLimit()); } + + // 8. Get node's attributes and update node-to-attributes mapping + // in RMNodeAttributeManager. + Set nodeAttributes = request.getNodeAttributes(); + if (nodeAttributes != null && !nodeAttributes.isEmpty()) { + // Update attributes in RM + nodeAttributesManager.updateNodeAttributes(nodeId, nodeAttributes); + } + return nodeHeartBeatResponse; } @@ -801,4 +814,9 @@ void refreshServiceAcls(Configuration configuration, public Server getServer() { return this.server; } + + @VisibleForTesting + public RMNodeAttributesManager getRMNodeAttributesManager() { + return nodeAttributesManager; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodeattributes/RMNodeAttributesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodeattributes/RMNodeAttributesManager.java new file mode 100644 index 00000000000..619086f4a55 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodeattributes/RMNodeAttributesManager.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.nodeattributes; + +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; + +import java.util.Map; +import java.util.Set; +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Maintains node-to-attributes mapping in memory, these mapping info + * gets updated when RM receives NM heartbeat. This class is thread-safe. + */ +public class RMNodeAttributesManager { + + //private Set node2attributes; + private Map node2attributes; + + public RMNodeAttributesManager() { + node2attributes = new ConcurrentHashMap<>(); + } + + public void updateNodeAttributes(NodeId nodeId, + Set attributes) { + node2attributes.put(nodeId, NodeToAttributes + .newInstance(nodeId.getHost(), new ArrayList<>(attributes))); + } + + public boolean hasNodeAttributes(NodeId nodeId) { + return node2attributes.containsKey(nodeId); + } + + public NodeToAttributes getNodeAttributes(NodeId nodeId) { + return node2attributes.get(nodeId); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index fc6326eb1e6..bc3a12263d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -28,6 +28,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.HashSet; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -64,6 +65,8 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; @@ -77,10 +80,12 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.nodeattributes.RMNodeAttributesManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -733,6 +738,56 @@ private NodeStatus getNodeStatusObject(NodeId nodeId) { return status; } + @Test + public void testNodeHeartbeatWithNodeAttributes() throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + rm = new MockRM(conf); + rm.start(); + + // Register to RM + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest registerReq = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + registerReq.setResource(capability); + registerReq.setNodeId(nodeId); + registerReq.setHttpPort(1234); + registerReq.setNMVersion(YarnVersionInfo.getVersion()); + RegisterNodeManagerResponse registerResponse = + resourceTrackerService.registerNodeManager(registerReq); + + Set nodeAttributes = new HashSet<>(); + nodeAttributes.add(NodeAttribute.newInstance("host", + NodeAttributeType.STRING, "host2")); + + // Set node attributes in HB. + NodeHeartbeatRequest heartbeatReq = + Records.newRecord(NodeHeartbeatRequest.class); + NodeStatus nodeStatusObject = getNodeStatusObject(nodeId); + heartbeatReq.setNodeStatus(nodeStatusObject); + heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse + .getNMTokenMasterKey()); + heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse + .getContainerTokenMasterKey()); + heartbeatReq.setNodeAttributes(nodeAttributes); + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + // Ensure RM gets correct node attributes update. + RMNodeAttributesManager attributeManager = + resourceTrackerService.getRMNodeAttributesManager(); + NodeToAttributes attrs = attributeManager.getNodeAttributes(nodeId); + Assert.assertEquals(1, attrs.getNodeAttributes().size()); + NodeAttribute na = attrs.getNodeAttributes().get(0); + Assert.assertEquals("host", na.getAttributeName()); + Assert.assertEquals("host2", na.getAttributeValue()); + Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType()); + } + @Test public void testNodeHeartBeatWithLabels() throws Exception { writeToHostsFile("host2");