diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3eefb8f..92e1a36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -828,6 +828,13 @@ protected synchronized void completedContainer(RMContainer rmContainer, return; } + if(!application.getLiveContainersMap().containsKey(container.getId())){ + LOG.info("Container " + container + " of application attempt " + appId + + " is not alive, skip do completedContainer operation on event " + + event); + return; + } + // Get the node on which the container was allocated FSSchedulerNode node = getFSSchedulerNode(container.getNodeId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 53cb8d0..26edead 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -108,11 +108,14 @@ public static Resource newAvailResource(Resource total, Resource used) { private long lastHealthReportTime; private NodeState state; private Set labels; + private List updatedContainerInfoList + = new ArrayList(); public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, Resource perNode, String rackName, String healthReport, long lastHealthReportTime, int cmdPort, String hostName, NodeState state, - Set labels) { + Set labels, + List updatedContainerInfoList) { this.nodeId = nodeId; this.nodeAddr = nodeAddr; this.httpAddress = httpAddress; @@ -124,6 +127,9 @@ public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, this.hostName = hostName; this.state = state; this.labels = labels; + if(updatedContainerInfoList != null){ + this.updatedContainerInfoList = updatedContainerInfoList; + } } @Override @@ -211,9 +217,12 @@ public String getNodeManagerVersion() { @Override public List pullContainerUpdates() { - return new ArrayList(); + return updatedContainerInfoList; } + public void setUpdatedContainerInfoList(List list){ + updatedContainerInfoList = list; + } @Override public String getHealthReport() { return healthReport; @@ -241,18 +250,18 @@ private static RMNode buildRMNode(int rack, final Resource perNode, private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr, Set labels) { return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++, null, 123, - labels); + labels, null); } private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr, int hostnum, String hostName, int port) { return buildRMNode(rack, perNode, state, httpAddr, hostnum, hostName, port, - null); + null, null); } private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr, int hostnum, String hostName, int port, - Set labels) { + Set labels, List updatedContainerInfoList) { final String rackName = "rack"+ rack; final int nid = hostnum; final String nodeAddr = hostName + ":" + nid; @@ -264,7 +273,8 @@ private static RMNode buildRMNode(int rack, final Resource perNode, final String httpAddress = httpAddr; String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe"; return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, - rackName, healthReport, 0, nid, hostName, state, labels); + rackName, healthReport, 0, nid, hostName, state, labels, + updatedContainerInfoList); } public static RMNode nodeInfo(int rack, final Resource perNode, @@ -282,17 +292,27 @@ public static RMNode newNodeInfo(int rack, final Resource perNode) { } public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum) { - return buildRMNode(rack, perNode, null, "localhost:0", hostnum, null, 123); + return buildRMNode(rack, perNode, null, "localhost:0", hostnum, null, 123, + null, null); } public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum, String hostName) { - return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, 123); + return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, + 123, null, null); } public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum, String hostName, int port) { - return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, port); + return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, + port,null, null); + } + + public static RMNode newNodeInfo(int rack, final Resource perNode, + int hostnum, String hostName, + List list) { + return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, + 123, null, list); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index c352cc9..e347937 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -52,18 +51,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.GroupMappingServiceProvider; import org.apache.hadoop.yarn.MockApps; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.QueueInfo; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -83,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -4736,6 +4725,77 @@ public void testPerfMetricsInited() { } @Test + public void testCompletedContainerReleaseSameContainerTwice() + throws IOException { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add one nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + scheduler.handle(new NodeAddedSchedulerEvent(node1)); + + int allocatedResMB = + FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB; + // Create resource request + ApplicationAttemptId attemptId = createSchedulingRequest( + allocatedResMB, 2, + "queue1", "user1", 1); + + // update scheduler + scheduler.handle(new NodeUpdateSchedulerEvent(node1)); + + FSAppAttempt app = scheduler.getApplicationAttempt(attemptId); + + // Assert usedResource of this app + assertEquals(1, app.getLiveContainersMap().size()); + assertEquals(allocatedResMB, app.getResourceUsage().getMemory()); + assertEquals(2, app.getResourceUsage().getVirtualCores()); + + assertEquals(1, app.getLiveContainers().size()); + ContainerId containerId = app.getLiveContainers() + .toArray(new RMContainer[0])[0].getContainerId(); + + // Fire NodeUpdateSchedulerEvent with completedContainer of containerId + ContainerStatus status = ContainerStatus.newInstance(containerId, + ContainerState.COMPLETE, "", 0); + + List updatedContainerInfos + = new ArrayList(); + List completedContainers = + new ArrayList(); + completedContainers.add(status); + updatedContainerInfos.add(new UpdatedContainerInfo( + new ArrayList(), completedContainers)); + + RMNode node1Update = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1", updatedContainerInfos); + + NodeUpdateSchedulerEvent event = new NodeUpdateSchedulerEvent(node1Update); + scheduler.handle(event); + + assertEquals(0, app.getLiveContainersMap().size()); + assertEquals(0, app.getResourceUsage().getMemory()); + assertEquals(0, app.getResourceUsage().getVirtualCores()); + + // call allocate with release request of containerId + List releasedContainers = new ArrayList(); + releasedContainers.add(containerId); + scheduler.allocate(attemptId, + new ArrayList(), + releasedContainers, + new ArrayList(), new ArrayList()); + + // Assert usedResource of this App be zero. + assertEquals(0, app.getLiveContainersMap().size()); + assertEquals(0, app.getResourceUsage().getMemory()); + assertEquals(0, app.getResourceUsage().getVirtualCores()); + } + + @Test public void testQueueNameWithTrailingSpace() throws Exception { scheduler.init(conf); scheduler.start();