diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index e318d47..5485d39 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; @@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; @@ -136,7 +138,8 @@ synchronized public boolean updateResourceRequests( QueueMetrics metrics = queue.getMetrics(); boolean anyResourcesUpdated = false; - + // Flag for any label request with Label + boolean anyRequestWithLabel = false; // Update resource requests for (ResourceRequest request : requests) { Priority priority = request.getPriority(); @@ -161,6 +164,28 @@ synchronized public boolean updateResourceRequests( if (request.getNumContainers() > 0) { activeUsersManager.activateApplication(user, applicationId); } + if ((null != request.getNodeLabelExpression()) + && (!request.getNodeLabelExpression().equals( + RMNodeLabelsManager.NO_LABEL))) { + // Flag set to true when any with NodeLabel request is received + anyRequestWithLabel = true; + ResourceRequest previousAnyRequest = + getResourceRequest(priority, resourceName); + // For any resource when there is any change in label Expression + // should change the label for all requests already added. + if ((null != previousAnyRequest) + && !(previousAnyRequest.getNodeLabelExpression().equals(request + .getNodeLabelExpression()))) { + Map preResourceRequests = + getResourceRequests(priority); + Set> preResReqEntrySet = + preResourceRequests.entrySet(); + for (Entry entry : preResReqEntrySet) { + ResourceRequest preReq = entry.getValue(); + preReq.setNodeLabelExpression(request.getNodeLabelExpression()); + } + } + } } Map asks = this.requests.get(priority); @@ -214,6 +239,28 @@ synchronized public boolean updateResourceRequests( } } } + // Only for label request the request set will be updated + // All received request with same priority as ANY should be changed to + // ANYs label expression. + if (anyRequestWithLabel) { + for (ResourceRequest request : requests) { + String resourceName = request.getResourceName(); + if (resourceName.equals(ResourceRequest.ANY)) { + continue; + } + Priority priority = request.getPriority(); + ResourceRequest anyResourceRequest = + getResourceRequest(priority, ResourceRequest.ANY); + if ((null != anyResourceRequest)) { + request.setNodeLabelExpression(anyResourceRequest + .getNodeLabelExpression()); + if (LOG.isDebugEnabled()) { + LOG.debug("Updated label Expression for :" + " application= " + + applicationId + " request= " + request); + } + } + } + } return anyResourcesUpdated; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 48d6602..9a7f186 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -81,6 +82,7 @@ private Configuration getConfigurationWithQueueLabels(Configuration config) { conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"}); conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100); final String A = CapacitySchedulerConfiguration.ROOT + ".a"; conf.setCapacity(A, 10); @@ -90,8 +92,9 @@ private Configuration getConfigurationWithQueueLabels(Configuration config) { final String B = CapacitySchedulerConfiguration.ROOT + ".b"; conf.setCapacity(B, 20); - conf.setAccessibleNodeLabels(B, toSet("y")); + conf.setAccessibleNodeLabels(B, toSet("y", "z")); conf.setCapacityByLabel(B, "y", 100); + conf.setCapacityByLabel(B, "z", 100); final String C = CapacitySchedulerConfiguration.ROOT + ".c"; conf.setCapacity(C, 70); @@ -110,6 +113,7 @@ private Configuration getConfigurationWithQueueLabels(Configuration config) { conf.setCapacity(B1, 100); conf.setMaximumCapacity(B1, 100); conf.setCapacityByLabel(B1, "y", 100); + conf.setCapacityByLabel(B1, "z", 100); final String C1 = C + ".c1"; conf.setQueues(C, new String[] {"c1"}); @@ -309,7 +313,7 @@ public RMNodeLabelsManager createNodeLabelManager() { rm1.close(); } - @Test (timeout = 120000) + @Test(timeout = 120000) public void testContainerAllocateWithLabels() throws Exception { // set node -> label mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); @@ -474,7 +478,63 @@ private void checkLaunchedContainerNumOnNode(MockRM rm, NodeId nodeId, SchedulerNode node = cs.getSchedulerNode(nodeId); Assert.assertEquals(numContainers, node.getNumContainers()); } - + + @Test + public void testResourceRequestUpdateNodePartitions() throws Exception { + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of(NodeLabel.newInstance("x"), + NodeLabel.newInstance("y", false), NodeLabel.newInstance("z", false))); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y"))); + // inject node label manager + MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // no label + MockNM nm2 = rm1.registerNode("h2:1234", 40 * GB); // label = y + // launch an app to queue b1 (label = y), AM container should be launched in + // nm2 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2); + List resourceRequest = new ArrayList(); + resourceRequest.add(am1.createResourceReq("*", 1024, 3, 5, "y")); + resourceRequest.add(am1.createResourceReq("h1:1234", 1024, 3, 2, + RMNodeLabelsManager.NO_LABEL)); + resourceRequest.add(am1.createResourceReq("*", 1024, 2, 3, "y")); + resourceRequest.add(am1.createResourceReq("h2:1234", 1024, 2, 4, + RMNodeLabelsManager.NO_LABEL)); + am1.allocate(resourceRequest, new ArrayList()); + CapacityScheduler cs = + (CapacityScheduler) rm1.getRMContext().getScheduler(); + FiCaSchedulerApp app = + cs.getApplicationAttempt(am1.getApplicationAttemptId()); + List allResourceRequests = + app.getAppSchedulingInfo().getAllResourceRequests(); + for (ResourceRequest changeReq : allResourceRequests) { + if (changeReq.getPriority().getPriority() == 2 + || changeReq.getPriority().getPriority() == 3) { + Assert.assertEquals("Expected label y", "y", + changeReq.getNodeLabelExpression()); + } + } + List newReq = new ArrayList(); + newReq.add(am1.createResourceReq("*", 1024, 3, 5, "z")); + am1.allocate(newReq, new ArrayList()); + for (ResourceRequest changeReq : allResourceRequests) { + if (changeReq.getPriority().getPriority() == 3) { + Assert.assertEquals("Expected label z", "z", + changeReq.getNodeLabelExpression()); + } else if (changeReq.getPriority().getPriority() == 2) { + Assert.assertEquals("Expected label y", "y", + changeReq.getNodeLabelExpression()); + } + } + } + @Test public void testPreferenceOfNeedyAppsTowardsNodePartitions() throws Exception { /**