diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 3d6eda2cf5c..5d9e48a6ac8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeAttributesUtils; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -675,9 +676,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // in RMNodeAttributeManager. Set nodeAttributes = request.getNodeAttributes(); if (nodeAttributes != null && !nodeAttributes.isEmpty()) { - nodeAttributes.forEach(nodeAttribute -> - LOG.debug(nodeId.toString() + " ATTRIBUTE : " - + nodeAttribute.toString())); + if (LOG.isDebugEnabled()) { + nodeAttributes.forEach(nodeAttribute -> LOG.debug( + nodeId.toString() + " ATTRIBUTE : " + nodeAttribute.toString())); + } // Validate attributes if (!nodeAttributes.stream().allMatch( @@ -692,9 +694,15 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) } else { // Replace all distributed node attributes associated with this host // with the new reported attributes in node attribute manager. - this.rmContext.getNodeAttributesManager() - .replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED, - ImmutableMap.of(nodeId.getHost(), nodeAttributes)); + Set currentNodeAttributes = + this.rmContext.getNodeAttributesManager() + .getAttributesForNode(nodeId.getHost()).keySet(); + if (!NodeAttributesUtils + .isNodeAttributesEquals(nodeAttributes, currentNodeAttributes)) { + this.rmContext.getNodeAttributesManager() + .replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED, + ImmutableMap.of(nodeId.getHost(), nodeAttributes)); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesUtils.java new file mode 100644 index 00000000000..13b1ec15b1f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesUtils.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.NodeAttribute; + +import java.util.Objects; +import java.util.Set; + +/** + * Node attributes utilities. + */ +public final class NodeAttributesUtils { + private static final Log LOG = LogFactory.getLog(NodeAttributesUtils.class); + + private NodeAttributesUtils() { /* Hidden constructor */ } + + public static boolean isNodeAttributesEquals( + Set checkNodeAttributes, + Set targetNodeAttributes) { + if (Objects.equals(checkNodeAttributes, targetNodeAttributes)) { + return checkNodeAttributes.stream() + .allMatch(e -> isNodeAttributeMatches(e, targetNodeAttributes)); + } + return false; + } + + private static boolean isNodeAttributeMatches( + NodeAttribute checkNodeAttribute, Set nodeAttributes) { + return nodeAttributes.stream().anyMatch( + e -> e.equals(checkNodeAttribute) && Objects + .equals(e.getAttributeValue(), + checkNodeAttribute.getAttributeValue())); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index b451db1a7d5..494e7dac963 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import com.google.common.collect.ImmutableSet; import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore; import org.apache.hadoop.yarn.server.api.ResourceTracker; @@ -45,6 +46,7 @@ import java.util.HashSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.transform.Transformer; import javax.xml.transform.TransformerFactory; @@ -116,6 +118,9 @@ import org.junit.After; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.w3c.dom.Document; import org.w3c.dom.Element; @@ -908,6 +913,119 @@ public void testNodeHeartbeatWithNodeAttributes() throws Exception { Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType()); } + @Test + public void testNodeHeartbeatOnlyUpdateNodeAttributesIfNeeded() + throws Exception { + writeToHostsFile("host2"); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + hostFile.getAbsolutePath()); + conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS, + FileSystemNodeAttributeStore.class, NodeAttributeStore.class); + File tempDir = File.createTempFile("nattr", ".tmp"); + tempDir.delete(); + tempDir.mkdirs(); + tempDir.deleteOnExit(); + conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR, + tempDir.getAbsolutePath()); + rm = new MockRM(conf); + rm.start(); + + // spy node attributes manager + NodeAttributesManager tmpAttributeManager = + rm.getRMContext().getNodeAttributesManager(); + NodeAttributesManager spyAttributeManager = spy(tmpAttributeManager); + rm.getRMContext().setNodeAttributesManager(spyAttributeManager); + AtomicInteger count = new AtomicInteger(0); + Mockito.doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) throws Exception { + count.incrementAndGet(); + tmpAttributeManager + .replaceNodeAttributes((String) invocation.getArguments()[0], + (Map>) invocation.getArguments()[1]); + return null; + } + }).when(spyAttributeManager) + .replaceNodeAttributes(Mockito.any(String.class), + Mockito.any(Map.class)); + + // Register to RM + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest registerReq = + Records.newRecord(RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + registerReq.setResource(capability); + registerReq.setNodeId(nodeId); + registerReq.setHttpPort(1234); + registerReq.setNMVersion(YarnVersionInfo.getVersion()); + RegisterNodeManagerResponse registerResponse = + resourceTrackerService.registerNodeManager(registerReq); + + Set nodeAttributes = new HashSet<>(); + nodeAttributes.add(NodeAttribute.newInstance( + NodeAttribute.PREFIX_DISTRIBUTED, "host", + NodeAttributeType.STRING, "host2")); + + // Set node attributes in HB. + NodeHeartbeatRequest heartbeatReq = + Records.newRecord(NodeHeartbeatRequest.class); + NodeStatus nodeStatusObject = getNodeStatusObject(nodeId); + int responseId = nodeStatusObject.getResponseId(); + heartbeatReq.setNodeStatus(nodeStatusObject); + heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse + .getNMTokenMasterKey()); + heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse + .getContainerTokenMasterKey()); + heartbeatReq.setNodeAttributes(nodeAttributes); + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + // Ensure RM gets correct node attributes update. + Map attrs = spyAttributeManager + .getAttributesForNode(nodeId.getHost()); + spyAttributeManager.getNodesToAttributes(ImmutableSet.of(nodeId.getHost())); + Assert.assertEquals(1, attrs.size()); + NodeAttribute na = attrs.keySet().iterator().next(); + Assert.assertEquals("host", na.getAttributeKey().getAttributeName()); + Assert.assertEquals("host2", na.getAttributeValue()); + Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType()); + Assert.assertEquals(1, count.get()); + + // Send HBs to RM with the same node attributes + nodeStatusObject.setResponseId(++responseId); + heartbeatReq.setNodeStatus(nodeStatusObject); + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + nodeStatusObject.setResponseId(++responseId); + heartbeatReq.setNodeStatus(nodeStatusObject); + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + // Make sure RM updated node attributes once + Assert.assertEquals(1, count.get()); + + // Send another HB to RM with updated node attributes + nodeAttributes.clear(); + nodeAttributes.add(NodeAttribute.newInstance( + NodeAttribute.PREFIX_DISTRIBUTED, "host", + NodeAttributeType.STRING, "host3")); + nodeStatusObject.setResponseId(++responseId); + heartbeatReq.setNodeStatus(nodeStatusObject); + heartbeatReq.setNodeAttributes(nodeAttributes); + resourceTrackerService.nodeHeartbeat(heartbeatReq); + + // Make sure RM gets the updated attribute + attrs = spyAttributeManager.getAttributesForNode(nodeId.getHost()); + Assert.assertEquals(1, attrs.size()); + na = attrs.keySet().iterator().next(); + Assert.assertEquals("host", na.getAttributeKey().getAttributeName()); + Assert.assertEquals("host3", na.getAttributeValue()); + Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType()); + + // Make sure RM updated node attributes twice + Assert.assertEquals(2, count.get()); + } + @Test public void testNodeHeartBeatWithInvalidLabels() throws Exception { writeToHostsFile("host2");