diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 9c16e49..cbbd427 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -865,6 +865,13 @@ protected synchronized void completedContainer(RMContainer rmContainer, return; } + if(!application.getLiveContainersMap().containsKey(container.getId())){ + LOG.info("Container " + container + " of application attempt " + appId + + " is not alive, skip do completedContainer operation on event " + + event); + return; + } + // Get the node on which the container was allocated FSSchedulerNode node = getFSSchedulerNode(container.getNodeId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 89aff29..5b5ec0a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -113,12 +113,15 @@ public static Resource newAvailResource(Resource total, Resource used) { private Set labels; private ResourceUtilization containersUtilization; private ResourceUtilization nodeUtilization; + private List updatedContainerInfoList + = new ArrayList(); public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, Resource perNode, String rackName, String healthReport, long lastHealthReportTime, int cmdPort, String hostName, NodeState state, Set labels, ResourceUtilization containersUtilization, - ResourceUtilization nodeUtilization) { + ResourceUtilization nodeUtilization, + List updatedContainerInfoList) { this.nodeId = nodeId; this.nodeAddr = nodeAddr; this.httpAddress = httpAddress; @@ -132,6 +135,9 @@ public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, this.labels = labels; this.containersUtilization = containersUtilization; this.nodeUtilization = nodeUtilization; + if(updatedContainerInfoList != null){ + this.updatedContainerInfoList = updatedContainerInfoList; + } } @Override @@ -219,9 +225,12 @@ public String getNodeManagerVersion() { @Override public List pullContainerUpdates() { - return new ArrayList(); + return updatedContainerInfoList; } + public void setUpdatedContainerInfoList(List list){ + updatedContainerInfoList = list; + } @Override public String getHealthReport() { return healthReport; @@ -270,19 +279,20 @@ private static RMNode buildRMNode(int rack, final Resource perNode, private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr, Set labels) { return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++, null, 123, - labels, null, null); + labels, null, null, null); } private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr, int hostnum, String hostName, int port) { return buildRMNode(rack, perNode, state, httpAddr, hostnum, hostName, port, - null, null, null); + null, null, null, null); } private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr, int hostnum, String hostName, int port, Set labels, ResourceUtilization containersUtilization, - ResourceUtilization nodeUtilization) { + ResourceUtilization nodeUtilization, + List updatedContainerInfoList) { final String rackName = "rack"+ rack; final int nid = hostnum; final String nodeAddr = hostName + ":" + nid; @@ -295,7 +305,7 @@ private static RMNode buildRMNode(int rack, final Resource perNode, String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe"; return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, rackName, healthReport, 0, nid, hostName, state, labels, - containersUtilization, nodeUtilization); + containersUtilization, nodeUtilization,updatedContainerInfoList ); } public static RMNode nodeInfo(int rack, final Resource perNode, @@ -313,17 +323,27 @@ public static RMNode newNodeInfo(int rack, final Resource perNode) { } public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum) { - return buildRMNode(rack, perNode, null, "localhost:0", hostnum, null, 123); + return buildRMNode(rack, perNode, null, "localhost:0", hostnum, null, 123, + null, null, null, null); } public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum, String hostName) { - return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, 123); + return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, + 123, null, null, null, null); } public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum, String hostName, int port) { - return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, port); + return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, + port,null, null, null, null); + } + + public static RMNode newNodeInfo(int rack, final Resource perNode, + int hostnum, String hostName, + List list) { + return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, + 123, null, null, null, list); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 430eba7..7b909df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.isA; @@ -55,18 +54,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.GroupMappingServiceProvider; import org.apache.hadoop.yarn.MockApps; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.QueueInfo; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -86,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -5078,6 +5067,78 @@ public void testPerfMetricsInited() { } @Test + public void testCompletedContainerReleaseSameContainerTwice() + throws IOException { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add one nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + scheduler.handle(new NodeAddedSchedulerEvent(node1)); + + int allocatedResMB = + FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB; + // Create resource request + ApplicationAttemptId attemptId = createSchedulingRequest( + allocatedResMB, 2, + "queue1", "user1", 1); + + // update scheduler + scheduler.handle(new NodeUpdateSchedulerEvent(node1)); + + FSAppAttempt app = scheduler.getApplicationAttempt(attemptId); + + // Assert usedResource of this app + assertEquals(1, app.getLiveContainersMap().size()); + assertEquals(allocatedResMB, app.getResourceUsage().getMemory()); + assertEquals(2, app.getResourceUsage().getVirtualCores()); + + assertEquals(1, app.getLiveContainers().size()); + ContainerId containerId = app.getLiveContainers() + .toArray(new RMContainer[0])[0].getContainerId(); + + // Fire NodeUpdateSchedulerEvent with completedContainer of containerId + ContainerStatus status = ContainerStatus.newInstance(containerId, + ContainerState.COMPLETE, "", 0); + + List updatedContainerInfos + = new ArrayList(); + List completedContainers = + new ArrayList(); + completedContainers.add(status); + updatedContainerInfos.add(new UpdatedContainerInfo( + new ArrayList(), completedContainers)); + + RMNode node1Update = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1", updatedContainerInfos); + + NodeUpdateSchedulerEvent event = new NodeUpdateSchedulerEvent(node1Update); + scheduler.handle(event); + + assertEquals(0, app.getLiveContainersMap().size()); + assertEquals(0, app.getResourceUsage().getMemory()); + assertEquals(0, app.getResourceUsage().getVirtualCores()); + + // call allocate with release request of containerId + List releasedContainers = new ArrayList(); + releasedContainers.add(containerId); + scheduler.allocate(attemptId, + new ArrayList(), + releasedContainers, + new ArrayList(), new ArrayList(), + null, null); + + // Assert usedResource of this App be zero. + assertEquals(0, app.getLiveContainersMap().size()); + assertEquals(0, app.getResourceUsage().getMemory()); + assertEquals(0, app.getResourceUsage().getVirtualCores()); + } + + @Test public void testQueueNameWithTrailingSpace() throws Exception { scheduler.init(conf); scheduler.start();