diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 33e8a1f..399086d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2029,6 +2029,23 @@ public static boolean isDistributedNodeLabelConfiguration(Configuration conf) {
public static final String NM_PROVIDER_CONFIGURED_NODE_LABELS =
NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-labels";
+ private static final String RM_NODE_LABELS_PREFIX = RM_PREFIX
+ + "node-labels.";
+
+ public static final String RM_NODE_LABELS_PROVIDER_CONFIG =
+ RM_NODE_LABELS_PREFIX + "provider";
+
+ private static final String RM_NODE_LABELS_PROVIDER_PREFIX =
+ RM_NODE_LABELS_PREFIX + "provider.";
+
+ //If -1 is configured then no timer task should be created
+ public static final String RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
+ RM_NODE_LABELS_PROVIDER_PREFIX + "fetch-interval-ms";
+
+ //once in 10 mins
+ public static final long DEFAULT_RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
+ 10 * 60 * 1000;
+
public static final String AM_BLACKLISTING_ENABLED =
YARN_PREFIX + "am.blacklisting.enabled";
public static final boolean DEFAULT_AM_BLACKLISTING_ENABLED = true;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
index 75e6cee..fd636bc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
@@ -118,6 +118,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
@@ -371,7 +372,8 @@ protected ResourceTrackerService createResourceTrackerService() {
return new CustomedResourceTrackerService(this.rmContext,
this.nodesListManager, this.nmLivelinessMonitor,
this.rmContext.getContainerTokenSecretManager(),
- this.rmContext.getNMTokenSecretManager());
+ this.rmContext.getNMTokenSecretManager(),
+ this.getNodeLabelsUpdater());
}
return super.createResourceTrackerService();
}
@@ -724,9 +726,11 @@ public CustomedResourceTrackerService(RMContext rmContext,
NodesListManager nodesListManager,
NMLivelinessMonitor nmLivelinessMonitor,
RMContainerTokenSecretManager containerTokenSecretManager,
- NMTokenSecretManagerInRM nmTokenSecretManager) {
+ NMTokenSecretManagerInRM nmTokenSecretManager,
+ RMNodeLabelsUpdater nodeLabelsProvider) {
super(rmContext, nodesListManager, nmLivelinessMonitor,
- containerTokenSecretManager, nmTokenSecretManager);
+ containerTokenSecretManager, nmTokenSecretManager,
+ nodeLabelsProvider);
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index bcd64c3..e4342d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2148,6 +2148,23 @@
yarn.nodemanager.node-labels.provider.fetch-timeout-ms
1200000
+
+
+
+
+ The class to use as the node labels fetcher by ResourceManager.
+
+ yarn.resourcemanager.node-labels.provider
+
+
+
+
+
+ The interval to use to update node labels by ResourceManager.
+
+ yarn.resourcemanager.node-labels.provider.fetch-interval-ms
+ 600000
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index ab46419..f77ed88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -746,6 +746,7 @@ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
final String msg = "set node to labels.";
checkAndThrowIfDistributedNodeLabelConfEnabled(operation);
+ checkAndThrowIfNodeLabelsFetcherConfigured(operation);
UserGroupInformation user = checkAcls(operation);
checkRMStatus(user.getShortUserName(), operation, msg);
@@ -791,6 +792,23 @@ private void checkAndThrowIfDistributedNodeLabelConfEnabled(String operation)
}
}
+ /**
+ * Disable CLI requests to update node labels if NodeLabelsFetcher is
+ * configured.
+ */
+ private void checkAndThrowIfNodeLabelsFetcherConfigured(String operation)
+ throws IOException {
+ Configuration conf = getConfig();
+ String providerString =
+ conf.get(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG, null);
+ if (providerString != null && providerString.trim().length() != 0) {
+ String msg = String.format("Error when invoke method=%s because of "
+ + "node labels provider is configured.", operation);
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+ }
+
@Override
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index d6d9629..52c1c67 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -62,6 +62,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@@ -159,6 +160,7 @@
private AppReportFetcher fetcher = null;
protected ResourceTrackerService resourceTracker;
private JvmPauseMonitor pauseMonitor;
+ private RMNodeLabelsUpdater nodeLabelsUpdater;
@VisibleForTesting
protected String webAppAddress;
@@ -253,6 +255,11 @@ protected void serviceInit(Configuration conf) throws Exception {
rmContext.setYarnConfiguration(conf);
+ nodeLabelsUpdater = createNodeLabelsUpdater(conf);
+ if (nodeLabelsUpdater != null) {
+ addService(nodeLabelsUpdater);
+ }
+
createAndInitActiveServices();
webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
@@ -1090,7 +1097,8 @@ protected ResourceTrackerService createResourceTrackerService() {
return new ResourceTrackerService(this.rmContext, this.nodesListManager,
this.nmLivelinessMonitor,
this.rmContext.getContainerTokenSecretManager(),
- this.rmContext.getNMTokenSecretManager());
+ this.rmContext.getNMTokenSecretManager(),
+ this.nodeLabelsUpdater);
}
protected ClientRMService createClientRMService() {
@@ -1111,6 +1119,20 @@ protected RMSecretManagerService createRMSecretManagerService() {
return new RMSecretManagerService(conf, rmContext);
}
+ /**
+ * Create NodeLabelsProvider based on configuration.
+ */
+ protected RMNodeLabelsUpdater createNodeLabelsUpdater(Configuration conf)
+ throws IOException {
+ String providerString =
+ conf.get(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG, null);
+ if (providerString == null || providerString.trim().length() == 0) {
+ return null;
+ } else {
+ return new RMNodeLabelsUpdater(rmContext);
+ }
+ }
+
@Private
public ClientRMService getClientRMService() {
return this.clientRM;
@@ -1150,6 +1172,11 @@ public QueueACLsManager getQueueACLsManager() {
}
@Private
+ public RMNodeLabelsUpdater getNodeLabelsUpdater() {
+ return nodeLabelsUpdater;
+ }
+
+ @Private
WebApp getWebapp() {
return this.webApp;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 7e774c5..76beaaa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -63,6 +63,7 @@
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
@@ -95,6 +96,7 @@
private final NMLivelinessMonitor nmLivelinessMonitor;
private final RMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInRM nmTokenSecretManager;
+ private final RMNodeLabelsUpdater nodeLabelsProvider;
private long nextHeartBeatInterval;
private Server server;
@@ -110,14 +112,15 @@ public ResourceTrackerService(RMContext rmContext,
NodesListManager nodesListManager,
NMLivelinessMonitor nmLivelinessMonitor,
RMContainerTokenSecretManager containerTokenSecretManager,
- NMTokenSecretManagerInRM nmTokenSecretManager) {
+ NMTokenSecretManagerInRM nmTokenSecretManager,
+ RMNodeLabelsUpdater nodeLabelsProvider) {
super(ResourceTrackerService.class.getName());
this.rmContext = rmContext;
this.nodesListManager = nodesListManager;
this.nmLivelinessMonitor = nmLivelinessMonitor;
this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager;
-
+ this.nodeLabelsProvider = nodeLabelsProvider;
}
@Override
@@ -363,6 +366,8 @@ public RegisterNodeManagerResponse registerNodeManager(
response.setDiagnosticsMessage(ex.getMessage());
response.setAreNodeLabelsAcceptedByRM(false);
}
+ } else if (!isDistributedNodeLabelsConf && nodeLabelsProvider != null) {
+ nodeLabelsProvider.register(nodeId);
}
StringBuilder message = new StringBuilder();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/AbstractNodeLabelsUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/AbstractNodeLabelsUpdater.java
new file mode 100644
index 0000000..fe6ab2c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/AbstractNodeLabelsUpdater.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Provides base implementation to update Nodelabels periodically.
+ */
+public abstract class AbstractNodeLabelsUpdater extends AbstractService {
+
+ private static final Log LOG = LogFactory
+ .getLog(AbstractNodeLabelsUpdater.class);
+
+ public static final long DISABLE_NODE_LABELS_UPDATER_TIMER = -1;
+
+ private long updateInterval;
+
+ // Thread which runs periodically to update node labels
+ private Thread nodeLabelsUpdaterThread;
+
+ private volatile boolean stopped;
+
+ public static final String NODE_LABELS_SEPRATOR = ",";
+
+ private NodeLabelsProvider nodeLabelsProvider;
+
+ public AbstractNodeLabelsUpdater(String name) {
+ super(name);
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ updateInterval = getNodeLabelFetchInterval(conf);
+ nodeLabelsProvider = getNodeLabelsProvider(conf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ if (updateInterval != DISABLE_NODE_LABELS_UPDATER_TIMER) {
+ nodeLabelsUpdaterThread = new Thread(new NodeLabelsUpdater());
+ nodeLabelsUpdaterThread.setName("NodeLabelsUpdater");
+ nodeLabelsUpdaterThread.start();
+ }
+ super.serviceStart();
+ }
+
+ /**
+ * Terminate the timer.
+ * @throws Exception
+ */
+ @Override
+ protected void serviceStop() throws Exception {
+ stopped = true;
+ if (nodeLabelsUpdaterThread != null) {
+ nodeLabelsUpdaterThread.interrupt();
+ }
+ super.serviceStop();
+ }
+
+ private class NodeLabelsUpdater implements Runnable {
+ @Override
+ public void run() {
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ try {
+ Set nodes = getNodesToUpdateLabels();
+ updateNodeLabels(new YarnConfiguration(), nodes);
+ } catch (IOException e) {
+ LOG.warn("Failed to update node Labels", e);
+ }
+
+ try {
+ synchronized (this) {
+ wait(updateInterval);
+ }
+ } catch (InterruptedException e) {
+ LOG.info(getName() + " thread interrupted");
+ break;
+ }
+ }
+ }
+ }
+
+ private void updateNodeLabels(Configuration conf, Set nodes)
+ throws IOException {
+ if (nodes != null) {
+ Map> labelsUpdated =
+ nodeLabelsProvider.getNodeLabels(conf, nodes);
+ if (labelsUpdated != null && labelsUpdated.size() != 0) {
+ nodeLabelsUpdated(labelsUpdated);
+ }
+ }
+ }
+
+ /**
+ * Get the interval in milliseconds to update node labels.
+ */
+ public abstract long getNodeLabelFetchInterval(Configuration conf);
+
+ /**
+ * Get the NodeLabelsProvider which is used to provide node labels.
+ */
+ public abstract NodeLabelsProvider getNodeLabelsProvider(Configuration conf)
+ throws IOException;
+
+ /**
+ * Get the nodes to update labels.
+ */
+ public abstract Set getNodesToUpdateLabels();
+
+ /**
+ * Callback to notify the newly updated node -> labels map.
+ */
+ public abstract void nodeLabelsUpdated(Map> labelsUpdate)
+ throws IOException;
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsProvider.java
new file mode 100644
index 0000000..6e184a4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsProvider.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+/**
+ * Interface which is responsible for providing the labels.
+ */
+public interface NodeLabelsProvider {
+
+ /**
+ * Provides the labels. LabelProvider is expected to give same Labels
+ * continuously until there is a change in labels.
+ *
+ * @param conf configuration
+ * @param nodes to fetch labels
+ * @return Set of node label strings applicable for a node
+ */
+ public Map> getNodeLabels(Configuration conf, Set nodes);
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsUpdater.java
new file mode 100644
index 0000000..fcff6f2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsUpdater.java
@@ -0,0 +1,113 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+
+/**
+ * Update NodeLabels for ResourceManager.
+ */
+public class RMNodeLabelsUpdater extends AbstractNodeLabelsUpdater {
+
+ private static final Log LOG = LogFactory.getLog(RMNodeLabelsUpdater.class);
+
+ private RMContext rmContext;
+
+ private Set newlyRegisterNodes = new HashSet();
+
+ public RMNodeLabelsUpdater(RMContext rmContext) {
+ super("ResourceManager NodeLabelsUpdater");
+ this.rmContext = rmContext;
+ }
+
+ @Override
+ public long getNodeLabelFetchInterval(Configuration conf) {
+ return conf.getLong(
+ YarnConfiguration.RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS);
+ }
+
+ @Override
+ public NodeLabelsProvider getNodeLabelsProvider(Configuration conf)
+ throws IOException {
+ NodeLabelsProvider nodeLabelsProvider = null;
+ try {
+ Class extends NodeLabelsProvider> labelsProviderClass = conf.getClass(
+ YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG, null,
+ NodeLabelsProvider.class);
+ if (labelsProviderClass != null) {
+ nodeLabelsProvider = labelsProviderClass.newInstance();
+ }
+ } catch (InstantiationException | IllegalAccessException
+ | RuntimeException e) {
+ LOG.error("Failed to create NodeLabelsProvider based on Configuration", e);
+ throw new IOException("Failed to create NodeLabelsProvider : "
+ + e.getMessage(), e);
+ }
+
+ if (nodeLabelsProvider != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Node Labels provider class is : "
+ + nodeLabelsProvider.getClass().toString());
+ }
+ }
+
+ return nodeLabelsProvider;
+ }
+
+ @Override
+ public Set getNodesToUpdateLabels() {
+ if (!newlyRegisterNodes.isEmpty()) {
+ synchronized(this) {
+ if (!newlyRegisterNodes.isEmpty()) {
+ Set nodes = new HashSet(newlyRegisterNodes);
+ newlyRegisterNodes.clear();
+ return nodes;
+ }
+ }
+ }
+
+ return Collections.unmodifiableSet(rmContext.getRMNodes().keySet());
+ }
+
+ @Override
+ public void nodeLabelsUpdated(Map> labelsUpdate)
+ throws IOException {
+ rmContext.getNodeLabelManager().replaceLabelsOnNode(labelsUpdate);
+ }
+
+ /**
+ * Register a newly added node.
+ */
+ public synchronized void register(NodeId node) {
+ newlyRegisterNodes.add(node);
+ notify();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
index 2410053..a628ab9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
@@ -201,6 +201,22 @@ private void checkAndThrowIfDistributedNodeLabelConfEnabled(String operation)
}
}
+ /**
+ * Disable REST requests to update node labels if NodeLabelsFetcher is
+ * configured.
+ */
+ private void checkAndThrowIfNodeLabelsFetcherConfigured(String operation)
+ throws IOException {
+ String providerString =
+ conf.get(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG, null);
+ if (providerString != null && providerString.trim().length() != 0) {
+ String msg = String.format("Error when invoke method=%s because of "
+ + "node labels provider is configured.", operation);
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+ }
+
RMWebServices(ResourceManager rm, Configuration conf,
HttpServletResponse response) {
this(rm, conf);
@@ -893,6 +909,7 @@ private Response replaceLabelsOnNode(
init();
checkAndThrowIfDistributedNodeLabelConfEnabled("replaceLabelsOnNode");
+ checkAndThrowIfNodeLabelsFetcherConfigured("replaceLabelsOnNode");
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
if (callerUGI == null) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 249f093..be53ac9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -624,7 +624,7 @@ protected ResourceTrackerService createResourceTrackerService() {
nmTokenSecretManager.rollMasterKey();
return new ResourceTrackerService(getRMContext(), nodesListManager,
this.nmLivelinessMonitor, containerTokenSecretManager,
- nmTokenSecretManager) {
+ nmTokenSecretManager, this.getNodeLabelsUpdater()) {
@Override
protected void serviceStart() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
index 62cfe84..42c1b9a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
@@ -553,7 +553,8 @@ protected ResourceTrackerService createResourceTrackerService() {
return new ResourceTrackerService(this.rmContext,
this.nodesListManager, this.nmLivelinessMonitor,
this.rmContext.getContainerTokenSecretManager(),
- this.rmContext.getNMTokenSecretManager()) {
+ this.rmContext.getNMTokenSecretManager(),
+ this.getNodeLabelsUpdater()) {
@Override
protected void serviceStart() throws Exception {
throw new Exception("ResourceTracker service failed");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index de17acd..250fcce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -1944,7 +1944,8 @@ protected ResourceTrackerService createResourceTrackerService() {
return new ResourceTrackerService(this.rmContext,
this.nodesListManager, this.nmLivelinessMonitor,
this.rmContext.getContainerTokenSecretManager(),
- this.rmContext.getNMTokenSecretManager()) {
+ this.rmContext.getNMTokenSecretManager(),
+ this.getNodeLabelsUpdater()) {
@Override
protected void serviceStart() throws Exception {
// send the container_finished event as soon as the
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
index c837450..d05da1d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
@@ -91,8 +91,8 @@ public void setUp() {
nmTokenSecretManager.start();
resourceTrackerService = new ResourceTrackerService(context,
nodesListManager, nmLivelinessMonitor, containerTokenSecretManager,
- nmTokenSecretManager);
-
+ nmTokenSecretManager, null);
+
resourceTrackerService.init(conf);
resourceTrackerService.start();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
index dce3d06..a411ac3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
@@ -107,8 +107,8 @@ public void setUp() {
nmTokenSecretManager.start();
resourceTrackerService = new ResourceTrackerService(context,
nodesListManager, nmLivelinessMonitor, containerTokenSecretManager,
- nmTokenSecretManager);
-
+ nmTokenSecretManager, null);
+
resourceTrackerService.init(conf);
resourceTrackerService.start();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
index 4f94695..c8f6c4f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
@@ -82,7 +82,7 @@ public void handle(Event event) {
resourceTrackerService = new ResourceTrackerService(context,
nodesListManager, new NMLivelinessMonitor(dispatcher),
context.getContainerTokenSecretManager(),
- context.getNMTokenSecretManager());
+ context.getNMTokenSecretManager(), null);
resourceTrackerService.init(conf);
}