From a0a9eca0a1d44dfadda1ff4300d2845fdd942a4f Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Mon, 16 May 2016 12:38:52 +0530 Subject: [PATCH] YARN-4002 --- .../org/apache/hadoop/util/HostsFileReader.java | 153 +++++++++++++-------- .../server/resourcemanager/NodesListManager.java | 53 ++++--- 2 files changed, 135 insertions(+), 71 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java index cac43c9..cc2d059 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java @@ -21,6 +21,9 @@ import java.io.*; import java.util.Set; import java.util.HashSet; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.io.Charsets; import org.apache.commons.logging.LogFactory; @@ -38,6 +41,8 @@ private Set excludes; private String includesFile; private String excludesFile; + private WriteLock writeLock; + private ReadLock readLock; private static final Log LOG = LogFactory.getLog(HostsFileReader.class); @@ -47,6 +52,9 @@ public HostsFileReader(String inFile, excludes = new HashSet(); includesFile = inFile; excludesFile = exFile; + ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + this.writeLock = rwLock.writeLock(); + this.readLock = rwLock.readLock(); refresh(); } @@ -57,6 +65,9 @@ public HostsFileReader(String includesFile, InputStream inFileInputStream, excludes = new HashSet(); this.includesFile = includesFile; this.excludesFile = excludesFile; + ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + this.writeLock = rwLock.writeLock(); + this.readLock = rwLock.readLock(); refresh(inFileInputStream, exFileInputStream); } @@ -101,80 +112,114 @@ public static void readFileToSetWithFileInputStream(String type, } } - public synchronized void refresh() throws IOException { + public void refresh() throws IOException { LOG.info("Refreshing hosts (include/exclude) list"); - Set newIncludes = new HashSet(); - Set newExcludes = new HashSet(); - boolean switchIncludes = false; - boolean switchExcludes = false; - if (!includesFile.isEmpty()) { - readFileToSet("included", includesFile, newIncludes); - switchIncludes = true; - } - if (!excludesFile.isEmpty()) { - readFileToSet("excluded", excludesFile, newExcludes); - switchExcludes = true; - } + this.writeLock.lock(); + try { + Set newIncludes = new HashSet(); + Set newExcludes = new HashSet(); + boolean switchIncludes = false; + boolean switchExcludes = false; + if (!includesFile.isEmpty()) { + readFileToSet("included", includesFile, newIncludes); + switchIncludes = true; + } + if (!excludesFile.isEmpty()) { + readFileToSet("excluded", excludesFile, newExcludes); + switchExcludes = true; + } - if (switchIncludes) { - // switch the new hosts that are to be included - includes = newIncludes; - } - if (switchExcludes) { - // switch the excluded hosts - excludes = newExcludes; + if (switchIncludes) { + // switch the new hosts that are to be included + includes = newIncludes; + } + if (switchExcludes) { + // switch the excluded hosts + excludes = newExcludes; + } + } finally { + this.writeLock.unlock(); } } @Private - public synchronized void refresh(InputStream inFileInputStream, + public void refresh(InputStream inFileInputStream, InputStream exFileInputStream) throws IOException { LOG.info("Refreshing hosts (include/exclude) list"); - Set newIncludes = new HashSet(); - Set newExcludes = new HashSet(); - boolean switchIncludes = false; - boolean switchExcludes = false; - if (inFileInputStream != null) { - readFileToSetWithFileInputStream("included", includesFile, - inFileInputStream, newIncludes); - switchIncludes = true; - } - if (exFileInputStream != null) { - readFileToSetWithFileInputStream("excluded", excludesFile, - exFileInputStream, newExcludes); - switchExcludes = true; - } - if (switchIncludes) { - // switch the new hosts that are to be included - includes = newIncludes; - } - if (switchExcludes) { - // switch the excluded hosts - excludes = newExcludes; + this.writeLock.lock(); + try { + Set newIncludes = new HashSet(); + Set newExcludes = new HashSet(); + boolean switchIncludes = false; + boolean switchExcludes = false; + if (inFileInputStream != null) { + readFileToSetWithFileInputStream("included", includesFile, + inFileInputStream, newIncludes); + switchIncludes = true; + } + if (exFileInputStream != null) { + readFileToSetWithFileInputStream("excluded", excludesFile, + exFileInputStream, newExcludes); + switchExcludes = true; + } + if (switchIncludes) { + // switch the new hosts that are to be included + includes = newIncludes; + } + if (switchExcludes) { + // switch the excluded hosts + excludes = newExcludes; + } + } finally { + this.writeLock.unlock(); } } - public synchronized Set getHosts() { - return includes; + public Set getHosts() { + this.readLock.lock(); + try { + return includes; + } finally { + this.readLock.unlock(); + } } - public synchronized Set getExcludedHosts() { - return excludes; + public Set getExcludedHosts() { + this.readLock.lock(); + try { + return excludes; + } finally { + this.readLock.unlock(); + } } - public synchronized void setIncludesFile(String includesFile) { + public void setIncludesFile(String includesFile) { LOG.info("Setting the includes file to " + includesFile); - this.includesFile = includesFile; + this.writeLock.lock(); + try { + this.includesFile = includesFile; + } finally { + this.writeLock.unlock(); + } } - public synchronized void setExcludesFile(String excludesFile) { + public void setExcludesFile(String excludesFile) { LOG.info("Setting the excludes file to " + excludesFile); - this.excludesFile = excludesFile; + this.writeLock.lock(); + try { + this.excludesFile = excludesFile; + } finally { + this.writeLock.unlock(); + } } - public synchronized void updateFileNames(String includesFile, - String excludesFile) { - setIncludesFile(includesFile); - setExcludesFile(excludesFile); + public void updateFileNames(String includesFile, String excludesFile) { + this.writeLock.lock(); + try { + setIncludesFile(includesFile); + setExcludesFile(excludesFile); + } finally { + this.writeLock.unlock(); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index bb00e60..ccbee1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -25,6 +25,9 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import java.util.Map; import java.util.Iterator; @@ -61,6 +64,8 @@ private static final Log LOG = LogFactory.getLog(NodesListManager.class); + private WriteLock hostsListWriteLock; + private ReadLock hostsListReadLock; private HostsFileReader hostsReader; private Configuration conf; private final RMContext rmContext; @@ -75,6 +80,9 @@ public NodesListManager(RMContext rmContext) { super(NodesListManager.class.getName()); this.rmContext = rmContext; + ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + this.hostsListWriteLock = rwLock.writeLock(); + this.hostsListReadLock = rwLock.readLock(); } @Override @@ -208,7 +216,8 @@ public void refreshNodes(Configuration yarnConf) throws IOException, private void refreshHostsReader(Configuration yarnConf) throws IOException, YarnException { - synchronized (hostsReader) { + this.hostsListWriteLock.lock(); + try { if (null == yarnConf) { yarnConf = new YarnConfiguration(); } @@ -226,6 +235,8 @@ private void refreshHostsReader(Configuration yarnConf) throws IOException, : this.rmContext.getConfigurationProvider() .getConfigurationInputStream(this.conf, excludesFile)); printConfiguredHosts(); + } finally { + this.hostsListWriteLock.unlock(); } } @@ -364,13 +375,18 @@ public void run() { public boolean isValidNode(String hostName) { String ip = resolver.resolve(hostName); - synchronized (hostsReader) { - Set hostsList = hostsReader.getHosts(); - Set excludeList = hostsReader.getExcludedHosts(); - return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList - .contains(ip)) - && !(excludeList.contains(hostName) || excludeList.contains(ip)); - } + Set hostsList; + Set excludeList; + this.hostsListReadLock.lock(); + try { + hostsList = hostsReader.getHosts(); + excludeList = hostsReader.getExcludedHosts(); + } finally { + this.hostsListReadLock.unlock(); + } + return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList + .contains(ip)) + && !(excludeList.contains(hostName) || excludeList.contains(ip)); } @Override @@ -467,17 +483,20 @@ private void updateInactiveNodes() { } public boolean isUntrackedNode(String hostName) { - boolean untracked; String ip = resolver.resolve(hostName); - synchronized (hostsReader) { - Set hostsList = hostsReader.getHosts(); - Set excludeList = hostsReader.getExcludedHosts(); - untracked = !hostsList.isEmpty() && - !hostsList.contains(hostName) && !hostsList.contains(ip) && - !excludeList.contains(hostName) && !excludeList.contains(ip); - } - return untracked; + Set hostsList; + Set excludeList; + this.hostsListReadLock.lock(); + try { + hostsList = hostsReader.getHosts(); + excludeList = hostsReader.getExcludedHosts(); + } finally { + this.hostsListReadLock.unlock(); + } + return !hostsList.isEmpty() && !hostsList.contains(hostName) + && !hostsList.contains(ip) && !excludeList.contains(hostName) + && !excludeList.contains(ip); } /** -- 1.9.2.msysgit.0