diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabel/ScriptBasedNodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabel/ScriptBasedNodeLabelsProvider.java new file mode 100644 index 0000000..dd1e4f1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabel/ScriptBasedNodeLabelsProvider.java @@ -0,0 +1,379 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.nodelabel; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.Shell.ExitCodeException; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.nodelabel.ScriptExecutor.ScriptOutputReceiver; + +/** + * + * The class which provides functionality of fetching the labels of the node + * using the configured node labels script. + */ +public class ScriptBasedNodeLabelsProvider extends AbstractService implements + ScriptOutputReceiver, NodeLabelsProvider { + + private static Log LOG = LogFactory.getLog(ScriptBasedNodeLabelsProvider.class); + + /** Absolute path to the health script. */ + private String nodeLabelScript; + /** Delay after which node health script to be executed */ + private long intervalTime; + /** Time after which the script should be timedout */ + private long scriptTimeout; + + /** Pattern used for searching in the output of the node health script */ + static private final String LABEL_PATTERN = "LABEL"; + + private ScriptExecutor scriptExecutor; + + private Set nodeLabels; + + protected final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + protected final Lock readLock = readWriteLock.readLock(); + protected final Lock writeLock = readWriteLock.writeLock(); + + public ScriptBasedNodeLabelsProvider() { + super(ScriptBasedNodeLabelsProvider.class.getName()); + } + + /* + * Method which initializes the values for the script path and interval time. + */ + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.nodeLabelScript = + conf.get(YarnConfiguration.NM_LABELS_FETCH_SCRIPT_PATH); + this.intervalTime = conf.getLong(YarnConfiguration.NM_LABELS_FETCH_INTERVAL_MS, + YarnConfiguration.DEFAULT_NM_LABELS_FETCH_INTERVAL_MS); + this.scriptTimeout = conf.getLong( + YarnConfiguration.NM_LABELS_FETCH_SCRIPT_TIMEOUT_MS, + YarnConfiguration.DEFAULT_NM_LABELS_FETCH_SCRIPT_TIMEOUT_MS); + String[] args = conf.getStrings(YarnConfiguration.NM_LABELS_FETCH_SCRIPT_OPTS, + new String[] {}); + scriptExecutor = + new ScriptExecutor(nodeLabelScript, args, scriptTimeout, + new String[] { LABEL_PATTERN }, this); + super.serviceInit(conf); + } + + /** + * Method used to start the Node Label Fetcher. + * + */ + @Override + protected void serviceStart() throws Exception { + scriptExecutor.startTimer("NodeLabelsFetcher-Timer", intervalTime); + super.serviceStart(); + } + + /** + * Method used to terminate the Node Label Fetcher service. + * + */ + @Override + protected void serviceStop() { + scriptExecutor.stopTimer(); + } + + /** + * @return Returns output from node labels fetcher script. + */ + public Set getNodeLabels() { + readLock.lock(); + try{ + return nodeLabels; + }finally{ + readLock.unlock(); + } + } + + /** + * Method used to determine if or not node health monitoring service should be + * started or not. Returns true if following conditions are met: + * + *
    + *
  1. Path to Node health check script is not empty
  2. + *
  3. Node health check script file exists
  4. + *
+ * + * @param conf + * @return true if node health monitoring service can be started. + */ + public static boolean shouldRun(Configuration conf) { + String nodeHealthScript = + conf.get(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_PATH); + if (nodeHealthScript == null || nodeHealthScript.trim().isEmpty()) { + return false; + } + File f = new File(nodeHealthScript); + return f.exists() && FileUtil.canExecute(f); + } + + /** + * Used only by tests to access the timer task directly + * @return the timer task + */ + TimerTask getTimerTask() { + return scriptExecutor; + } + + @Override + public void parseScriptOutput(Map scriptOutPut) { + String labelString = scriptOutPut.get(LABEL_PATTERN); + String[] nodeLabelsFromScript = + ((labelString == null) ? null : labelString.split(",", 0)); + if (!arelabelsSameAsPrevOutPut(nodeLabelsFromScript)) { + writeLock.lock(); + try { + nodeLabels = new HashSet(Arrays.asList(nodeLabelsFromScript)); + } finally { + writeLock.unlock(); + } + } + } + + private boolean arelabelsSameAsPrevOutPut(String[] nodeLabelsFromScript){ + if(nodeLabelsFromScript.length!=nodeLabels.size()){ + return false; + } + for(String label:nodeLabelsFromScript) + { + if(! nodeLabels.contains(label)){ + return false; + } + } + return true; + } + + @Override + public void scriptExecutionFailed(String failureMessage, Exception ex) { + LOG.warn("Failed to fetch Node Labels from Script " + nodeLabelScript + + " : " + failureMessage); + if(null!=ex){ + LOG.debug("Node Labels Exception trace :",ex); + } + } + + @Override + public void initialize(Configuration conf) { + } +} + + +/** + * Class which is used by the {@link Timer} class to periodically execute the + * node label script. + * + */ +class ScriptExecutor extends TimerTask { + private static enum ScriptExitStatus { + SUCCESS, TIMED_OUT, FAILED_WITH_EXIT_CODE, FAILED_WITH_EXCEPTION, + } + + /** Time out error message */ + static final String SCRIPT_TIMED_OUT_MSG = " script timed out"; + + /** Script execution failed */ + static final String SCRIPT_EXEC_FAILED_MSG = " script execution failed"; + + public static interface ScriptOutputReceiver { + /** + * Map which contains entry with matched pattern provided to ScriptExecutor + * as the key and the line from the output as the value. needs t + * + * @param scriptOutPut + */ + public void parseScriptOutput(Map scriptOutPut); + + /** + * This method will be called if the execution failed or timed out + * + * @param failureMessage + * @param ex + */ + public void scriptExecutionFailed(String failureMessage, Exception ex); + } + + /** */ + + Exception exceptionThrown; + + /** Timer used to schedule node health monitoring script execution */ + private Timer scriptExecuteScheduler; + + /** ShellCommandExecutor used to execute monitoring script */ + ShellCommandExecutor shexec = null; + + private String[] outputPatterns; + + private ScriptOutputReceiver receiver; + + private String scriptPath; + + /** + * @param scriptArgs + * @param scriptPath - Absolute path to the script to be executed. + * @param intervalTime + * @param scriptTimeout + * @param patterns + */ + public ScriptExecutor(String scriptPath, String[] scriptArgs, + long scriptTimeout, String[] outputPatterns, ScriptOutputReceiver receiver) { + this.scriptPath=scriptPath; + this.outputPatterns=outputPatterns; + this.receiver=receiver; + ArrayList execScript = new ArrayList(); + execScript.add(scriptPath); + if (scriptArgs != null) { + execScript.addAll(Arrays.asList(scriptArgs)); + } + shexec = + new ShellCommandExecutor(execScript.toArray(new String[execScript + .size()]), null, null, scriptTimeout); + } + + public void startTimer(String timerName,long intervalTime){ + scriptExecuteScheduler = new Timer(timerName, true); + // Start the timer task immediately and + // then periodically at interval time. + scriptExecuteScheduler.scheduleAtFixedRate(this, 0, intervalTime); + } + + public void stopTimer() { + if (scriptExecuteScheduler != null) { + scriptExecuteScheduler.cancel(); + } + if (shexec != null) { + Process p = shexec.getProcess(); + if (p != null) { + p.destroy(); + } + } + } + + @Override + public void run() { + executeScript(); + } + + protected void executeScript() { + ScriptExitStatus status = ScriptExitStatus.SUCCESS; + try { + shexec.execute(); + } catch (ExitCodeException e) { + // ignore the exit code of the script + status = ScriptExitStatus.FAILED_WITH_EXIT_CODE; + // On Windows, we will not hit the Stream closed IOException + // thrown by stdout buffered reader for timeout event. + if (Shell.WINDOWS && shexec.isTimedOut()) { + status = ScriptExitStatus.TIMED_OUT; + } + exceptionThrown=e; + } catch (Exception e) { + if (!shexec.isTimedOut()) { + status = ScriptExitStatus.FAILED_WITH_EXCEPTION; + } else { + status = ScriptExitStatus.TIMED_OUT; + } + exceptionThrown=e; + } finally { + reportScriptStatus(status, shexec.getOutput()); + } + } + + /** + * Method which is used to parse output from the node health monitor and send + * to the report address. + * + * The timed out script or script which causes IOException output is ignored. + * + * The node is marked unhealthy if + *
    + *
  1. The node health script times out
  2. + *
  3. The node health scripts output has a line which begins with ERROR
  4. + *
  5. An exception is thrown while executing the script
  6. + *
+ * If the script throws {@link IOException} or {@link ExitCodeException} the + * output is ignored and node is left remaining healthy, as script might have + * syntax error. + * + * @param status + */ + void reportScriptStatus(ScriptExitStatus status,String scriptOutut) { + switch (status) { + case SUCCESS: + if (status == ScriptExitStatus.SUCCESS) { + receiver.parseScriptOutput(matchPatterns(shexec.getOutput())); + } + break; + case TIMED_OUT: + receiver.scriptExecutionFailed(scriptPath+ SCRIPT_TIMED_OUT_MSG,null); + break; + case FAILED_WITH_EXCEPTION: + case FAILED_WITH_EXIT_CODE: + receiver.scriptExecutionFailed(scriptPath + SCRIPT_EXEC_FAILED_MSG, + exceptionThrown); + break; + } + } + + /** + * Method which collect lines from the output string which begins with + * Patterns provided. + * + * @param scriptOutput string + * @return true if output string has error pattern in it. + */ + private Map matchPatterns(String scriptOutput) { + Map scriptExecutorOutput=new HashMap(); + String[] splits = scriptOutput.split("\n"); + for (String split : splits) { + for(String pattern:outputPatterns){ + if (split.startsWith(pattern)) { + scriptExecutorOutput.put(pattern, split); + break; + } + } + } + return scriptExecutorOutput; + } +}