diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index f1baf5c..6425c35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1953,6 +1953,22 @@ public static boolean isDistributedNodeLabelConfiguration(Configuration conf) { NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE)); } + private static final String RM_NODE_LABELS_PREFIX = RM_PREFIX + + "node-labels."; + + public static final String RM_NODE_LABELS_FETCHER = + RM_NODE_LABELS_PREFIX + "fetcher"; + + private static final String RM_NODE_LABELS_FETCHER_PREFIX = + RM_NODE_LABELS_PREFIX + "fetcher."; + + public static final String RM_NODE_LABELS_FETCHER_FETCH_INTERVAL_MS = + RM_NODE_LABELS_FETCHER_PREFIX + "fetch-interval-ms"; + + //once in 10 mins + public static final long DEFAULT_RM_NODE_LABELS_FETCHER_FETCH_INTERVAL_MS = + 10 * 60 * 1000; + public YarnConfiguration() { super(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index 75e6cee..06a529c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -118,6 +118,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; @@ -371,7 +372,8 @@ protected ResourceTrackerService createResourceTrackerService() { return new CustomedResourceTrackerService(this.rmContext, this.nodesListManager, this.nmLivelinessMonitor, this.rmContext.getContainerTokenSecretManager(), - this.rmContext.getNMTokenSecretManager()); + this.rmContext.getNMTokenSecretManager(), + this.nodeLabelsProvider); } return super.createResourceTrackerService(); } @@ -724,9 +726,11 @@ public CustomedResourceTrackerService(RMContext rmContext, NodesListManager nodesListManager, NMLivelinessMonitor nmLivelinessMonitor, RMContainerTokenSecretManager containerTokenSecretManager, - NMTokenSecretManagerInRM nmTokenSecretManager) { + NMTokenSecretManagerInRM nmTokenSecretManager, + RMNodeLabelsProvider nodeLabelsProvider) { super(rmContext, nodesListManager, nmLivelinessMonitor, - containerTokenSecretManager, nmTokenSecretManager); + containerTokenSecretManager, nmTokenSecretManager, + nodeLabelsProvider); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/nodelabels/AbstractNodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/nodelabels/AbstractNodeLabelsProvider.java new file mode 100644 index 0000000..9eb974b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/nodelabels/AbstractNodeLabelsProvider.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodelabels; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * Provides base implementation of NodeLabelsProvider. + */ +public abstract class AbstractNodeLabelsProvider extends NodeLabelsProvider { + + private static final Log LOG = LogFactory + .getLog(AbstractNodeLabelsProvider.class); + + public static final long DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER = -1; + + protected long updateInterval; + + // Thread which runs periodically to update node labels + private Thread nodeLabelsUpdaterThread; + + private volatile boolean stopped; + + public static final String NODE_LABELS_SEPRATOR = ","; + + private NodeLabelsFetcher fetcher; + + public AbstractNodeLabelsProvider(String name) { + super(name); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.updateInterval = getNodeLabelFetchInterval(conf); + fetcher = getNodeLabelsFetcher(conf); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + if (updateInterval != DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER) { + nodeLabelsUpdaterThread = new Thread(new NodeLabelsUpdater()); + nodeLabelsUpdaterThread.setName("NodeLabelsUpdater"); + nodeLabelsUpdaterThread.start(); + } + super.serviceStart(); + } + + /** + * terminate the timer + * @throws Exception + */ + @Override + protected void serviceStop() throws Exception { + stopped = true; + if (nodeLabelsUpdaterThread != null) { + nodeLabelsUpdaterThread.interrupt(); + } + super.serviceStop(); + } + + private class NodeLabelsUpdater implements Runnable { + @Override + public void run() { + while (!stopped && !Thread.currentThread().isInterrupted()) { + try { + Set nodes = getNodesToUpdateLabels(); + updateNodeLabels(new YarnConfiguration(), nodes); + } catch (IOException e) { + LOG.warn("Failed to update node Labels", e); + } + + try { + synchronized (this) { + wait(updateInterval); + } + } catch (InterruptedException e) { + LOG.info(getName() + " thread interrupted"); + break; + } + } + } + } + + private void updateNodeLabels(Configuration conf, Set nodes) + throws IOException { + if (nodes != null) { + Map> labelsUpdated = fetcher.getNodeLabels(conf, nodes); + if (labelsUpdated != null && labelsUpdated.size() != 0) { + nodeLabelsUpdated(labelsUpdated); + } + } + } + + public abstract long getNodeLabelFetchInterval(Configuration conf); + + public abstract NodeLabelsFetcher getNodeLabelsFetcher(Configuration conf) + throws IOException; + + public abstract Set getNodesToUpdateLabels(); + + public abstract void nodeLabelsUpdated(Map> labelsUpdate) + throws IOException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/nodelabels/NodeLabelsFetcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/nodelabels/NodeLabelsFetcher.java new file mode 100644 index 0000000..43a3bed --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/nodelabels/NodeLabelsFetcher.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodelabels; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; + +public interface NodeLabelsFetcher { + public abstract Map> getNodeLabels(Configuration conf, + Set nodes); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/nodelabels/NodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/nodelabels/NodeLabelsProvider.java new file mode 100644 index 0000000..47d0746 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/nodelabels/NodeLabelsProvider.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodelabels; + +import java.util.Set; + +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.NodeLabel; + +/** + * Interface which will be responsible for fetching the labels + * + */ +public abstract class NodeLabelsProvider extends AbstractService { + + public NodeLabelsProvider(String name) { + super(name); + } + + /** + * Provides the labels. LabelProvider is expected to give same Labels + * continuously until there is a change in labels. + * If null is returned then Empty label set is assumed by the caller. + * + * @return Set of node label strings applicable for a node + */ + public abstract Set getNodeLabels(); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/nodelabels/NodeLabelsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/nodelabels/NodeLabelsUtils.java new file mode 100644 index 0000000..4cfd285 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/nodelabels/NodeLabelsUtils.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodelabels; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.NodeLabel; + +public class NodeLabelsUtils { + public static Set convertToNodeLabelSet(Set nodeLabels) { + if (null == nodeLabels) { + return null; + } + Set labels = new HashSet(); + for (String label : nodeLabels) { + labels.add(NodeLabel.newInstance(label)); + } + return labels; + } + + public static Set convertToStringSet(Set nodeLabels) { + if (null == nodeLabels) { + return null; + } + Set labels = new HashSet(); + for (NodeLabel label : nodeLabels) { + labels.add(label.getName()); + } + return labels; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index a06293d..e1df121 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -58,11 +58,11 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; -import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 30a2bd5..d4af3ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -74,13 +74,13 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; -import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.util.YarnVersionInfo; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProvider.java deleted file mode 100644 index dab3709..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProvider.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.nodemanager.nodelabels; - -import java.util.Set; - -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.NodeLabel; - -/** - * Interface which will be responsible for fetching the labels - * - */ -public abstract class NodeLabelsProvider extends AbstractService { - - public NodeLabelsProvider(String name) { - super(name); - } - - /** - * Provides the labels. LabelProvider is expected to give same Labels - * continuously until there is a change in labels. - * If null is returned then Empty label set is assumed by the caller. - * - * @return Set of node label strings applicable for a node - */ - public abstract Set getNodeLabels(); -} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java index 7e1bbd8..c28237e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java @@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; -import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; +import org.apache.hadoop.yarn.server.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.junit.After; import org.junit.Assert; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index e5bb6e5..3f4011a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -679,6 +679,7 @@ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( final String msg = "set node to labels."; checkAndThrowIfDistributedNodeLabelConfEnabled(operation); + checkAndThrowIfNodeLabelsFetcherConfigured(operation); UserGroupInformation user = checkAcls(operation); checkRMStatus(user.getShortUserName(), operation, msg); @@ -724,6 +725,19 @@ private void checkAndThrowIfDistributedNodeLabelConfEnabled(String operation) } } + private void checkAndThrowIfNodeLabelsFetcherConfigured(String operation) + throws IOException { + Configuration conf = getConfig(); + String fetcherString = + conf.get(YarnConfiguration.RM_NODE_LABELS_FETCHER, null); + if (fetcherString != null && fetcherString.trim().length() != 0) { + String msg = String.format("Error when invoke method=%s because of " + + "node labels provider is configured.", operation); + LOG.error(msg); + throw new IOException(msg); + } + } + @Override public CheckForDecommissioningNodesResponse checkForDecommissioningNodes( CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 817565b..7910e6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsProvider; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; @@ -159,6 +160,7 @@ private AppReportFetcher fetcher = null; protected ResourceTrackerService resourceTracker; private JvmPauseMonitor pauseMonitor; + protected RMNodeLabelsProvider nodeLabelsProvider; @VisibleForTesting protected String webAppAddress; @@ -253,6 +255,11 @@ protected void serviceInit(Configuration conf) throws Exception { rmContext.setYarnConfiguration(conf); + nodeLabelsProvider = createNodeLabelsProvider(conf); + if (nodeLabelsProvider != null) { + addService(nodeLabelsProvider); + } + createAndInitActiveServices(); webAppAddress = WebAppUtils.getWebAppBindURL(this.conf, @@ -1090,7 +1097,8 @@ protected ResourceTrackerService createResourceTrackerService() { return new ResourceTrackerService(this.rmContext, this.nodesListManager, this.nmLivelinessMonitor, this.rmContext.getContainerTokenSecretManager(), - this.rmContext.getNMTokenSecretManager()); + this.rmContext.getNMTokenSecretManager(), + this.nodeLabelsProvider); } protected ClientRMService createClientRMService() { @@ -1111,6 +1119,17 @@ protected RMSecretManagerService createRMSecretManagerService() { return new RMSecretManagerService(conf, rmContext); } + protected RMNodeLabelsProvider createNodeLabelsProvider(Configuration conf) + throws IOException { + String fetcherString = + conf.get(YarnConfiguration.RM_NODE_LABELS_FETCHER, null); + if (fetcherString == null || fetcherString.trim().length() == 0) { + return null; + } else { + return new RMNodeLabelsProvider(rmContext); + } + } + @Private public ClientRMService getClientRMService() { return this.clientRM; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 3c2c09b..8e11e08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; @@ -94,6 +95,7 @@ private final NMLivelinessMonitor nmLivelinessMonitor; private final RMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInRM nmTokenSecretManager; + private final RMNodeLabelsProvider nodeLabelsProvider; private long nextHeartBeatInterval; private Server server; @@ -109,14 +111,15 @@ public ResourceTrackerService(RMContext rmContext, NodesListManager nodesListManager, NMLivelinessMonitor nmLivelinessMonitor, RMContainerTokenSecretManager containerTokenSecretManager, - NMTokenSecretManagerInRM nmTokenSecretManager) { + NMTokenSecretManagerInRM nmTokenSecretManager, + RMNodeLabelsProvider nodeLabelsProvider) { super(ResourceTrackerService.class.getName()); this.rmContext = rmContext; this.nodesListManager = nodesListManager; this.nmLivelinessMonitor = nmLivelinessMonitor; this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; - + this.nodeLabelsProvider = nodeLabelsProvider; } @Override @@ -360,6 +363,8 @@ public RegisterNodeManagerResponse registerNodeManager( response.setDiagnosticsMessage(ex.getMessage()); response.setAreNodeLabelsAcceptedByRM(false); } + } else if (!isDistributedNodeLabelsConf && nodeLabelsProvider != null) { + nodeLabelsProvider.register(nodeId); } StringBuilder message = new StringBuilder(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsProvider.java new file mode 100644 index 0000000..fe19b80 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsProvider.java @@ -0,0 +1,122 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodelabels.AbstractNodeLabelsProvider; +import org.apache.hadoop.yarn.server.nodelabels.NodeLabelsFetcher; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; + +public class RMNodeLabelsProvider extends AbstractNodeLabelsProvider { + + private static final Log LOG = LogFactory.getLog(RMNodeLabelsProvider.class); + + private RMContext rmContext; + + private Set newlyRegisterNodes = new HashSet(); + + public RMNodeLabelsProvider(RMContext rmContext) { + super("ResourceManager NodeLabels Provider"); + this.rmContext = rmContext; + } + + @Override + public long getNodeLabelFetchInterval(Configuration conf) { + return conf.getLong(YarnConfiguration.RM_NODE_LABELS_FETCHER_FETCH_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NODE_LABELS_FETCHER_FETCH_INTERVAL_MS); + } + + @Override + public NodeLabelsFetcher getNodeLabelsFetcher(Configuration conf) + throws IOException { + NodeLabelsFetcher fetcher = null; + String fetcherString = conf.get(YarnConfiguration.RM_NODE_LABELS_FETCHER, + null); + if (fetcherString == null || fetcherString.trim().length() == 0) { + return fetcher; + } + switch (fetcherString.trim().toLowerCase()) { + // leave for shortcut configuration for common node label fetchers + default: + try { + Class labelsFetcherClass = conf.getClass( + YarnConfiguration.RM_NODE_LABELS_FETCHER, null, + NodeLabelsFetcher.class); + fetcher = labelsFetcherClass.newInstance(); + } catch (InstantiationException | IllegalAccessException + | RuntimeException e) { + LOG.error("Failed to create NodeLabelsFetcher based on Configuration", e); + throw new IOException("Failed to create NodeLabelsFetcher : " + + e.getMessage(), e); + } + } + + if (fetcher != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Node Labels fetcher class is : " + + fetcher.getClass().toString()); + } + } + + return fetcher; + } + + @Override + public Set getNodesToUpdateLabels() { + if (!newlyRegisterNodes.isEmpty()) { + synchronized(this) { + if (!newlyRegisterNodes.isEmpty()) { + Set nodes = new HashSet(newlyRegisterNodes); + newlyRegisterNodes.clear(); + return nodes; + } + } + } + + return Collections.unmodifiableSet(rmContext.getRMNodes().keySet()); + } + + @Override + public void nodeLabelsUpdated(Map> labelsUpdate) + throws IOException { + rmContext.getNodeLabelManager().replaceLabelsOnNode(labelsUpdate); + } + + @Override + public Set getNodeLabels() { + // No need to implement this method for Resource Manager + throw new UnsupportedOperationException(); + } + + public synchronized void register(NodeId node) { + newlyRegisterNodes.add(node); + notify(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index b1fa80a..d33f50f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -199,6 +199,18 @@ private void checkAndThrowIfDistributedNodeLabelConfEnabled(String operation) } } + private void checkAndThrowIfNodeLabelsFetcherConfigured(String operation) + throws IOException { + String fetcherString = + conf.get(YarnConfiguration.RM_NODE_LABELS_FETCHER, null); + if (fetcherString != null && fetcherString.trim().length() != 0) { + String msg = String.format("Error when invoke method=%s because of " + + "node labels provider is configured.", operation); + LOG.error(msg); + throw new IOException(msg); + } + } + RMWebServices(ResourceManager rm, Configuration conf, HttpServletResponse response) { this(rm, conf); @@ -890,6 +902,7 @@ private Response replaceLabelsOnNode( init(); checkAndThrowIfDistributedNodeLabelConfEnabled("replaceLabelsOnNode"); + checkAndThrowIfNodeLabelsFetcherConfigured("replaceLabelsOnNode"); UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); if (callerUGI == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 5080355..bd42e5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -622,7 +622,7 @@ protected ResourceTrackerService createResourceTrackerService() { nmTokenSecretManager.rollMasterKey(); return new ResourceTrackerService(getRMContext(), nodesListManager, this.nmLivelinessMonitor, containerTokenSecretManager, - nmTokenSecretManager) { + nmTokenSecretManager, this.nodeLabelsProvider) { @Override protected void serviceStart() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index 0200e85..64f2c7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -550,7 +550,8 @@ protected ResourceTrackerService createResourceTrackerService() { return new ResourceTrackerService(this.rmContext, this.nodesListManager, this.nmLivelinessMonitor, this.rmContext.getContainerTokenSecretManager(), - this.rmContext.getNMTokenSecretManager()) { + this.rmContext.getNMTokenSecretManager(), + this.nodeLabelsProvider) { @Override protected void serviceStart() throws Exception { throw new Exception("ResourceTracker service failed"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index de17acd..2e20123 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1944,7 +1944,8 @@ protected ResourceTrackerService createResourceTrackerService() { return new ResourceTrackerService(this.rmContext, this.nodesListManager, this.nmLivelinessMonitor, this.rmContext.getContainerTokenSecretManager(), - this.rmContext.getNMTokenSecretManager()) { + this.rmContext.getNMTokenSecretManager(), + this.nodeLabelsProvider) { @Override protected void serviceStart() throws Exception { // send the container_finished event as soon as the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java index c837450..1f8a45e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java @@ -91,7 +91,7 @@ public void setUp() { nmTokenSecretManager.start(); resourceTrackerService = new ResourceTrackerService(context, nodesListManager, nmLivelinessMonitor, containerTokenSecretManager, - nmTokenSecretManager); + nmTokenSecretManager, null); resourceTrackerService.init(conf); resourceTrackerService.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java index b525efc..85aedbd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -102,7 +102,7 @@ public void setUp() { nmTokenSecretManager.start(); resourceTrackerService = new ResourceTrackerService(context, nodesListManager, nmLivelinessMonitor, containerTokenSecretManager, - nmTokenSecretManager); + nmTokenSecretManager, null); resourceTrackerService.init(conf); resourceTrackerService.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java index 4f94695..c8f6c4f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java @@ -82,7 +82,7 @@ public void handle(Event event) { resourceTrackerService = new ResourceTrackerService(context, nodesListManager, new NMLivelinessMonitor(dispatcher), context.getContainerTokenSecretManager(), - context.getNMTokenSecretManager()); + context.getNMTokenSecretManager(), null); resourceTrackerService.init(conf); }