diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
index d7aa413..61b9401 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.api.records;
import java.io.Serializable;
+import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
@@ -239,6 +240,30 @@ public static boolean isAnyLocation(String hostName) {
@Stable
public abstract void setRelaxLocality(boolean relaxLocality);
+ /**
+ * Get labels specified by the application for this
+ * ResourceRequest.
+ *
+ * An application can use labels to specify a subset of cluster nodes
+ * on which it wants to narrow container-allocations.
+ *
+ * @return labels specified by the application for this
+ * ResourceRequest
+ */
+ @Public
+ @Stable
+ public abstract List getLabels();
+
+ /**
+ * Set labels to narrow container-allocations to a subset of cluster
+ * nodes.
+ *
+ * @param labels node labels to narrow container allocations
+ */
+ @Public
+ @Stable
+ public abstract void setLabels(List labels);
+
@Override
public int hashCode() {
final int prime = 2153;
@@ -299,7 +324,20 @@ public int compareTo(ResourceRequest other) {
int numContainersComparison =
this.getNumContainers() - other.getNumContainers();
if (numContainersComparison == 0) {
- return 0;
+ List thisLabels = this.getLabels();
+ List otherLabels = other.getLabels();
+ if (thisLabels.size() == otherLabels.size() &&
+ !thisLabels.isEmpty()) {
+ for (int i=0; i getContainerStatuses();
+ public abstract List getLabels();
public abstract void setNodeId(NodeId nodeId);
public abstract void setHttpPort(int port);
public abstract void setResource(Resource resource);
public abstract void setNMVersion(String version);
public abstract void setContainerStatuses(List containerStatuses);
+ public abstract void setLabels(List labels);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 8a2c539..5adc23a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -281,8 +281,11 @@ public RegisterNodeManagerResponse registerNodeManager(
.getCurrentKey());
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
- resolve(host), ResourceOption.newInstance(capability, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT),
- nodeManagerVersion);
+ resolve(host),
+ ResourceOption.newInstance(capability,
+ RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT),
+ nodeManagerVersion,
+ request.getLabels());
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
if (oldNode == null) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index 24793e8..920b299 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+import java.util.Collection;
import java.util.List;
import org.apache.hadoop.net.Node;
@@ -147,4 +148,10 @@
* @return containerUpdates accumulated across NM heartbeats.
*/
public List pullContainerUpdates();
+
+ /**
+ * Get the list of labels associated with the node.
+ * @return the list of labels associated with the node
+ */
+ public Collection getLabels();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index dc53a5d..12239f0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
@@ -115,6 +116,8 @@
private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
+ private final Collection labels;
+
private static final StateMachineFactory stateMachine;
public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
- int cmPort, int httpPort, Node node, ResourceOption resourceOption, String nodeManagerVersion) {
+ int cmPort, int httpPort, Node node, ResourceOption resourceOption,
+ String nodeManagerVersion,
+ Collection labels) {
this.nodeId = nodeId;
this.context = context;
this.hostName = hostName;
@@ -191,6 +196,8 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
this.latestNodeHeartBeatResponse.setResponseId(0);
+ this.labels = labels;
+
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
@@ -717,4 +724,9 @@ public void setNextHeartBeat(boolean nextHeartBeat) {
public int getQueueSize() {
return nodeUpdateQueue.size();
}
+
+ @Override
+ public Collection getLabels() {
+ return labels;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index de71f71..a39c2cf 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -38,6 +39,7 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -69,14 +71,20 @@
/* Allocated by scheduler */
boolean pending = true; // for app metrics
+ private Set labels = new ConcurrentSkipListSet();
+ private Set invalidLabels = new ConcurrentSkipListSet();
+ private final SchedulerLabelsManager labelsManager;
+
public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
- String user, Queue queue, ActiveUsersManager activeUsersManager) {
+ String user, Queue queue, ActiveUsersManager activeUsersManager,
+ SchedulerLabelsManager labelsManager) {
this.applicationAttemptId = appAttemptId;
this.applicationId = appAttemptId.getApplicationId();
this.queue = queue;
this.queueName = queue.getQueueName();
this.user = user;
this.activeUsersManager = activeUsersManager;
+ this.labelsManager = labelsManager;
}
public ApplicationId getApplicationId() {
@@ -120,6 +128,7 @@ public int getNewContainerId() {
* @param requests resources to be acquired
*/
synchronized public void updateResourceRequests(
+ SchedulerApplicationAttempt applicationAttempt,
List requests) {
QueueMetrics metrics = queue.getMetrics();
@@ -176,9 +185,33 @@ synchronized public void updateResourceRequests(
metrics.decrPendingResources(user, lastRequestContainers,
lastRequestCapability);
}
+
+ // Process labels
+ if (labelsManager == null) {
+ LOG.warn("Labels are not supported!");
+ } else {
+ for (String label : request.getLabels()) {
+ addLabel(label);
+ labelsManager.addLabelToApplication(label, applicationAttempt);
+ }
+ }
}
}
+ public synchronized void addInvalidLabel(String label) {
+ invalidLabels.add(label);
+ }
+
+ public synchronized Collection getAndResetInvalidLabels() {
+ List current = new ArrayList(invalidLabels);
+ invalidLabels.clear();
+ return current;
+ }
+
+ public synchronized void addLabel(String label) {
+ labels.add(label);
+ }
+
/**
* The ApplicationMaster is updating the blacklist
*
@@ -379,7 +412,8 @@ synchronized public void move(Queue newQueue) {
this.queue = newQueue;
}
- synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
+ synchronized public void stop(SchedulerApplicationAttempt applicationAttempt,
+ RMAppAttemptState rmAppAttemptFinalState) {
// clear pending resources metrics for the application
QueueMetrics metrics = queue.getMetrics();
for (Map asks : requests.values()) {
@@ -393,6 +427,14 @@ synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
// Clear requests themselves
clearRequests();
+
+ // Clear lables and inform labelsManager
+ if (labelsManager != null) {
+ for (String label : labels) {
+ labelsManager.removeLabelFromApplication(label, applicationAttempt);
+ }
+ labels.clear();
+ }
}
public synchronized void setQueue(Queue queue) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 919d561..f317d40 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -59,7 +59,8 @@
*/
@Private
@Unstable
-public class SchedulerApplicationAttempt {
+public class SchedulerApplicationAttempt
+implements Comparable {
private static final Log LOG = LogFactory
.getLog(SchedulerApplicationAttempt.class);
@@ -99,12 +100,12 @@
protected final RMContext rmContext;
public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
- String user, Queue queue, ActiveUsersManager activeUsersManager,
- RMContext rmContext) {
+ String user, Queue queue, ActiveUsersManager activeUsersManager,
+ SchedulerLabelsManager labelsManager, RMContext rmContext) {
this.rmContext = rmContext;
this.appSchedulingInfo =
new AppSchedulingInfo(applicationAttemptId, user, queue,
- activeUsersManager);
+ activeUsersManager, labelsManager);
this.queue = queue;
}
@@ -202,14 +203,14 @@ public Queue getQueue() {
public synchronized void updateResourceRequests(
List requests) {
if (!isStopped) {
- appSchedulingInfo.updateResourceRequests(requests);
+ appSchedulingInfo.updateResourceRequests(this, requests);
}
}
public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
// Cleanup all scheduling information
isStopped = true;
- appSchedulingInfo.stop(rmAppAttemptFinalState);
+ appSchedulingInfo.stop(this, rmAppAttemptFinalState);
}
public synchronized boolean isStopped() {
@@ -497,5 +498,19 @@ public synchronized void move(Queue newQueue) {
appSchedulingInfo.move(newQueue);
this.queue = newQueue;
- }
+ }
+
+ public synchronized void addInvalidLabel(String label) {
+ appSchedulingInfo.addInvalidLabel(label);
+ }
+
+ public synchronized Collection getAndResetInvalidLabels() {
+ return appSchedulingInfo.getAndResetInvalidLabels();
+ }
+
+ @Override
+ public int compareTo(SchedulerApplicationAttempt o) {
+ return getApplicationAttemptId().compareTo(o.getApplicationAttemptId());
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerLabelsManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerLabelsManager.java
new file mode 100644
index 0000000..a85ae92
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerLabelsManager.java
@@ -0,0 +1,134 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+@Private
+public class SchedulerLabelsManager {
+ private static final Log LOG = LogFactory.getLog(SchedulerLabelsManager.class);
+
+ Map labels = new ConcurrentHashMap();
+
+ public synchronized void addNode(RMNode node) {
+ for (String label : node.getLabels()) {
+ SchedulerLabel schedulerLabel = labels.get(label);
+ if (schedulerLabel == null) {
+ schedulerLabel = new SchedulerLabel();
+ labels.put(label, schedulerLabel);
+ }
+ schedulerLabel.addNode();
+
+ LOG.info("Added label " + label + " for node " + node.getNodeID());
+ }
+ }
+
+ public synchronized void removeNode(RMNode node) {
+ for (String label : node.getLabels()) {
+ SchedulerLabel schedulerLabel = labels.get(label);
+ schedulerLabel.removeNode(); // This cannot be null, ever
+
+ // Remove label if it's no longer valid, also inform applications
+ if (!schedulerLabel.isValid()) {
+ for (SchedulerApplicationAttempt application : schedulerLabel.getApplications()) {
+ LOG.info("Label: " + label + " is invalid after removing node " +
+ node.getNodeID() + ", informing application: " +
+ application.getApplicationAttemptId());
+ application.addInvalidLabel(label);
+ }
+ labels.remove(label);
+ }
+
+ LOG.info("Removed label " + label + " emanating from node " +
+ node.getNodeID());
+ }
+ }
+
+ public void addLabelToApplication(
+ String label, SchedulerApplicationAttempt application) {
+ SchedulerLabel schedulerLabel = labels.get(label);
+
+ // Sanity check: Is the label valid?
+ if (schedulerLabel == null || !schedulerLabel.isValid()) {
+ LOG.info("Application " +
+ application.getApplicationAttemptId() +
+ " asking for invalid label: " + label);
+ application.addInvalidLabel(label);
+ }
+
+ schedulerLabel.addLabelToApplication(application);
+ }
+
+ public void removeLabelFromApplication(
+ String label, SchedulerApplicationAttempt application) {
+ SchedulerLabel schedulerLabel = labels.get(label);
+ if (schedulerLabel != null) {
+ schedulerLabel.removeLabelFromApplication(application);
+ }
+ }
+
+ /**
+ * Information about a label in the system.
+ *
+ * SchedulerLabel tracks a validity of the label and
+ * also the applications which are using this label.
+ */
+ @Private
+ static class SchedulerLabel {
+
+ int refCount;
+ Set applications =
+ new ConcurrentSkipListSet();
+
+ public synchronized void addNode() {
+ ++refCount;
+ }
+
+ public synchronized void removeNode() {
+ --refCount;
+ }
+
+ public synchronized boolean isValid() {
+ return refCount > 0;
+ }
+
+ public synchronized void addLabelToApplication(
+ SchedulerApplicationAttempt application) {
+ applications.add(application);
+ }
+
+ public synchronized void removeLabelFromApplication(
+ SchedulerApplicationAttempt application) {
+ applications.remove(application);
+ }
+
+ public synchronized Collection getApplications() {
+ return applications;
+ }
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 6c392b5..4a418ed 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -74,6 +74,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -194,6 +195,8 @@ public Configuration getConf() {
private ResourceCalculator calculator;
private boolean usePortForNodeName;
+ private SchedulerLabelsManager labelsManager = new SchedulerLabelsManager();
+
public CapacityScheduler() {}
@Override
@@ -465,7 +468,7 @@ private synchronized void addApplicationAttempt(
FiCaSchedulerApp attempt =
new FiCaSchedulerApp(applicationAttemptId, application.getUser(),
- queue, queue.getActiveUsersManager(), rmContext);
+ queue, queue.getActiveUsersManager(), labelsManager, rmContext);
if (transferStateFromPreviousAttempt) {
attempt.transferStateFromPreviousAttempt(application
.getCurrentAppAttempt());
@@ -606,14 +609,16 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId,
LOG.debug("allocate: pre-update" +
" applicationAttemptId=" + applicationAttemptId +
" application=" + application);
+ application.showRequests();
}
- application.showRequests();
// Update application requests
application.updateResourceRequests(ask);
- LOG.debug("allocate: post-update");
- application.showRequests();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("allocate: post-update");
+ application.showRequests();
+ }
}
if(LOG.isDebugEnabled()) {
@@ -826,6 +831,7 @@ public void handle(SchedulerEvent event) {
private synchronized void addNode(RMNode nodeManager) {
this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
usePortForNodeName));
+ labelsManager.addNode(nodeManager);
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
root.updateClusterResource(clusterResource);
++numNodeManagers;
@@ -863,6 +869,7 @@ private synchronized void removeNode(RMNode nodeInfo) {
}
this.nodes.remove(nodeInfo.getNodeID());
+ labelsManager.removeNode(nodeInfo);
LOG.info("Removed node " + nodeInfo.getNodeAddress() +
" clusterResource: " + clusterResource);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 470cb10..09a908c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -48,6 +48,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerLabelsManager;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -66,8 +67,9 @@
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
- RMContext rmContext) {
- super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
+ SchedulerLabelsManager labelsManager, RMContext rmContext) {
+ super(applicationAttemptId, user, queue, activeUsersManager, labelsManager,
+ rmContext);
}
synchronized public boolean containerCompleted(RMContainer rmContainer,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
index adabfef..587a4de 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
@@ -63,7 +63,7 @@
public FSSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, FSLeafQueue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) {
- super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
+ super(applicationAttemptId, user, queue, activeUsersManager, null, rmContext);
}
public void setAppSchedulable(AppSchedulable appSchedulable) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index 4ff38f0..31f972f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -134,7 +134,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
new TestNodeListManagerEventDispatcher());
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
- node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
+ node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null, null);
nodesListManagerEvent = null;
}
@@ -191,7 +191,8 @@ public void testContainerUpdate() throws InterruptedException{
node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
- RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
+ RMNodeImpl node2 =
+ new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null, null);
node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
@@ -459,7 +460,7 @@ private RMNodeImpl getRunningNode() {
Resource capability = Resource.newInstance(4096, 4);
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
null, ResourceOption.newInstance(capability,
- RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), null);
+ RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), null, null);
node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
Assert.assertEquals(NodeState.RUNNING, node.getState());
return node;
@@ -478,7 +479,8 @@ private RMNodeImpl getUnhealthyNode() {
private RMNodeImpl getNewNode() {
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
- RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
+ RMNodeImpl node =
+ new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null, null);
return node;
}