diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 62e0a71..196237b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1613,6 +1613,18 @@ private static void addDeprecatedKeys() { public static final String NM_NODE_LABELS_PROVIDER_CLASS = NM_NODE_LABELS_PREFIX + "node-labels-provider.class"; + private static final String NM_NODE_LABELS_CONFIG_BASED_PREFIX = + NM_NODE_LABELS_PREFIX + "config-based."; + + public static final String NM_NODE_LABELS_CONFIG_BASED_FETCH_INTERVAL_MS = + NM_NODE_LABELS_CONFIG_BASED_PREFIX + "interval-ms"; + + public static final String NM_NODE_LABELS_FROM_CONFIG = + NM_NODE_LABELS_CONFIG_BASED_PREFIX + "node-labels"; + + public static final long DEFAULT_NM_NODE_LABELS_FETCH_INTERVAL_MS = + 10 * 60 * 1000; + public YarnConfiguration() { super(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java index b67b397..879c4ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java @@ -18,20 +18,123 @@ package org.apache.hadoop.yarn.server.nodemanager.nodelabels; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; /** - * Implementation for this will be given in another patch + * Provides Node's Labels by constantly monitoring the configuration. */ public class ConfigurationNodeLabelsProvider extends NodeLabelsProviderService { - public ConfigurationNodeLabelsProvider(String name) { - super(name); + private static final Log LOG = LogFactory + .getLog(ConfigurationNodeLabelsProvider.class); + + /** Timer used to schedule node health monitoring script execution */ + private Timer configurationMonitorTimer; + + public ConfigurationNodeLabelsProvider() { + super("Configuration Based NodeLabels Provider Service"); } + private Set nodeLabels = new HashSet(); + private long intervalTime; + // for testing purpose + long startTime = 0; + @Override public Set getNodeLabels() { - //Implementation for this will be given in another patch - return null; + readLock.lock(); + try { + return nodeLabels; + } finally { + readLock.unlock(); + } + } + + protected Lock readLock; + protected Lock writeLock; + + private TimerTask timerTask; + + public TimerTask getTimerTask() { + return timerTask; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.intervalTime = + conf.getLong( + YarnConfiguration.NM_NODE_LABELS_CONFIG_BASED_FETCH_INTERVAL_MS, + YarnConfiguration.DEFAULT_NM_NODE_LABELS_FETCH_INTERVAL_MS); + ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + readLock = readWriteLock.readLock(); + writeLock = readWriteLock.writeLock(); + updateNodeLabelsFromConfig(conf); + } + + @Override + protected void serviceStart() { + configurationMonitorTimer = + new Timer("Node Labels Configuration Monitor", true); + timerTask = new ConfigurationMonitorTimerTask(); + configurationMonitorTimer.scheduleAtFixedRate(timerTask, startTime, + intervalTime); + } + + private void updateNodeLabelsFromConfig(Configuration conf) + throws IOException { + String confLabelString = + conf.get(YarnConfiguration.NM_NODE_LABELS_FROM_CONFIG, ""); + String[] nodeLabelsFromConfiguration = + (confLabelString == null || confLabelString.isEmpty()) ? new String[] {} + : StringUtils.getStrings(confLabelString); + boolean validLabels = true; + StringBuffer errorMsg = new StringBuffer(""); + for (int i = 0; i < nodeLabelsFromConfiguration.length; i++) { + try { + CommonNodeLabelsManager + .checkAndThrowLabelName(nodeLabelsFromConfiguration[i]); + } catch (IOException e) { + validLabels = false; + errorMsg.append(e.getMessage()); + errorMsg.append(" , "); + } + } + if (validLabels) { + writeLock.lock(); + try { + nodeLabels = + new HashSet(Arrays.asList(nodeLabelsFromConfiguration)); + } finally { + writeLock.unlock(); + } + } else { + throw new IOException(errorMsg.toString()); + } + } + + private class ConfigurationMonitorTimerTask extends TimerTask { + @Override + public void run() { + try { + updateNodeLabelsFromConfig(new YarnConfiguration()); + } catch (Exception e) { + LOG.error("Failed to update node Labels from configuration.xml ", e); + } + } } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeLabelsProvider.java new file mode 100644 index 0000000..ab040e8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeLabelsProvider.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.nodelabels; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Date; +import java.util.TimerTask; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestConfigurationNodeLabelsProvider extends NodeLabelTestBase { + + protected static File testRootDir = new File("target", + TestConfigurationNodeLabelsProvider.class.getName() + "-localDir") + .getAbsoluteFile(); + + final static File nodeLabelsConfigFile = new File(testRootDir, + "yarn-site.xml"); + + private static XMLPathClassLoader loader; + + private ConfigurationNodeLabelsProvider nodeLabelsProvider; + + @Before + public void setup() { + loader = + new XMLPathClassLoader( + TestConfigurationNodeLabelsProvider.class.getClassLoader()); + testRootDir.mkdirs(); + + Configuration conf = getConfForNodeLabels(); + nodeLabelsProvider = new ConfigurationNodeLabelsProvider(); + nodeLabelsProvider.init(conf); + // To delay the timer to run and we call timerTask.run manually + nodeLabelsProvider.startTime = new Date().getTime() + 1 * 60 * 60 * 1000l; + } + + @After + public void tearDown() throws Exception { + if (nodeLabelsProvider != null) { + nodeLabelsProvider.close(); + } + if (testRootDir.exists()) { + FileContext.getLocalFSFileContext().delete( + new Path(testRootDir.getAbsolutePath()), true); + } + } + + private Configuration getConfForNodeLabels() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.NM_NODE_LABELS_FROM_CONFIG, "A,B,CX"); + return conf; + } + + @Test + public void testNodeLabelsFromConfig() throws IOException, + InterruptedException { + // test for ensuring labels are set during initialization of the class + nodeLabelsProvider.start(); + Thread.sleep(5000l); // sleep so that timer has run once during + // initialization + assertCollectionEquals(toSet("A", "B", "CX"), + nodeLabelsProvider.getNodeLabels()); + + // test for valid Modification + TimerTask timerTask = nodeLabelsProvider.getTimerTask(); + modifyConfAndCallTimer(timerTask, "X,y,Z"); + assertCollectionEquals(toSet("X", "y", "Z"), + nodeLabelsProvider.getNodeLabels()); + + // test for Invalid Modification. Provider is expected to return the last + // read labels + modifyConfAndCallTimer(timerTask, "A,#Xy,Z"); + assertCollectionEquals(toSet("X", "y", "Z"), + nodeLabelsProvider.getNodeLabels()); + nodeLabelsProvider.close(); + } + + private static void modifyConfAndCallTimer(TimerTask timerTask, + String nodeLabels) throws FileNotFoundException, IOException { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.NM_NODE_LABELS_FROM_CONFIG, nodeLabels); + conf.writeXml(new FileOutputStream(nodeLabelsConfigFile)); + ClassLoader actualLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(loader); + timerTask.run(); + } finally { + Thread.currentThread().setContextClassLoader(actualLoader); + } + } + + private static class XMLPathClassLoader extends ClassLoader { + public XMLPathClassLoader(ClassLoader wrapper) { + super(wrapper); + } + + public URL getResource(String name) { + if (name.equals(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE)) { + try { + return nodeLabelsConfigFile.toURI().toURL(); + } catch (MalformedURLException e) { + e.printStackTrace(); + Assert.fail(); + } + } + return super.getResource(name); + } + } +} -- 1.9.2.msysgit.0