diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 7244b17..cd1dacf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -504,7 +504,7 @@ public AllocateResponse allocate(AllocateRequest request) try { RMServerUtils.normalizeAndValidateRequests(ask, rScheduler.getMaximumResourceCapability(), app.getQueue(), - rScheduler); + rScheduler, rmContext); } catch (InvalidResourceRequestException e) { LOG.warn("Invalid resource ask by application " + appAttemptId, e); throw e; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index ca21f11..7990421 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -383,7 +383,7 @@ private ResourceRequest validateAndCreateResourceRequest( try { SchedulerUtils.normalizeAndValidateRequest(amReq, scheduler.getMaximumResourceCapability(), - submissionContext.getQueue(), scheduler, isRecovery); + submissionContext.getQueue(), scheduler, isRecovery, rmContext); } catch (InvalidResourceRequestException e) { LOG.warn("RM app submission failed in validating AM resource request" + " for application " + submissionContext.getApplicationId(), e); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 083189a..4669a28 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -91,11 +91,12 @@ * requested memory/vcore is non-negative and not greater than max */ public static void normalizeAndValidateRequests(List ask, - Resource maximumResource, String queueName, YarnScheduler scheduler) + Resource maximumResource, String queueName, YarnScheduler scheduler, + RMContext rmContext) throws InvalidResourceRequestException { for (ResourceRequest resReq : ask) { SchedulerUtils.normalizeAndvalidateRequest(resReq, maximumResource, - queueName, scheduler); + queueName, scheduler, rmContext); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index 5e9ea5d..7966303 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AccessType; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -210,7 +211,7 @@ private static void normalizeNodeLabelExpressionInRequest( public static void normalizeAndValidateRequest(ResourceRequest resReq, Resource maximumResource, String queueName, YarnScheduler scheduler, - boolean isRecovery) + boolean isRecovery, RMContext rmContext) throws InvalidResourceRequestException { QueueInfo queueInfo = null; @@ -222,15 +223,16 @@ public static void normalizeAndValidateRequest(ResourceRequest resReq, } SchedulerUtils.normalizeNodeLabelExpressionInRequest(resReq, queueInfo); if (!isRecovery) { - validateResourceRequest(resReq, maximumResource, queueInfo); + validateResourceRequest(resReq, maximumResource, queueInfo, rmContext); } } public static void normalizeAndvalidateRequest(ResourceRequest resReq, - Resource maximumResource, String queueName, YarnScheduler scheduler) + Resource maximumResource, String queueName, YarnScheduler scheduler, + RMContext rmContext) throws InvalidResourceRequestException { normalizeAndValidateRequest(resReq, maximumResource, queueName, scheduler, - false); + false, rmContext); } /** @@ -239,8 +241,8 @@ public static void normalizeAndvalidateRequest(ResourceRequest resReq, * * @throws InvalidResourceRequestException when there is invalid request */ - public static void validateResourceRequest(ResourceRequest resReq, - Resource maximumResource, QueueInfo queueInfo) + private static void validateResourceRequest(ResourceRequest resReq, + Resource maximumResource, QueueInfo queueInfo, RMContext rmContext) throws InvalidResourceRequestException { if (resReq.getCapability().getMemory() < 0 || resReq.getCapability().getMemory() > maximumResource.getMemory()) { @@ -282,7 +284,7 @@ public static void validateResourceRequest(ResourceRequest resReq, if (labelExp != null && !labelExp.trim().isEmpty() && queueInfo != null) { if (!checkQueueLabelExpression(queueInfo.getAccessibleNodeLabels(), - labelExp)) { + labelExp, rmContext)) { throw new InvalidResourceRequestException("Invalid resource request" + ", queue=" + queueInfo.getQueueName() @@ -317,18 +319,34 @@ public static void checkIfLabelInClusterNodeLabels(RMNodeLabelsManager mgr, } public static boolean checkQueueLabelExpression(Set queueLabels, - String labelExpression) { - if (queueLabels != null && queueLabels.contains(RMNodeLabelsManager.ANY)) { - return true; - } + String labelExpression, RMContext rmContext) { // if label expression is empty, we can allocate container on any node if (labelExpression == null) { return true; } for (String str : labelExpression.split("&&")) { - if (!str.trim().isEmpty() - && (queueLabels == null || !queueLabels.contains(str.trim()))) { - return false; + str = str.trim(); + if (!str.trim().isEmpty()) { + // check queue label + boolean queueCheck = true; + if (queueLabels == null) { + queueCheck = false; + } else { + if (!queueLabels.contains(str) + && !queueLabels.contains(RMNodeLabelsManager.ANY)) { + queueCheck = false; + } + } + + if (!queueCheck) { + return false; + } + + // check node label manager contains this label + RMNodeLabelsManager nlm = rmContext.getNodeLabelManager(); + if (nlm != null && !nlm.containsNodeLabel(str)) { + return false; + } } } return true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 47cea19..2f294d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -121,8 +121,7 @@ protected void setupConfigurableCapacities() { accessibleLabels, csContext.getConfiguration(), queueCapacities, - parent == null ? null : parent.getQueueCapacities(), - csContext.getRMContext().getNodeLabelManager()); + parent == null ? null : parent.getQueueCapacities()); } @Override @@ -248,8 +247,6 @@ synchronized void setupQueueConfigs(Resource clusterResource) if (this.accessibleLabels == null && parent != null) { this.accessibleLabels = parent.getAccessibleNodeLabels(); } - SchedulerUtils.checkIfLabelInClusterNodeLabels(labelManager, - this.accessibleLabels); // inherit from parent if labels not set if (this.defaultLabelExpression == null && parent != null diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 0cc7589..256e831 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -117,9 +117,8 @@ public static void updateAndCheckCapacitiesByLabel(String queuePath, */ public static void loadUpdateAndCheckCapacities(String queuePath, Set accessibleLabels, CapacitySchedulerConfiguration csConf, - QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities, - RMNodeLabelsManager nlm) { - loadCapacitiesByLabelsFromConf(queuePath, accessibleLabels, nlm, + QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) { + loadCapacitiesByLabelsFromConf(queuePath, accessibleLabels, queueCapacities, csConf); updateAbsoluteCapacitiesByNodeLabels(queueCapacities, parentQueueCapacities); @@ -129,13 +128,13 @@ public static void loadUpdateAndCheckCapacities(String queuePath, // Considered NO_LABEL, ANY and null cases private static Set normalizeAccessibleNodeLabels(Set labels, - RMNodeLabelsManager mgr) { + CapacitySchedulerConfiguration csConf) { Set accessibleLabels = new HashSet(); if (labels != null) { accessibleLabels.addAll(labels); } if (accessibleLabels.contains(CommonNodeLabelsManager.ANY)) { - accessibleLabels.addAll(mgr.getClusterNodeLabelNames()); + accessibleLabels.addAll(csConf.getKnownNodeLabels()); } accessibleLabels.add(CommonNodeLabelsManager.NO_LABEL); @@ -143,10 +142,10 @@ public static void loadUpdateAndCheckCapacities(String queuePath, } private static void loadCapacitiesByLabelsFromConf(String queuePath, - Set labels, RMNodeLabelsManager mgr, + Set labels, QueueCapacities queueCapacities, CapacitySchedulerConfiguration csConf) { queueCapacities.clearConfigurableFields(); - labels = normalizeAccessibleNodeLabels(labels, mgr); + labels = normalizeAccessibleNodeLabels(labels, csConf); for (String label : labels) { if (label.equals(CommonNodeLabelsManager.NO_LABEL)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index b00f25c..bd2027a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -23,8 +23,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.StringTokenizer; @@ -44,13 +46,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*; - - import com.google.common.collect.ImmutableSet; public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration { @@ -909,4 +912,35 @@ public boolean getPreemptionDisabled(String queue, boolean defaultVal) { defaultVal); return preemptionDisabled; } + + /** + * Get known node labels appeared in the configuration + */ + public Set getKnownNodeLabels() { + Set knownNodeLabels = new HashSet(); + Entry e = null; + + Iterator> iter = iterator(); + while (iter.hasNext()) { + e = iter.next(); + String key = e.getKey(); + String value = e.getValue(); + + if (key.endsWith(ACCESSIBLE_NODE_LABELS)) { + for (String label : value.split(",")) { + label = label.trim(); + + // Skip special node labels + if (label.equals(RMNodeLabelsManager.ANY) + || label.equals(RMNodeLabelsManager.NO_LABEL)) { + continue; + } + + knownNodeLabels.add(label); + } + } + } + + return knownNodeLabels; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 56ade84..9a7338d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -184,20 +184,6 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) maxAMResourcePerQueuePercent = conf.getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath()); - - if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels, - this.defaultLabelExpression)) { - throw new IOException("Invalid default label expression of " - + " queue=" - + getQueueName() - + " doesn't have permission to access all labels " - + "in default label expression. labelExpression of resource request=" - + (this.defaultLabelExpression == null ? "" - : this.defaultLabelExpression) - + ". Queue labels=" - + (getAccessibleNodeLabels() == null ? "" : StringUtils.join( - getAccessibleNodeLabels().iterator(), ','))); - } nodeLocalityDelay = conf.getNodeLocalityDelay(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java index 8abaeb6..29db94a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; @@ -64,8 +65,10 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -83,12 +86,15 @@ import org.junit.Assert; import org.junit.Test; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; public class TestSchedulerUtils { private static final Log LOG = LogFactory.getLog(TestSchedulerUtils.class); + private RMContext rmContext = getMockRMContext(); + @Test (timeout = 30000) public void testNormalizeRequest() { ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); @@ -206,6 +212,9 @@ public void testValidateResourceRequestWithErrorLabelsPermission() // set queue accessible node labesl to [x, y] queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y")); + rmContext.getNodeLabelManager().addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("x"), + NodeLabel.newInstance("y"))); Resource resource = Resources.createResource( 0, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); @@ -213,22 +222,56 @@ public void testValidateResourceRequestWithErrorLabelsPermission() mock(Priority.class), ResourceRequest.ANY, resource, 1); resReq.setNodeLabelExpression("x"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler); + scheduler, rmContext); resReq.setNodeLabelExpression("y"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler); + scheduler, rmContext); resReq.setNodeLabelExpression(""); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler); + scheduler, rmContext); resReq.setNodeLabelExpression(" "); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler); + scheduler, rmContext); } catch (InvalidResourceRequestException e) { e.printStackTrace(); fail("Should be valid when request labels is a subset of queue labels"); + } finally { + rmContext.getNodeLabelManager().removeFromClusterNodeLabels( + Arrays.asList("x", "y")); + } + + // same as above, but cluster node labels don't contains label being + // requested. should fail + try { + // set queue accessible node labesl to [x, y] + queueAccessibleNodeLabels.clear(); + queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y")); + Resource resource = Resources.createResource( + 0, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + ResourceRequest resReq = BuilderUtils.newResourceRequest( + mock(Priority.class), ResourceRequest.ANY, resource, 1); + resReq.setNodeLabelExpression("x"); + SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", + scheduler, rmContext); + + resReq.setNodeLabelExpression("y"); + SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", + scheduler, rmContext); + + resReq.setNodeLabelExpression(""); + SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", + scheduler, rmContext); + + resReq.setNodeLabelExpression(" "); + SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", + scheduler, rmContext); + + fail("Should fail"); + } catch (InvalidResourceRequestException e) { } // queue has labels, failed cases (when ask a label not included by queue) @@ -236,6 +279,9 @@ public void testValidateResourceRequestWithErrorLabelsPermission() // set queue accessible node labesl to [x, y] queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y")); + rmContext.getNodeLabelManager().addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("x"), + NodeLabel.newInstance("y"))); Resource resource = Resources.createResource( 0, @@ -244,9 +290,12 @@ public void testValidateResourceRequestWithErrorLabelsPermission() mock(Priority.class), ResourceRequest.ANY, resource, 1); resReq.setNodeLabelExpression("z"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler); + scheduler, rmContext); fail("Should fail"); } catch (InvalidResourceRequestException e) { + } finally { + rmContext.getNodeLabelManager().removeFromClusterNodeLabels( + Arrays.asList("x", "y")); } // we don't allow specify more than two node labels in a single expression @@ -255,6 +304,9 @@ public void testValidateResourceRequestWithErrorLabelsPermission() // set queue accessible node labesl to [x, y] queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y")); + rmContext.getNodeLabelManager().addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("x"), + NodeLabel.newInstance("y"))); Resource resource = Resources.createResource( 0, @@ -263,9 +315,12 @@ public void testValidateResourceRequestWithErrorLabelsPermission() mock(Priority.class), ResourceRequest.ANY, resource, 1); resReq.setNodeLabelExpression("x && y"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler); + scheduler, rmContext); fail("Should fail"); } catch (InvalidResourceRequestException e) { + } finally { + rmContext.getNodeLabelManager().removeFromClusterNodeLabels( + Arrays.asList("x", "y")); } // queue doesn't have label, succeed (when request no label) @@ -280,15 +335,15 @@ public void testValidateResourceRequestWithErrorLabelsPermission() ResourceRequest resReq = BuilderUtils.newResourceRequest( mock(Priority.class), ResourceRequest.ANY, resource, 1); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler); + scheduler, rmContext); resReq.setNodeLabelExpression(""); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler); + scheduler, rmContext); resReq.setNodeLabelExpression(" "); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler); + scheduler, rmContext); } catch (InvalidResourceRequestException e) { e.printStackTrace(); fail("Should be valid when request labels is empty"); @@ -299,6 +354,9 @@ public void testValidateResourceRequestWithErrorLabelsPermission() // set queue accessible node labels to empty queueAccessibleNodeLabels.clear(); + rmContext.getNodeLabelManager().addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("x"))); + Resource resource = Resources.createResource( 0, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); @@ -306,9 +364,12 @@ public void testValidateResourceRequestWithErrorLabelsPermission() mock(Priority.class), ResourceRequest.ANY, resource, 1); resReq.setNodeLabelExpression("x"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler); + scheduler, rmContext); fail("Should fail"); } catch (InvalidResourceRequestException e) { + } finally { + rmContext.getNodeLabelManager().removeFromClusterNodeLabels( + Arrays.asList("x")); } // queue is "*", always succeeded @@ -317,6 +378,10 @@ public void testValidateResourceRequestWithErrorLabelsPermission() queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.add(RMNodeLabelsManager.ANY); + rmContext.getNodeLabelManager().addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("x"), + NodeLabel.newInstance("y"), NodeLabel.newInstance("z"))); + Resource resource = Resources.createResource( 0, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); @@ -324,18 +389,47 @@ public void testValidateResourceRequestWithErrorLabelsPermission() mock(Priority.class), ResourceRequest.ANY, resource, 1); resReq.setNodeLabelExpression("x"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler); + scheduler, rmContext); resReq.setNodeLabelExpression("y"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler); + scheduler, rmContext); resReq.setNodeLabelExpression("z"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler); + scheduler, rmContext); } catch (InvalidResourceRequestException e) { e.printStackTrace(); fail("Should be valid when queue can access any labels"); + } finally { + rmContext.getNodeLabelManager().removeFromClusterNodeLabels( + Arrays.asList("x", "y", "z")); + } + + // same as above, but cluster node labels don't contains label, should fail + try { + // set queue accessible node labels to empty + queueAccessibleNodeLabels.clear(); + queueAccessibleNodeLabels.add(RMNodeLabelsManager.ANY); + + Resource resource = Resources.createResource( + 0, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + ResourceRequest resReq = BuilderUtils.newResourceRequest( + mock(Priority.class), ResourceRequest.ANY, resource, 1); + resReq.setNodeLabelExpression("x"); + SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", + scheduler, rmContext); + + resReq.setNodeLabelExpression("y"); + SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", + scheduler, rmContext); + + resReq.setNodeLabelExpression("z"); + SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", + scheduler, rmContext); + fail("Should fail"); + } catch (InvalidResourceRequestException e) { } // we don't allow resource name other than ANY and specify label @@ -343,6 +437,9 @@ public void testValidateResourceRequestWithErrorLabelsPermission() // set queue accessible node labesl to [x, y] queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y")); + rmContext.getNodeLabelManager().addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("x"), + NodeLabel.newInstance("y"))); Resource resource = Resources.createResource( 0, @@ -351,9 +448,12 @@ public void testValidateResourceRequestWithErrorLabelsPermission() mock(Priority.class), "rack", resource, 1); resReq.setNodeLabelExpression("x"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler); + scheduler, rmContext); fail("Should fail"); } catch (InvalidResourceRequestException e) { + } finally { + rmContext.getNodeLabelManager().removeFromClusterNodeLabels( + Arrays.asList("x", "y")); } // we don't allow resource name other than ANY and specify label even if @@ -363,6 +463,8 @@ public void testValidateResourceRequestWithErrorLabelsPermission() queueAccessibleNodeLabels.clear(); queueAccessibleNodeLabels.addAll(Arrays .asList(CommonNodeLabelsManager.ANY)); + rmContext.getNodeLabelManager().addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("x"))); Resource resource = Resources.createResource( 0, @@ -371,9 +473,12 @@ public void testValidateResourceRequestWithErrorLabelsPermission() mock(Priority.class), "rack", resource, 1); resReq.setNodeLabelExpression("x"); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue", - scheduler); + scheduler, rmContext); fail("Should fail"); } catch (InvalidResourceRequestException e) { + } finally { + rmContext.getNodeLabelManager().removeFromClusterNodeLabels( + Arrays.asList("x")); } } @@ -395,7 +500,7 @@ public void testValidateResourceRequest() { BuilderUtils.newResourceRequest(mock(Priority.class), ResourceRequest.ANY, resource, 1); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, - mockScheduler); + mockScheduler, rmContext); } catch (InvalidResourceRequestException e) { fail("Zero memory should be accepted"); } @@ -409,7 +514,7 @@ public void testValidateResourceRequest() { BuilderUtils.newResourceRequest(mock(Priority.class), ResourceRequest.ANY, resource, 1); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, - mockScheduler); + mockScheduler, rmContext); } catch (InvalidResourceRequestException e) { fail("Zero vcores should be accepted"); } @@ -424,7 +529,7 @@ public void testValidateResourceRequest() { BuilderUtils.newResourceRequest(mock(Priority.class), ResourceRequest.ANY, resource, 1); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, - mockScheduler); + mockScheduler, rmContext); } catch (InvalidResourceRequestException e) { fail("Max memory should be accepted"); } @@ -439,7 +544,7 @@ public void testValidateResourceRequest() { BuilderUtils.newResourceRequest(mock(Priority.class), ResourceRequest.ANY, resource, 1); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, - mockScheduler); + mockScheduler, rmContext); } catch (InvalidResourceRequestException e) { fail("Max vcores should not be accepted"); } @@ -453,7 +558,7 @@ public void testValidateResourceRequest() { BuilderUtils.newResourceRequest(mock(Priority.class), ResourceRequest.ANY, resource, 1); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, - mockScheduler); + mockScheduler, rmContext); fail("Negative memory should not be accepted"); } catch (InvalidResourceRequestException e) { // expected @@ -468,7 +573,7 @@ public void testValidateResourceRequest() { BuilderUtils.newResourceRequest(mock(Priority.class), ResourceRequest.ANY, resource, 1); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, - mockScheduler); + mockScheduler, rmContext); fail("Negative vcores should not be accepted"); } catch (InvalidResourceRequestException e) { // expected @@ -484,7 +589,7 @@ public void testValidateResourceRequest() { BuilderUtils.newResourceRequest(mock(Priority.class), ResourceRequest.ANY, resource, 1); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, - mockScheduler); + mockScheduler, rmContext); fail("More than max memory should not be accepted"); } catch (InvalidResourceRequestException e) { // expected @@ -501,7 +606,7 @@ public void testValidateResourceRequest() { BuilderUtils.newResourceRequest(mock(Priority.class), ResourceRequest.ANY, resource, 1); SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null, - mockScheduler); + mockScheduler, rmContext); fail("More than max vcores should not be accepted"); } catch (InvalidResourceRequestException e) { // expected @@ -632,4 +737,12 @@ public void testCreatePreemptedContainerStatus() { Assert.assertNull(applications.get(appId)); return app; } + + private static RMContext getMockRMContext() { + RMContext rmContext = mock(RMContext.class); + RMNodeLabelsManager nlm = new NullRMNodeLabelsManager(); + nlm.init(new Configuration(false)); + when(rmContext.getNodeLabelManager()).thenReturn(nlm); + return rmContext; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java index 0206772..8d04700 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java @@ -551,7 +551,7 @@ public void testQueueParsingWithLabelsInherit() throws IOException { ServiceOperations.stopQuietly(capacityScheduler); } - @Test(expected = Exception.class) + @Test public void testQueueParsingWhenLabelsNotExistedInNodeLabelManager() throws IOException { YarnConfiguration conf = new YarnConfiguration(); @@ -579,7 +579,7 @@ public void testQueueParsingWhenLabelsNotExistedInNodeLabelManager() ServiceOperations.stopQuietly(nodeLabelsManager); } - @Test(expected = Exception.class) + @Test public void testQueueParsingWhenLabelsInheritedNotExistedInNodeLabelManager() throws IOException { YarnConfiguration conf = new YarnConfiguration(); @@ -607,7 +607,7 @@ public void testQueueParsingWhenLabelsInheritedNotExistedInNodeLabelManager() ServiceOperations.stopQuietly(nodeLabelsManager); } - @Test(expected = Exception.class) + @Test public void testSingleLevelQueueParsingWhenLabelsNotExistedInNodeLabelManager() throws IOException { YarnConfiguration conf = new YarnConfiguration(); @@ -635,7 +635,7 @@ public void testSingleLevelQueueParsingWhenLabelsNotExistedInNodeLabelManager() ServiceOperations.stopQuietly(nodeLabelsManager); } - @Test(expected = Exception.class) + @Test public void testQueueParsingWhenLabelsNotExist() throws IOException { YarnConfiguration conf = new YarnConfiguration(); CapacitySchedulerConfiguration csConf = @@ -769,4 +769,100 @@ public void testNestedQueueParsingShouldTrimSpaces() throws Exception { Assert.assertEquals(0.10 * 0.4, a2.getAbsoluteCapacity(), DELTA); Assert.assertEquals(0.15, a2.getAbsoluteMaximumCapacity(), DELTA); } + + /** + * Test init a queue configuration, children's capacity for a given label + * doesn't equals to 100%. This expect IllegalArgumentException thrown. + */ + @Test(expected = IllegalArgumentException.class) + public void testQueueParsingFailWhenSumOfChildrenNonLabeledCapacityNot100Percent() + throws IOException { + nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet + .of("red", "blue")); + + YarnConfiguration conf = new YarnConfiguration(); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(conf); + setupQueueConfiguration(csConf); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c2", 5); + + CapacityScheduler capacityScheduler = new CapacityScheduler(); + RMContextImpl rmContext = + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(csConf), + new NMTokenSecretManagerInRM(csConf), + new ClientToAMTokenSecretManagerInRM(), null); + rmContext.setNodeLabelManager(nodeLabelManager); + capacityScheduler.setConf(csConf); + capacityScheduler.setRMContext(rmContext); + capacityScheduler.init(csConf); + capacityScheduler.start(); + ServiceOperations.stopQuietly(capacityScheduler); + } + + /** + * Test init a queue configuration, children's capacity for a given label + * doesn't equals to 100%. This expect IllegalArgumentException thrown. + */ + @Test(expected = IllegalArgumentException.class) + public void testQueueParsingFailWhenSumOfChildrenLabeledCapacityNot100Percent() + throws IOException { + nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet + .of("red", "blue")); + + YarnConfiguration conf = new YarnConfiguration(); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(conf); + setupQueueConfigurationWithLabels(csConf); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT + ".b.b3", + "red", 24); + + CapacityScheduler capacityScheduler = new CapacityScheduler(); + RMContextImpl rmContext = + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(csConf), + new NMTokenSecretManagerInRM(csConf), + new ClientToAMTokenSecretManagerInRM(), null); + rmContext.setNodeLabelManager(nodeLabelManager); + capacityScheduler.setConf(csConf); + capacityScheduler.setRMContext(rmContext); + capacityScheduler.init(csConf); + capacityScheduler.start(); + ServiceOperations.stopQuietly(capacityScheduler); + } + + /** + * Test init a queue configuration, children's capacity for a given label + * doesn't equals to 100%. This expect IllegalArgumentException thrown. + */ + @Test(expected = IllegalArgumentException.class) + public void testQueueParsingWithSumOfChildLabelCapacityNot100PercentWithWildCard() + throws IOException { + nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet + .of("red", "blue")); + + YarnConfiguration conf = new YarnConfiguration(); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(conf); + setupQueueConfigurationWithLabels(csConf); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT + ".b.b3", + "red", 24); + csConf.setAccessibleNodeLabels(CapacitySchedulerConfiguration.ROOT, + ImmutableSet.of(RMNodeLabelsManager.ANY)); + csConf.setAccessibleNodeLabels(CapacitySchedulerConfiguration.ROOT + ".b", + ImmutableSet.of(RMNodeLabelsManager.ANY)); + + CapacityScheduler capacityScheduler = new CapacityScheduler(); + RMContextImpl rmContext = + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(csConf), + new NMTokenSecretManagerInRM(csConf), + new ClientToAMTokenSecretManagerInRM(), null); + rmContext.setNodeLabelManager(nodeLabelManager); + capacityScheduler.setConf(csConf); + capacityScheduler.setRMContext(rmContext); + capacityScheduler.init(csConf); + capacityScheduler.start(); + ServiceOperations.stopQuietly(capacityScheduler); + } }