diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 0c99139d547..b7a9ae4f819 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -218,6 +218,10 @@ public Integer getDecommissioningTimeout() { return null; } + @Override public void removeContainerAllocationTags( + ContainerId containerId, Set allocationTags) { + } + @Override public Resource getPhysicalResource() { return null; diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 78645e98d68..cabb563fa63 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -206,6 +206,12 @@ public Integer getDecommissioningTimeout() { return node.getAllocationTagsWithCount(); } + @Override + public void removeContainerAllocationTags(ContainerId containerId, + Set allocationTags) { + node.removeContainerAllocationTags(containerId, allocationTags); + } + @Override public Resource getPhysicalResource() { return null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index b5c8e7cb8e9..efac6662b2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -701,11 +701,6 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { - // Notify AllocationTagsManager - container.rmContext.getAllocationTagsManager().removeContainer( - container.getNodeId(), container.getContainerId(), - container.getAllocationTags()); - RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event; container.finishTime = System.currentTimeMillis(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 872f2a6ef8d..6a60e5e1250 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -189,4 +189,13 @@ * @return a map of each allocation tag and its count. */ Map getAllocationTagsWithCount(); + + /** + * Removes a set of allocation tags belong to a certain container + * on this node. + * @param containerId + * @param allocationTags + */ + void removeContainerAllocationTags(ContainerId containerId, + Set allocationTags); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index b942afae564..c31e958069e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -1541,4 +1541,11 @@ public Integer getDecommissioningTimeout() { return context.getAllocationTagsManager() .getAllocationTagsWithCount(getNodeID()); } + + @Override + public void removeContainerAllocationTags(ContainerId containerId, + Set allocationTags) { + context.getAllocationTagsManager() + .removeContainer(nodeId, containerId, allocationTags); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index d5bfc57e66d..74488591336 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -242,6 +242,15 @@ public synchronized void releaseContainer(ContainerId containerId, launchedContainers.remove(containerId); Container container = info.container.getContainer(); + + // We remove allocation tags when a container is actually + // released on NM. This is to avoid running into situation + // when AM releases a container and NM has some delay to + // actually release it, then the tag can still be visible + // at RM so that RM can respect it during scheduling new containers. + getRMNode().removeContainerAllocationTags( + container.getId(), container.getAllocationTags()); + updateResourceForReleasedContainer(container); if (LOG.isDebugEnabled()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 84105d93032..085dcb69d2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -285,6 +285,11 @@ public Integer getDecommissioningTimeout() { return null; } + @Override + public void removeContainerAllocationTags( + ContainerId containerId, Set allocationTags) { + } + @Override public Resource getPhysicalResource() { return this.physicalResource; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index c0f8d39abe3..ba409b1386b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -27,9 +27,16 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -416,6 +423,103 @@ public void testReleasedContainerIfAppAttemptisNull() throws Exception { } } + @Test(timeout = 30000l) + public void testContainerReleaseWithAllocationTags() throws Exception { + // Currently only can be tested against capacity scheduler. + if (getSchedulerType().equals(SchedulerType.CAPACITY)) { + final String testTag1 = "some-tag"; + final String testTag2 = "some-other-tag"; + YarnConfiguration conf = getConf(); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, "scheduler"); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = new MockNM("127.0.0.1:1234", + 10240, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = + rm1.submitApp(200, "name", "user", new HashMap<>(), false, "default", + -1, null, "Test", false, true); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // allocate 1 container with tag1 + SchedulingRequest sr = SchedulingRequest + .newInstance(1l, Priority.newInstance(1), + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), + Sets.newHashSet(testTag1), + ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)), + null); + + // allocate 3 containers with tag2 + SchedulingRequest sr1 = SchedulingRequest + .newInstance(2l, Priority.newInstance(1), + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), + Sets.newHashSet(testTag2), + ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)), + null); + + AllocateRequest ar = AllocateRequest.newBuilder() + .schedulingRequests(Lists.newArrayList(sr, sr1)).build(); + am1.allocate(ar); + nm1.nodeHeartbeat(true); + + List allocated = new ArrayList<>(); + while (allocated.size() < 4) { + AllocateResponse rsp = am1 + .allocate(new ArrayList<>(), new ArrayList<>()); + allocated.addAll(rsp.getAllocatedContainers()); + nm1.nodeHeartbeat(true); + Thread.sleep(1000); + } + + Assert.assertEquals(4, allocated.size()); + + Set containers = allocated.stream() + .filter(container -> container.getAllocationRequestId() == 1l) + .collect(Collectors.toSet()); + Assert.assertNotNull(containers); + Assert.assertEquals(1, containers.size()); + ContainerId cid = containers.iterator().next().getId(); + + // mock container start + rm1.getRMContext().getScheduler() + .getSchedulerNode(nm1.getNodeId()).containerStarted(cid); + + // verifies the allocation is made with correct number of tags + Map nodeTags = rm1.getRMContext() + .getAllocationTagsManager() + .getAllocationTagsWithCount(nm1.getNodeId()); + Assert.assertNotNull(nodeTags.get(testTag1)); + Assert.assertEquals(1, nodeTags.get(testTag1).intValue()); + + // release a container + am1.allocate(new ArrayList<>(), Lists.newArrayList(cid)); + + // before NM confirms, the tag should still exist + nodeTags = rm1.getRMContext().getAllocationTagsManager() + .getAllocationTagsWithCount(nm1.getNodeId()); + Assert.assertNotNull(nodeTags); + Assert.assertNotNull(nodeTags.get(testTag1)); + Assert.assertEquals(1, nodeTags.get(testTag1).intValue()); + + // NM reports back that container is released + // RM should cleanup the tag + ContainerStatus cs = ContainerStatus.newInstance(cid, + ContainerState.COMPLETE, "", 0); + nm1.nodeHeartbeat(Lists.newArrayList(cs), true); + + // Wait on condition + // 1) tag1 doesn't exist anymore + // 2) num of tag2 is still 3 + GenericTestUtils.waitFor(() -> { + Map tags = rm1.getRMContext() + .getAllocationTagsManager() + .getAllocationTagsWithCount(nm1.getNodeId()); + return tags.get(testTag1) == null && + tags.get(testTag2).intValue() == 3; + }, 500, 3000); + } + } + @Test(timeout=60000) public void testContainerReleasedByNode() throws Exception { System.out.println("Starting testContainerReleasedByNode");