diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 597af94..0b0122d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -19,6 +19,8 @@ import java.io.File; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; import java.util.HashMap; @@ -36,6 +38,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -85,12 +90,10 @@ private final Clock clock; - private long lastSuccessfulReload; // Last time we successfully reloaded queues - private boolean lastReloadAttemptFailed = false; - // Path to XML file containing allocations. - private File allocFile; - + private Path allocFile; + private FileSystem fs; + private Listener reloadListener; @VisibleForTesting @@ -114,17 +117,21 @@ public AllocationFileLoaderService(Clock clock) { @Override public void serviceInit(Configuration conf) throws Exception { this.allocFile = getAllocationFile(conf); - if (allocFile != null) { - reloadThread = new Thread() { - @Override - public void run() { - while (running) { + if(this.allocFile != null) { + this.fs = allocFile.getFileSystem(conf); + reloadThread = new Thread(() -> { + boolean lastReloadAttemptFailed = false; + long lastSuccessfulReload = 0L; + while (running) { + try { long time = clock.getTime(); - long lastModified = allocFile.lastModified(); + long lastModified = fs.getFileStatus(allocFile).getModificationTime(); if (lastModified > lastSuccessfulReload && time > lastModified + ALLOC_RELOAD_WAIT_MS) { try { reloadAllocations(); + lastSuccessfulReload = clock.getTime(); + lastReloadAttemptFailed = false; } catch (Exception ex) { if (!lastReloadAttemptFailed) { LOG.error("Failed to reload fair scheduler config file - " + @@ -136,19 +143,22 @@ public void run() { if (!lastReloadAttemptFailed) { LOG.warn("Failed to reload fair scheduler config file because" + " last modified returned 0. File exists: " - + allocFile.exists()); + + fs.exists(allocFile)); } lastReloadAttemptFailed = true; } - try { - Thread.sleep(reloadIntervalMs); - } catch (InterruptedException ex) { - LOG.info( - "Interrupted while waiting to reload alloc configuration"); - } + } catch (IOException e) { + LOG.info("Exception while loading allocation file: " + e); } + try { + Thread.sleep(reloadIntervalMs); + } catch (InterruptedException ex) { + LOG.info( + "Interrupted while waiting to reload alloc configuration"); + } + } - }; + }); reloadThread.setName("AllocationFileReloader"); reloadThread.setDaemon(true); } @@ -182,10 +192,10 @@ public void serviceStop() throws Exception { * path is relative, it is searched for in the * classpath, but loaded like a regular File. */ - public File getAllocationFile(Configuration conf) { + public Path getAllocationFile(Configuration conf) throws IOException { String allocFilePath = conf.get(FairSchedulerConfiguration.ALLOCATION_FILE, FairSchedulerConfiguration.DEFAULT_ALLOCATION_FILE); - File allocFile = new File(allocFilePath); + Path allocFile = new Path(allocFilePath); if (!allocFile.isAbsolute()) { URL url = Thread.currentThread().getContextClassLoader() .getResource(allocFilePath); @@ -196,8 +206,10 @@ public File getAllocationFile(Configuration conf) { throw new RuntimeException("Allocation file " + url + " found on the classpath is not on the local filesystem."); } else { - allocFile = new File(url.getPath()); + allocFile = new Path(url.getProtocol(), null, url.getPath()); } + } else if (allocFile.isAbsoluteAndSchemeAuthorityNull()){ + allocFile = new Path("file", null, allocFilePath); } return allocFile; } @@ -274,7 +286,7 @@ public synchronized void reloadAllocations() throws IOException, DocumentBuilderFactory.newInstance(); docBuilderFactory.setIgnoringComments(true); DocumentBuilder builder = docBuilderFactory.newDocumentBuilder(); - Document doc = builder.parse(allocFile); + Document doc = builder.parse(fs.open(allocFile)); Element root = doc.getDocumentElement(); if (!"allocations".equals(root.getTagName())) throw new AllocationConfigurationException("Bad fair scheduler config " + @@ -438,9 +450,6 @@ public synchronized void reloadAllocations() throws IOException, reservationAcls, newPlacementPolicy, configuredQueues, globalReservationQueueConfig, reservableQueues, nonPreemptableQueues); - lastSuccessfulReload = clock.getTime(); - lastReloadAttemptFailed = false; - reloadListener.onReload(info); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 67b46f9..1800a64 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -22,15 +22,26 @@ import java.io.File; import java.io.FileOutputStream; import java.io.FileWriter; +import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintWriter; +import java.net.URISyntaxException; +import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.util.ByteArrayManager; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; @@ -45,16 +56,50 @@ final static String ALLOC_FILE = new File(TEST_DIR, "test-queues").getAbsolutePath(); - + private static final String TEST_FAIRSCHED_XML = "test-fair-scheduler.xml"; + @Test - public void testGetAllocationFileFromClasspath() { - Configuration conf = new Configuration(); - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, - "test-fair-scheduler.xml"); + public void testGetAllocationFileFromFileSystem() + throws IOException, URISyntaxException { + Configuration conf = new YarnConfiguration(); + File baseDir = + new File(TEST_DIR + Path.SEPARATOR + "getAllocHDFS").getAbsoluteFile(); + FileUtil.fullyDelete(baseDir); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); + MiniDFSCluster hdfsCluster = builder.build(); + String fsAllocPath = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + + Path.SEPARATOR + TEST_FAIRSCHED_XML; + + URL fschedURL = Thread.currentThread().getContextClassLoader() + .getResource(TEST_FAIRSCHED_XML); + FileSystem fs = FileSystem.get(conf); + fs.copyFromLocalFile(new Path(fschedURL.toURI()), new Path(fsAllocPath)); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocPath); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); - File allocationFile = allocLoader.getAllocationFile(conf); - assertEquals("test-fair-scheduler.xml", allocationFile.getName()); - assertTrue(allocationFile.exists()); + Path allocationFile = allocLoader.getAllocationFile(conf); + assertEquals(fsAllocPath, allocationFile.toString()); + assertTrue(fs.exists(allocationFile)); + + hdfsCluster.shutdown(true); + } + + @Test + public void testGetAllocationFileFromClasspath() { + try { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, + TEST_FAIRSCHED_XML); + AllocationFileLoaderService allocLoader = + new AllocationFileLoaderService(); + Path allocationFile = allocLoader.getAllocationFile(conf); + assertEquals(TEST_FAIRSCHED_XML, allocationFile.getName()); + assertTrue(fs.exists(allocationFile)); + } catch (IOException e) { + fail("Unable to access allocation file from classpath: " + e); + } } @Test (timeout = 10000)