diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ConfigurationManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ConfigurationManager.java new file mode 100644 index 00000000000..d627f0639da --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ConfigurationManager.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.conf; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.service.AbstractService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.WeakHashMap; + +import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; +import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; + +/** + * Configuration Manager monitors the files and reloads the properties has + * there been any changes in realtime. + */ +public class ConfigurationManager extends AbstractService { + private volatile boolean runMonitorThread; + private Thread monitorThread; + private WatchService watchService; + private static final Logger LOG = LoggerFactory + .getLogger(ConfigurationManager.class); + + + /** + * The names of the config files that needs to be monitored. + */ + public static final String CONF_MANAGER_FILE_NAMES = + "yarn.conf.manager.file.names"; + + /** + * The default name of the config file that we can change on the fly. + */ + public static final String DEFAULT_CONFIG_FILE_NAME = + "yarn-site-override.xml"; + + /** + * The directory in which this configuration file will be located. + */ + public static final String CONFIG_DIR = System.getProperty("user.dir"); + + /** + * The path of the directory in which this configuration file will be located. + */ + public static final Path CONFIG_DIR_PATH = Paths.get(CONFIG_DIR); + + /** + * Config files that needs to monitor so that we can change them on the fly. + */ + private List configFileNames; + + /** + * HashMap of files paths. This is for faster access to check the modified + * file is currently monitored. + */ + private HashMap configFileNamesPaths = new HashMap<>(); + + + public ConfigurationManager() { + super(ConfigurationManager.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + + configFileNames = new ArrayList(Arrays.asList(getConfig() + .getStrings(CONF_MANAGER_FILE_NAMES, DEFAULT_CONFIG_FILE_NAME))); + + // This file will override all other previous configurations. + for(String fileName : configFileNames) { + + File configFile = new File(CONFIG_DIR, fileName); + + if(!configFile.exists()) { + LOG.error("To be monitored config file does not exists {} ", fileName); + continue; + } + configFileNamesPaths.put(configFile.toPath().toString(), fileName); + getConfig().addResource(new org.apache.hadoop.fs.Path( + configFile.toURI())); + + } + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + startThread(); + super.serviceStart(); + } + @Override + protected void serviceStop() throws Exception { + stopThread(); + super.serviceStop(); + } + + // The set of Configuration Observers. These classes would like to get + // notified when the configuration is reloaded from disk. This is a set + // constructed from a WeakHashMap, whose entries would be removed if the + // observer classes go out of scope. + private final Set configurationObservers = + Collections.newSetFromMap(new WeakHashMap()); + + /** + * Register an observer class + * @param observer + */ + public void registerObserver(ConfigurationObserver observer) { + synchronized (configurationObservers) { + configurationObservers.add(observer); + } + } + + /** + * Deregister an observer class + * @param observer + */ + public void deregisterObserver(ConfigurationObserver observer) { + synchronized (configurationObservers) { + configurationObservers.remove(observer); + } + } + + /** + * The conf object has been repopulated from disk, and we have to notify + * all the observers that are expressed interest to do that. + */ + public void notifyAllObservers() { + LOG.info("Starting to notify all observers that config changed."); + synchronized (configurationObservers) { + for (ConfigurationObserver observer : configurationObservers) { + try { + if (observer != null) { + observer.onConfigurationChange(getConfig()); + } + } catch (Throwable t) { + LOG.error("Encountered a throwable while notifying observers: " + + " of type : " + observer.getClass().getCanonicalName() + + "(" + observer + ")", t); + } + } + } + } + + /** + * @return the number of observers. + */ + @VisibleForTesting + public int getNumObservers() { + synchronized (configurationObservers) { + return configurationObservers.size(); + } + } + + public void addNewConfigFiles() { + List newConfigFileNames = new ArrayList(Arrays.asList( + getConfig().getStrings(CONF_MANAGER_FILE_NAMES, + DEFAULT_CONFIG_FILE_NAME))); + + /* Gets only the new added files in the monitored files */ + newConfigFileNames.removeAll(configFileNames); + + for(String fileName : newConfigFileNames) { + File configFile = new File(CONFIG_DIR, fileName); + + if(!configFile.exists()) { + LOG.error("To be monitored config file does not exists {} ", fileName); + continue; + } + + configFileNames.add(fileName); + configFileNamesPaths.put(configFile.toPath().toString(), fileName); + getConfig().addResource(new org.apache.hadoop.fs.Path( + configFile.toURI())); + } + } + + + /** + * Start the background thread if not running. + */ + public synchronized void startThread() { + if (this.runMonitorThread) { + LOG.info("Monitor thread already running, skip starting"); + return; + } + LOG.info("starting the monitor thread"); + this.runMonitorThread = true; + try { + this.watchService = FileSystems.getDefault().newWatchService(); + LOG.info("Watching directory {} for changes", CONFIG_DIR_PATH); + CONFIG_DIR_PATH.register(this.watchService, ENTRY_CREATE, ENTRY_MODIFY); + + this.monitorThread = new Thread(new FileMonitorRunnable()); + this.monitorThread.setDaemon(true); + this.monitorThread.start(); + } catch (Exception e) { + this.runMonitorThread = false; + LOG.error("Error starting Configuration Manager Watch Service thread",e); + throw new RuntimeException(e); + } + } + + /** + * Shutdown the background thread if running. + */ + public synchronized void stopThread() { + LOG.info("stopping the monitor thread"); + if (this.runMonitorThread) { + this.runMonitorThread = false; + this.monitorThread.interrupt(); + try{ + this.watchService.close(); + } catch (IOException e) { + LOG.error("Watch service failed to close", e); + } + } + } + + + /** + * This clears the configuration, and forces resources to be reloaded before + * reading any property. + */ + private void forceReloadProperties() { + addNewConfigFiles(); + + // This method is synchronized so there will be no issues with concurrent + // accesses of the configuration object while this is happening. + getConfig().reloadConfiguration(); + LOG.info("Configuration has been reloaded"); + notifyAllObservers(); + + } + + + /** + * WatchService APIs are blocking, so create a separate thread for monitoring + * the file for changes + */ + private class FileMonitorRunnable implements Runnable { + @Override + public void run() { + while (runMonitorThread) { + WatchKey modifiedKey; + try { + modifiedKey = watchService.take(); + } catch (InterruptedException e) { + LOG.error("Interrupted while watching the watchKey", e); + continue; + } + + for (WatchEvent fileModifiedEvent : modifiedKey.pollEvents()) { + WatchEvent.Kind eventKind = fileModifiedEvent.kind(); + if (eventKind != ENTRY_CREATE && eventKind != ENTRY_MODIFY) { + continue; + } + + Path modifiedFile = + CONFIG_DIR_PATH.resolve((Path) fileModifiedEvent.context()); + + // Verify that the file update happened on the file we're monitoring. + if (!configFileNamesPaths.containsKey(modifiedFile.toString())) { + LOG.warn( + "Modified file '{}' does not match the configuration files)", + modifiedFile.toString()); + continue; + } + LOG.info("Received file creation/modified event for " + + modifiedFile.toString()); + + // This forces the configuration objects to be reloaded on the next + // access. + forceReloadProperties(); + + } + // This must happen after each successful take. + boolean isValid = modifiedKey.reset(); + if (!isValid) { + LOG.warn("Watch service is closed watchKey is cancelled, " + + "stopping monitoring thread"); + break; + } + } + } + } + +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ConfigurationObserver.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ConfigurationObserver.java new file mode 100644 index 00000000000..42bbfc72081 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ConfigurationObserver.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.conf; + +/** + * Every class that wants to observe changes in Configuration properties, + * must implement interface (and also, register itself with the + * ConfigurationManager object. + */ +public interface ConfigurationObserver { + /** + * This method would be called by the {@link ConfigurationManager} + * object when the {@link Configuration} object is reloaded from disk. + */ + void onConfigurationChange(Configuration conf); +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfigurationManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfigurationManager.java new file mode 100644 index 00000000000..904089467d5 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfigurationManager.java @@ -0,0 +1,358 @@ +package org.apache.hadoop.conf; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Unit Tests to test the functionality of ConfigurationManager + * Also, tests configurationObserver + */ +public class TestConfigurationManager { + + private static final Logger LOG = LoggerFactory + .getLogger(TestConfigurationManager.class); + + + + public static final File OVERRIDE_FILE = new File( + System.getProperty("user.dir"), "yarn-site-override.xml"); + + public static final File OTHER_FILE = new File( + System.getProperty("user.dir"), "yarn-site-other.xml"); + + public static final String CONFIG_KEY = "yarn.conf-testing-dummy-count"; + public static final String CONFIG_VALUE = "yarn.value"; + + public static final String YARN_CONFIG_KEY = "hadoop.http.staticuser.user"; + public static final String YARN_CONFIG_VALUE = "dr.who"; + + + private Configuration conf; + private ConfigurationManager configurationManager; + + private Map keyValuePairs = new HashMap<>(); + + class DummyConfigurationObserver implements ConfigurationObserver { + private boolean notifiedOnChange = false; + private ConfigurationManager cm; + + public DummyConfigurationObserver(ConfigurationManager cm) { + this.cm = cm; + register(); + } + + @Override + public void onConfigurationChange(Configuration conf) { + notifiedOnChange = true; + } + + // Was the observer notified on Configuration change? + public boolean wasNotifiedOnChange() { + return notifiedOnChange; + } + + public void resetNotifiedOnChange() { + notifiedOnChange = false; + } + + public void register() { + this.cm.registerObserver(this); + } + + public void deregister() { + this.cm.deregisterObserver(this); + } + } + + + + @Before + public void setup() + throws Exception { + conf = new Configuration(); + + keyValuePairs.put(CONFIG_KEY, CONFIG_VALUE); + keyValuePairs.put(configurationManager.CONF_MANAGER_FILE_NAMES, + "yarn-site-override.xml"); + createNewYarnSite(OVERRIDE_FILE, keyValuePairs); + + Map yarnKeyValuePairs = new HashMap<>(); + yarnKeyValuePairs.put(CONFIG_KEY + ".other-site", CONFIG_VALUE+"other"); + createNewYarnSite(OTHER_FILE, yarnKeyValuePairs); + + configurationManager = new ConfigurationManager(); + configurationManager.init(conf); + configurationManager.start(); + + + } + + @After + public void stop() { + OTHER_FILE.delete(); + OVERRIDE_FILE.delete(); + configurationManager.stopThread(); + configurationManager.stop(); + } + + /** + * Test if observers get notified by the ConfigurationManager + * when the Configuration is reloaded. + */ + @Test + public void testCheckIfObserversNotified() { + Configuration conf = new Configuration(); + ConfigurationManager cm = new ConfigurationManager(); + DummyConfigurationObserver d1 = new DummyConfigurationObserver(cm); + + // Check if we get notified. + cm.notifyAllObservers(); + assertTrue(d1.wasNotifiedOnChange()); + d1.resetNotifiedOnChange(); + + // Now check if we get notified on change with more than one observers. + DummyConfigurationObserver d2 = new DummyConfigurationObserver(cm); + cm.notifyAllObservers(); + assertTrue(d1.wasNotifiedOnChange()); + d1.resetNotifiedOnChange(); + assertTrue(d2.wasNotifiedOnChange()); + d2.resetNotifiedOnChange(); + + // Now try deregistering an observer and verify that it was not notified + d2.deregister(); + cm.notifyAllObservers(); + assertTrue(d1.wasNotifiedOnChange()); + d1.resetNotifiedOnChange(); + assertFalse(d2.wasNotifiedOnChange()); + } + + // Register an observer that will go out of scope immediately, allowing + // us to test that out of scope observers are deregistered. + private void registerLocalObserver(ConfigurationManager cm) { + new DummyConfigurationObserver(cm); + } + + /** + * Test if out-of-scope observers are deregistered on GC. + */ + @Test + public void testDeregisterOnOutOfScope() { + Configuration conf = new Configuration(); + ConfigurationManager cm = new ConfigurationManager(); + + boolean outOfScopeObserversDeregistered = false; + + // On my machine, I was able to cause a GC after around 5 iterations. + // If we do not cause a GC in 100k iterations, which is very unlikely, + // there might be something wrong with the GC. + for (int i = 0; i < 100000; i++) { + registerLocalObserver(cm); + cm.notifyAllObservers(); + + // 'Suggest' the system to do a GC. We should be able to cause GC + // atleast once in the 2000 iterations. + System.gc(); + + // If GC indeed happened, all the observers (which are all out of scope), + // should have been deregistered. + if (cm.getNumObservers() <= i) { + outOfScopeObserversDeregistered = true; + break; + } + } + if (!outOfScopeObserversDeregistered) { + LOG.warn("Observers were not GC-ed! Something seems to be wrong."); + } + assertTrue(outOfScopeObserversDeregistered); + } + + /** + * Test if values are overridden after modifying the xml file from + * ConfigurationManager + * when the Configuration is reloaded. + */ + @Test + public void testValueChange() throws Exception { + + String valueToChange = "2"; + Assert.assertEquals(conf.get(YARN_CONFIG_KEY), YARN_CONFIG_VALUE); + Assert.assertEquals(conf.get(CONFIG_KEY), CONFIG_VALUE); + Map keyValuePairs = new HashMap<>(); + keyValuePairs.put(CONFIG_KEY, valueToChange); + + modifyFile(OVERRIDE_FILE, keyValuePairs); + + Thread.sleep(1000); + // Value should have been changed now. + Assert.assertEquals(conf.get(CONFIG_KEY), valueToChange); + + //Other XML files should not reflect the change + Assert.assertEquals(conf.get(YARN_CONFIG_KEY), YARN_CONFIG_VALUE); + } + + /** + * Test if new files are monitored after modifying the xml file from + * ConfigurationManager + * when the Configuration is reloaded. + */ + @Test + public void testNewFilesAddition() throws Exception { + + Assert.assertEquals(conf.get(YARN_CONFIG_KEY), YARN_CONFIG_VALUE); + Assert.assertEquals(conf.get(CONFIG_KEY), CONFIG_VALUE); + + //This should give us null value as this resource is not added yet + Assert.assertNotEquals(conf.get(CONFIG_KEY + ".other-site"), + CONFIG_VALUE+"other"); + + keyValuePairs.put(configurationManager.CONF_MANAGER_FILE_NAMES, + "yarn-site-override.xml,yarn-site-other.xml"); + + modifyFile(OVERRIDE_FILE, keyValuePairs); + + // Sleeping this thread in order to give time for the change to reflect + Thread.sleep(2000); + + Assert.assertEquals(conf.get(CONFIG_KEY + ".other-site"), CONFIG_VALUE+"other"); + + Assert.assertEquals(conf.get(YARN_CONFIG_KEY), YARN_CONFIG_VALUE); + Assert.assertEquals(conf.get(CONFIG_KEY), CONFIG_VALUE); + + } + + /** + * Test to make sure the changes are not reflected from files that are + * not monitored in ConfigurationManager + */ + @Test + public void testNonExistsFilesAddition() throws Exception { + + Assert.assertEquals(conf.get(YARN_CONFIG_KEY), YARN_CONFIG_VALUE); + Assert.assertEquals(conf.get(CONFIG_KEY), CONFIG_VALUE); + + + keyValuePairs.put(configurationManager.CONF_MANAGER_FILE_NAMES, + "yarn-site-override.xml,404file.xml"); + + modifyFile(OVERRIDE_FILE, keyValuePairs); + + // Sleeping this thread in order to give time for the change to reflect + Thread.sleep(2000); + + + Assert.assertEquals(conf.get(YARN_CONFIG_KEY), YARN_CONFIG_VALUE); + Assert.assertEquals(conf.get(CONFIG_KEY), CONFIG_VALUE); + + } + + + + /** + * Creates a new XML file. + * + * @param writeToFile The file to create. + * @param keyValuePairs The key-value mapping to create. + * @throws ParserConfigurationException + * @throws TransformerException + */ + private void createNewYarnSite(File writeToFile, + Map keyValuePairs) throws ParserConfigurationException, + TransformerException { + + DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilder docBuilder = docFactory.newDocumentBuilder(); + + // root elements + Document doc = docBuilder.newDocument(); + Element rootElement = doc.createElement("configuration"); + doc.appendChild(rootElement); + + for (String key : keyValuePairs.keySet()) { + Element property = doc.createElement("property"); + rootElement.appendChild(property); + Element propertyName = doc.createElement("name"); + property.appendChild(propertyName); + propertyName.appendChild(doc.createTextNode(key)); + Element propertyValue = doc.createElement("value"); + property.appendChild(propertyValue); + propertyValue.appendChild(doc.createTextNode(keyValuePairs.get(key))); + } + TransformerFactory transformerFactory = TransformerFactory.newInstance(); + Transformer transformer = transformerFactory.newTransformer(); + DOMSource source = new DOMSource(doc); + StreamResult result = new StreamResult(writeToFile); + transformer.transform(source, result); + } + + /** + * Modify the file by replacing particular keys with the values specified in + * the map. + * + * @param fileToModify The file to modify in place. + * @param newKeyValuePairs The key-value mapping of keys to be replaced. + * @throws IOException + * @throws SAXException + * @throws ParserConfigurationException + * @throws TransformerException + */ + private void modifyFile(File fileToModify, + Map newKeyValuePairs) throws IOException, SAXException, + ParserConfigurationException, TransformerException { + DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilder docBuilder = docFactory.newDocumentBuilder(); + Document doc = docBuilder.parse(fileToModify); + NodeList propertyList = doc.getElementsByTagName("property"); + Assert.assertTrue(propertyList.getLength() > 0); + + for (int i = 0; i < propertyList.getLength(); i++) { + Node firstProperty = propertyList.item(i); + NodeList children = firstProperty.getChildNodes(); + Node valueNode = null; + String valueToBeReplaced = null; + for (int j = 0; j < children.getLength(); j++) { + Node childNode = children.item(j); + if (childNode.getNodeName().equalsIgnoreCase("value")) { + valueNode = childNode; + } else if (childNode.getNodeName().equalsIgnoreCase("name") + && newKeyValuePairs.containsKey(childNode.getTextContent())) { + valueToBeReplaced = newKeyValuePairs.get(childNode.getTextContent()); + } + } + if (valueToBeReplaced != null) { + valueNode.setTextContent(valueToBeReplaced); + } + } + // write the content into xml file + TransformerFactory transformerFactory = TransformerFactory.newInstance(); + Transformer transformer = transformerFactory.newTransformer(); + DOMSource source = new DOMSource(doc); + StreamResult result = new StreamResult(fileToModify); + transformer.transform(source, result); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 85d5a580362..a193f7a92bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -130,6 +130,10 @@ private static void addDeprecatedKeys() { public static final String YARN_PREFIX = "yarn."; + public static final String YARN_CONF_MANAGER_ENABLED = + YARN_PREFIX + "conf.manager.enabled"; + public static final Boolean YARN_CONF_MANAGER_ENABLED_DEFAULT = false; + ///////////////////////////// // Resource types configs //////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 113e6a7c668..75502ea8c11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ConfigurationManager; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -132,6 +133,10 @@ NodeManagerMetrics getNodeManagerMetrics(); + default ConfigurationManager getConfigurationManager() { + return null; + } + /** * Get the {@code DeletionService} associated with the NM. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index f90423cf6b6..f2495410ff8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ConfigurationManager; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -146,6 +147,8 @@ public int getExitCode() { private boolean shouldExitOnShutdownEvent = false; private NMLogAggregationStatusTracker nmLogAggregationStatusTracker; + + private ConfigurationManager configurationManager; /** * Default Container State transition listener. */ @@ -481,6 +484,15 @@ protected void serviceInit(Configuration conf) throws Exception { ((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater); nmStore.setNodeStatusUpdater(nodeStatusUpdater); + + // Configuration Manager to monitor for config file changes + if(conf.getBoolean(YarnConfiguration.YARN_CONF_MANAGER_ENABLED, + YarnConfiguration.YARN_CONF_MANAGER_ENABLED_DEFAULT)) { + configurationManager = new ConfigurationManager(); + addService(configurationManager); + ((NMContext) context).setConfigurationManager(configurationManager); + } + // Do secure login before calling init for added services. try { doSecureLogin(); @@ -614,6 +626,8 @@ protected void reregisterCollectors() { private Configuration conf = null; + private ConfigurationManager configurationManager; + private NodeManagerMetrics metrics = null; protected final ConcurrentMap applications = @@ -711,6 +725,16 @@ public Configuration getConf() { return this.conf; } + @Override + public ConfigurationManager getConfigurationManager() { + return this.configurationManager; + } + + public void setConfigurationManager(ConfigurationManager + configurationManager) { + this.configurationManager = configurationManager; + } + @Override public ConcurrentMap getContainers() { return this.containers; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 55420bd9270..5656e017afa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ConfigurationManager; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -153,6 +154,8 @@ void setRMDelegatedNodeLabelsUpdater( boolean isSchedulerReadyForAllocatingContainers(); Configuration getYarnConfiguration(); + + ConfigurationManager getConfigurationManager(); PlacementManager getQueuePlacementManager(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 54e0281f7e8..1d92fe25946 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ConfigurationManager; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -289,6 +290,16 @@ public String getHAZookeeperConnectionState() { return serviceContext.getHAZookeeperConnectionState(); } + @Override + public ConfigurationManager getConfigurationManager() { + return serviceContext.getConfigurationManager(); + } + + public void setConfigurationManager(ConfigurationManager + configurationManager) { + serviceContext.setConfigurationManager(configurationManager); + } + // ========================================================================== /** * RM Active service context. This will be recreated for every transition from diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java index 45c61667cf9..84752a88e45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java @@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ConfigurationManager; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.yarn.conf.ConfigurationProvider; @@ -55,6 +56,8 @@ private ResourceManager resourceManager; private RMTimelineCollectorManager timelineCollectorManager; + private ConfigurationManager configurationManager; + public ResourceManager getResourceManager() { return resourceManager; } @@ -159,4 +162,14 @@ public String getHAZookeeperConnectionState() { return elector.getZookeeperConnectionState(); } } + + + public ConfigurationManager getConfigurationManager() { + return this.configurationManager; + } + + public void setConfigurationManager(ConfigurationManager + configurationManager) { + this.configurationManager = configurationManager; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 836a5ece80b..d289c88600d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -29,6 +29,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ConfigurationManager; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.http.HttpServer2; @@ -220,6 +221,8 @@ Long.toString(new SecureRandom().nextLong()); private boolean recoveryEnabled; + private ConfigurationManager configurationManager; + @VisibleForTesting protected String webAppAddress; private ConfigurationProvider configurationProvider = null; @@ -356,6 +359,14 @@ protected void serviceInit(Configuration conf) throws Exception { rmContext.setRMTimelineCollectorManager(timelineCollectorManager); } + // Configuration Manager to monitor for config file changes + if(conf.getBoolean(YarnConfiguration.YARN_CONF_MANAGER_ENABLED, + YarnConfiguration.YARN_CONF_MANAGER_ENABLED_DEFAULT)) { + configurationManager = new ConfigurationManager(); + addService(configurationManager); + rmContext.setConfigurationManager(configurationManager); + } + SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher(); addIfService(systemMetricsPublisher);