From 6b8721a4c7b0e8e05b5728cba16a8181a97af5a5 Mon Sep 17 00:00:00 2001 From: Sunil Date: Thu, 27 Oct 2016 18:37:31 +0530 Subject: [PATCH] YARN-3933 --- .../scheduler/fair/FSAppAttempt.java | 11 ++- .../scheduler/fair/FairScheduler.java | 10 ++- .../yarn/server/resourcemanager/MockNodes.java | 28 +++++-- .../scheduler/fair/TestFairScheduler.java | 85 ++++++++++++++++++++++ 4 files changed, 124 insertions(+), 10 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 3555faa..547249c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -123,13 +123,21 @@ public QueueMetrics getMetrics() { return queue.getMetrics(); } - public void containerCompleted(RMContainer rmContainer, + public boolean containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { try { writeLock.lock(); Container container = rmContainer.getContainer(); ContainerId containerId = container.getId(); + if (!liveContainers.containsKey(container.getId())) { + LOG.info( + "Container " + container + " of application attempt " + this.getId() + + " is not alive, skip completedContainer operation on event " + + event); + return false; + } + // Remove from the list of newly allocated containers if found newlyAllocatedContainers.remove(rmContainer); @@ -157,6 +165,7 @@ public void containerCompleted(RMContainer rmContainer, // Clear resource utilization metrics cache. lastMemoryAggregateAllocationUpdateTime = -1; + return true; } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 94fdb7c..85956a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -879,13 +879,17 @@ protected void completedContainerInternal( // Get the node on which the container was allocated FSSchedulerNode node = getFSSchedulerNode(container.getNodeId()); + boolean removed = false; if (rmContainer.getState() == RMContainerState.RESERVED) { application.unreserve(rmContainer.getReservedSchedulerKey(), node); - } else{ - application.containerCompleted(rmContainer, containerStatus, event); + } else { + removed = application.containerCompleted(rmContainer, containerStatus, + event); node.releaseContainer(container); - updateRootQueueMetrics(); + if (removed) { + updateRootQueueMetrics(); + } } if (LOG.isDebugEnabled()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 5a89e54..8ad79db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -112,12 +112,15 @@ public static Resource newAvailResource(Resource total, Resource used) { private Set labels; private ResourceUtilization containersUtilization; private ResourceUtilization nodeUtilization; + private List updatedContainerInfoList = + new ArrayList(); public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, Resource perNode, String rackName, String healthReport, long lastHealthReportTime, int cmdPort, String hostName, NodeState state, Set labels, ResourceUtilization containersUtilization, - ResourceUtilization nodeUtilization) { + ResourceUtilization nodeUtilization, + List updatedContainerInfoList) { this.nodeId = nodeId; this.nodeAddr = nodeAddr; this.httpAddress = httpAddress; @@ -131,6 +134,9 @@ public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, this.labels = labels; this.containersUtilization = containersUtilization; this.nodeUtilization = nodeUtilization; + if (updatedContainerInfoList != null) { + this.updatedContainerInfoList = updatedContainerInfoList; + } } @Override @@ -218,7 +224,11 @@ public String getNodeManagerVersion() { @Override public List pullContainerUpdates() { - return new ArrayList(); + return updatedContainerInfoList; + } + + public void setUpdatedContainerInfoList(List list) { + updatedContainerInfoList = list; } @Override @@ -287,19 +297,20 @@ private static RMNode buildRMNode(int rack, final Resource perNode, private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr, Set labels) { return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++, null, 123, - labels, null, null); + labels, null, null, null); } private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr, int hostnum, String hostName, int port) { return buildRMNode(rack, perNode, state, httpAddr, hostnum, hostName, port, - null, null, null); + null, null, null, null); } private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr, int hostnum, String hostName, int port, Set labels, ResourceUtilization containersUtilization, - ResourceUtilization nodeUtilization) { + ResourceUtilization nodeUtilization, + List updatedContainerInfoList) { final String rackName = "rack"+ rack; final int nid = hostnum; final String nodeAddr = hostName + ":" + nid; @@ -312,7 +323,7 @@ private static RMNode buildRMNode(int rack, final Resource perNode, String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe"; return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, rackName, healthReport, 0, nid, hostName, state, labels, - containersUtilization, nodeUtilization); + containersUtilization, nodeUtilization, updatedContainerInfoList); } public static RMNode nodeInfo(int rack, final Resource perNode, @@ -343,4 +354,9 @@ public static RMNode newNodeInfo(int rack, final Resource perNode, return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, port); } + public static RMNode newNodeInfo(int rack, final Resource perNode, + int hostnum, String hostName, List list) { + return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, + 123, null, null, null, list); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 9e0dd06..a6d3bd9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -60,6 +60,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; @@ -95,6 +97,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -118,6 +121,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.log4j.Logger; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -4918,4 +4922,85 @@ public void testRefreshQueuesWhenRMHA() throws Exception { rm1.stop(); rm2.stop(); } + + @Test + public void testCompletedContainerReleaseSameContainerTwice() + throws IOException { + final Logger LOG = Logger.getLogger(TestFairScheduler.class); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add one nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + scheduler.handle(new NodeAddedSchedulerEvent(node1)); + + int allocatedResMB = + FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB; + + // Create resource request + ApplicationAttemptId attemptId = createSchedulingRequest( + allocatedResMB, 2, + "queue1", "user1", 1); + + // update scheduler + scheduler.handle(new NodeUpdateSchedulerEvent(node1)); + + FSAppAttempt app = scheduler.getApplicationAttempt(attemptId); + + // Assert usedResource of this app + assertEquals(1, app.getLiveContainersMap().size()); + assertEquals(allocatedResMB, app.getResourceUsage().getMemorySize()); + assertEquals(2, app.getResourceUsage().getVirtualCores()); + + assertEquals(1, app.getLiveContainers().size()); + RMContainer rmContainer = app.getLiveContainers() + .toArray(new RMContainer[0])[0]; + ContainerId containerId = rmContainer.getContainerId(); + + // Call allocate with empty resource request to let the formal allocated container enter ACQUIRED state + scheduler.allocate(attemptId, new ArrayList(), new ArrayList(), + null, null, null, null); + + // Fire NodeUpdateSchedulerEvent with completedContainer of containerId + ContainerStatus status = ContainerStatus.newInstance(containerId, + ContainerState.COMPLETE, "", 0); + + List updatedContainerInfos + = new ArrayList(); + List completedContainers = + new ArrayList(); + completedContainers.add(status); + updatedContainerInfos.add(new UpdatedContainerInfo( + new ArrayList(), completedContainers)); + + RMNode node1Update = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1", updatedContainerInfos); + + NodeUpdateSchedulerEvent event = new NodeUpdateSchedulerEvent(node1Update); + scheduler.handle(event); + + assertEquals(0, app.getLiveContainersMap().size()); + assertEquals(0, app.getResourceUsage().getMemorySize()); + assertEquals(0, app.getResourceUsage().getVirtualCores()); + + LOG.info("release container"); + // call allocate with release request of containerId + List releasedContainers = new ArrayList(); + releasedContainers.add(containerId); + scheduler.allocate(attemptId, + new ArrayList(), + releasedContainers, + new ArrayList(), new ArrayList(), + null, null); + + // Assert usedResource of this App be zero. + assertEquals(0, app.getLiveContainersMap().size()); + assertEquals(0, app.getResourceUsage().getMemorySize()); + assertEquals(0, app.getResourceUsage().getVirtualCores()); + } } -- 2.7.4 (Apple Git-66)