diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 7a40b6a2022..e541ab7b0dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -87,7 +87,7 @@ private Path allocFile; private FileSystem fs; - private final Listener reloadListener; + private Listener reloadListener; @VisibleForTesting long reloadIntervalMs = ALLOC_RELOAD_INTERVAL_MS; @@ -95,16 +95,15 @@ private Thread reloadThread; private volatile boolean running = true; - public AllocationFileLoaderService(Listener reloadListener) { - this(reloadListener, SystemClock.getInstance()); + public AllocationFileLoaderService() { + this(SystemClock.getInstance()); } private List defaultPermissions; - public AllocationFileLoaderService(Listener reloadListener, Clock clock) { + public AllocationFileLoaderService(Clock clock) { super(AllocationFileLoaderService.class.getName()); this.clock = clock; - this.reloadListener = reloadListener; } @Override @@ -209,6 +208,10 @@ public Path getAllocationFile(Configuration conf) return allocPath; } + public synchronized void setReloadListener(Listener reloadListener) { + this.reloadListener = reloadListener; + } + /** * Updates the allocation list from the allocation config file. This file is * expected to be in the XML format specified in the design doc. @@ -350,6 +353,7 @@ private ReservationQueueConfiguration createReservationQueueConfig( public interface Listener { void onReload(AllocationConfiguration info) throws IOException; - void onCheck(); + default void onCheck() { + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 4c84aa91ae2..123f7110cfe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -208,8 +208,7 @@ public FairScheduler() { super(FairScheduler.class.getName()); context = new FSContext(this); - allocsLoader = - new AllocationFileLoaderService(new AllocationReloadListener()); + allocsLoader = new AllocationFileLoaderService(); queueMgr = new QueueManager(this); maxRunningEnforcer = new MaxRunningAppsEnforcer(this); } @@ -1438,6 +1437,7 @@ private void initScheduler(Configuration conf) throws IOException { } allocsLoader.init(conf); + allocsLoader.setReloadListener(new AllocationReloadListener()); // If we fail to load allocations file on initialize, we want to fail // immediately. After a successful load, exceptions on future reloads // will just result in leaving things as they are. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 30b8a917841..50a003ecd11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService.Listener; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; @@ -33,8 +32,6 @@ import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Test; -import org.mockito.Mockito; - import java.io.File; import java.io.FileOutputStream; import java.io.FileWriter; @@ -82,8 +79,7 @@ public void testGetAllocationFileFromFileSystem() fs.copyFromLocalFile(new Path(fschedURL.toURI()), new Path(fsAllocPath)); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocPath); - AllocationFileLoaderService allocLoader = - new AllocationFileLoaderService(Mockito.mock(Listener.class)); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); Path allocationFile = allocLoader.getAllocationFile(conf); assertEquals(fsAllocPath, allocationFile.toString()); assertTrue(fs.exists(allocationFile)); @@ -96,8 +92,7 @@ public void testDenyGetAllocationFileFromUnsupportedFileSystem() throws UnsupportedFileSystemException { Configuration conf = new YarnConfiguration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, "badfs:///badfile"); - AllocationFileLoaderService allocLoader = - new AllocationFileLoaderService(Mockito.mock(Listener.class)); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); allocLoader.getAllocationFile(conf); } @@ -110,7 +105,7 @@ public void testGetAllocationFileFromClasspath() { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, TEST_FAIRSCHED_XML); AllocationFileLoaderService allocLoader = - new AllocationFileLoaderService(Mockito.mock(Listener.class)); + new AllocationFileLoaderService(); Path allocationFile = allocLoader.getAllocationFile(conf); assertEquals(TEST_FAIRSCHED_XML, allocationFile.getName()); assertTrue(fs.exists(allocationFile)); @@ -139,11 +134,12 @@ public void testReload() throws Exception { Configuration conf = new Configuration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - ReloadListener confHolder = new ReloadListener(); - AllocationFileLoaderService allocLoader = - new AllocationFileLoaderService(confHolder, clock); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService( + clock); allocLoader.reloadIntervalMs = 5; allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); AllocationConfiguration allocConf = confHolder.allocConf; @@ -209,9 +205,7 @@ public void testReload() throws Exception { public void testAllocationFileParsing() throws Exception { Configuration conf = new Configuration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - ReloadListener confHolder = new ReloadListener(); - AllocationFileLoaderService allocLoader = - new AllocationFileLoaderService(confHolder); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); AllocationFileWriter .create() @@ -284,6 +278,8 @@ public void testAllocationFileParsing() throws Exception { .writeToFile(ALLOC_FILE); allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); AllocationConfiguration queueConf = confHolder.allocConf; @@ -431,9 +427,7 @@ public void testAllocationFileParsing() throws Exception { public void testBackwardsCompatibleAllocationFileParsing() throws Exception { Configuration conf = new Configuration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - ReloadListener confHolder = new ReloadListener(); - AllocationFileLoaderService allocLoader = - new AllocationFileLoaderService(confHolder); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); @@ -479,6 +473,8 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception { out.close(); allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); AllocationConfiguration queueConf = confHolder.allocConf; @@ -554,10 +550,10 @@ public void testSimplePlacementPolicyFromConf() throws Exception { out.println(""); out.close(); - ReloadListener confHolder = new ReloadListener(); - AllocationFileLoaderService allocLoader = - new AllocationFileLoaderService(confHolder); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); AllocationConfiguration allocConf = confHolder.allocConf; @@ -588,10 +584,10 @@ public void testQueueAlongsideRoot() throws Exception { out.println(""); out.close(); - ReloadListener confHolder = new ReloadListener(); - AllocationFileLoaderService allocLoader = - new AllocationFileLoaderService(confHolder); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); } @@ -612,10 +608,10 @@ public void testQueueNameContainingPeriods() throws Exception { out.println(""); out.close(); - ReloadListener confHolder = new ReloadListener(); - AllocationFileLoaderService allocLoader = - new AllocationFileLoaderService(confHolder); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); } @@ -636,10 +632,10 @@ public void testQueueNameContainingOnlyWhitespace() throws Exception { out.println(""); out.close(); - ReloadListener confHolder = new ReloadListener(); - AllocationFileLoaderService allocLoader = - new AllocationFileLoaderService(confHolder); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); } @@ -658,10 +654,10 @@ public void testParentTagWithReservation() throws Exception { out.println(""); out.close(); - ReloadListener confHolder = new ReloadListener(); - AllocationFileLoaderService allocLoader = - new AllocationFileLoaderService(confHolder); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); try { allocLoader.reloadAllocations(); } catch (AllocationConfigurationException ex) { @@ -689,10 +685,10 @@ public void testParentWithReservation() throws Exception { out.println(""); out.close(); - ReloadListener confHolder = new ReloadListener(); - AllocationFileLoaderService allocLoader = - new AllocationFileLoaderService(confHolder); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); try { allocLoader.reloadAllocations(); } catch (AllocationConfigurationException ex) { @@ -718,10 +714,10 @@ public void testParentTagWithChild() throws Exception { out.println(""); out.close(); - ReloadListener confHolder = new ReloadListener(); - AllocationFileLoaderService allocLoader = - new AllocationFileLoaderService(confHolder); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); AllocationConfiguration queueConf = confHolder.allocConf; // Check whether queue 'parent' and 'child' are loaded successfully @@ -749,10 +745,10 @@ public void testQueueNameContainingNBWhitespace() throws Exception { out.println(""); out.close(); - ReloadListener confHolder = new ReloadListener(); - AllocationFileLoaderService allocLoader = - new AllocationFileLoaderService(confHolder); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); } @@ -771,10 +767,10 @@ public void testDefaultQueueSchedulingModeIsFIFO() throws Exception { out.println(""); out.close(); - ReloadListener confHolder = new ReloadListener(); - AllocationFileLoaderService allocLoader = - new AllocationFileLoaderService(confHolder); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); } @@ -797,10 +793,10 @@ public void testReservableQueue() throws Exception { out.println(""); out.close(); - ReloadListener confHolder = new ReloadListener(); - AllocationFileLoaderService allocLoader = - new AllocationFileLoaderService(confHolder); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); AllocationConfiguration allocConf = confHolder.allocConf; @@ -857,10 +853,10 @@ public void testReservableCannotBeCombinedWithDynamicUserQueue() out.println(""); out.close(); - ReloadListener confHolder = new ReloadListener(); - AllocationFileLoaderService allocLoader = - new AllocationFileLoaderService(confHolder); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); }