diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index fc020f73f00..b9a3f843a20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2453,6 +2453,12 @@ public String moveApplication(ApplicationId appId, // attach the Container to another queue dest.attachContainer(getClusterResource(), app, rmContainer); } + // Move all reserved containers, sync ResourceUsageByLabel ResourceUsageByUser and numContainer + for (RMContainer rmContainer : app.getReservedContainers()) { + source.detachContainer(getClusterResource(), app, rmContainer); + // attach the Container to another queue + dest.attachContainer(getClusterResource(), app, rmContainer); + } if (!app.isStopped()) { source.finishApplicationAttempt(app, sourceQueueName); // Submit to a new queue diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 6368ee50789..c6f42946d66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -2016,7 +2016,8 @@ public void attachContainer(Resource clusterResource, allocateResource(clusterResource, application, rmContainer.getContainer() .getResource(), node.getPartition(), rmContainer); LOG.info("movedContainer" + " container=" + rmContainer.getContainer() - + " resource=" + rmContainer.getContainer().getResource() + + " containerState="+ rmContainer.getState() + + " resource=" + rmContainer.getContainer().getResource() + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster=" + clusterResource); @@ -2035,7 +2036,8 @@ public void detachContainer(Resource clusterResource, releaseResource(clusterResource, application, rmContainer.getContainer() .getResource(), node.getPartition(), rmContainer); LOG.info("movedContainer" + " container=" + rmContainer.getContainer() - + " resource=" + rmContainer.getContainer().getResource() + + " containerState="+ rmContainer.getState() + + " resource=" + rmContainer.getContainer().getResource() + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster=" + clusterResource); @@ -2043,7 +2045,6 @@ public void detachContainer(Resource clusterResource, getParent().detachContainer(clusterResource, application, rmContainer); } } - /** * @return all ignored partition exclusivity RMContainers in the LeafQueue, * this will be used by preemption policy. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index fb323c51167..a80256c473b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -5537,4 +5537,105 @@ public void testCSQueueMetrics() throws Exception { assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getMaxCapacityMB()); assertEquals(25600, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getMaxCapacityMB()); } + + //YARN-9838 + @Test + public void testReservedContainerLeakWhenMoveApplication() throws Exception { + CapacitySchedulerConfiguration csConf + = new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); + csConf.setCapacity("root.a",50); + csConf.setMaximumCapacity("root.a",100); + csConf.setUserLimitFactor("root.a",100); + csConf.setCapacity("root.b",50); + csConf.setMaximumCapacity("root.b",100); + csConf.setUserLimitFactor("root.b",100); + + YarnConfiguration conf=new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + RMNodeLabelsManager mgr=new NullRMNodeLabelsManager(); + mgr.init(conf); + MockRM rm1 = new MockRM(csConf); + CapacityScheduler scheduler=(CapacityScheduler) rm1.getResourceScheduler(); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8 * GB); + MockNM nm2 = rm1.registerNode("127.0.0.2:1234", 8 * GB); + /** + * simulation + * app1: (1 AM,1 running container) + * app2: (1 AM,1 reserved container) + */ + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app_1", "user_1", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // launch another app to queue, AM container should be launched in nm1 + RMApp app2 = rm1.submitApp(1 * GB, "app_2", "user_1", null, "a"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + am1.allocate("*", 4 * GB, 1, new ArrayList()); + //this containerRequest should be reserved + am2.allocate("*", 4 * GB, 1, new ArrayList()); + + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + // Do node heartbeats 2 times + // First time will allocate container for app1, second time will reserve + // container for app2 + scheduler.handle(new NodeUpdateSchedulerEvent(rmNode1)); + scheduler.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + FiCaSchedulerApp schedulerApp1 = + scheduler.getApplicationAttempt(am1.getApplicationAttemptId()); + FiCaSchedulerApp schedulerApp2 = + scheduler.getApplicationAttempt(am2.getApplicationAttemptId()); + // APP1: 1 AM, 1 allocatedContainer + Assert.assertEquals(2, schedulerApp1.getLiveContainers().size()); + // APP2: 1 AM,1 reservedContainer + Assert.assertEquals(1,schedulerApp2.getLiveContainers().size()); + Assert.assertEquals(1,schedulerApp2.getReservedContainers().size()); + /** + * before,move app2 which has one reservedContainer + */ + LeafQueue srcQueue = (LeafQueue) scheduler.getQueue("a"); + LeafQueue desQueue = (LeafQueue) scheduler.getQueue("b"); + Assert.assertEquals(4,srcQueue.getNumContainers()); + Assert.assertEquals(10*GB,srcQueue.getUsedResources().getMemorySize());// AM: 2*1GB container: 4GB running,4GB reserved + Assert.assertEquals(0,desQueue.getNumContainers()); + Assert.assertEquals(0,desQueue.getUsedResources().getMemorySize()); + //app1 ResourceUsage (0 reserved) + Assert.assertEquals(5*GB,schedulerApp1.getAppAttemptResourceUsage().getAllUsed().getMemorySize()); + Assert.assertEquals(0,schedulerApp1.getCurrentReservation().getMemorySize()); + //app2 ResourceUsage (4GB reserved) + Assert.assertEquals(1*GB,schedulerApp2.getAppAttemptResourceUsage().getAllUsed().getMemorySize()); + Assert.assertEquals(4*GB,schedulerApp2.getCurrentReservation().getMemorySize()); + /** + * move app2 which has one reservedContainer + */ + scheduler.moveApplication(app2.getApplicationId(),"b"); + // finish.keep the order,if killing app1 first,the reservedContainer of app2 will be allocated + rm1.killApp(app2.getApplicationId()); + rm1.killApp(app1.getApplicationId()); + /** + * after,moved app2 which has one reservedContainer + */ + { + // after fixed + Assert.assertEquals(0, srcQueue.getNumContainers()); + Assert.assertEquals(0, desQueue.getNumContainers()); + Assert.assertEquals(0, srcQueue.getUsedResources().getMemorySize()); + Assert.assertEquals(0, desQueue.getUsedResources().getMemorySize()); + } + /*{ + // before fixed + // the reserved container borrowed from srcQueue and returned to desQueue, + // but the numContainer and UsedResource did not sync when moving app to another queue + Assert.assertEquals(+1,srcQueue.getNumContainers()); //true + Assert.assertEquals(-1,desQueue.getNumContainers()); //true + Assert.assertEquals(+4*GB, srcQueue.getUsedResources().getMemorySize()); //true + Assert.assertEquals(-4*GB, desQueue.getUsedResources().getMemorySize()); //true + }*/ + rm1.close(); + } }