diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 7b9873a..439dc0f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -27,6 +27,7 @@ import java.net.InetSocketAddress; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; @@ -64,6 +65,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.DiskChecker; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -479,18 +481,15 @@ LocalResourcesTracker getLocalResourcesTracker( } private String getUserFileCachePath(String user) { - String path = - "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR - + user + Path.SEPARATOR + ContainerLocalizer.FILECACHE; - return path; + return StringUtils.join(Path.SEPARATOR, Arrays.asList(".", + ContainerLocalizer.USERCACHE, user, ContainerLocalizer.FILECACHE)); + } private String getUserAppCachePath(String user, String appId) { - String path = - "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR - + user + Path.SEPARATOR + ContainerLocalizer.APPCACHE - + Path.SEPARATOR + appId; - return path; + return StringUtils.join(Path.SEPARATOR, Arrays.asList(".", + ContainerLocalizer.USERCACHE, user, ContainerLocalizer.APPCACHE, appId, + ContainerLocalizer.FILECACHE)); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 77bde7b..9c27a30 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -34,14 +34,15 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.times; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -71,6 +72,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -97,6 +99,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerRunner; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; @@ -104,6 +107,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.BeforeClass; @@ -677,6 +682,154 @@ public Void answer(InvocationOnMock invocation) throws IOException { } } + @Test(timeout = 10000) + public void testLocalResourcePath() throws IOException { + + // test the local path where application and user cache files will be + // localized. + + DrainDispatcher dispatcher1 = null, dispatcher2 = null; + try { + dispatcher1 = new DrainDispatcher(); + dispatcher2 = new DrainDispatcher(); + + String user = "testuser"; + // Creating a mocked localizer context. Not providing container-id as it + // is not checked in findNextResource call. + LocalizerContext cxt = + new LocalizerContext(user, BuilderUtils.newContainerId(1, 1, 1, 1), + null); + + // mocked Resource Localization Service + Configuration conf = new Configuration(); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + final FileContext lfs = FileContext.getFileContext(spylfs, conf); + // We don't want files to be created + doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class), + anyBoolean()); + + // creating one local directory + List localDirs = new ArrayList(); + String[] sDirs = new String[1]; + for (int i = 0; i < 1; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + // setting log directory. + String logDir = + lfs.makeQualified(new Path(basedir, "logdir ")).toString(); + conf.set(YarnConfiguration.NM_LOG_DIRS, logDir); + + LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService(); + localDirHandler.init(conf); + ResourceLocalizationService rls = + new ResourceLocalizationService(dispatcher2, + mock(ContainerExecutor.class), mock(DeletionService.class), + localDirHandler); + rls.init(conf); + + ApplicationId appID = BuilderUtils.newApplicationId(1, 1); + // Creating mocked application + Application app = mock(Application.class); + when(app.getUser()).thenReturn(user); + when(app.getAppId()).thenReturn(appID); + + // Sending INIT_APPLICATION_RESOURCES event so that local resources + // tracker will be created one for user and another one for application + ApplicationLocalizationEvent appLocalizationEvent = + new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app); + rls.handle(appLocalizationEvent); + + // Creating localizer runner for container + LocalizerRunner localizerRunner1 = + rls.new LocalizerRunner(cxt, "container-1"); + // Setting up LocalizedResource and corresponding request + LocalResourceRequest req1 = + new LocalResourceRequest(new Path("file:///tmp"), 123L, + LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, ""); + + // Verifying PRIVATE resource cache file path + LocalizedResource lr1 = new LocalizedResource(req1, dispatcher1); + // Creating a request for the localized resource. + ResourceRequestEvent reqEvent1 = + new ResourceRequestEvent(req1, LocalResourceVisibility.PRIVATE, cxt); + lr1.handle(reqEvent1); + + // adding resource request event to pending list of container-1 + LocalizerResourceRequestEvent localizerEvent1 = + new LocalizerResourceRequestEvent(lr1, + LocalResourceVisibility.PRIVATE, cxt, null); + localizerRunner1.pending.add(localizerEvent1); + + List statusList = + new ArrayList(); + LocalizerHeartbeatResponse response1 = + localizerRunner1.update(statusList); + + String userCachePath = + StringUtils.join(Path.SEPARATOR, Arrays.asList(localDirs.get(0) + .toUri().getRawPath(), ContainerLocalizer.USERCACHE, user, + ContainerLocalizer.FILECACHE)); + // Validating user cache path + // returned destinationPath = user cache path + random number + Path destinationDirectory = + new Path(response1.getResourceSpecs().get(0) + .getDestinationDirectory().getFile()); + Assert.assertEquals(userCachePath, destinationDirectory.getParent() + .toUri().toString()); + + // Removing private resource + localizerRunner1.pending.remove(0); + + // Verifying application resource cache file path + // Setting up LocalizedResource and corresponding request + LocalResourceRequest req2 = + new LocalResourceRequest(new Path("file:///tmp"), 123L, + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, ""); + + LocalizedResource lr2 = new LocalizedResource(req2, dispatcher1); + + // Creating a request for the localized resource. + ResourceRequestEvent reqEvent2 = + new ResourceRequestEvent(req2, LocalResourceVisibility.APPLICATION, + cxt); + lr2.handle(reqEvent2); + + // adding resource request event to pending list of container-1 + LocalizerResourceRequestEvent localizerEvent2 = + new LocalizerResourceRequestEvent(lr2, + LocalResourceVisibility.APPLICATION, cxt, null); + localizerRunner1.pending.add(localizerEvent2); + + LocalizerHeartbeatResponse response2 = + localizerRunner1.update(statusList); + + String userAppCachePath = + StringUtils.join(Path.SEPARATOR, Arrays.asList(localDirs.get(0) + .toUri().getRawPath(), ContainerLocalizer.USERCACHE, user, + ContainerLocalizer.APPCACHE, appID.toString(), + ContainerLocalizer.FILECACHE)); + // Validating application cache path + // returned destinationPath = userApplication cache path + random number + destinationDirectory = + new Path(response2.getResourceSpecs().get(0) + .getDestinationDirectory().getFile()); + Assert.assertEquals(userAppCachePath, destinationDirectory.getParent() + .toUri().toString()); + + } finally { + if (dispatcher1 != null) { + dispatcher1.stop(); + } + if (dispatcher2 != null) { + dispatcher2.stop(); + } + } + } + private static URL getPath(String path) { URL url = BuilderUtils.newURL("file", null, 0, path); return url;