diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index f9a72191a67..07587b33db9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -1190,8 +1190,7 @@ public void recordContainerAllocationTime(long value) { } @Private - public boolean hasPendingResourceRequest(ResourceCalculator rc, - String nodePartition, Resource cluster, + public boolean hasPendingResourceRequest(String nodePartition, SchedulingMode schedulingMode) { // We need to consider unconfirmed allocations if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { @@ -1204,16 +1203,12 @@ public boolean hasPendingResourceRequest(ResourceCalculator rc, // To avoid too many allocation-proposals rejected for non-default // partition allocation if (StringUtils.equals(nodePartition, RMNodeLabelsManager.NO_LABEL)) { - pending = Resources.subtract(pending, Resources + Resources.subtractFromNonNegative(pending, Resources .createResource(unconfirmedAllocatedMem.get(), unconfirmedAllocatedVcores.get())); } - if (Resources.greaterThan(rc, cluster, pending, Resources.none())) { - return true; - } - - return false; + return !Resources.isNone(pending); } /* diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index f753d31fdbf..b3eca936fe3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -838,8 +838,8 @@ public CSAssignment assignContainers(Resource clusterResource, if (reservedContainer == null) { // Check if application needs more resource, skip if it doesn't need more. - if (!application.hasPendingResourceRequest(rc, - ps.getPartition(), clusterResource, schedulingMode)) { + if (!application.hasPendingResourceRequest(ps.getPartition(), + schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId() + ", because it doesn't need more resource, schedulingMode=" diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 71e6f7fd7df..1ba102fc343 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -96,6 +96,8 @@ private final SchedulingPolicy defaultSchedulingPolicy; + private final Map> accessibleNodeLabels; + // Policy for mapping apps to queues @VisibleForTesting QueuePlacementPolicy placementPolicy; @@ -128,7 +130,8 @@ public AllocationConfiguration(Map minQueueResources, Map> configuredQueues, ReservationQueueConfiguration globalReservationQueueConfig, Set reservableQueues, - Set nonPreemptableQueues) { + Set nonPreemptableQueues, + Map> accessibleNodeLabels) { this.minQueueResources = minQueueResources; this.maxQueueResources = maxQueueResources; this.maxChildQueueResources = maxChildQueueResources; @@ -152,6 +155,7 @@ public AllocationConfiguration(Map minQueueResources, this.placementPolicy = placementPolicy; this.configuredQueues = configuredQueues; this.nonPreemptableQueues = nonPreemptableQueues; + this.accessibleNodeLabels = accessibleNodeLabels; } public AllocationConfiguration(Configuration conf) { @@ -181,6 +185,7 @@ public AllocationConfiguration(Configuration conf) { placementPolicy = QueuePlacementPolicy.fromConfiguration(conf, configuredQueues); nonPreemptableQueues = new HashSet<>(); + accessibleNodeLabels = new HashMap<>(); } /** @@ -329,6 +334,10 @@ SchedulingPolicy getSchedulingPolicy(String queueName) { return (policy == null) ? defaultSchedulingPolicy : policy; } + public Set getAccessibleNodeLabel(String queueName) { + return accessibleNodeLabels.get(queueName); + } + public SchedulingPolicy getDefaultSchedulingPolicy() { return defaultSchedulingPolicy; } @@ -416,6 +425,7 @@ public void initFSQueue(FSQueue queue){ queue.setMaxAMShare(getQueueMaxAMShare(name)); queue.setMaxChildQueueResource(getMaxChildResources(name)); + // Set queue metrics. queue.getMetrics().setMinShare(getMinResources(name)); queue.getMetrics().setMaxShare(getMaxResources(name)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 313a27ae378..656e6400ebc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -243,6 +243,7 @@ public synchronized void reloadAllocations() throws IOException, new HashMap<>(); Set reservableQueues = new HashSet<>(); Set nonPreemptableQueues = new HashSet<>(); + Map> accessibleNodeLabels = new HashMap<>(); int userMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE; Resource queueMaxResourcesDefault = Resources.unbounded(); @@ -386,7 +387,7 @@ public synchronized void reloadAllocations() throws IOException, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, reservationAcls, configuredQueues, reservableQueues, - nonPreemptableQueues); + nonPreemptableQueues, accessibleNodeLabels); } // Load placement policy and pass it configured queues @@ -436,7 +437,8 @@ public synchronized void reloadAllocations() throws IOException, defaultSchedPolicy, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, reservationAcls, newPlacementPolicy, configuredQueues, - globalReservationQueueConfig, reservableQueues, nonPreemptableQueues); + globalReservationQueueConfig, reservableQueues, nonPreemptableQueues, + accessibleNodeLabels); lastSuccessfulReload = clock.getTime(); lastReloadAttemptFailed = false; @@ -463,7 +465,8 @@ private void loadQueue(String parentName, Element element, Map> resAcls, Map> configuredQueues, Set reservableQueues, - Set nonPreemptableQueues) + Set nonPreemptableQueues, + Map> accessibleNodeLabels) throws AllocationConfigurationException { String queueName = FairSchedulerUtilities.trimQueueName( element.getAttribute("name")); @@ -568,6 +571,21 @@ private void loadQueue(String parentName, Element element, if (!Boolean.parseBoolean(text)) { nonPreemptableQueues.add(queueName); } + } else if ("accessibleNodeLabel".equals(field.getTagName())) { + String[] nodeLabel = + ((Text)field.getFirstChild()).getData().trim().split(","); + + if (nodeLabel != null && nodeLabel.length > 0) { + Set nodeLabelSet = new HashSet<>(); + + for (String label : nodeLabel) { + if (label != null && label.length() > 0) { + nodeLabelSet.add(label); + } + } + + accessibleNodeLabels.put(queueName, nodeLabelSet); + } } else if ("queue".endsWith(field.getTagName()) || "pool".equals(field.getTagName())) { loadQueue(queueName, field, minQueueResources, maxQueueResources, @@ -575,7 +593,7 @@ private void loadQueue(String parentName, Element element, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, resAcls, configuredQueues, reservableQueues, - nonPreemptableQueues); + nonPreemptableQueues, accessibleNodeLabels); isLeaf = false; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 5dfef731e20..dc61085503f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; @@ -55,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -1320,18 +1322,44 @@ public void updateDemand() { @Override public Resource assignContainer(FSSchedulerNode node) { + Resource res = Resources.none(); + if (isOverAMShareLimit()) { - PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk(); - updateAMDiagnosticMsg(amAsk.getPerAllocationResource(), - " exceeds maximum AM resource allowed)."); - if (LOG.isDebugEnabled()) { - LOG.debug("AM resource request: " + amAsk.getPerAllocationResource() - + " exceeds maximum AM resource allowed, " - + getQueue().dumpState()); + logOverAMShareLimit(); + } else if (nodeLabelsMatchContainerLabel(node.getNodeID())) { + res = assignContainer(node, false); + } + + return res; + } + + private void logOverAMShareLimit() { + PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk(); + updateAMDiagnosticMsg(amAsk.getPerAllocationResource(), + " exceeds maximum AM resource allowed)."); + if (LOG.isDebugEnabled()) { + LOG.debug("AM resource request: " + amAsk.getPerAllocationResource() + + " exceeds maximum AM resource allowed, " + + getQueue().dumpState()); + } + } + + private boolean nodeLabelsMatchContainerLabel(NodeId nodeId) { + RMNodeLabelsManager labelManager = scheduler.getLabelManager(); + Set labelsOnNode = labelManager.getLabelsOnNode(nodeId); + + boolean appHasLabel = false; + + for (String label : labelsOnNode) { + appHasLabel = hasPendingResourceRequest(label, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + + if (appHasLabel) { + break; } - return Resources.none(); } - return assignContainer(node, false); + + return appHasLabel; } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 5b4e4dc9e5e..c2caa28a150 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -219,7 +219,7 @@ public Resource assignContainer(FSSchedulerNode node) { try { for (FSQueue child : childQueues) { assigned = child.assignContainer(node); - if (!Resources.equals(assigned, Resources.none())) { + if (!Resources.isNone(assigned)) { break; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 10168232a94..1e349e53a79 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @Private @Unstable @@ -75,6 +77,7 @@ protected Resource maxShare; protected int maxRunningApps; protected Resource maxChildQueueResource; + private Set accessibleLabels = null; // maxAMShare is a value between 0 and 1. protected float maxAMShare; @@ -110,6 +113,7 @@ public void reinit(boolean recursive) { AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); allocConf.initFSQueue(this); updatePreemptionVariables(); + updateNodeLabels(); if (recursive) { for (FSQueue child : getChildQueues()) { @@ -386,6 +390,17 @@ private void updatePreemptionVariables() { } } + /** ++ * Update node label for this queue ++ */ + public void updateNodeLabels() { + accessibleLabels = scheduler.getAllocationConfiguration() + .getAccessibleNodeLabel(getName()); + if (accessibleLabels == null && parent != null) { + accessibleLabels = parent.getAccessibleNodeLabels(); + } + } + /** * Gets the children of this queue, if any. */ @@ -416,6 +431,8 @@ boolean assignContainerPreCheck(FSSchedulerNode node) { + " because it has reserved containers."); } return false; + } else if (!nodeLabelCheck(node.getNodeID())) { + return false; } else if (!Resources.fitsIn(getResourceUsage(), maxShare)) { if (LOG.isDebugEnabled()) { LOG.debug("Assigning container failed on node '" + node.getNodeName() @@ -423,9 +440,39 @@ boolean assignContainerPreCheck(FSSchedulerNode node) { + dumpState()); } return false; - } else { + } + + return true; + } + + /** + * Check if the queue's labels allow it to assign containers on this node. + * + * @param nodeId the ID of the node to check + * @return true if the queue is allowed to assign containers on this node + */ + protected boolean nodeLabelCheck(NodeId nodeId) { + boolean queueHasLabels = (accessibleLabels != null) && + !accessibleLabels.isEmpty(); + + // A queue with no label will accept any node + if (!queueHasLabels) { return true; } + + RMNodeLabelsManager labelManager = scheduler.getLabelManager(); + Set labelsOnNode = labelManager.getLabelsOnNode(nodeId); + + if (queueHasLabels && (labelsOnNode != null) && + !labelsOnNode.isEmpty()) { + for (String queueLabel : accessibleLabels) { + if (labelsOnNode.contains(queueLabel)) { + return true; + } + } + } + + return false; } /** @@ -444,10 +491,9 @@ public String toString() { @Override public Set getAccessibleNodeLabels() { - // TODO, add implementation for FS - return null; + return accessibleLabels; } - + @Override public String getDefaultNodeLabelExpression() { // TODO, add implementation for FS diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index c5212501b34..0d44f3c5646 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -105,6 +105,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; /** * A scheduler that schedules resources between a set of queues. The scheduler @@ -198,6 +199,8 @@ @VisibleForTesting final MaxRunningAppsEnforcer maxRunningEnforcer; + private RMNodeLabelsManager labelManager; + private AllocationFileLoaderService allocsLoader; @VisibleForTesting AllocationConfiguration allocConf; @@ -463,6 +466,10 @@ public FairSchedulerEventLog getEventLog() { return eventLog; } + public RMNodeLabelsManager getLabelManager() { + return labelManager; + } + /** * Add a new application to the scheduler, with a given id, queue name, and * user. This will accept a new app even if the user or queue is above @@ -1333,6 +1340,8 @@ private void initScheduler(Configuration conf) throws IOException { eventLog.init(this.conf); allocConf = new AllocationConfiguration(conf); + labelManager = rmContext.getNodeLabelManager(); + try { queueMgr.initialize(conf); } catch (Exception e) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index c08d13e9c48..2b0ef73a749 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -276,10 +276,14 @@ private FSQueue createNewQueues(FSQueueType queueType, String queueName = i.next(); // Check if child policy is allowed - SchedulingPolicy childPolicy = scheduler.getAllocationConfiguration(). - getSchedulingPolicy(queueName); + SchedulingPolicy childPolicy = queueConf.getSchedulingPolicy(queueName); + if (!parent.getPolicy().isChildPolicyAllowed(childPolicy)) { - LOG.error("Can't create queue '" + queueName + "'."); + LOG.error("Can't create queue '" + queueName + "' because the " + + "configured policy (" + childPolicy.getName() + ") is not " + + "allowed by the parent queue's policy (" + + parent.getPolicy().getName() + ")."); + return null; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index af4e1dd32a0..1028999b990 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -118,7 +118,15 @@ protected ResourceRequest createResourceRequest( protected ResourceRequest createResourceRequest( int memory, int vcores, String host, int priority, int numContainers, boolean relaxLocality) { - ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); + return createResourceRequest(memory, vcores, host, priority, numContainers, + relaxLocality, RMNodeLabelsManager.NO_LABEL); + } + + protected ResourceRequest createResourceRequest( + int memory, int vcores, String host, int priority, int numContainers, + boolean relaxLocality, String label) { + ResourceRequest request = + recordFactory.newRecordInstance(ResourceRequest.class); request.setCapability(BuilderUtils.newResource(memory, vcores)); request.setResourceName(host); request.setNumContainers(numContainers); @@ -126,7 +134,8 @@ protected ResourceRequest createResourceRequest( prio.setPriority(priority); request.setPriority(prio); request.setRelaxLocality(relaxLocality); - request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL); + request.setNodeLabelExpression(label); + return request; } @@ -163,6 +172,13 @@ protected ApplicationAttemptId createSchedulingRequest( protected ApplicationAttemptId createSchedulingRequest( int memory, int vcores, String queueId, String userId, int numContainers, int priority) { + return createSchedulingRequest(memory, vcores, queueId, userId, + numContainers, priority, RMNodeLabelsManager.NO_LABEL); + } + + protected ApplicationAttemptId createSchedulingRequest( + int memory, int vcores, String queueId, String userId, int numContainers, + int priority, String label) { ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); scheduler.addApplication(id.getApplicationId(), queueId, userId, false); // This conditional is for testAclSubmitApplication where app is rejected @@ -170,9 +186,9 @@ protected ApplicationAttemptId createSchedulingRequest( if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { scheduler.addApplicationAttempt(id, false, false); } - List ask = new ArrayList(); + List ask = new ArrayList<>(); ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, - priority, numContainers, true); + priority, numContainers, true, label); ask.add(request); RMApp rmApp = mock(RMApp.class); @@ -188,8 +204,8 @@ protected ApplicationAttemptId createSchedulingRequest( resourceManager.getRMContext().getRMApps() .put(id.getApplicationId(), rmApp); - scheduler.allocate(id, ask, new ArrayList(), - null, null, NULL_UPDATE_REQUESTS); + scheduler.allocate(id, ask, new ArrayList<>(), null, null, + NULL_UPDATE_REQUESTS); scheduler.update(); return id; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSNodeLabel.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSNodeLabel.java new file mode 100644 index 00000000000..0e088f0da6c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSNodeLabel.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +@SuppressWarnings("unchecked") +public class TestFSNodeLabel extends FairSchedulerTestBase { + private final static String ALLOC_FILE = + new File(TEST_DIR, "test-queues").getAbsolutePath(); + + @Before + public void setUp() throws IOException { + scheduler = new FairScheduler(); + conf = createConfiguration(); + conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + resourceManager = new ResourceManager(); + resourceManager.init(conf); + + // TODO: This test should really be using MockRM. For now starting stuff + // that is needed at a bare minimum. + ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); + resourceManager.getRMContext().getStateStore().start(); + + // to initialize the master key + resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey(); + + scheduler.setRMContext(resourceManager.getRMContext()); + } + + @After + public void tearDown() { + if (scheduler != null) { + scheduler.stop(); + scheduler = null; + } + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + QueueMetrics.clearQueueMetrics(); + DefaultMetricsSystem.shutdown(); + } + + @Test + public void testAssignmentWithNodeLabel() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("1024mb,0vcores"); + out.println(""); + out.println(""); + out.println("1024mb,0vcores"); + out.println("label2"); + out.println(""); + out.println(""); + out.println("label3,label4"); + out.println(""); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Create 3 nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + RMNode node3 = + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, + "127.0.0.3"); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + // add node label + Set clusterLabel = new HashSet<>(); + clusterLabel.add(NodeLabel.newInstance("label1")); + clusterLabel.add(NodeLabel.newInstance("label2")); + clusterLabel.add(NodeLabel.newInstance("label3")); + scheduler.getLabelManager().addToCluserNodeLabels(clusterLabel); + Map> nodeLabelMap = new HashMap<>(); + // node1: label1 + // node2: label2, label3 + // node3: + Set nodeLabel1 = new HashSet<>(); + nodeLabel1.add(RMNodeLabelsManager.NO_LABEL); + Set nodeLabel2 = new HashSet<>(); + nodeLabel2.add("label2"); + Set nodeLabel3 = new HashSet<>(); + nodeLabel3.add("label3"); + nodeLabelMap.put(node1.getNodeID(), nodeLabel1); + nodeLabelMap.put(node2.getNodeID(), nodeLabel2); + nodeLabelMap.put(node3.getNodeID(), nodeLabel3); + scheduler.getLabelManager().addLabelsToNode(nodeLabelMap); + + //case1 : app submitted into queue without node label could allocated on each node + ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "root.queueA", + "user1", 3 , 3); + scheduler.update(); + NodeUpdateSchedulerEvent nodeUpdate11 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate12 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent nodeUpdate13 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate11); + scheduler.handle(nodeUpdate12); + scheduler.handle(nodeUpdate13); + // app2 will allocated on node1 + for (RMContainer container: scheduler.getSchedulerApp(appAttId1).getLiveContainers()) { + assertTrue(container.getAllocatedNode().equals(node1.getNodeID())); + } + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + appAttId1, RMAppAttemptState.FINISHED, false); + scheduler.handle(appRemovedEvent1); + scheduler.update(); + + //case2 : app submitted into queueB could only be allocated on node1 + // now super app finished + ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 1, + "root.queueB", "user1", 3 , 3, "label2"); + scheduler.update(); + NodeUpdateSchedulerEvent nodeUpdate21 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate22 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent nodeUpdate23 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate21); + scheduler.handle(nodeUpdate22); + scheduler.handle(nodeUpdate23); + // app2 will allocated on node2 + for (RMContainer container: scheduler.getSchedulerApp(appAttId2).getLiveContainers()) { + assertTrue(container.getAllocatedNode().equals(node2.getNodeID())); + } + AppAttemptRemovedSchedulerEvent appRemovedEvent2 = new AppAttemptRemovedSchedulerEvent( + appAttId2, RMAppAttemptState.FINISHED, false); + scheduler.handle(appRemovedEvent2); + scheduler.update(); + + //case3 : app submitted into queueC could be allocated on node3 + // now super app finished + ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 1, + "root.queueC", "user1", 3, 3, "label3"); + scheduler.update(); + NodeUpdateSchedulerEvent nodeUpdate31 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate32 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent nodeUpdate33 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate31); + scheduler.handle(nodeUpdate32); + scheduler.handle(nodeUpdate33); + // app3 will allocated on node2 + for (RMContainer container: scheduler.getSchedulerApp(appAttId3).getLiveContainers()) { + assertTrue(container.getAllocatedNode().equals(node3.getNodeID())); + } + AppAttemptRemovedSchedulerEvent appRemovedEvent3 = new AppAttemptRemovedSchedulerEvent( + appAttId3, RMAppAttemptState.FINISHED, false); + scheduler.handle(appRemovedEvent3); + scheduler.update(); + } +}