diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 597af94..135b36b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -17,25 +17,14 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import java.io.File; -import java.io.IOException; -import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; - +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -45,8 +34,8 @@ import org.apache.hadoop.yarn.security.Permission; import org.apache.hadoop.yarn.security.PrivilegedEntity; import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; @@ -57,7 +46,19 @@ import org.w3c.dom.Text; import org.xml.sax.SAXException; -import com.google.common.annotations.VisibleForTesting; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; @Public @Unstable @@ -85,12 +86,13 @@ private final Clock clock; - private long lastSuccessfulReload; // Last time we successfully reloaded queues - private boolean lastReloadAttemptFailed = false; - - // Path to XML file containing allocations. - private File allocFile; - + private AtomicLong lastSuccessfulReload; // Last time we successfully reloaded queues + private AtomicBoolean lastReloadAttemptFailed; + + // Path to XML file containing allocations. + private Path allocFile; + private FileSystem fs; + private Listener reloadListener; @VisibleForTesting @@ -108,47 +110,50 @@ public AllocationFileLoaderService() { public AllocationFileLoaderService(Clock clock) { super(AllocationFileLoaderService.class.getName()); this.clock = clock; - + this.lastSuccessfulReload = new AtomicLong(0); + this.lastReloadAttemptFailed = new AtomicBoolean(false); } @Override public void serviceInit(Configuration conf) throws Exception { this.allocFile = getAllocationFile(conf); - if (allocFile != null) { - reloadThread = new Thread() { - @Override - public void run() { - while (running) { + if(this.allocFile != null) { + this.fs = allocFile.getFileSystem(conf); + reloadThread = new Thread(() -> { + while (running) { + try { long time = clock.getTime(); - long lastModified = allocFile.lastModified(); - if (lastModified > lastSuccessfulReload && + long lastModified = fs.getFileStatus(allocFile).getModificationTime(); + if (lastModified > lastSuccessfulReload.get() && time > lastModified + ALLOC_RELOAD_WAIT_MS) { try { reloadAllocations(); } catch (Exception ex) { - if (!lastReloadAttemptFailed) { + if (!lastReloadAttemptFailed.get()) { LOG.error("Failed to reload fair scheduler config file - " + "will use existing allocations.", ex); } - lastReloadAttemptFailed = true; + lastReloadAttemptFailed.set(true); } } else if (lastModified == 0l) { - if (!lastReloadAttemptFailed) { + if (!lastReloadAttemptFailed.get()) { LOG.warn("Failed to reload fair scheduler config file because" + " last modified returned 0. File exists: " - + allocFile.exists()); + + fs.exists(allocFile)); } - lastReloadAttemptFailed = true; - } - try { - Thread.sleep(reloadIntervalMs); - } catch (InterruptedException ex) { - LOG.info( - "Interrupted while waiting to reload alloc configuration"); + lastReloadAttemptFailed.set(true); } + } catch (IOException e) { + LOG.info("Exception while loading allocation file: " + e); + } + try { + Thread.sleep(reloadIntervalMs); + } catch (InterruptedException ex) { + LOG.info( + "Interrupted while waiting to reload alloc configuration"); } } - }; + }); reloadThread.setName("AllocationFileReloader"); reloadThread.setDaemon(true); } @@ -182,10 +187,10 @@ public void serviceStop() throws Exception { * path is relative, it is searched for in the * classpath, but loaded like a regular File. */ - public File getAllocationFile(Configuration conf) { + public Path getAllocationFile(Configuration conf) throws IOException { String allocFilePath = conf.get(FairSchedulerConfiguration.ALLOCATION_FILE, FairSchedulerConfiguration.DEFAULT_ALLOCATION_FILE); - File allocFile = new File(allocFilePath); + Path allocFile = new Path(allocFilePath); if (!allocFile.isAbsolute()) { URL url = Thread.currentThread().getContextClassLoader() .getResource(allocFilePath); @@ -196,8 +201,10 @@ public File getAllocationFile(Configuration conf) { throw new RuntimeException("Allocation file " + url + " found on the classpath is not on the local filesystem."); } else { - allocFile = new File(url.getPath()); + allocFile = new Path(url.getProtocol(), null, url.getPath()); } + } else if (allocFile.isAbsoluteAndSchemeAuthorityNull()){ + allocFile = new Path("file", null, allocFilePath); } return allocFile; } @@ -274,7 +281,7 @@ public synchronized void reloadAllocations() throws IOException, DocumentBuilderFactory.newInstance(); docBuilderFactory.setIgnoringComments(true); DocumentBuilder builder = docBuilderFactory.newDocumentBuilder(); - Document doc = builder.parse(allocFile); + Document doc = builder.parse(fs.open(allocFile)); Element root = doc.getDocumentElement(); if (!"allocations".equals(root.getTagName())) throw new AllocationConfigurationException("Bad fair scheduler config " + @@ -437,9 +444,9 @@ public synchronized void reloadAllocations() throws IOException, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, reservationAcls, newPlacementPolicy, configuredQueues, globalReservationQueueConfig, reservableQueues, nonPreemptableQueues); - - lastSuccessfulReload = clock.getTime(); - lastReloadAttemptFailed = false; + + lastSuccessfulReload.set(clock.getTime()); + lastReloadAttemptFailed.set(false); reloadListener.onReload(info); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 67b46f9..da24317 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -17,17 +17,11 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import static org.junit.Assert.*; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.FileWriter; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.nio.charset.StandardCharsets; -import java.util.List; - import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; @@ -38,6 +32,23 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Test; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class TestAllocationFileLoaderService { final static String TEST_DIR = new File(System.getProperty("test.build.data", @@ -45,16 +56,50 @@ final static String ALLOC_FILE = new File(TEST_DIR, "test-queues").getAbsolutePath(); - + private static final String TEST_FAIRSCHED_XML = "test-fair-scheduler.xml"; + @Test - public void testGetAllocationFileFromClasspath() { - Configuration conf = new Configuration(); - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, - "test-fair-scheduler.xml"); + public void testGetAllocationFileFromFileSystem() + throws IOException, URISyntaxException { + Configuration conf = new YarnConfiguration(); + File baseDir = + new File(TEST_DIR + Path.SEPARATOR + "getAllocHDFS").getAbsoluteFile(); + FileUtil.fullyDelete(baseDir); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); + MiniDFSCluster hdfsCluster = builder.build(); + String fsAllocPath = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + + Path.SEPARATOR + TEST_FAIRSCHED_XML; + + URL fschedURL = Thread.currentThread().getContextClassLoader() + .getResource(TEST_FAIRSCHED_XML); + FileSystem fs = FileSystem.get(conf); + fs.copyFromLocalFile(new Path(fschedURL.toURI()), new Path(fsAllocPath)); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocPath); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); - File allocationFile = allocLoader.getAllocationFile(conf); - assertEquals("test-fair-scheduler.xml", allocationFile.getName()); - assertTrue(allocationFile.exists()); + Path allocationFile = allocLoader.getAllocationFile(conf); + assertEquals(fsAllocPath, allocationFile.toString()); + assertTrue(fs.exists(allocationFile)); + + hdfsCluster.shutdown(true); + } + + @Test + public void testGetAllocationFileFromClasspath() { + try { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, + TEST_FAIRSCHED_XML); + AllocationFileLoaderService allocLoader = + new AllocationFileLoaderService(); + Path allocationFile = allocLoader.getAllocationFile(conf); + assertEquals(TEST_FAIRSCHED_XML, allocationFile.getName()); + assertTrue(fs.exists(allocationFile)); + } catch (IOException e) { + fail("Unable to access allocation file from classpath: " + e); + } } @Test (timeout = 10000)