diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 33e8a1f..c3ecf2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1979,14 +1979,17 @@ private static void addDeprecatedKeys() { public static final String NODELABEL_CONFIGURATION_TYPE = NODE_LABELS_PREFIX + "configuration-type"; - public static final String CENTALIZED_NODELABEL_CONFIGURATION_TYPE = + public static final String CENTRALIZED_NODELABEL_CONFIGURATION_TYPE = "centralized"; - + + public static final String DELEGATED_CENTALIZED_NODELABEL_CONFIGURATION_TYPE = + "delegated-centralized"; + public static final String DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE = "distributed"; public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE = - CENTALIZED_NODELABEL_CONFIGURATION_TYPE; + CENTRALIZED_NODELABEL_CONFIGURATION_TYPE; public static final String MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY = YARN_PREFIX + "cluster.max-application-priority"; @@ -1999,6 +2002,20 @@ public static boolean isDistributedNodeLabelConfiguration(Configuration conf) { NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE)); } + @Private + public static boolean isCentralizedNodeLabelConfiguration( + Configuration conf) { + return CENTRALIZED_NODELABEL_CONFIGURATION_TYPE.equals(conf.get( + NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE)); + } + + @Private + public static boolean isDelegatedCentralizedNodeLabelConfiguration( + Configuration conf) { + return DELEGATED_CENTALIZED_NODELABEL_CONFIGURATION_TYPE.equals(conf.get( + NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE)); + } + private static final String NM_NODE_LABELS_PREFIX = NM_PREFIX + "node-labels."; @@ -2029,6 +2046,23 @@ public static boolean isDistributedNodeLabelConfiguration(Configuration conf) { public static final String NM_PROVIDER_CONFIGURED_NODE_LABELS = NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-labels"; + private static final String RM_NODE_LABELS_PREFIX = RM_PREFIX + + "node-labels."; + + public static final String RM_NODE_LABELS_PROVIDER_CONFIG = + RM_NODE_LABELS_PREFIX + "provider"; + + private static final String RM_NODE_LABELS_PROVIDER_PREFIX = + RM_NODE_LABELS_PREFIX + "provider."; + + //If -1 is configured then no timer task should be created + public static final String RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS = + RM_NODE_LABELS_PROVIDER_PREFIX + "fetch-interval-ms"; + + //once in 10 mins + public static final long DEFAULT_RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS = + 10 * 60 * 1000; + public static final String AM_BLACKLISTING_ENABLED = YARN_PREFIX + "am.blacklisting.enabled"; public static final boolean DEFAULT_AM_BLACKLISTING_ENABLED = true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index bcd64c3..ce1d7fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2095,7 +2095,7 @@ Set configuration type for node labels. Administrators can specify - "centralized" or "distributed". + "centralized", "delegated-centralized" or "distributed". yarn.node-labels.configuration-type centralized @@ -2148,6 +2148,23 @@ yarn.nodemanager.node-labels.provider.fetch-timeout-ms 1200000 + + + + + The class to use as the node labels fetcher by ResourceManager. + + yarn.resourcemanager.node-labels.provider + + + + + + The interval to use to update node labels by ResourceManager. + + yarn.resourcemanager.node-labels.provider.fetch-interval-ms + 600000 + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index ab46419..f071ea3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -120,7 +120,7 @@ private UserGroupInformation daemonUser; @VisibleForTesting - boolean isDistributedNodeLabelConfiguration = false; + boolean isCentralizedNodeLabelConfiguration = false; public AdminService(ResourceManager rm, RMContext rmContext) { super(AdminService.class.getName()); @@ -151,8 +151,8 @@ public void serviceInit(Configuration conf) throws Exception { .getCurrentUser()); rmId = conf.get(YarnConfiguration.RM_HA_ID); - isDistributedNodeLabelConfiguration = - YarnConfiguration.isDistributedNodeLabelConfiguration(conf); + isCentralizedNodeLabelConfiguration = + YarnConfiguration.isCentralizedNodeLabelConfiguration(conf); super.serviceInit(conf); } @@ -745,7 +745,7 @@ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( String operation = "replaceLabelsOnNode"; final String msg = "set node to labels."; - checkAndThrowIfDistributedNodeLabelConfEnabled(operation); + checkAndThrowIfCentralizedNodeLabelConfNotEnabled(operation); UserGroupInformation user = checkAcls(operation); checkRMStatus(user.getShortUserName(), operation, msg); @@ -780,12 +780,13 @@ private YarnException logAndWrapException(Exception exception, String user, return RPCUtil.getRemoteException(exception); } - private void checkAndThrowIfDistributedNodeLabelConfEnabled(String operation) - throws YarnException { - if (isDistributedNodeLabelConfiguration) { + private void checkAndThrowIfCentralizedNodeLabelConfNotEnabled( + String operation) throws YarnException { + if (!isCentralizedNodeLabelConfiguration) { String msg = - String.format("Error when invoke method=%s because of " - + "distributed node label configuration enabled.", operation); + String.format("Error when invoke method=%s because " + + "centralized node label configuration is not enabled.", + operation); LOG.error(msg); throw RPCUtil.getRemoteException(new IOException(msg)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index c71323f..ccc9643 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -94,6 +95,7 @@ private ResourceTrackerService resourceTrackerService; private ApplicationMasterService applicationMasterService; private RMNodeLabelsManager nodeLabelManager; + private RMNodeLabelsUpdater nodeLabelsUpdater; private long epoch; private Clock systemClock = new SystemClock(); private long schedulerRecoveryStartTime = 0; @@ -392,6 +394,18 @@ public void setNodeLabelManager(RMNodeLabelsManager mgr) { @Private @Unstable + public RMNodeLabelsUpdater getNodeLabelsUpdater() { + return nodeLabelsUpdater; + } + + @Private + @Unstable + public void setNodeLabelsUpdater(RMNodeLabelsUpdater nodeLabelsUpdater) { + this.nodeLabelsUpdater = nodeLabelsUpdater; + } + + @Private + @Unstable public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { this.schedulerRecoveryStartTime = systemClock.getTime(); this.schedulerRecoveryWaitTime = waitTime; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index b64c834..de6b5f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; @@ -118,6 +119,10 @@ void setRMApplicationHistoryWriter( public void setNodeLabelManager(RMNodeLabelsManager mgr); + RMNodeLabelsUpdater getNodeLabelsUpdater(); + + public void setNodeLabelsUpdater(RMNodeLabelsUpdater nodeLabelsUpdater); + long getEpoch(); ReservationSystem getReservationSystem(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 840cea7..c40a620 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; @@ -400,6 +401,16 @@ public void setNodeLabelManager(RMNodeLabelsManager mgr) { activeServiceContext.setNodeLabelManager(mgr); } + @Override + public RMNodeLabelsUpdater getNodeLabelsUpdater() { + return activeServiceContext.getNodeLabelsUpdater(); + } + + @Override + public void setNodeLabelsUpdater(RMNodeLabelsUpdater nodeLabelsUpdater) { + activeServiceContext.setNodeLabelsUpdater(nodeLabelsUpdater); + } + public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index d6d9629..a63bcc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; @@ -440,6 +441,12 @@ protected void serviceInit(Configuration configuration) throws Exception { addService(nlm); rmContext.setNodeLabelManager(nlm); + RMNodeLabelsUpdater nodeLabelsUpdater = createNodeLabelsUpdater(conf); + if (nodeLabelsUpdater != null) { + addService(nodeLabelsUpdater); + rmContext.setNodeLabelsUpdater(nodeLabelsUpdater); + } + boolean isRecoveryEnabled = conf.getBoolean( YarnConfiguration.RECOVERY_ENABLED, YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); @@ -1111,6 +1118,17 @@ protected RMSecretManagerService createRMSecretManagerService() { return new RMSecretManagerService(conf, rmContext); } + /** + * Create NodeLabelsUpdater based on configuration. + */ + protected RMNodeLabelsUpdater createNodeLabelsUpdater(Configuration conf) { + if (YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(conf)) { + return new RMNodeLabelsUpdater(rmContext); + } else { + return null; + } + } + @Private public ClientRMService getClientRMService() { return this.clientRM; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 7e774c5..b1084c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; @@ -105,6 +106,7 @@ private int minAllocVcores; private boolean isDistributedNodeLabelsConf; + private boolean isDelegatedCentralizedNodeLabelsConf; public ResourceTrackerService(RMContext rmContext, NodesListManager nodesListManager, @@ -151,6 +153,8 @@ protected void serviceInit(Configuration conf) throws Exception { isDistributedNodeLabelsConf = YarnConfiguration.isDistributedNodeLabelConfiguration(conf); + isDelegatedCentralizedNodeLabelsConf = YarnConfiguration + .isDelegatedCentralizedNodeLabelConfiguration(conf); super.serviceInit(conf); } @@ -241,17 +245,6 @@ void handleNMContainerStatus(NMContainerStatus containerStatus, NodeId nodeId) { } } - static Set convertToStringSet(Set nodeLabels) { - if (null == nodeLabels) { - return null; - } - Set labels = new HashSet(); - for (NodeLabel label : nodeLabels) { - labels.add(label.getName()); - } - return labels; - } - @SuppressWarnings("unchecked") @Override public RegisterNodeManagerResponse registerNodeManager( @@ -353,7 +346,8 @@ public RegisterNodeManagerResponse registerNodeManager( } // Update node's labels to RM's NodeLabelManager. - Set nodeLabels = convertToStringSet(request.getNodeLabels()); + Set nodeLabels = NodeLabelsUtils.convertToStringSet( + request.getNodeLabels()); if (isDistributedNodeLabelsConf && nodeLabels != null) { try { updateNodeLabelsFromNMReport(nodeLabels, nodeId); @@ -363,6 +357,8 @@ public RegisterNodeManagerResponse registerNodeManager( response.setDiagnosticsMessage(ex.getMessage()); response.setAreNodeLabelsAcceptedByRM(false); } + } else if (isDelegatedCentralizedNodeLabelsConf) { + this.rmContext.getNodeLabelsUpdater().updateNodeLabels(nodeId); } StringBuilder message = new StringBuilder(); @@ -477,7 +473,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) if (isDistributedNodeLabelsConf && request.getNodeLabels() != null) { try { updateNodeLabelsFromNMReport( - convertToStringSet(request.getNodeLabels()), nodeId); + NodeLabelsUtils.convertToStringSet(request.getNodeLabels()), nodeId); nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(true); } catch (IOException ex) { //ensure the error message is captured and sent across in response diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsProvider.java new file mode 100644 index 0000000..8a6211c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsProvider.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; + +/** + * Interface which is responsible for providing the labels. + */ +public interface NodeLabelsProvider { + + /** + * Provides the labels. LabelProvider is expected to give same Labels + * continuously until there is a change in labels. + * + * @param nodes to fetch labels + * @return Set of node label strings applicable for a node + */ + public Map> getNodeLabels(Set nodes); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsUtils.java new file mode 100644 index 0000000..0c256d9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsUtils.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.NodeLabel; + +/** + * Node labels utilities. + */ +public class NodeLabelsUtils { + + private NodeLabelsUtils() { /* Hidden constructor */ } + + public static Set convertToStringSet(Set nodeLabels) { + if (null == nodeLabels) { + return null; + } + Set labels = new HashSet(); + for (NodeLabel label : nodeLabels) { + labels.add(label.getName()); + } + return labels; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsUpdater.java new file mode 100644 index 0000000..8308eba --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsUpdater.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; + +import com.google.common.collect.ImmutableSet; + +/** + * Update Nodelabels for Resource Manager periodically. + */ +public class RMNodeLabelsUpdater extends AbstractService { + + private static final Log LOG = LogFactory + .getLog(RMNodeLabelsUpdater.class); + + public static final long DISABLE_NODE_LABELS_UPDATER_TIMER = -1; + + private long updateInterval; + + // Thread which runs periodically to update node labels + private Thread nodeLabelsUpdaterThread; + + private volatile boolean stopped; + + private NodeLabelsProvider nodeLabelsProvider; + + private RMContext rmContext; + + public RMNodeLabelsUpdater(RMContext rmContext) { + super("RMNodeLabelsUpdater"); + this.rmContext = rmContext; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + updateInterval = conf.getLong( + YarnConfiguration.RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS); + nodeLabelsProvider = createNodeLabelsProvider(conf); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + if (updateInterval != DISABLE_NODE_LABELS_UPDATER_TIMER) { + nodeLabelsUpdaterThread = new Thread(new NodeLabelsUpdaterThread()); + nodeLabelsUpdaterThread.setName("NodeLabelsUpdaterThread"); + nodeLabelsUpdaterThread.start(); + } + super.serviceStart(); + } + + /** + * Terminate the timer. + * @throws Exception + */ + @Override + protected void serviceStop() throws Exception { + stopped = true; + if (nodeLabelsUpdaterThread != null) { + nodeLabelsUpdaterThread.interrupt(); + } + super.serviceStop(); + } + + private class NodeLabelsUpdaterThread implements Runnable { + @Override + public void run() { + while (!stopped && !Thread.currentThread().isInterrupted()) { + try { + Set nodes = Collections.unmodifiableSet( + rmContext.getRMNodes().keySet()); + updateNodeLabelsInternal(nodes); + } catch (IOException e) { + LOG.warn("Failed to update node Labels", e); + } + + try { + Thread.sleep(updateInterval); + } catch (InterruptedException e) { + LOG.info(getName() + " thread interrupted"); + break; + } + } + } + } + + private void updateNodeLabelsInternal(Set nodes) + throws IOException { + if (nodes != null && nodes.size() != 0) { + Map> labelsUpdated = + nodeLabelsProvider.getNodeLabels(nodes); + if (labelsUpdated != null && labelsUpdated.size() != 0) { + nodeLabelsUpdated(labelsUpdated); + } + } + } + + /** + * Update the node -> labels map. + */ + private void nodeLabelsUpdated(Map> labelsUpdated) + throws IOException { + Map> nodeToLabels = + new HashMap>(labelsUpdated.size()); + for (Map.Entry> entry : labelsUpdated.entrySet()) { + nodeToLabels.put(entry.getKey(), + NodeLabelsUtils.convertToStringSet(entry.getValue())); + } + rmContext.getNodeLabelManager().replaceLabelsOnNode(nodeToLabels); + } + + /** + * Get the NodeLabelsProvider which is used to provide node labels. + */ + private NodeLabelsProvider createNodeLabelsProvider(Configuration conf) + throws IOException { + NodeLabelsProvider nodeLabelsProvider = null; + try { + Class labelsProviderClass = conf.getClass( + YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG, null, + NodeLabelsProvider.class); + if (labelsProviderClass != null) { + nodeLabelsProvider = labelsProviderClass.newInstance(); + } + } catch (InstantiationException | IllegalAccessException + | RuntimeException e) { + LOG.error("Failed to create NodeLabelsProvider based on Configuration", e); + throw new IOException("Failed to create NodeLabelsProvider : " + + e.getMessage(), e); + } + + if (nodeLabelsProvider == null) { + String msg = "NodeLabelsProvider should be configured when " + + "delegated-centralized node label configuration is enabled"; + LOG.error(msg); + throw new IOException(msg); + } else if (LOG.isDebugEnabled()) { + LOG.debug("Node Labels provider class is : " + + nodeLabelsProvider.getClass().toString()); + } + + return nodeLabelsProvider; + } + + /** + * Update node labels for a specified node. + * @param node the node to update node labels + */ + public void updateNodeLabels(NodeId node) { + try { + updateNodeLabelsInternal(ImmutableSet.of(node)); + } catch (IOException e) { + LOG.warn("Failed to update node Labels for node: " + node, e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 2410053..928216b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -177,7 +177,7 @@ private @Context HttpServletResponse response; @VisibleForTesting - boolean isDistributedNodeLabelConfiguration = false; + boolean isCentralizedNodeLabelConfiguration = false; public final static String DELEGATION_TOKEN_HEADER = "Hadoop-YARN-RM-Delegation-Token"; @@ -186,16 +186,17 @@ public RMWebServices(final ResourceManager rm, Configuration conf) { this.rm = rm; this.conf = conf; - isDistributedNodeLabelConfiguration = - YarnConfiguration.isDistributedNodeLabelConfiguration(conf); + isCentralizedNodeLabelConfiguration = + YarnConfiguration.isCentralizedNodeLabelConfiguration(conf); } - private void checkAndThrowIfDistributedNodeLabelConfEnabled(String operation) - throws IOException { - if (isDistributedNodeLabelConfiguration) { + private void checkAndThrowIfCentralizedNodeLabelConfNotEnabled( + String operation) throws IOException { + if (!isCentralizedNodeLabelConfiguration) { String msg = - String.format("Error when invoke method=%s because of " - + "distributed node label configuration enabled.", operation); + String.format("Error when invoke method=%s because " + + "centralized node label configuration is not enabled.", + operation); LOG.error(msg); throw new IOException(msg); } @@ -892,7 +893,7 @@ private Response replaceLabelsOnNode( String operation) throws IOException { init(); - checkAndThrowIfDistributedNodeLabelConfEnabled("replaceLabelsOnNode"); + checkAndThrowIfCentralizedNodeLabelConfNotEnabled("replaceLabelsOnNode"); UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); if (callerUGI == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java index 7e75cfa..e61d9fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java @@ -872,12 +872,12 @@ public void testModifyLabelsOnNodesWithDistributedConfigurationDisabled() } @Test(expected = YarnException.class) - public void testModifyLabelsOnNodesWithDistributedConfigurationEnabled() + public void testModifyLabelsOnNodesWithCentralizedConfigurationDisabled() throws IOException, YarnException { // create RM and set it's ACTIVE, and set distributed node label // configuration to true MockRM rm = new MockRM(); - rm.adminService.isDistributedNodeLabelConfiguration = true; + rm.adminService.isCentralizedNodeLabelConfiguration = false; ((RMContextImpl) rm.getRMContext()) .setHAServiceState(HAServiceState.ACTIVE); @@ -893,14 +893,14 @@ public void testModifyLabelsOnNodesWithDistributedConfigurationEnabled() } @Test - public void testRemoveClusterNodeLabelsWithDistributedConfigurationEnabled() + public void testRemoveClusterNodeLabelsWithCentralizedConfigurationDisabled() throws IOException, YarnException { // create RM and set it's ACTIVE MockRM rm = new MockRM(); ((RMContextImpl) rm.getRMContext()) .setHAServiceState(HAServiceState.ACTIVE); RMNodeLabelsManager labelMgr = rm.rmContext.getNodeLabelManager(); - rm.adminService.isDistributedNodeLabelConfiguration = true; + rm.adminService.isCentralizedNodeLabelConfiguration = false; // by default, distributed configuration for node label is disabled, this // should pass diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 94a0e4c..e42ed91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -362,7 +363,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { Assert.assertEquals("Action should be normal on valid Node Labels", NodeAction.NORMAL, response.getNodeAction()); assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId), - ResourceTrackerService.convertToStringSet(registerReq.getNodeLabels())); + NodeLabelsUtils.convertToStringSet(registerReq.getNodeLabels())); Assert.assertTrue("Valid Node Labels were not accepted by RM", response.getAreNodeLabelsAcceptedByRM()); rm.stop(); @@ -590,7 +591,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { Assert.assertEquals("InValid Node Labels were not accepted by RM", NodeAction.NORMAL, nodeHeartbeatResponse.getNodeAction()); assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId), - ResourceTrackerService.convertToStringSet(heartbeatReq.getNodeLabels())); + NodeLabelsUtils.convertToStringSet(heartbeatReq.getNodeLabels())); Assert.assertTrue("Valid Node Labels were not accepted by RM", nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsUpdater.java new file mode 100644 index 0000000..6acf17a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsUpdater.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.YarnVersionInfo; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; + +public class TestRMNodeLabelsUpdater extends NodeLabelTestBase { + private YarnConfiguration conf; + private static Map> nodeLabelsMap = Maps.newHashMap(); + + @Before + public void setup() { + conf = new YarnConfiguration(); + conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE, + YarnConfiguration.DELEGATED_CENTALIZED_NODELABEL_CONFIGURATION_TYPE); + conf.setClass(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG, + DummyNodeLabelsProvider.class, NodeLabelsProvider.class); + } + + @Test + public void testNodeLabelsProviderConfiguration() { + conf.unset(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG); + try { + MockRM rm = new MockRM(conf); + rm.init(conf); + rm.start(); + Assert.fail("Expected an exception"); + } catch (Exception e) { + // expected + } + } + + @Test + public void testNodeLabelsUpdaterTimer() throws Exception { + conf.setLong(YarnConfiguration.RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS, + 1000); + MockRM rm = new MockRM(conf); + rm.init(conf); + rm.start(); + + RMNodeLabelsManager mgr = rm.getRMContext().getNodeLabelManager(); + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); + + NodeId nodeId = toNodeId("h1:1234"); + assertEquals(0, mgr.getLabelsOnNode(nodeId).size()); + updateNodeLabels(nodeId, "x"); + registerNode(rm, nodeId); + assertCollectionEquals(ImmutableSet.of("x"), mgr.getLabelsOnNode(nodeId)); + + // Ensure that node labels are updated if NodeLabelsProvider + // gives different labels + updateNodeLabels(nodeId, "y"); + Thread.sleep(1500); + assertCollectionEquals(ImmutableSet.of("y"), mgr.getLabelsOnNode(nodeId)); + + rm.stop(); + } + + @Test + public void testNodeLabelsUpdaterForNoTimer() throws Exception { + conf.setLong(YarnConfiguration.RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS, + RMNodeLabelsUpdater.DISABLE_NODE_LABELS_UPDATER_TIMER); + MockRM rm = new MockRM(conf); + rm.init(conf); + rm.start(); + + RMNodeLabelsManager mgr = rm.getRMContext().getNodeLabelManager(); + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x")); + + NodeId nodeId = toNodeId("h1:1234"); + updateNodeLabels(nodeId, "x"); + registerNode(rm, nodeId); + // Ensure that even though timer is not run, node labels are fetched + // when node is registered + assertCollectionEquals(ImmutableSet.of("x"), mgr.getLabelsOnNode(nodeId)); + + rm.stop(); + } + + private void registerNode(ResourceManager rm, NodeId nodeId) + throws YarnException, IOException { + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest req = + Records.newRecord(RegisterNodeManagerRequest.class); + Resource capability = BuilderUtils.newResource(1024, 1); + req.setResource(capability); + req.setNodeId(nodeId); + req.setHttpPort(1234); + req.setNMVersion(YarnVersionInfo.getVersion()); + resourceTrackerService.registerNodeManager(req); + } + + private void updateNodeLabels(NodeId nodeId, String... nodeLabelsStr) { + nodeLabelsMap.put(nodeId, toNodeLabelSet(nodeLabelsStr)); + } + + public static class DummyNodeLabelsProvider implements NodeLabelsProvider { + @Override + public Map> getNodeLabels(Set nodes) { + Map> nodeLabels = Maps.newHashMap(); + for(NodeId node : nodes) { + nodeLabels.put(node, nodeLabelsMap.get(node)); + } + return nodeLabels; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java index 53a9902..472dab6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java @@ -445,8 +445,8 @@ public void testNodeLabels() throws JSONException, Exception { .post(ClientResponse.class); LOG.info("posted node nodelabel"); - //setting rmWebService for Distributed NodeLabel Configuration - rmWebService.isDistributedNodeLabelConfiguration = true; + //setting rmWebService for non Centralized NodeLabel Configuration + rmWebService.isCentralizedNodeLabelConfiguration = false; // Case1 : Replace labels using node-to-labels ntli = new NodeToLabelsEntryList();