diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 7acde846de0..e5384150a82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1142,7 +1142,7 @@ private synchronized void updateNodeAndQueueResource(RMNode nm, root.updateClusterResource(clusterResource, new ResourceLimits( clusterResource)); } - + /** * Process node labels update on a node. */ @@ -1185,7 +1185,7 @@ private synchronized void updateLabelsOnNode(NodeId nodeId, if (null != reservedContainer) { killReservedContainer(reservedContainer); } - + // Update node labels after we've done this node.updateLabels(newLabels); } @@ -1367,13 +1367,8 @@ public void handle(SchedulerEvent event) { { NodeLabelsUpdateSchedulerEvent labelUpdateEvent = (NodeLabelsUpdateSchedulerEvent) event; - - for (Entry> entry : labelUpdateEvent - .getUpdatedNodeToLabels().entrySet()) { - NodeId id = entry.getKey(); - Set labels = entry.getValue(); - updateLabelsOnNode(id, labels); - } + + updateNodeLabelsAndQueueResource(labelUpdateEvent); } break; case NODE_UPDATE: @@ -1483,6 +1478,23 @@ public void handle(SchedulerEvent event) { } } + /** + * Process node labels update. + */ + private synchronized void updateNodeLabelsAndQueueResource( + NodeLabelsUpdateSchedulerEvent labelUpdateEvent) { + for (Entry> entry : labelUpdateEvent + .getUpdatedNodeToLabels().entrySet()) { + NodeId id = entry.getKey(); + Set labels = entry.getValue(); + updateLabelsOnNode(id, labels); + } + Resource clusterResource = getClusterResource(); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + } + + private synchronized void addNode(RMNode nodeManager) { FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, usePortForNodeName, nodeManager.getNodeLabels()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java index af320e53ce8..f399530c48d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java @@ -413,7 +413,7 @@ public RMNodeLabelsManager createNodeLabelManager() { rm.close(); } - @Test(timeout = 3000000) + @Test(timeout = 300000) public void testMoveApplicationWithLabel() throws Exception { // set node -> label mgr.addToCluserNodeLabelsWithDefaultExclusivity( @@ -589,7 +589,49 @@ public RMNodeLabelsManager createNodeLabelManager() { rm.close(); } - @Test (timeout = 60000) + @Test + public void testAMResourceLimitNodeUpdatePartition() throws Exception { + conf.setInt("yarn.scheduler.minimum-allocation-mb", 64); + // inject node label manager + MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + rm.registerNode("h1:1234", 6400); + mgr.addToCluserNodeLabelsWithDefaultExclusivity( + ImmutableSet.of("x", "y", "z")); + + // .1 percentage of 6400 will be for am + checkAMResourceLimit(rm, "a", 640, ""); + checkAMResourceLimit(rm, "a", 0, "x"); + checkAMResourceLimit(rm, "a", 0, "y"); + checkAMResourceLimit(rm, "a", 0, "z"); + + mgr.replaceLabelsOnNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + rm.drainEvents(); + + checkAMResourceLimit(rm, "a", 640, "x"); + checkAMResourceLimit(rm, "a", 0, "y"); + checkAMResourceLimit(rm, "a", 0, "z"); + checkAMResourceLimit(rm, "a", 0, ""); + + // Switch + mgr.replaceLabelsOnNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y"))); + rm.drainEvents(); + + checkAMResourceLimit(rm, "a", 0, "x"); + checkAMResourceLimit(rm, "a", 640, "y"); + checkAMResourceLimit(rm, "a", 0, "z"); + checkAMResourceLimit(rm, "a", 0, ""); + } + + @Test(timeout = 60000) public void testAMResourceUsageWhenNodeUpdatesPartition() throws Exception { // set node -> label @@ -638,8 +680,8 @@ public RMNodeLabelsManager createNodeLabelManager() { FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId()); // change h1's label to z - cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(), - toSet("z")))); + cs.handle(new NodeLabelsUpdateSchedulerEvent( + ImmutableMap.of(nm1.getNodeId(), toSet("z")))); // Now the resources also should change from x to z. Verify AM and normal // used resource are successfully changed. @@ -677,4 +719,28 @@ public RMNodeLabelsManager createNodeLabelManager() { rm.close(); } + + private void checkAMResourceLimit(MockRM rm, String queuename, int memory, + String label) throws InterruptedException { + Assert.assertEquals(memory, + waitForResourceUpdate(rm, queuename, memory, label, 3000L)); + } + + private long waitForResourceUpdate(MockRM rm, String queuename, long memory, + String label, long timeout) throws InterruptedException { + long start = System.currentTimeMillis(); + long memorySize = 0; + while (System.currentTimeMillis() - start < timeout) { + CapacityScheduler scheduler = + (CapacityScheduler) rm.getResourceScheduler(); + CSQueue queue = scheduler.getQueue(queuename); + memorySize = + queue.getQueueResourceUsage().getAMLimit(label).getMemorySize(); + if (memory == memorySize) { + return memorySize; + } + Thread.sleep(100); + } + return memorySize; + } }