diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index f4b6221..3cc0f1b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -775,6 +775,10 @@ public void addResource(LocalizerResourceRequestEvent request) { if (!publicDirDestPath.getParent().equals(publicRootPath)) { DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath())); } + + //List localDirs = getInitializedLocalDirs(); + //List logDirs = getInitializedLogDirs(); + // explicitly synchronize pending here to avoid future task // completing and being dequeued before pending updated synchronized (pending) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 1051e7a..fd01f2f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -940,7 +940,104 @@ public boolean matches(Object o) { } } - @Test(timeout=20000) + @Test + public void testPublicResourceInitializesLocalDir() throws Exception { + + // Setup state to simulate restart NM with existing state meaning no + // directory creation during initialization + NMStateStoreService spyStateStore = spy(nmContext.getNMStateStore()); + // when(spyStateStore.canRecover()).thenReturn(true); + NMContext spyContext = spy(nmContext); + when(spyContext.getNMStateStore()).thenReturn(spyStateStore); + + List localDirs = new ArrayList(); + String[] sDirs = new String[4]; + for (int i = 0; i < 4; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + + + DrainDispatcher dispatcher = new DrainDispatcher(); + EventHandler applicationBus = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, applicationBus); + EventHandler containerBus = mock(EventHandler.class); + dispatcher.register(ContainerEventType.class, containerBus); + + ContainerExecutor exec = mock(ContainerExecutor.class); + DeletionService delService = mock(DeletionService.class); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + dirsHandler.init(conf); + + dispatcher.init(conf); + dispatcher.start(); + + try { + ResourceLocalizationService rawService = + new ResourceLocalizationService(dispatcher, exec, delService, + dirsHandler, spyContext); + ResourceLocalizationService spyService = spy(rawService); + doReturn(mockServer).when(spyService).createServer(); + doReturn(lfs).when(spyService).getLocalFileContext( + isA(Configuration.class)); + + spyService.init(conf); + spyService.start(); + + final String user = "user0"; + // init application + final Application app = mock(Application.class); + final ApplicationId appId = + BuilderUtils.newApplicationId(314159265358979L, 3); + when(app.getUser()).thenReturn(user); + when(app.getAppId()).thenReturn(appId); + spyService.handle(new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); + dispatcher.await(); + + // init container. + final Container c = getMockContainer(appId, 42, user); + + // init resources + Random r = new Random(); + long seed = r.nextLong(); + System.out.println("SEED: " + seed); + r.setSeed(seed); + + // Queue up public resource localization + final LocalResource pubResource = getPublicMockedResource(r); + final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource); + + Map> req = + new HashMap>(); + req.put(LocalResourceVisibility.PUBLIC, + Collections.singletonList(pubReq)); + + Set pubRsrcs = new HashSet(); + pubRsrcs.add(pubReq); + + spyService.handle(new ContainerLocalizationRequestEvent(c, req)); + dispatcher.await(); + + + final FsPermission defaultPerm = new FsPermission((short)0755); + + // verify directory creation + for (Path p : localDirs) { + p = new Path((new URI(p.toString())).getPath()); + Path publicCache = new Path(p, ContainerLocalizer.FILECACHE); + verify(spylfs) + .mkdir(eq(publicCache), + eq(defaultPerm), eq(true)); + } + } finally { + dispatcher.stop(); + } + } + + @Test(timeout=2000000) @SuppressWarnings("unchecked") // mocked generics public void testFailedPublicResource() throws Exception { List localDirs = new ArrayList();