diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 33e8a1f..399086d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2029,6 +2029,23 @@ public static boolean isDistributedNodeLabelConfiguration(Configuration conf) { public static final String NM_PROVIDER_CONFIGURED_NODE_LABELS = NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-labels"; + private static final String RM_NODE_LABELS_PREFIX = RM_PREFIX + + "node-labels."; + + public static final String RM_NODE_LABELS_PROVIDER_CONFIG = + RM_NODE_LABELS_PREFIX + "provider"; + + private static final String RM_NODE_LABELS_PROVIDER_PREFIX = + RM_NODE_LABELS_PREFIX + "provider."; + + //If -1 is configured then no timer task should be created + public static final String RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS = + RM_NODE_LABELS_PROVIDER_PREFIX + "fetch-interval-ms"; + + //once in 10 mins + public static final long DEFAULT_RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS = + 10 * 60 * 1000; + public static final String AM_BLACKLISTING_ENABLED = YARN_PREFIX + "am.blacklisting.enabled"; public static final boolean DEFAULT_AM_BLACKLISTING_ENABLED = true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index 75e6cee..fd636bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -118,6 +118,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; @@ -371,7 +372,8 @@ protected ResourceTrackerService createResourceTrackerService() { return new CustomedResourceTrackerService(this.rmContext, this.nodesListManager, this.nmLivelinessMonitor, this.rmContext.getContainerTokenSecretManager(), - this.rmContext.getNMTokenSecretManager()); + this.rmContext.getNMTokenSecretManager(), + this.getNodeLabelsUpdater()); } return super.createResourceTrackerService(); } @@ -724,9 +726,11 @@ public CustomedResourceTrackerService(RMContext rmContext, NodesListManager nodesListManager, NMLivelinessMonitor nmLivelinessMonitor, RMContainerTokenSecretManager containerTokenSecretManager, - NMTokenSecretManagerInRM nmTokenSecretManager) { + NMTokenSecretManagerInRM nmTokenSecretManager, + RMNodeLabelsUpdater nodeLabelsProvider) { super(rmContext, nodesListManager, nmLivelinessMonitor, - containerTokenSecretManager, nmTokenSecretManager); + containerTokenSecretManager, nmTokenSecretManager, + nodeLabelsProvider); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index bcd64c3..e4342d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2148,6 +2148,23 @@ yarn.nodemanager.node-labels.provider.fetch-timeout-ms 1200000 + + + + + The class to use as the node labels fetcher by ResourceManager. + + yarn.resourcemanager.node-labels.provider + + + + + + The interval to use to update node labels by ResourceManager. + + yarn.resourcemanager.node-labels.provider.fetch-interval-ms + 600000 + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index ab46419..f77ed88 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -746,6 +746,7 @@ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( final String msg = "set node to labels."; checkAndThrowIfDistributedNodeLabelConfEnabled(operation); + checkAndThrowIfNodeLabelsFetcherConfigured(operation); UserGroupInformation user = checkAcls(operation); checkRMStatus(user.getShortUserName(), operation, msg); @@ -791,6 +792,23 @@ private void checkAndThrowIfDistributedNodeLabelConfEnabled(String operation) } } + /** + * Disable CLI requests to update node labels if NodeLabelsFetcher is + * configured. + */ + private void checkAndThrowIfNodeLabelsFetcherConfigured(String operation) + throws IOException { + Configuration conf = getConfig(); + String providerString = + conf.get(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG, null); + if (providerString != null && providerString.trim().length() != 0) { + String msg = String.format("Error when invoke method=%s because of " + + "node labels provider is configured.", operation); + LOG.error(msg); + throw new IOException(msg); + } + } + @Override public CheckForDecommissioningNodesResponse checkForDecommissioningNodes( CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index d6d9629..52c1c67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; @@ -159,6 +160,7 @@ private AppReportFetcher fetcher = null; protected ResourceTrackerService resourceTracker; private JvmPauseMonitor pauseMonitor; + private RMNodeLabelsUpdater nodeLabelsUpdater; @VisibleForTesting protected String webAppAddress; @@ -253,6 +255,11 @@ protected void serviceInit(Configuration conf) throws Exception { rmContext.setYarnConfiguration(conf); + nodeLabelsUpdater = createNodeLabelsUpdater(conf); + if (nodeLabelsUpdater != null) { + addService(nodeLabelsUpdater); + } + createAndInitActiveServices(); webAppAddress = WebAppUtils.getWebAppBindURL(this.conf, @@ -1090,7 +1097,8 @@ protected ResourceTrackerService createResourceTrackerService() { return new ResourceTrackerService(this.rmContext, this.nodesListManager, this.nmLivelinessMonitor, this.rmContext.getContainerTokenSecretManager(), - this.rmContext.getNMTokenSecretManager()); + this.rmContext.getNMTokenSecretManager(), + this.nodeLabelsUpdater); } protected ClientRMService createClientRMService() { @@ -1111,6 +1119,20 @@ protected RMSecretManagerService createRMSecretManagerService() { return new RMSecretManagerService(conf, rmContext); } + /** + * Create NodeLabelsProvider based on configuration. + */ + protected RMNodeLabelsUpdater createNodeLabelsUpdater(Configuration conf) + throws IOException { + String providerString = + conf.get(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG, null); + if (providerString == null || providerString.trim().length() == 0) { + return null; + } else { + return new RMNodeLabelsUpdater(rmContext); + } + } + @Private public ClientRMService getClientRMService() { return this.clientRM; @@ -1150,6 +1172,11 @@ public QueueACLsManager getQueueACLsManager() { } @Private + public RMNodeLabelsUpdater getNodeLabelsUpdater() { + return nodeLabelsUpdater; + } + + @Private WebApp getWebapp() { return this.webApp; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 7e774c5..76beaaa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; @@ -95,6 +96,7 @@ private final NMLivelinessMonitor nmLivelinessMonitor; private final RMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInRM nmTokenSecretManager; + private final RMNodeLabelsUpdater nodeLabelsProvider; private long nextHeartBeatInterval; private Server server; @@ -110,14 +112,15 @@ public ResourceTrackerService(RMContext rmContext, NodesListManager nodesListManager, NMLivelinessMonitor nmLivelinessMonitor, RMContainerTokenSecretManager containerTokenSecretManager, - NMTokenSecretManagerInRM nmTokenSecretManager) { + NMTokenSecretManagerInRM nmTokenSecretManager, + RMNodeLabelsUpdater nodeLabelsProvider) { super(ResourceTrackerService.class.getName()); this.rmContext = rmContext; this.nodesListManager = nodesListManager; this.nmLivelinessMonitor = nmLivelinessMonitor; this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; - + this.nodeLabelsProvider = nodeLabelsProvider; } @Override @@ -363,6 +366,8 @@ public RegisterNodeManagerResponse registerNodeManager( response.setDiagnosticsMessage(ex.getMessage()); response.setAreNodeLabelsAcceptedByRM(false); } + } else if (!isDistributedNodeLabelsConf && nodeLabelsProvider != null) { + nodeLabelsProvider.register(nodeId); } StringBuilder message = new StringBuilder(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/AbstractNodeLabelsUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/AbstractNodeLabelsUpdater.java new file mode 100644 index 0000000..fe6ab2c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/AbstractNodeLabelsUpdater.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * Provides base implementation to update Nodelabels periodically. + */ +public abstract class AbstractNodeLabelsUpdater extends AbstractService { + + private static final Log LOG = LogFactory + .getLog(AbstractNodeLabelsUpdater.class); + + public static final long DISABLE_NODE_LABELS_UPDATER_TIMER = -1; + + private long updateInterval; + + // Thread which runs periodically to update node labels + private Thread nodeLabelsUpdaterThread; + + private volatile boolean stopped; + + public static final String NODE_LABELS_SEPRATOR = ","; + + private NodeLabelsProvider nodeLabelsProvider; + + public AbstractNodeLabelsUpdater(String name) { + super(name); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + updateInterval = getNodeLabelFetchInterval(conf); + nodeLabelsProvider = getNodeLabelsProvider(conf); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + if (updateInterval != DISABLE_NODE_LABELS_UPDATER_TIMER) { + nodeLabelsUpdaterThread = new Thread(new NodeLabelsUpdater()); + nodeLabelsUpdaterThread.setName("NodeLabelsUpdater"); + nodeLabelsUpdaterThread.start(); + } + super.serviceStart(); + } + + /** + * Terminate the timer. + * @throws Exception + */ + @Override + protected void serviceStop() throws Exception { + stopped = true; + if (nodeLabelsUpdaterThread != null) { + nodeLabelsUpdaterThread.interrupt(); + } + super.serviceStop(); + } + + private class NodeLabelsUpdater implements Runnable { + @Override + public void run() { + while (!stopped && !Thread.currentThread().isInterrupted()) { + try { + Set nodes = getNodesToUpdateLabels(); + updateNodeLabels(new YarnConfiguration(), nodes); + } catch (IOException e) { + LOG.warn("Failed to update node Labels", e); + } + + try { + synchronized (this) { + wait(updateInterval); + } + } catch (InterruptedException e) { + LOG.info(getName() + " thread interrupted"); + break; + } + } + } + } + + private void updateNodeLabels(Configuration conf, Set nodes) + throws IOException { + if (nodes != null) { + Map> labelsUpdated = + nodeLabelsProvider.getNodeLabels(conf, nodes); + if (labelsUpdated != null && labelsUpdated.size() != 0) { + nodeLabelsUpdated(labelsUpdated); + } + } + } + + /** + * Get the interval in milliseconds to update node labels. + */ + public abstract long getNodeLabelFetchInterval(Configuration conf); + + /** + * Get the NodeLabelsProvider which is used to provide node labels. + */ + public abstract NodeLabelsProvider getNodeLabelsProvider(Configuration conf) + throws IOException; + + /** + * Get the nodes to update labels. + */ + public abstract Set getNodesToUpdateLabels(); + + /** + * Callback to notify the newly updated node -> labels map. + */ + public abstract void nodeLabelsUpdated(Map> labelsUpdate) + throws IOException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsProvider.java new file mode 100644 index 0000000..6e184a4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsProvider.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; + +/** + * Interface which is responsible for providing the labels. + */ +public interface NodeLabelsProvider { + + /** + * Provides the labels. LabelProvider is expected to give same Labels + * continuously until there is a change in labels. + * + * @param conf configuration + * @param nodes to fetch labels + * @return Set of node label strings applicable for a node + */ + public Map> getNodeLabels(Configuration conf, Set nodes); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsUpdater.java new file mode 100644 index 0000000..fcff6f2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsUpdater.java @@ -0,0 +1,113 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; + +/** + * Update NodeLabels for ResourceManager. + */ +public class RMNodeLabelsUpdater extends AbstractNodeLabelsUpdater { + + private static final Log LOG = LogFactory.getLog(RMNodeLabelsUpdater.class); + + private RMContext rmContext; + + private Set newlyRegisterNodes = new HashSet(); + + public RMNodeLabelsUpdater(RMContext rmContext) { + super("ResourceManager NodeLabelsUpdater"); + this.rmContext = rmContext; + } + + @Override + public long getNodeLabelFetchInterval(Configuration conf) { + return conf.getLong( + YarnConfiguration.RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS); + } + + @Override + public NodeLabelsProvider getNodeLabelsProvider(Configuration conf) + throws IOException { + NodeLabelsProvider nodeLabelsProvider = null; + try { + Class labelsProviderClass = conf.getClass( + YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG, null, + NodeLabelsProvider.class); + if (labelsProviderClass != null) { + nodeLabelsProvider = labelsProviderClass.newInstance(); + } + } catch (InstantiationException | IllegalAccessException + | RuntimeException e) { + LOG.error("Failed to create NodeLabelsProvider based on Configuration", e); + throw new IOException("Failed to create NodeLabelsProvider : " + + e.getMessage(), e); + } + + if (nodeLabelsProvider != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Node Labels provider class is : " + + nodeLabelsProvider.getClass().toString()); + } + } + + return nodeLabelsProvider; + } + + @Override + public Set getNodesToUpdateLabels() { + if (!newlyRegisterNodes.isEmpty()) { + synchronized(this) { + if (!newlyRegisterNodes.isEmpty()) { + Set nodes = new HashSet(newlyRegisterNodes); + newlyRegisterNodes.clear(); + return nodes; + } + } + } + + return Collections.unmodifiableSet(rmContext.getRMNodes().keySet()); + } + + @Override + public void nodeLabelsUpdated(Map> labelsUpdate) + throws IOException { + rmContext.getNodeLabelManager().replaceLabelsOnNode(labelsUpdate); + } + + /** + * Register a newly added node. + */ + public synchronized void register(NodeId node) { + newlyRegisterNodes.add(node); + notify(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 2410053..a628ab9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -201,6 +201,22 @@ private void checkAndThrowIfDistributedNodeLabelConfEnabled(String operation) } } + /** + * Disable REST requests to update node labels if NodeLabelsFetcher is + * configured. + */ + private void checkAndThrowIfNodeLabelsFetcherConfigured(String operation) + throws IOException { + String providerString = + conf.get(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG, null); + if (providerString != null && providerString.trim().length() != 0) { + String msg = String.format("Error when invoke method=%s because of " + + "node labels provider is configured.", operation); + LOG.error(msg); + throw new IOException(msg); + } + } + RMWebServices(ResourceManager rm, Configuration conf, HttpServletResponse response) { this(rm, conf); @@ -893,6 +909,7 @@ private Response replaceLabelsOnNode( init(); checkAndThrowIfDistributedNodeLabelConfEnabled("replaceLabelsOnNode"); + checkAndThrowIfNodeLabelsFetcherConfigured("replaceLabelsOnNode"); UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); if (callerUGI == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 249f093..be53ac9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -624,7 +624,7 @@ protected ResourceTrackerService createResourceTrackerService() { nmTokenSecretManager.rollMasterKey(); return new ResourceTrackerService(getRMContext(), nodesListManager, this.nmLivelinessMonitor, containerTokenSecretManager, - nmTokenSecretManager) { + nmTokenSecretManager, this.getNodeLabelsUpdater()) { @Override protected void serviceStart() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index 62cfe84..42c1b9a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -553,7 +553,8 @@ protected ResourceTrackerService createResourceTrackerService() { return new ResourceTrackerService(this.rmContext, this.nodesListManager, this.nmLivelinessMonitor, this.rmContext.getContainerTokenSecretManager(), - this.rmContext.getNMTokenSecretManager()) { + this.rmContext.getNMTokenSecretManager(), + this.getNodeLabelsUpdater()) { @Override protected void serviceStart() throws Exception { throw new Exception("ResourceTracker service failed"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index de17acd..250fcce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1944,7 +1944,8 @@ protected ResourceTrackerService createResourceTrackerService() { return new ResourceTrackerService(this.rmContext, this.nodesListManager, this.nmLivelinessMonitor, this.rmContext.getContainerTokenSecretManager(), - this.rmContext.getNMTokenSecretManager()) { + this.rmContext.getNMTokenSecretManager(), + this.getNodeLabelsUpdater()) { @Override protected void serviceStart() throws Exception { // send the container_finished event as soon as the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java index c837450..d05da1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java @@ -91,8 +91,8 @@ public void setUp() { nmTokenSecretManager.start(); resourceTrackerService = new ResourceTrackerService(context, nodesListManager, nmLivelinessMonitor, containerTokenSecretManager, - nmTokenSecretManager); - + nmTokenSecretManager, null); + resourceTrackerService.init(conf); resourceTrackerService.start(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java index dce3d06..a411ac3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -107,8 +107,8 @@ public void setUp() { nmTokenSecretManager.start(); resourceTrackerService = new ResourceTrackerService(context, nodesListManager, nmLivelinessMonitor, containerTokenSecretManager, - nmTokenSecretManager); - + nmTokenSecretManager, null); + resourceTrackerService.init(conf); resourceTrackerService.start(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java index 4f94695..c8f6c4f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java @@ -82,7 +82,7 @@ public void handle(Event event) { resourceTrackerService = new ResourceTrackerService(context, nodesListManager, new NMLivelinessMonitor(dispatcher), context.getContainerTokenSecretManager(), - context.getNMTokenSecretManager()); + context.getNMTokenSecretManager(), null); resourceTrackerService.init(conf); }