diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java index a6907676f9d..8531277e4a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java @@ -34,8 +34,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.log4j.Logger; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; @@ -69,6 +71,9 @@ private TypeToCountedTags globalNodeMapping = new TypeToCountedTags(); // Global tags to Rack mapping private TypeToCountedTags globalRackMapping = new TypeToCountedTags(); + // Global node to containers mapping + private Map> containersToNode = + new HashMap<>(); /** * Generic store mapping type T to counted tags. @@ -343,9 +348,17 @@ public void addContainer(NodeId nodeId, ContainerId containerId, if (allocationTags == null || allocationTags.isEmpty()) { return; } + if (containersToNode.containsKey(nodeId) && + containersToNode.get(nodeId).contains(containerId)) { + LOG.warn("Container " + containerId.toString() + + " already exists. Skip adding it."); + return; + } ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId(); addTags(nodeId, applicationId, allocationTags); + containersToNode.putIfAbsent(nodeId, new ArrayList<>()); + containersToNode.get(nodeId).add(containerId); if (LOG.isDebugEnabled()) { LOG.debug("Added container=" + containerId + " with tags=[" + StringUtils.join(allocationTags, ",") + "]"); @@ -388,13 +401,19 @@ public void removeContainer(NodeId nodeId, if (allocationTags == null || allocationTags.isEmpty()) { return; } - ApplicationId applicationId = - containerId.getApplicationAttemptId().getApplicationId(); - - removeTags(nodeId, applicationId, allocationTags); - if (LOG.isDebugEnabled()) { - LOG.debug("Removed container=" + containerId + " with tags=[" - + StringUtils.join(allocationTags, ",") + "]"); + if (containersToNode.containsKey(nodeId) + && containersToNode.get(nodeId).contains(containerId)) { + ApplicationId applicationId = containerId + .getApplicationAttemptId().getApplicationId(); + removeTags(nodeId, applicationId, allocationTags); + containersToNode.get(nodeId).remove(containerId); + if (LOG.isDebugEnabled()) { + LOG.debug("Removed container=" + containerId + " with tags=[" + + StringUtils.join(allocationTags, ",") + "]"); + } + } else { + LOG.warn("Container " + containerId.toString() + + " doesn't exist in allocation tags manager."); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java index 3f2aaed2a7c..2f4ae3bf531 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; @@ -38,6 +39,7 @@ import org.mockito.Mockito; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -60,6 +62,44 @@ public void setup() { rmContext = rm.getRMContext(); } + @Test + public void testMultipleAddRemoveContainer() { + AllocationTagsManager atm = new AllocationTagsManager(rmContext); + + NodeId nodeId = NodeId.fromString("host1:123"); + ContainerId cid1 = TestUtils.getMockContainerId(1, 1); + ContainerId cid2 = TestUtils.getMockContainerId(1, 2); + Set tags1 = ImmutableSet.of("mapper", "reducer"); + Set tags2 = ImmutableSet.of("mapper"); + + // node - mapper : 2 + // - reduce : 1 + atm.addContainer(nodeId, cid1, tags1); + atm.addContainer(nodeId, cid2, tags2); + Assert.assertEquals(2L, + (long) atm.getAllocationTagsWithCount(nodeId).get("mapper")); + Assert.assertEquals(1L, + (long) atm.getAllocationTagsWithCount(nodeId).get("reducer")); + + // remove container1 + atm.removeContainer(nodeId, cid1, tags1); + Assert.assertEquals(1L, + (long) atm.getAllocationTagsWithCount(nodeId).get("mapper")); + Assert.assertNull(atm.getAllocationTagsWithCount(nodeId).get("reducer")); + + // remove the same container again, it should be skipped + atm.removeContainer(nodeId, cid1, tags1); + Assert.assertEquals(1L, + (long) atm.getAllocationTagsWithCount(nodeId).get("mapper")); + Assert.assertNull(atm.getAllocationTagsWithCount(nodeId).get("reducer")); + + // add container2 again + atm.addContainer(nodeId, cid2, tags1); + Assert.assertEquals(1L, + (long) atm.getAllocationTagsWithCount(nodeId).get("mapper")); + Assert.assertNull(atm.getAllocationTagsWithCount(nodeId).get("reducer")); + } + @Test public void testAllocationTagsManagerSimpleCases() throws InvalidAllocationTagsQueryException {