diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 634f667..48eddef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -122,12 +122,19 @@ public QueueMetrics getMetrics() { return queue.getMetrics(); } - synchronized public void containerCompleted(RMContainer rmContainer, + synchronized public boolean containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { Container container = rmContainer.getContainer(); ContainerId containerId = container.getId(); - + + if(!liveContainers.containsKey(container.getId())){ + LOG.info("Container " + container + " of application attempt " + this.getId() + + " is not alive, skip completedContainer operation on event " + + event); + return false; + } + // Remove from the list of newly allocated containers if found newlyAllocatedContainers.remove(rmContainer); @@ -160,6 +167,7 @@ synchronized public void containerCompleted(RMContainer rmContainer, // Clear resource utilization metrics cache. lastMemoryAggregateAllocationUpdateTime = -1; + return true; } private synchronized void unreserveInternal( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index bc953ba..87f4992 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -864,13 +864,17 @@ protected synchronized void completedContainerInternal( // Get the node on which the container was allocated FSSchedulerNode node = getFSSchedulerNode(container.getNodeId()); + boolean removed = false; if (rmContainer.getState() == RMContainerState.RESERVED) { application.unreserve(rmContainer.getReservedPriority(), node); } else { - application.containerCompleted(rmContainer, containerStatus, event); + removed = application.containerCompleted(rmContainer, containerStatus, + event); node.releaseContainer(container); - updateRootQueueMetrics(); + if (removed) { + updateRootQueueMetrics(); + } } if (LOG.isDebugEnabled()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index a88abe7..6a522ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -113,12 +113,15 @@ public static Resource newAvailResource(Resource total, Resource used) { private Set labels; private ResourceUtilization containersUtilization; private ResourceUtilization nodeUtilization; + private List updatedContainerInfoList + = new ArrayList(); public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, Resource perNode, String rackName, String healthReport, long lastHealthReportTime, int cmdPort, String hostName, NodeState state, Set labels, ResourceUtilization containersUtilization, - ResourceUtilization nodeUtilization) { + ResourceUtilization nodeUtilization, + List updatedContainerInfoList) { this.nodeId = nodeId; this.nodeAddr = nodeAddr; this.httpAddress = httpAddress; @@ -132,6 +135,9 @@ public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, this.labels = labels; this.containersUtilization = containersUtilization; this.nodeUtilization = nodeUtilization; + if(updatedContainerInfoList != null){ + this.updatedContainerInfoList = updatedContainerInfoList; + } } @Override @@ -219,9 +225,12 @@ public String getNodeManagerVersion() { @Override public List pullContainerUpdates() { - return new ArrayList(); + return updatedContainerInfoList; } + public void setUpdatedContainerInfoList(List list){ + updatedContainerInfoList = list; + } @Override public String getHealthReport() { return healthReport; @@ -283,19 +292,20 @@ private static RMNode buildRMNode(int rack, final Resource perNode, private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr, Set labels) { return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++, null, 123, - labels, null, null); + labels, null, null, null); } private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr, int hostnum, String hostName, int port) { return buildRMNode(rack, perNode, state, httpAddr, hostnum, hostName, port, - null, null, null); + null, null, null, null); } private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr, int hostnum, String hostName, int port, Set labels, ResourceUtilization containersUtilization, - ResourceUtilization nodeUtilization) { + ResourceUtilization nodeUtilization, + List updatedContainerInfoList) { final String rackName = "rack"+ rack; final int nid = hostnum; final String nodeAddr = hostName + ":" + nid; @@ -308,7 +318,7 @@ private static RMNode buildRMNode(int rack, final Resource perNode, String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe"; return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, rackName, healthReport, 0, nid, hostName, state, labels, - containersUtilization, nodeUtilization); + containersUtilization, nodeUtilization,updatedContainerInfoList ); } public static RMNode nodeInfo(int rack, final Resource perNode, @@ -326,17 +336,27 @@ public static RMNode newNodeInfo(int rack, final Resource perNode) { } public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum) { - return buildRMNode(rack, perNode, null, "localhost:0", hostnum, null, 123); + return buildRMNode(rack, perNode, null, "localhost:0", hostnum, null, 123, + null, null, null, null); } public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum, String hostName) { - return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, 123); + return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, + 123, null, null, null, null); } public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum, String hostName, int port) { - return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, port); + return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, + port,null, null, null, null); + } + + public static RMNode newNodeInfo(int rack, final Resource perNode, + int hostnum, String hostName, + List list) { + return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, + 123, null, null, null, list); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 43ebe53..b093eea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -89,6 +90,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -107,6 +109,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.log4j.Logger; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -4308,6 +4311,87 @@ public void testPerfMetricsInited() { } @Test + public void testCompletedContainerReleaseSameContainerTwice() + throws IOException { + final Logger LOG = Logger.getLogger(TestFairScheduler.class); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add one nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + scheduler.handle(new NodeAddedSchedulerEvent(node1)); + + int allocatedResMB = + FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB; + + // Create resource request + ApplicationAttemptId attemptId = createSchedulingRequest( + allocatedResMB, 2, + "queue1", "user1", 1); + + // update scheduler + scheduler.handle(new NodeUpdateSchedulerEvent(node1)); + + FSAppAttempt app = scheduler.getApplicationAttempt(attemptId); + + // Assert usedResource of this app + assertEquals(1, app.getLiveContainersMap().size()); + assertEquals(allocatedResMB, app.getResourceUsage().getMemorySize()); + assertEquals(2, app.getResourceUsage().getVirtualCores()); + + assertEquals(1, app.getLiveContainers().size()); + RMContainer rmContainer = app.getLiveContainers() + .toArray(new RMContainer[0])[0]; + ContainerId containerId = rmContainer.getContainerId(); + + // Call allocate with empty resource request to let the formal allocated container enter ACQUIRED state + scheduler.allocate(attemptId, new ArrayList(), new ArrayList(), + null, null, null, null); + + // Fire NodeUpdateSchedulerEvent with completedContainer of containerId + ContainerStatus status = ContainerStatus.newInstance(containerId, + ContainerState.COMPLETE, "", 0); + + List updatedContainerInfos + = new ArrayList(); + List completedContainers = + new ArrayList(); + completedContainers.add(status); + updatedContainerInfos.add(new UpdatedContainerInfo( + new ArrayList(), completedContainers)); + + RMNode node1Update = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1", updatedContainerInfos); + + NodeUpdateSchedulerEvent event = new NodeUpdateSchedulerEvent(node1Update); + scheduler.handle(event); + + assertEquals(0, app.getLiveContainersMap().size()); + assertEquals(0, app.getResourceUsage().getMemorySize()); + assertEquals(0, app.getResourceUsage().getVirtualCores()); + + LOG.info("release container"); + // call allocate with release request of containerId + List releasedContainers = new ArrayList(); + releasedContainers.add(containerId); + scheduler.allocate(attemptId, + new ArrayList(), + releasedContainers, + new ArrayList(), new ArrayList(), + null, null); + + // Assert usedResource of this App be zero. + assertEquals(0, app.getLiveContainersMap().size()); + assertEquals(0, app.getResourceUsage().getMemorySize()); + assertEquals(0, app.getResourceUsage().getVirtualCores()); + } + + @Test public void testQueueNameWithTrailingSpace() throws Exception { scheduler.init(conf); scheduler.start(); -- 2.7.4 (Apple Git-66)