diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index d7aa413..61b9401 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.api.records; import java.io.Serializable; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; @@ -239,6 +240,30 @@ public static boolean isAnyLocation(String hostName) { @Stable public abstract void setRelaxLocality(boolean relaxLocality); + /** + * Get labels specified by the application for this + * ResourceRequest. + * + * An application can use labels to specify a subset of cluster nodes + * on which it wants to narrow container-allocations. + * + * @return labels specified by the application for this + * ResourceRequest + */ + @Public + @Stable + public abstract List getLabels(); + + /** + * Set labels to narrow container-allocations to a subset of cluster + * nodes. + * + * @param labels node labels to narrow container allocations + */ + @Public + @Stable + public abstract void setLabels(List labels); + @Override public int hashCode() { final int prime = 2153; @@ -299,7 +324,20 @@ public int compareTo(ResourceRequest other) { int numContainersComparison = this.getNumContainers() - other.getNumContainers(); if (numContainersComparison == 0) { - return 0; + List thisLabels = this.getLabels(); + List otherLabels = other.getLabels(); + if (thisLabels.size() == otherLabels.size() && + !thisLabels.isEmpty()) { + for (int i=0; i getContainerStatuses(); + public abstract List getLabels(); public abstract void setNodeId(NodeId nodeId); public abstract void setHttpPort(int port); public abstract void setResource(Resource resource); public abstract void setNMVersion(String version); public abstract void setContainerStatuses(List containerStatuses); + public abstract void setLabels(List labels); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 8a2c539..5adc23a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -281,8 +281,11 @@ public RegisterNodeManagerResponse registerNodeManager( .getCurrentKey()); RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, - resolve(host), ResourceOption.newInstance(capability, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), - nodeManagerVersion); + resolve(host), + ResourceOption.newInstance(capability, + RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), + nodeManagerVersion, + request.getLabels()); RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 24793e8..920b299 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode; +import java.util.Collection; import java.util.List; import org.apache.hadoop.net.Node; @@ -147,4 +148,10 @@ * @return containerUpdates accumulated across NM heartbeats. */ public List pullContainerUpdates(); + + /** + * Get the list of labels associated with the node. + * @return the list of labels associated with the node + */ + public Collection getLabels(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index dc53a5d..12239f0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -115,6 +116,8 @@ private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory .newRecordInstance(NodeHeartbeatResponse.class); + private final Collection labels; + private static final StateMachineFactory stateMachine; public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, - int cmPort, int httpPort, Node node, ResourceOption resourceOption, String nodeManagerVersion) { + int cmPort, int httpPort, Node node, ResourceOption resourceOption, + String nodeManagerVersion, + Collection labels) { this.nodeId = nodeId; this.context = context; this.hostName = hostName; @@ -191,6 +196,8 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, this.latestNodeHeartBeatResponse.setResponseId(0); + this.labels = labels; + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -717,4 +724,9 @@ public void setNextHeartBeat(boolean nextHeartBeat) { public int getQueueSize() { return nodeUpdateQueue.size(); } + + @Override + public Collection getLabels() { + return labels; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index de71f71..a39c2cf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.util.resource.Resources; @@ -69,14 +71,20 @@ /* Allocated by scheduler */ boolean pending = true; // for app metrics + private Set labels = new ConcurrentSkipListSet(); + private Set invalidLabels = new ConcurrentSkipListSet(); + private final SchedulerLabelsManager labelsManager; + public AppSchedulingInfo(ApplicationAttemptId appAttemptId, - String user, Queue queue, ActiveUsersManager activeUsersManager) { + String user, Queue queue, ActiveUsersManager activeUsersManager, + SchedulerLabelsManager labelsManager) { this.applicationAttemptId = appAttemptId; this.applicationId = appAttemptId.getApplicationId(); this.queue = queue; this.queueName = queue.getQueueName(); this.user = user; this.activeUsersManager = activeUsersManager; + this.labelsManager = labelsManager; } public ApplicationId getApplicationId() { @@ -120,6 +128,7 @@ public int getNewContainerId() { * @param requests resources to be acquired */ synchronized public void updateResourceRequests( + SchedulerApplicationAttempt applicationAttempt, List requests) { QueueMetrics metrics = queue.getMetrics(); @@ -176,9 +185,33 @@ synchronized public void updateResourceRequests( metrics.decrPendingResources(user, lastRequestContainers, lastRequestCapability); } + + // Process labels + if (labelsManager == null) { + LOG.warn("Labels are not supported!"); + } else { + for (String label : request.getLabels()) { + addLabel(label); + labelsManager.addLabelToApplication(label, applicationAttempt); + } + } } } + public synchronized void addInvalidLabel(String label) { + invalidLabels.add(label); + } + + public synchronized Collection getAndResetInvalidLabels() { + List current = new ArrayList(invalidLabels); + invalidLabels.clear(); + return current; + } + + public synchronized void addLabel(String label) { + labels.add(label); + } + /** * The ApplicationMaster is updating the blacklist * @@ -379,7 +412,8 @@ synchronized public void move(Queue newQueue) { this.queue = newQueue; } - synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) { + synchronized public void stop(SchedulerApplicationAttempt applicationAttempt, + RMAppAttemptState rmAppAttemptFinalState) { // clear pending resources metrics for the application QueueMetrics metrics = queue.getMetrics(); for (Map asks : requests.values()) { @@ -393,6 +427,14 @@ synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) { // Clear requests themselves clearRequests(); + + // Clear lables and inform labelsManager + if (labelsManager != null) { + for (String label : labels) { + labelsManager.removeLabelFromApplication(label, applicationAttempt); + } + labels.clear(); + } } public synchronized void setQueue(Queue queue) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 919d561..f317d40 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -59,7 +59,8 @@ */ @Private @Unstable -public class SchedulerApplicationAttempt { +public class SchedulerApplicationAttempt +implements Comparable { private static final Log LOG = LogFactory .getLog(SchedulerApplicationAttempt.class); @@ -99,12 +100,12 @@ protected final RMContext rmContext; public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, - String user, Queue queue, ActiveUsersManager activeUsersManager, - RMContext rmContext) { + String user, Queue queue, ActiveUsersManager activeUsersManager, + SchedulerLabelsManager labelsManager, RMContext rmContext) { this.rmContext = rmContext; this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user, queue, - activeUsersManager); + activeUsersManager, labelsManager); this.queue = queue; } @@ -202,14 +203,14 @@ public Queue getQueue() { public synchronized void updateResourceRequests( List requests) { if (!isStopped) { - appSchedulingInfo.updateResourceRequests(requests); + appSchedulingInfo.updateResourceRequests(this, requests); } } public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { // Cleanup all scheduling information isStopped = true; - appSchedulingInfo.stop(rmAppAttemptFinalState); + appSchedulingInfo.stop(this, rmAppAttemptFinalState); } public synchronized boolean isStopped() { @@ -497,5 +498,19 @@ public synchronized void move(Queue newQueue) { appSchedulingInfo.move(newQueue); this.queue = newQueue; - } + } + + public synchronized void addInvalidLabel(String label) { + appSchedulingInfo.addInvalidLabel(label); + } + + public synchronized Collection getAndResetInvalidLabels() { + return appSchedulingInfo.getAndResetInvalidLabels(); + } + + @Override + public int compareTo(SchedulerApplicationAttempt o) { + return getApplicationAttemptId().compareTo(o.getApplicationAttemptId()); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerLabelsManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerLabelsManager.java new file mode 100644 index 0000000..a85ae92 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerLabelsManager.java @@ -0,0 +1,134 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; + +@Private +public class SchedulerLabelsManager { + private static final Log LOG = LogFactory.getLog(SchedulerLabelsManager.class); + + Map labels = new ConcurrentHashMap(); + + public synchronized void addNode(RMNode node) { + for (String label : node.getLabels()) { + SchedulerLabel schedulerLabel = labels.get(label); + if (schedulerLabel == null) { + schedulerLabel = new SchedulerLabel(); + labels.put(label, schedulerLabel); + } + schedulerLabel.addNode(); + + LOG.info("Added label " + label + " for node " + node.getNodeID()); + } + } + + public synchronized void removeNode(RMNode node) { + for (String label : node.getLabels()) { + SchedulerLabel schedulerLabel = labels.get(label); + schedulerLabel.removeNode(); // This cannot be null, ever + + // Remove label if it's no longer valid, also inform applications + if (!schedulerLabel.isValid()) { + for (SchedulerApplicationAttempt application : schedulerLabel.getApplications()) { + LOG.info("Label: " + label + " is invalid after removing node " + + node.getNodeID() + ", informing application: " + + application.getApplicationAttemptId()); + application.addInvalidLabel(label); + } + labels.remove(label); + } + + LOG.info("Removed label " + label + " emanating from node " + + node.getNodeID()); + } + } + + public void addLabelToApplication( + String label, SchedulerApplicationAttempt application) { + SchedulerLabel schedulerLabel = labels.get(label); + + // Sanity check: Is the label valid? + if (schedulerLabel == null || !schedulerLabel.isValid()) { + LOG.info("Application " + + application.getApplicationAttemptId() + + " asking for invalid label: " + label); + application.addInvalidLabel(label); + } + + schedulerLabel.addLabelToApplication(application); + } + + public void removeLabelFromApplication( + String label, SchedulerApplicationAttempt application) { + SchedulerLabel schedulerLabel = labels.get(label); + if (schedulerLabel != null) { + schedulerLabel.removeLabelFromApplication(application); + } + } + + /** + * Information about a label in the system. + * + * SchedulerLabel tracks a validity of the label and + * also the applications which are using this label. + */ + @Private + static class SchedulerLabel { + + int refCount; + Set applications = + new ConcurrentSkipListSet(); + + public synchronized void addNode() { + ++refCount; + } + + public synchronized void removeNode() { + --refCount; + } + + public synchronized boolean isValid() { + return refCount > 0; + } + + public synchronized void addLabelToApplication( + SchedulerApplicationAttempt application) { + applications.add(application); + } + + public synchronized void removeLabelFromApplication( + SchedulerApplicationAttempt application) { + applications.remove(application); + } + + public synchronized Collection getApplications() { + return applications; + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6c392b5..4a418ed 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -194,6 +195,8 @@ public Configuration getConf() { private ResourceCalculator calculator; private boolean usePortForNodeName; + private SchedulerLabelsManager labelsManager = new SchedulerLabelsManager(); + public CapacityScheduler() {} @Override @@ -465,7 +468,7 @@ private synchronized void addApplicationAttempt( FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId, application.getUser(), - queue, queue.getActiveUsersManager(), rmContext); + queue, queue.getActiveUsersManager(), labelsManager, rmContext); if (transferStateFromPreviousAttempt) { attempt.transferStateFromPreviousAttempt(application .getCurrentAppAttempt()); @@ -606,14 +609,16 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, LOG.debug("allocate: pre-update" + " applicationAttemptId=" + applicationAttemptId + " application=" + application); + application.showRequests(); } - application.showRequests(); // Update application requests application.updateResourceRequests(ask); - LOG.debug("allocate: post-update"); - application.showRequests(); + if(LOG.isDebugEnabled()) { + LOG.debug("allocate: post-update"); + application.showRequests(); + } } if(LOG.isDebugEnabled()) { @@ -826,6 +831,7 @@ public void handle(SchedulerEvent event) { private synchronized void addNode(RMNode nodeManager) { this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager, usePortForNodeName)); + labelsManager.addNode(nodeManager); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); root.updateClusterResource(clusterResource); ++numNodeManagers; @@ -863,6 +869,7 @@ private synchronized void removeNode(RMNode nodeInfo) { } this.nodes.remove(nodeInfo.getNodeID()); + labelsManager.removeNode(nodeInfo); LOG.info("Removed node " + nodeInfo.getNodeAddress() + " clusterResource: " + clusterResource); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 470cb10..09a908c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerLabelsManager; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -66,8 +67,9 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, - RMContext rmContext) { - super(applicationAttemptId, user, queue, activeUsersManager, rmContext); + SchedulerLabelsManager labelsManager, RMContext rmContext) { + super(applicationAttemptId, user, queue, activeUsersManager, labelsManager, + rmContext); } synchronized public boolean containerCompleted(RMContainer rmContainer, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index adabfef..587a4de 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -63,7 +63,7 @@ public FSSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, FSLeafQueue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { - super(applicationAttemptId, user, queue, activeUsersManager, rmContext); + super(applicationAttemptId, user, queue, activeUsersManager, null, rmContext); } public void setAppSchedulable(AppSchedulable appSchedulable) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 4ff38f0..31f972f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -134,7 +134,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { new TestNodeListManagerEventDispatcher()); NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); - node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); + node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null, null); nodesListManagerEvent = null; } @@ -191,7 +191,8 @@ public void testContainerUpdate() throws InterruptedException{ node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1); - RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); + RMNodeImpl node2 = + new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null, null); node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId( @@ -459,7 +460,7 @@ private RMNodeImpl getRunningNode() { Resource capability = Resource.newInstance(4096, 4); RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, null, ResourceOption.newInstance(capability, - RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), null); + RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), null, null); node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED)); Assert.assertEquals(NodeState.RUNNING, node.getState()); return node; @@ -478,7 +479,8 @@ private RMNodeImpl getUnhealthyNode() { private RMNodeImpl getNewNode() { NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); - RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); + RMNodeImpl node = + new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null, null); return node; }