diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java index 2658918..d0818c1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java @@ -42,7 +42,7 @@ /** * Manages a list of local storage directories. */ -class DirectoryCollection { +public class DirectoryCollection { private static final Log LOG = LogFactory.getLog(DirectoryCollection.class); public enum DiskErrorCause { @@ -59,6 +59,10 @@ } } + public static interface DirsChangeListener { + void onDirsChanged(); + } + /** * Returns a merged list which contains all the elements of l1 and l2 * @param l1 the first list to be included @@ -84,6 +88,8 @@ private int goodDirsDiskUtilizationPercentage; + private Set dirsChangeListeners; + /** * Create collection for the directories specified. No check for free space. * @@ -154,6 +160,20 @@ public DirectoryCollection(String[] dirs, : utilizationPercentageCutOff); diskUtilizationSpaceCutoff = utilizationSpaceCutOff < 0 ? 0 : utilizationSpaceCutOff; + + dirsChangeListeners = new HashSet(); + } + + synchronized void registerDirsChangeListener( + DirsChangeListener listener) { + if (dirsChangeListeners.add(listener)) { + listener.onDirsChanged(); + } + } + + synchronized void deregisterDirsChangeListener( + DirsChangeListener listener) { + dirsChangeListeners.remove(listener); } /** @@ -280,6 +300,11 @@ synchronized boolean checkDirs() { } } setGoodDirsDiskUtilizationPercentage(); + if (setChanged) { + for (DirsChangeListener listener : dirsChangeListeners) { + listener.onDirsChanged(); + } + } return setChanged; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java index 493571d..57d4395 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java @@ -38,6 +38,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; /** @@ -192,6 +193,22 @@ protected void serviceStop() throws Exception { super.serviceStop(); } + public void registerLocalDirsChangeListener(DirsChangeListener listener) { + localDirs.registerDirsChangeListener(listener); + } + + public void registerLogDirsChangeListener(DirsChangeListener listener) { + logDirs.registerDirsChangeListener(listener); + } + + public void deregisterLocalDirsChangeListener(DirsChangeListener listener) { + localDirs.deregisterDirsChangeListener(listener); + } + + public void deregisterLogDirsChangeListener(DirsChangeListener listener) { + logDirs.deregisterDirsChangeListener(listener); + } + /** * @return the good/valid local directories based on disks' health */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index cdd252c..b6e85e3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -92,6 +92,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask; +import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; @@ -161,6 +162,8 @@ private LocalResourcesTracker publicRsrc; private LocalDirsHandlerService dirsHandler; + private DirsChangeListener localDirsChangeListener; + private DirsChangeListener logDirsChangeListener; private Context nmContext; /** @@ -254,6 +257,18 @@ public void serviceInit(Configuration conf) throws Exception { localizerTracker = createLocalizerTracker(conf); addService(localizerTracker); dispatcher.register(LocalizerEventType.class, localizerTracker); + localDirsChangeListener = new DirsChangeListener() { + @Override + public void onDirsChanged() { + getInitializedLocalDirs(); + } + }; + logDirsChangeListener = new DirsChangeListener() { + @Override + public void onDirsChanged() { + getInitializedLogDirs(); + } + }; super.serviceInit(conf); } @@ -343,6 +358,8 @@ public void serviceStart() throws Exception { server.getListenerAddress()); LOG.info("Localizer started on port " + server.getPort()); super.serviceStart(); + dirsHandler.registerLocalDirsChangeListener(localDirsChangeListener); + dirsHandler.registerLogDirsChangeListener(logDirsChangeListener); } LocalizerTracker createLocalizerTracker(Configuration conf) { @@ -373,6 +390,8 @@ Server createServer() { @Override public void serviceStop() throws Exception { + dirsHandler.deregisterLocalDirsChangeListener(localDirsChangeListener); + dirsHandler.deregisterLogDirsChangeListener(logDirsChangeListener); if (server != null) { server.stop(); } @@ -812,11 +831,6 @@ public void addResource(LocalizerResourceRequestEvent request) { DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath())); } - // In case this is not a newly initialized nm state, ensure - // initialized local/log dirs similar to LocalizerRunner - getInitializedLocalDirs(); - getInitializedLogDirs(); - // explicitly synchronize pending here to avoid future task // completing and being dequeued before pending updated synchronized (pending) { @@ -1114,8 +1128,6 @@ public void run() { // 1) write credentials to private dir writeCredentials(nmPrivateCTokensPath); // 2) exec initApplication and wait - List localDirs = getInitializedLocalDirs(); - List logDirs = getInitializedLogDirs(); if (dirsHandler.areDisksHealthy()) { exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress, context.getUser(), @@ -1387,7 +1399,8 @@ private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del, * * @return list of initialized local dirs */ - synchronized private List getInitializedLocalDirs() { + @VisibleForTesting + List getInitializedLocalDirs() { List dirs = dirsHandler.getLocalDirs(); List checkFailedDirs = new ArrayList(); for (String dir : dirs) { @@ -1465,7 +1478,7 @@ private boolean checkLocalDir(String localDir) { * * @return list of initialized log dirs */ - synchronized private List getInitializedLogDirs() { + private List getInitializedLogDirs() { List dirs = dirsHandler.getLogDirs(); initializeLogDirs(lfs); return dirs; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java index e4525a5..2fd89c6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -258,4 +259,50 @@ public void testConstructors() { Assert.assertEquals(100.0F, dc.getDiskUtilizationPercentageCutoff(), delta); Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff()); } + + @Test + public void testDirsChangeListener() { + DirsChangeListenerTest listener1 = new DirsChangeListenerTest(); + DirsChangeListenerTest listener2 = new DirsChangeListenerTest(); + DirsChangeListenerTest listener3 = new DirsChangeListenerTest(); + + String dirA = new File(testDir, "dirA").getPath(); + String[] dirs = { dirA }; + DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F); + Assert.assertEquals(1, dc.getGoodDirs().size()); + Assert.assertEquals(listener1.num, 0); + Assert.assertEquals(listener2.num, 0); + Assert.assertEquals(listener3.num, 0); + dc.registerDirsChangeListener(listener1); + dc.registerDirsChangeListener(listener2); + dc.registerDirsChangeListener(listener3); + Assert.assertEquals(listener1.num, 1); + Assert.assertEquals(listener2.num, 1); + Assert.assertEquals(listener3.num, 1); + + dc.deregisterDirsChangeListener(listener3); + dc.checkDirs(); + Assert.assertEquals(0, dc.getGoodDirs().size()); + Assert.assertEquals(listener1.num, 2); + Assert.assertEquals(listener2.num, 2); + Assert.assertEquals(listener3.num, 1); + + dc.deregisterDirsChangeListener(listener2); + dc.setDiskUtilizationPercentageCutoff(100.0F); + dc.checkDirs(); + Assert.assertEquals(1, dc.getGoodDirs().size()); + Assert.assertEquals(listener1.num, 3); + Assert.assertEquals(listener2.num, 2); + Assert.assertEquals(listener3.num, 1); + } + + static class DirsChangeListenerTest implements DirsChangeListener { + public int num = 0; + public DirsChangeListenerTest() { + } + @Override + public void onDirsChanged() { + num++; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 2edaf45..747c4b0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -1098,7 +1098,6 @@ public void testPublicResourceInitializesLocalDir() throws Exception { isA(Configuration.class)); spyService.init(conf); - spyService.start(); final FsPermission defaultPerm = new FsPermission((short)0755); @@ -1110,6 +1109,8 @@ public void testPublicResourceInitializesLocalDir() throws Exception { .mkdir(eq(publicCache),eq(defaultPerm), eq(true)); } + spyService.start(); + final String user = "user0"; // init application final Application app = mock(Application.class); @@ -1131,21 +1132,32 @@ public void testPublicResourceInitializesLocalDir() throws Exception { r.setSeed(seed); // Queue up public resource localization - final LocalResource pubResource = getPublicMockedResource(r); - final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource); + final LocalResource pubResource1 = getPublicMockedResource(r); + final LocalResourceRequest pubReq1 = + new LocalResourceRequest(pubResource1); + + LocalResource pubResource2 = null; + do { + pubResource2 = getPublicMockedResource(r); + } while (pubResource2 == null || pubResource2.equals(pubResource1)); + // above call to make sure we don't get identical resources. + final LocalResourceRequest pubReq2 = + new LocalResourceRequest(pubResource2); + + Set pubRsrcs = new HashSet(); + pubRsrcs.add(pubReq1); + pubRsrcs.add(pubReq2); Map> req = new HashMap>(); - req.put(LocalResourceVisibility.PUBLIC, - Collections.singletonList(pubReq)); - - Set pubRsrcs = new HashSet(); - pubRsrcs.add(pubReq); + req.put(LocalResourceVisibility.PUBLIC, pubRsrcs); spyService.handle(new ContainerLocalizationRequestEvent(c, req)); dispatcher.await(); + verify(spyService, times(1)).getInitializedLocalDirs(); + // verify directory creation for (Path p : localDirs) { p = new Path((new URI(p.toString())).getPath());