diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeLabelsFetcherService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeLabelsFetcherService.java new file mode 100644 index 0000000..fdcb1ab --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeLabelsFetcherService.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.nodelabel.NodeLabelsProvider; +import org.apache.hadoop.yarn.server.nodemanager.nodelabel.ScriptBasedNodeLabelsProvider; + +/** + * The class which provides functionality of fetching the labels of the node and + * providing them back to the service requesting it + * + */ +public class NodeLabelsFetcherService extends CompositeService { + private static Log LOG = LogFactory.getLog(NodeLabelsFetcherService.class); + + private NodeLabelsProvider labelsProvider; + + public NodeLabelsFetcherService() { + super("Node Label Fetcher Service"); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.labelsProvider = getNodeLabelsProvider(conf); + super.serviceInit(conf); + } + + public Set getNodeLabels() { + return labelsProvider.getNodeLabels(); + } + + private NodeLabelsProvider getNodeLabelsProvider(Configuration conf) { + String labelsFetchScript = + conf.get(YarnConfiguration.NM_LABELS_FETCH_SCRIPT_PATH); + if (null == labelsFetchScript) { + LOG.info("Node Labels will be picked from Configuration"); + return new ConfigurationNodeLabelsProvider(conf); + } else { + LOG.info("Script is configured forfetching Node Labels"); + ScriptBasedNodeLabelsProvider nodeLabelsProvider = + new ScriptBasedNodeLabelsProvider(); + addService(nodeLabelsProvider); + return nodeLabelsProvider; + } + } + + /** + * If script is not configured then get the labels from the configuration file + */ + private class ConfigurationNodeLabelsProvider implements NodeLabelsProvider { + private Set nodeLabels; + + public ConfigurationNodeLabelsProvider(Configuration conf) { + String[] nodeLabelsArr = + StringUtils + .getStrings(conf.get(YarnConfiguration.NM_NODE_LABELS, "")); + nodeLabels = new HashSet(Arrays.asList(nodeLabelsArr)); + } + + @Override + public Set getNodeLabels() { + return nodeLabels; + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabel/NodeLabelsProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabel/NodeLabelsProvider.java new file mode 100644 index 0000000..a5c4125 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabel/NodeLabelsProvider.java @@ -0,0 +1,30 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.nodelabel; + +import java.util.Set; + +/** + * Interace which will be responsible for fetching the labels + * + */ +public interface NodeLabelsProvider { + public Set getNodeLabels(); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabel/ScriptBasedNodeLabelsProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabel/ScriptBasedNodeLabelsProvider.java new file mode 100644 index 0000000..84e01ff --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabel/ScriptBasedNodeLabelsProvider.java @@ -0,0 +1,375 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.nodelabel; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.Shell.ExitCodeException; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.nodelabel.ScriptExecutor.ScriptOutputReceiver; + +/** + * + * The class which provides functionality of fetching the labels of the node + * using the configured node labels script. + */ +public class ScriptBasedNodeLabelsProvider extends AbstractService implements + ScriptOutputReceiver, NodeLabelsProvider { + + private static Log LOG = LogFactory.getLog(ScriptBasedNodeLabelsProvider.class); + + /** Absolute path to the health script. */ + private String nodeLabelScript; + /** Delay after which node health script to be executed */ + private long intervalTime; + /** Time after which the script should be timedout */ + private long scriptTimeout; + + /** Pattern used for searching in the output of the node health script */ + static private final String LABEL_PATTERN = "LABEL"; + + private ScriptExecutor scriptExecutor; + + private Set nodeLabels; + + protected final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + protected final Lock readLock = readWriteLock.readLock(); + protected final Lock writeLock = readWriteLock.writeLock(); + + public ScriptBasedNodeLabelsProvider() { + super(ScriptBasedNodeLabelsProvider.class.getName()); + } + + /* + * Method which initializes the values for the script path and interval time. + */ + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.nodeLabelScript = + conf.get(YarnConfiguration.NM_LABELS_FETCH_SCRIPT_PATH); + this.intervalTime = conf.getLong(YarnConfiguration.NM_LABELS_FETCH_INTERVAL_MS, + YarnConfiguration.DEFAULT_NM_LABELS_FETCH_INTERVAL_MS); + this.scriptTimeout = conf.getLong( + YarnConfiguration.NM_LABELS_FETCH_SCRIPT_TIMEOUT_MS, + YarnConfiguration.DEFAULT_NM_LABELS_FETCH_SCRIPT_TIMEOUT_MS); + String[] args = conf.getStrings(YarnConfiguration.NM_LABELS_FETCH_SCRIPT_OPTS, + new String[] {}); + scriptExecutor = + new ScriptExecutor(nodeLabelScript, args, scriptTimeout, + new String[] { LABEL_PATTERN }, this); + super.serviceInit(conf); + } + + /** + * Method used to start the Node Label Fetcher. + * + */ + @Override + protected void serviceStart() throws Exception { + scriptExecutor.startTimer("NodeLabelsFetcher-Timer", intervalTime); + super.serviceStart(); + } + + /** + * Method used to terminate the Node Label Fetcher service. + * + */ + @Override + protected void serviceStop() { + scriptExecutor.stopTimer(); + } + + /** + * @return Returns output from node labels fetcher script. + */ + public Set getNodeLabels() { + readLock.lock(); + try{ + return nodeLabels; + }finally{ + readLock.unlock(); + } + } + + /** + * Method used to determine if or not node health monitoring service should be + * started or not. Returns true if following conditions are met: + * + *
    + *
  1. Path to Node health check script is not empty
  2. + *
  3. Node health check script file exists
  4. + *
+ * + * @param conf + * @return true if node health monitoring service can be started. + */ + public static boolean shouldRun(Configuration conf) { + String nodeHealthScript = + conf.get(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_PATH); + if (nodeHealthScript == null || nodeHealthScript.trim().isEmpty()) { + return false; + } + File f = new File(nodeHealthScript); + return f.exists() && FileUtil.canExecute(f); + } + + /** + * Used only by tests to access the timer task directly + * @return the timer task + */ + TimerTask getTimerTask() { + return scriptExecutor; + } + + @Override + public void parseScriptOutput(Map scriptOutPut) { + String labelString = scriptOutPut.get(LABEL_PATTERN); + String[] nodeLabelsFromScript = + ((labelString == null) ? null : labelString.split(",", 0)); + if (!arelabelsSameAsPrevOutPut(nodeLabelsFromScript)) { + writeLock.lock(); + try { + nodeLabels = new HashSet(Arrays.asList(nodeLabelsFromScript)); + } finally { + writeLock.unlock(); + } + } + } + + private boolean arelabelsSameAsPrevOutPut(String[] nodeLabelsFromScript){ + if(nodeLabelsFromScript.length!=nodeLabels.size()){ + return false; + } + for(String label:nodeLabelsFromScript) + { + if(! nodeLabels.contains(label)){ + return false; + } + } + return true; + } + + @Override + public void scriptExecutionFailed(String failureMessage, Exception ex) { + LOG.warn("Failed to fetch Node Labels from Script " + nodeLabelScript + + " : " + failureMessage); + if(null!=ex){ + LOG.debug("Node Labels Exception trace :",ex); + } + } +} + + +/** + * Class which is used by the {@link Timer} class to periodically execute the + * node label script. + * + */ +class ScriptExecutor extends TimerTask { + private static enum ScriptExitStatus { + SUCCESS, TIMED_OUT, FAILED_WITH_EXIT_CODE, FAILED_WITH_EXCEPTION, + } + + /** Time out error message */ + static final String SCRIPT_TIMED_OUT_MSG = " script timed out"; + + /** Script execution failed */ + static final String SCRIPT_EXEC_FAILED_MSG = " script execution failed"; + + public static interface ScriptOutputReceiver { + /** + * Map which contains entry with matched pattern provided to ScriptExecutor + * as the key and the line from the output as the value. needs t + * + * @param scriptOutPut + */ + public void parseScriptOutput(Map scriptOutPut); + + /** + * This method will be called if the execution failed or timed out + * + * @param failureMessage + * @param ex + */ + public void scriptExecutionFailed(String failureMessage, Exception ex); + } + + /** */ + + Exception exceptionThrown; + + /** Timer used to schedule node health monitoring script execution */ + private Timer scriptExecuteScheduler; + + /** ShellCommandExecutor used to execute monitoring script */ + ShellCommandExecutor shexec = null; + + private String[] outputPatterns; + + private ScriptOutputReceiver receiver; + + private String scriptPath; + + /** + * @param scriptArgs + * @param scriptPath - Absolute path to the script to be executed. + * @param intervalTime + * @param scriptTimeout + * @param patterns + */ + public ScriptExecutor(String scriptPath, String[] scriptArgs, + long scriptTimeout, String[] outputPatterns, ScriptOutputReceiver receiver) { + this.scriptPath=scriptPath; + this.outputPatterns=outputPatterns; + this.receiver=receiver; + ArrayList execScript = new ArrayList(); + execScript.add(scriptPath); + if (scriptArgs != null) { + execScript.addAll(Arrays.asList(scriptArgs)); + } + shexec = + new ShellCommandExecutor(execScript.toArray(new String[execScript + .size()]), null, null, scriptTimeout); + } + + public void startTimer(String timerName,long intervalTime){ + scriptExecuteScheduler = new Timer(timerName, true); + // Start the timer task immediately and + // then periodically at interval time. + scriptExecuteScheduler.scheduleAtFixedRate(this, 0, intervalTime); + } + + public void stopTimer() { + if (scriptExecuteScheduler != null) { + scriptExecuteScheduler.cancel(); + } + if (shexec != null) { + Process p = shexec.getProcess(); + if (p != null) { + p.destroy(); + } + } + } + + @Override + public void run() { + executeScript(); + } + + protected void executeScript() { + ScriptExitStatus status = ScriptExitStatus.SUCCESS; + try { + shexec.execute(); + } catch (ExitCodeException e) { + // ignore the exit code of the script + status = ScriptExitStatus.FAILED_WITH_EXIT_CODE; + // On Windows, we will not hit the Stream closed IOException + // thrown by stdout buffered reader for timeout event. + if (Shell.WINDOWS && shexec.isTimedOut()) { + status = ScriptExitStatus.TIMED_OUT; + } + exceptionThrown=e; + } catch (Exception e) { + if (!shexec.isTimedOut()) { + status = ScriptExitStatus.FAILED_WITH_EXCEPTION; + } else { + status = ScriptExitStatus.TIMED_OUT; + } + exceptionThrown=e; + } finally { + reportScriptStatus(status, shexec.getOutput()); + } + } + + /** + * Method which is used to parse output from the node health monitor and send + * to the report address. + * + * The timed out script or script which causes IOException output is ignored. + * + * The node is marked unhealthy if + *
    + *
  1. The node health script times out
  2. + *
  3. The node health scripts output has a line which begins with ERROR
  4. + *
  5. An exception is thrown while executing the script
  6. + *
+ * If the script throws {@link IOException} or {@link ExitCodeException} the + * output is ignored and node is left remaining healthy, as script might have + * syntax error. + * + * @param status + */ + void reportScriptStatus(ScriptExitStatus status,String scriptOutut) { + switch (status) { + case SUCCESS: + if (status == ScriptExitStatus.SUCCESS) { + receiver.parseScriptOutput(matchPatterns(shexec.getOutput())); + } + break; + case TIMED_OUT: + receiver.scriptExecutionFailed(scriptPath+ SCRIPT_TIMED_OUT_MSG,null); + break; + case FAILED_WITH_EXCEPTION: + case FAILED_WITH_EXIT_CODE: + receiver.scriptExecutionFailed(scriptPath + SCRIPT_EXEC_FAILED_MSG, + exceptionThrown); + break; + } + } + + /** + * Method which collect lines from the output string which begins with + * Patterns provided. + * + * @param scriptOutput string + * @return true if output string has error pattern in it. + */ + private Map matchPatterns(String scriptOutput) { + Map scriptExecutorOutput=new HashMap(); + String[] splits = scriptOutput.split("\n"); + for (String split : splits) { + for(String pattern:outputPatterns){ + if (split.startsWith(pattern)) { + scriptExecutorOutput.put(pattern, split); + break; + } + } + } + return scriptExecutorOutput; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index a81c1a6..6d91802 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1463,6 +1463,9 @@ */ public static final String RM_NODE_LABELS_MANAGER_CLASS = NODE_LABELS_PREFIX + "manager-class"; + + public static final String RM_NODE_LABELS_VALIDATE_WITH_CENTRALIZED_LABELS = + NODE_LABELS_PREFIX + "validate.with.centralized.labels"; /** URI for NodeLabelManager */ public static final String FS_NODE_LABELS_STORE_ROOT_DIR = NODE_LABELS_PREFIX @@ -1472,6 +1475,29 @@ public static final String DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC = "2000, 500"; + /** Configurations in NodeManager for NodeLabelsFeature*/ + public static final String NM_NODE_LABELS = NODE_LABELS_PREFIX + + "nm.labels"; + public static final String NM_LABELS_FETCH_PREFIX = NODE_LABELS_PREFIX + + "nm.labels-fetcher."; + public static final String NM_LABELS_FETCH_SCRIPT_PATH = + NM_LABELS_FETCH_PREFIX + "script.path"; + + public static final String NM_LABELS_FETCH_INTERVAL_MS = + NM_LABELS_FETCH_PREFIX + "interval-ms"; + + public static final long DEFAULT_NM_LABELS_FETCH_INTERVAL_MS = 10 * 60 * 1000; + + public static final String NM_LABELS_FETCH_SCRIPT_TIMEOUT_MS = + NM_LABELS_FETCH_PREFIX + "script.timeout-ms"; + + public static final long DEFAULT_NM_LABELS_FETCH_SCRIPT_TIMEOUT_MS = + DEFAULT_NM_LABELS_FETCH_INTERVAL_MS * 2; + + public static final String NM_LABELS_FETCH_SCRIPT_OPTS = + NM_LABELS_FETCH_PREFIX + "script.opts"; + /** End of Configurations in NodeManager for NodeLabelsFeature*/ + public YarnConfiguration() { super(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java index 8885769..ecc8d9d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java @@ -60,7 +60,7 @@ public void testResourceTrackerOnHA() throws Exception { // make sure registerNodeManager works when failover happens RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, 0, resource, - YarnVersionInfo.getVersion(), null, null); + YarnVersionInfo.getVersion(), null, null,null); resourceTracker.registerNodeManager(request); Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId)); @@ -70,7 +70,7 @@ public void testResourceTrackerOnHA() throws Exception { NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null, null, null); NodeHeartbeatRequest request2 = - NodeHeartbeatRequest.newInstance(status, null, null); + NodeHeartbeatRequest.newInstance(status, null, null,null); resourceTracker.nodeHeartbeat(request2); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java index ee1b945..7651869 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java @@ -40,14 +40,15 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.nodelabels.event.StoreNewClusterNodeLabels; import org.apache.hadoop.yarn.nodelabels.event.NodeLabelsStoreEvent; import org.apache.hadoop.yarn.nodelabels.event.NodeLabelsStoreEventType; import org.apache.hadoop.yarn.nodelabels.event.RemoveClusterNodeLabels; +import org.apache.hadoop.yarn.nodelabels.event.StoreNewClusterNodeLabels; import org.apache.hadoop.yarn.nodelabels.event.UpdateNodeToLabelsMappingsEvent; import org.apache.hadoop.yarn.util.resource.Resources; @@ -81,6 +82,7 @@ protected final WriteLock writeLock; protected NodeLabelsStore store; + private boolean verifyWithCentralizedValidLabels; protected static class Label { public Resource resource; @@ -197,7 +199,10 @@ protected void initDispatcher(Configuration conf) { @Override protected void serviceInit(Configuration conf) throws Exception { initNodeLabelStore(conf); - + this.verifyWithCentralizedValidLabels = + conf.getBoolean( + YarnConfiguration.RM_NODE_LABELS_VALIDATE_WITH_CENTRALIZED_LABELS, + true); labelCollections.put(NO_LABEL, new Label()); } @@ -282,16 +287,18 @@ protected void checkAddLabelsToNode( return; } - // check all labels being added existed - Set knownLabels = labelCollections.keySet(); - for (Entry> entry : addedLabelsToNode.entrySet()) { - if (!knownLabels.containsAll(entry.getValue())) { - String msg = - "Not all labels being added contained by known " - + "label collections, please check" + ", added labels=[" - + StringUtils.join(entry.getValue(), ",") + "]"; - LOG.error(msg); - throw new IOException(msg); + if (verifyWithCentralizedValidLabels) { + // check all labels being added existed + Set knownLabels = labelCollections.keySet(); + for (Entry> entry : addedLabelsToNode.entrySet()) { + if (!knownLabels.containsAll(entry.getValue())) { + String msg = + "Not all labels being added contained by known " + + "label collections, please check" + ", added labels=[" + + StringUtils.join(entry.getValue(), ",") + "]"; + LOG.error(msg); + throw new IOException(msg); + } } } } @@ -420,7 +427,7 @@ protected void checkRemoveLabelsFromNode( NodeId nodeId = entry.getKey(); Set labels = entry.getValue(); - if (!knownLabels.containsAll(labels)) { + if (verifyWithCentralizedValidLabels && !knownLabels.containsAll(labels)) { String msg = "Not all labels being removed contained by known " + "label collections, please check" + ", removed labels=[" @@ -525,15 +532,17 @@ protected void checkReplaceLabelsOnNode( } // check all labels being added existed - Set knownLabels = labelCollections.keySet(); - for (Entry> entry : replaceLabelsToNode.entrySet()) { - if (!knownLabels.containsAll(entry.getValue())) { - String msg = - "Not all labels being replaced contained by known " - + "label collections, please check" + ", new labels=[" - + StringUtils.join(entry.getValue(), ",") + "]"; - LOG.error(msg); - throw new IOException(msg); + if (verifyWithCentralizedValidLabels) { + Set knownLabels = labelCollections.keySet(); + for (Entry> entry : replaceLabelsToNode.entrySet()) { + if (!knownLabels.containsAll(entry.getValue())) { + String msg = + "Not all labels being replaced contained by known " + + "label collections, please check" + ", new labels=[" + + StringUtils.join(entry.getValue(), ",") + "]"; + LOG.error(msg); + throw new IOException(msg); + } } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index addd3fe..a903ea6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; +import java.util.Set; + import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.util.Records; @@ -26,7 +28,7 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, MasterKey lastKnownContainerTokenMasterKey, - MasterKey lastKnownNMTokenMasterKey) { + MasterKey lastKnownNMTokenMasterKey,Set nodeLabels) { NodeHeartbeatRequest nodeHeartbeatRequest = Records.newRecord(NodeHeartbeatRequest.class); nodeHeartbeatRequest.setNodeStatus(nodeStatus); @@ -34,6 +36,7 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, .setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey); nodeHeartbeatRequest .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey); + nodeHeartbeatRequest.setNodeLabels(nodeLabels); return nodeHeartbeatRequest; } @@ -45,4 +48,7 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, public abstract MasterKey getLastKnownNMTokenMasterKey(); public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey); + + public abstract Set getNodeLabels(); + public abstract void setNodeLabels(Set nodeLabels); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index 0e3d7e4..011055e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import java.util.List; +import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -30,7 +31,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, int httpPort, Resource resource, String nodeManagerVersionId, List containerStatuses, - List runningApplications) { + List runningApplications,Set nodeLabels) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -39,6 +40,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, request.setNMVersion(nodeManagerVersionId); request.setContainerStatuses(containerStatuses); request.setRunningApplications(runningApplications); + request.setNodeLabels(nodeLabels); return request; } @@ -47,6 +49,8 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, public abstract Resource getResource(); public abstract String getNMVersion(); public abstract List getNMContainerStatuses(); + public abstract Set getNodeLabels(); + public abstract void setNodeLabels(Set nodeLabels); /** * We introduce this here because currently YARN RM doesn't persist nodes info diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index 26d1f19..85e9389 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -18,10 +18,14 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; +import java.util.HashSet; +import java.util.Set; + import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -36,6 +40,7 @@ private NodeStatus nodeStatus = null; private MasterKey lastKnownContainerTokenMasterKey = null; private MasterKey lastKnownNMTokenMasterKey = null; + private Set labels = null; public NodeHeartbeatRequestPBImpl() { builder = NodeHeartbeatRequestProto.newBuilder(); @@ -80,6 +85,10 @@ private void mergeLocalToBuilder() { builder.setLastKnownNmTokenMasterKey( convertToProtoFormat(this.lastKnownNMTokenMasterKey)); } + if (this.labels != null && !this.labels.isEmpty()) { + builder.clearNodeLabels(); + builder.addAllNodeLabels(this.labels); + } } private void mergeLocalToProto() { @@ -178,4 +187,27 @@ private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) { private MasterKeyProto convertToProtoFormat(MasterKey t) { return ((MasterKeyPBImpl)t).getProto(); } + + @Override + public Set getNodeLabels() { + initNodeLabels(); + return labels; + } + + private void initNodeLabels(){ + if (this.labels != null) { + return; + } + NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; + labels = new HashSet(p.getNodeLabelsList()); + } + + @Override + public void setNodeLabels(Set nodeLabels) { + maybeInitBuilder(); + if (labels == null || labels.isEmpty()) { + builder.clearNodeLabels(); + } + this.labels = nodeLabels; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index ce4faec..2c58eab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -20,23 +20,18 @@ import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; @@ -56,6 +51,7 @@ private NodeId nodeId = null; private List containerStatuses = null; private List runningApplications = null; + private Set labels = null; public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); @@ -86,7 +82,10 @@ private void mergeLocalToBuilder() { if (this.nodeId != null) { builder.setNodeId(convertToProtoFormat(this.nodeId)); } - + if (this.labels != null && !this.labels.isEmpty()) { + builder.clearNodeLabels(); + builder.addAllNodeLabels(this.labels); + } } private synchronized void addNMContainerStatusesToProto() { @@ -292,6 +291,29 @@ public void setNMVersion(String version) { builder.setNmVersion(version); } + @Override + public Set getNodeLabels() { + initNodeLabels(); + return labels; + } + + private void initNodeLabels(){ + if (this.labels != null) { + return; + } + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + labels = new HashSet(p.getNodeLabelsList()); + } + + @Override + public void setNodeLabels(Set nodeLabels) { + maybeInitBuilder(); + if (labels == null || labels.isEmpty()) { + builder.clearNodeLabels(); + } + this.labels = nodeLabels; + } + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { return new ApplicationIdPBImpl(p); } @@ -323,4 +345,5 @@ private NMContainerStatusPBImpl convertFromProtoFormat(NMContainerStatusProto c) private NMContainerStatusProto convertToProtoFormat(NMContainerStatus c) { return ((NMContainerStatusPBImpl)c).getProto(); } + } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index d0990fb..33919f7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -32,6 +32,7 @@ message RegisterNodeManagerRequestProto { optional string nm_version = 5; repeated NMContainerStatusProto container_statuses = 6; repeated ApplicationIdProto runningApplications = 7; + repeated string nodeLabels = 8; } message RegisterNodeManagerResponseProto { @@ -47,6 +48,7 @@ message NodeHeartbeatRequestProto { optional NodeStatusProto node_status = 1; optional MasterKeyProto last_known_container_token_master_key = 2; optional MasterKeyProto last_known_nm_token_master_key = 3; + repeated string nodeLabels = 4; } message NodeHeartbeatResponseProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java index 7165445..7f5e4db 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -29,8 +30,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; -import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; import org.junit.Assert; @@ -78,7 +77,7 @@ public void testRegisterNodeManagerRequest() { RegisterNodeManagerRequest.newInstance( NodeId.newInstance("1.1.1.1", 1000), 8080, Resource.newInstance(1024, 1), "NM-version-id", reports, - Arrays.asList(appId)); + Arrays.asList(appId),new HashSet()); RegisterNodeManagerRequest requestProto = new RegisterNodeManagerRequestPBImpl( ((RegisterNodeManagerRequestPBImpl) request).getProto()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java index fdacd92..672b9a7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import java.util.Arrays; +import java.util.HashSet; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -44,7 +45,7 @@ public void testRegisterNodeManagerRequest() { ContainerState.RUNNING, Resource.newInstance(1024, 1), "good", -1, Priority.newInstance(0), 1234)), Arrays.asList( ApplicationId.newInstance(1234L, 1), - ApplicationId.newInstance(1234L, 2))); + ApplicationId.newInstance(1234L, 2)),new HashSet()); // serialze to proto, and get request from proto RegisterNodeManagerRequest request1 = @@ -67,8 +68,9 @@ public void testRegisterNodeManagerRequest() { @Test public void testRegisterNodeManagerRequestWithNullArrays() { RegisterNodeManagerRequest request = - RegisterNodeManagerRequest.newInstance(NodeId.newInstance("host", 1234), - 1234, Resource.newInstance(0, 0), "version", null, null); + RegisterNodeManagerRequest.newInstance( + NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0), + "version", null, null, new HashSet()); // serialze to proto, and get request from proto RegisterNodeManagerRequest request1 = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 43770c1..10f1a76 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -75,6 +75,7 @@ protected final NodeManagerMetrics metrics = NodeManagerMetrics.create(); private ApplicationACLsManager aclsManager; private NodeHealthCheckerService nodeHealthChecker; + private NodeLabelsFetcherService nodeLabelsFetcher; private LocalDirsHandlerService dirsHandler; private Context context; private AsyncDispatcher dispatcher; @@ -91,9 +92,10 @@ public NodeManager() { } protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { - return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, - metrics); + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService nodeLabelsFetcher) { + return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, + metrics,nodeLabelsFetcher); } protected NodeResourceMonitor createNodeResourceMonitor() { @@ -220,8 +222,11 @@ protected void serviceInit(Configuration conf) throws Exception { this.context = createNMContext(containerTokenSecretManager, nmTokenSecretManager, nmStore); + nodeLabelsFetcher = new NodeLabelsFetcherService(); + nodeStatusUpdater = - createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); + createNodeStatusUpdater(context, dispatcher, nodeHealthChecker, + nodeLabelsFetcher); NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor(); addService(nodeResourceMonitor); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index bed58f5..72265c1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -112,10 +112,14 @@ private Thread statusUpdater; private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER; + private final NodeLabelsFetcherService nodeLabelsFetcher; + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsFetcherService nodeLabelsFetcher) { super(NodeStatusUpdaterImpl.class.getName()); this.healthChecker = healthChecker; + this.nodeLabelsFetcher = nodeLabelsFetcher; this.context = context; this.dispatcher = dispatcher; this.metrics = metrics; @@ -245,7 +249,8 @@ protected void registerWithRM() List containerReports = getNMContainerStatuses(); RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, - nodeManagerVersionId, containerReports, getRunningApplications()); + nodeManagerVersionId, containerReports, getRunningApplications(), + nodeLabelsFetcher.getNodeLabels()); if (containerReports != null) { LOG.info("Registering with RM using containers :" + containerReports); } @@ -543,7 +548,7 @@ public void run() { NodeStatusUpdaterImpl.this.context .getContainerTokenSecretManager().getCurrentKey(), NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager() - .getCurrentKey()); + .getCurrentKey(),nodeLabelsFetcher.getNodeLabels()); response = resourceTracker.nodeHeartbeat(request); //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java index 3f4091c..874b2d7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java @@ -52,8 +52,9 @@ private ResourceTracker resourceTracker; public MockNodeStatusUpdater(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsFetcherService labelFetcher) { + super(context, dispatcher, healthChecker, metrics, labelFetcher); resourceTracker = createResourceTracker(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java index fabb03b..7da7553 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java @@ -100,10 +100,13 @@ public int getHttpPort() { Dispatcher dispatcher = new AsyncDispatcher(); NodeHealthCheckerService healthChecker = new NodeHealthCheckerService(); healthChecker.init(conf); + NodeLabelsFetcherService labelsFetcher = new NodeLabelsFetcherService(); + labelsFetcher.init(conf); LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler(); NodeManagerMetrics metrics = NodeManagerMetrics.create(); NodeStatusUpdater nodeStatusUpdater = - new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, metrics) { + new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, metrics, + labelsFetcher) { @Override protected ResourceTracker getRMClient() { return new LocalRMInterface(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java index e69170e..218f557 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java @@ -269,9 +269,11 @@ public MyNodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { MockNodeStatusUpdater myNodeStatusUpdater = - new MockNodeStatusUpdater(context, dispatcher, healthChecker, metrics); + new MockNodeStatusUpdater(context, dispatcher, healthChecker, + metrics, labelsFetcher); return myNodeStatusUpdater; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index 85bafb3..30904ed 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -226,9 +226,10 @@ public void testNMSentContainerStatusOnResync() throws Exception { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { return new TestNodeStatusUpdaterResync(context, dispatcher, - healthChecker, metrics) { + healthChecker, metrics,labelsFetcher) { @Override protected ResourceTracker createResourceTracker() { return new MockResourceTracker() { @@ -308,8 +309,9 @@ public NodeHeartbeatResponse nodeHeartbeat( // This can be used as a common base class for testing NM resync behavior. class TestNodeStatusUpdaterResync extends MockNodeStatusUpdater { public TestNodeStatusUpdaterResync(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsFetcherService labelsFetcher) { + super(context, dispatcher, healthChecker, metrics,labelsFetcher); } @Override protected void rebootNodeStatusUpdaterAndRegisterWithRM() { @@ -355,9 +357,10 @@ public void setExistingContainerId(ContainerId cId) { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { return new TestNodeStatusUpdaterImpl1(context, dispatcher, - healthChecker, metrics); + healthChecker, metrics,labelsFetcher); } public int getNMRegistrationCount() { @@ -367,8 +370,9 @@ public int getNMRegistrationCount() { class TestNodeStatusUpdaterImpl1 extends MockNodeStatusUpdater { public TestNodeStatusUpdaterImpl1(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsFetcherService labelsFetcher) { + super(context, dispatcher, healthChecker, metrics,labelsFetcher); } @Override @@ -423,9 +427,10 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() { Thread launchContainersThread = null; @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { return new TestNodeStatusUpdaterImpl2(context, dispatcher, - healthChecker, metrics); + healthChecker, metrics,labelsFetcher); } @Override @@ -466,8 +471,9 @@ public void setBlockNewContainerRequests( class TestNodeStatusUpdaterImpl2 extends MockNodeStatusUpdater { public TestNodeStatusUpdaterImpl2(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsFetcherService labelsFetcher) { + super(context, dispatcher, healthChecker, metrics, labelsFetcher); } @Override @@ -553,9 +559,10 @@ public void run() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { return new TestNodeStatusUpdaterImpl3(context, dispatcher, healthChecker, - metrics); + metrics,labelsFetcher); } public int getNMRegistrationCount() { @@ -573,8 +580,9 @@ protected void shutDown() { class TestNodeStatusUpdaterImpl3 extends MockNodeStatusUpdater { public TestNodeStatusUpdaterImpl3(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsFetcherService labelsFetcher) { + super(context, dispatcher, healthChecker, metrics,labelsFetcher); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java index c44f7b8..3253566 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java @@ -309,9 +309,11 @@ private static File createUnhaltingScriptFile(ContainerId cId, @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { MockNodeStatusUpdater myNodeStatusUpdater = - new MockNodeStatusUpdater(context, dispatcher, healthChecker, metrics); + new MockNodeStatusUpdater(context, dispatcher, healthChecker, + metrics, labelsFetcher); return myNodeStatusUpdater; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 7593ce6..e79079d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -289,8 +289,9 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) private Context context; public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsFetcherService labelsFetcher) { + super(context, dispatcher, healthChecker, metrics,labelsFetcher); this.context = context; resourceTracker = new MyResourceTracker(this.context); } @@ -312,8 +313,9 @@ protected void stopRMProxy() { public ResourceTracker resourceTracker; public MyNodeStatusUpdater2(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsFetcherService labelsFetcher) { + super(context, dispatcher, healthChecker, metrics,labelsFetcher); resourceTracker = new MyResourceTracker4(context); } @@ -333,8 +335,9 @@ protected void stopRMProxy() { private Context context; public MyNodeStatusUpdater3(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + NodeLabelsFetcherService labelsFetcher) { + super(context, dispatcher, healthChecker, metrics,labelsFetcher); this.context = context; this.resourceTracker = new MyResourceTracker3(this.context); } @@ -362,8 +365,9 @@ protected boolean isTokenKeepAliveEnabled(Configuration conf) { public ResourceTracker resourceTracker; public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, - long rmStartIntervalMS, boolean rmNeverStart) { - super(context, dispatcher, healthChecker, metrics); + long rmStartIntervalMS, boolean rmNeverStart, + NodeLabelsFetcherService labelsFetcher) { + super(context, dispatcher, healthChecker, metrics,labelsFetcher); this.rmStartIntervalMS = rmStartIntervalMS; this.rmNeverStart = rmNeverStart; } @@ -401,8 +405,9 @@ protected void stopRMProxy() { private Configuration conf; public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, Configuration conf) { - super(context, dispatcher, healthChecker, metrics); + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, Configuration conf, + NodeLabelsFetcherService labelsFetcher) { + super(context, dispatcher, healthChecker, metrics,labelsFetcher); resourceTracker = new MyResourceTracker5(); this.conf = conf; } @@ -425,9 +430,11 @@ protected void stopRMProxy() { private MyNodeStatusUpdater3 nodeStatusUpdater; @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { this.nodeStatusUpdater = - new MyNodeStatusUpdater3(context, dispatcher, healthChecker, metrics); + new MyNodeStatusUpdater3(context, dispatcher, healthChecker, metrics, + labelsFetcher); return this.nodeStatusUpdater; } @@ -448,10 +455,11 @@ public MyNodeManager2 (CyclicBarrier syncBarrier, Configuration conf) { } @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { nodeStatusUpdater = new MyNodeStatusUpdater5(context, dispatcher, healthChecker, - metrics, conf); + metrics, conf,labelsFetcher); return nodeStatusUpdater; } @@ -917,9 +925,10 @@ public void testNMRegistration() throws InterruptedException { nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { return new MyNodeStatusUpdater(context, dispatcher, healthChecker, - metrics); + metrics,labelsFetcher); } }; @@ -978,9 +987,10 @@ public void testStopReentrant() throws Exception { nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater( - context, dispatcher, healthChecker, metrics); + context, dispatcher, healthChecker, metrics,labelsFetcher); MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); myResourceTracker2.heartBeatNodeAction = NodeAction.SHUTDOWN; myNodeStatusUpdater.resourceTracker = myResourceTracker2; @@ -1063,8 +1073,9 @@ private NodeManagerWithCustomNodeStatusUpdater() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker) { - updater = createUpdater(context, dispatcher, healthChecker); + NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { + updater = createUpdater(context, dispatcher, healthChecker,labelsFetcher); return updater; } @@ -1074,7 +1085,8 @@ public NodeStatusUpdater getUpdater() { abstract NodeStatusUpdater createUpdater(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker); + NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher); } @Test @@ -1083,9 +1095,10 @@ public void testNMShutdownForRegistrationFailure() throws Exception { nm = new NodeManagerWithCustomNodeStatusUpdater() { @Override protected NodeStatusUpdater createUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { MyNodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater( - context, dispatcher, healthChecker, metrics); + context, dispatcher, healthChecker, metrics,labelsFetcher); MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); myResourceTracker2.registerNodeAction = NodeAction.SHUTDOWN; myResourceTracker2.shutDownMessage = "RM Shutting Down Node"; @@ -1116,10 +1129,11 @@ public void testNMConnectionToRM() throws Exception { nm = nmWithUpdater = new NodeManagerWithCustomNodeStatusUpdater() { @Override protected NodeStatusUpdater createUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4( context, dispatcher, healthChecker, metrics, - rmStartIntervalMS, true); + rmStartIntervalMS, true,labelsFetcher); return nodeStatusUpdater; } }; @@ -1148,10 +1162,11 @@ protected NodeStatusUpdater createUpdater(Context context, nm = nmWithUpdater = new NodeManagerWithCustomNodeStatusUpdater() { @Override protected NodeStatusUpdater createUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4( context, dispatcher, healthChecker, metrics, rmStartIntervalMS, - false); + false,labelsFetcher); return nodeStatusUpdater; } }; @@ -1195,9 +1210,10 @@ public void testNoRegistrationWhenNMServicesFail() throws Exception { nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { return new MyNodeStatusUpdater(context, dispatcher, healthChecker, - metrics); + metrics, labelsFetcher); } @Override @@ -1261,10 +1277,11 @@ public void testCompletedContainerStatusBackup() throws Exception { nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { MyNodeStatusUpdater2 myNodeStatusUpdater = new MyNodeStatusUpdater2(context, dispatcher, healthChecker, - metrics); + metrics,labelsFetcher); return myNodeStatusUpdater; } @@ -1338,9 +1355,10 @@ public void testRMVersionLessThanMinimum() throws InterruptedException { nm = new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater( - context, dispatcher, healthChecker, metrics); + context, dispatcher, healthChecker, metrics,labelsFetcher); MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); myResourceTracker2.heartBeatNodeAction = NodeAction.NORMAL; myResourceTracker2.rmVersion = "3.0.0"; @@ -1498,9 +1516,11 @@ private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) { return new NodeManager() { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { - MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater( - context, dispatcher, healthChecker, metrics); + Dispatcher dispatcher, NodeHealthCheckerService healthChecker, + NodeLabelsFetcherService labelsFetcher) { + MyNodeStatusUpdater myNodeStatusUpdater = + new MyNodeStatusUpdater(context, dispatcher, healthChecker, + metrics, labelsFetcher); MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); myResourceTracker2.heartBeatNodeAction = nodeHeartBeatAction; myNodeStatusUpdater.resourceTracker = myResourceTracker2; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 1907e1a..a0eb1ea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -118,7 +118,7 @@ public int getHttpPort() { protected final long DUMMY_RM_IDENTIFIER = 1234; protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl( - context, new AsyncDispatcher(), null, metrics) { + context, new AsyncDispatcher(), null, metrics,null) { @Override protected ResourceTracker getRMClient() { return new LocalRMInterface(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index f5583bc..bfff124 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -20,6 +20,9 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -145,7 +148,6 @@ protected void serviceInit(Configuration conf) throws Exception { minimumNodeManagerVersion = conf.get( YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION, YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION); - super.serviceInit(conf); } @@ -329,6 +331,13 @@ public RegisterNodeManagerResponse registerNodeManager( } } } + + //Update node's labels to RM's NodeLabelManager. + if(updateNodeLabels(rmNode.getNodeID(),request.getNodeLabels())){ + // TODO add code to update node register response with a flag accepting the + // labels so that it can further be used in NM side to send + // labels only when there is any change + } String message = "NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: " @@ -415,9 +424,27 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), remoteNodeStatus.getContainersStatuses(), remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse)); + + // 5. Update node's labels to RM's NodeLabelManager. + if(updateNodeLabels(rmNode.getNodeID(),request.getNodeLabels())){ + // TODO add code to update heart beat response with a flag accepting the + // change in labels so that it can further be used in NM side to send + // labels only when there is any change + } return nodeHeartBeatResponse; } + + private boolean updateNodeLabels(NodeId node, Set nodeLabels) + throws IOException { + if (null != nodeLabels) { + Map> labelUpdate = new HashMap>(); + labelUpdate.put(node, nodeLabels); + this.rmContext.getNodeLabelManager().replaceLabelsOnNode(labelUpdate); + return true; + } + return false; + } private void populateKeys(NodeHeartbeatRequest request, NodeHeartbeatResponse nodeHeartBeatResponse) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index e83d601..16630c4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -57,9 +57,9 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore; import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore; -import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService; +import org.apache.hadoop.yarn.server.nodemanager.NodeLabelsFetcherService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; @@ -592,9 +592,9 @@ protected void doSecureLogin() throws IOException { private class ShortCircuitedNodeManager extends CustomNodeManager { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + Dispatcher dispatcher, NodeHealthCheckerService healthChecker,NodeLabelsFetcherService labelFetcher) { return new NodeStatusUpdaterImpl(context, dispatcher, - healthChecker, metrics) { + healthChecker, metrics, labelFetcher) { @Override protected ResourceTracker getRMClient() { final ResourceTrackerService rt =