diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index fcd22cb..eb51564 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -338,7 +338,15 @@ /**List of directories to store localized files in.*/ public static final String NM_LOCAL_DIRS = NM_PREFIX + "local-dirs"; public static final String DEFAULT_NM_LOCAL_DIRS = "/tmp/nm-local-dir"; - + + /** + * Number of files in each localized directories + * Avoid tuning this too low. + */ + public static final String NM_LOCAL_CACHE_NUM_FILES_PER_DIRECTORY = + NM_PREFIX + "local.cache.numfilesperdirectory"; + public static final int DEFAULT_NM_LOCAL_CACHE_NUM_FILES_PER_DIRECTORY = 8192; + /** Address where the localizer IPC is.*/ public static final String NM_LOCALIZER_ADDRESS = NM_PREFIX + "localizer.address"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/HierarchicalDirectory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/HierarchicalDirectory.java new file mode 100644 index 0000000..5eb85e5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/HierarchicalDirectory.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.log4j.Logger; + + +/** + * {@link HierarchicalDirectory} is used for managing hiearchical directories. + * It will allow to restrict the number of files in a directory to + * {@link HierarchicalDirectory#PER_DIR_FILE_LIMIT} which includes 36 + * subdirectories (named from 0 to 9 and a to z). The per directory + * file count can be controlled using + * {@link HierarchicalDirectory#setPerDirectoryFileLimit(int)}. Root directory + * is represented by an empty string. It inernally maintains a vacant + * directory queue. As soon as the file count for the directory reaches its + * limit; its state is changed from {@link DirectoryState#VACANT} to + * {@link DirectoryState#FULL}. New sub directories are not created unless + * a {@link HierarchicalDirectory#getHierarchicalPath()} request is made and + * vacant directories are empty. + * + * Note : this structure only returns hierarchical path but doesn't create one + * on disk. + */ +public class HierarchicalDirectory { + + private static int PER_DIR_FILE_LIMIT; + // total 36 = a to z plus 0 to 9 + public static final int DIRECTORIES_PER_LEVEL = 36; + + public static void init (YarnConfiguration conf) { + PER_DIR_FILE_LIMIT = conf.getInt( + YarnConfiguration.NM_LOCAL_CACHE_NUM_FILES_PER_DIRECTORY, + YarnConfiguration.DEFAULT_NM_LOCAL_CACHE_NUM_FILES_PER_DIRECTORY); + Logger LOG = Logger.getLogger(HierarchicalDirectory.class); + if (PER_DIR_FILE_LIMIT <= 36){ + LOG.error(YarnConfiguration.NM_LOCAL_CACHE_NUM_FILES_PER_DIRECTORY + + " parameter is configured with very low value."); + throw new YarnException( + YarnConfiguration.NM_LOCAL_CACHE_NUM_FILES_PER_DIRECTORY + + " parameter is configured with very low value."); + } else { + LOG.info("per directory file limit = " + PER_DIR_FILE_LIMIT); + } + PER_DIR_FILE_LIMIT = PER_DIR_FILE_LIMIT - 36; + } + + private Queue vacantSubDirectories; + private HashMap knownSubDirectories; + private int totalSubDirectories; + + public HierarchicalDirectory() { + totalSubDirectories = 0; + HierarchicalSubDirectory rootDir = + new HierarchicalSubDirectory(totalSubDirectories); + vacantSubDirectories = + new LinkedList(); + knownSubDirectories = + new HashMap(); + knownSubDirectories.put("", rootDir); + vacantSubDirectories.add(rootDir); + } + + /** + * This method will compute hierarchical path and will increment file + * count for the directory corresponding to the path. + * @return {@link String} relative hierarchical path + */ + public synchronized String getHierarchicalPath() { + if (vacantSubDirectories.isEmpty()) { + totalSubDirectories++; + HierarchicalSubDirectory newDir = + new HierarchicalSubDirectory(totalSubDirectories); + vacantSubDirectories.add(newDir); + knownSubDirectories.put(newDir.getRelativePath(), newDir); + } + HierarchicalSubDirectory subDir = vacantSubDirectories.peek(); + if (subDir.incAndGetCount() >= PER_DIR_FILE_LIMIT) { + vacantSubDirectories.poll(); + subDir.setState(DirectoryState.FULL); + } + return subDir.getRelativePath(); + } + + /** + * This method will reduce the file count for the directory + * represented by path. The root directory of this hierarchical directory + * structure is represented by {@link Path#SEPARATOR}. + * @param {{@link String} relative hierarchical path + */ + public synchronized void decFileCountForPath(String relPath) { + relPath = relPath==null ? "" : relPath.trim(); + HierarchicalSubDirectory subDir = knownSubDirectories.get(relPath); + int count = subDir.decAndGetCount(); + if (subDir.getState() == DirectoryState.FULL && + count < PER_DIR_FILE_LIMIT) { + subDir.setState(DirectoryState.VACANT); + vacantSubDirectories.add(subDir); + } + } + + /* + * VACANT : HierarchicalSubDirectory file count not yet reached its limit. + * FULL : HierarchicalSubDirectory file count reached its limit. + */ + private enum DirectoryState { VACANT, FULL }; + + /* + * It limits the number of files and sub directories in the directory + * to the limit HierarchicalDirectory#PER_DIR_FILE_LIMIT. + */ + private class HierarchicalSubDirectory { + private int fileCount; + private final String relativePath; + private DirectoryState state; + + public HierarchicalSubDirectory(int directoryNo) { + fileCount = 0; + state = DirectoryState.VACANT; + if ( directoryNo == 0) { + relativePath = ""; + } else { + String tPath = Integer.toString(directoryNo - 1, + DIRECTORIES_PER_LEVEL); + StringBuffer sb = new StringBuffer(); + sb.append(tPath.charAt(0)); + for ( int i = 1 ; i < tPath.length(); i++) { + sb.append(Path.SEPARATOR).append(tPath.charAt(i)); + } + relativePath = sb.toString(); + } + } + + public int incAndGetCount() { + return ++fileCount; + } + + public int decAndGetCount() { + return --fileCount; + } + + public String getRelativePath() { + return relativePath; + } + + public DirectoryState getState() { + return state; + } + + public void setState( DirectoryState state) { + this.state = state; + } + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java index b24d8af..ab8dad5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -35,6 +36,10 @@ boolean remove(LocalizedResource req, DeletionService delService); + Path getPathForLocalResource(LocalResourceRequest req, Path localDirPath); + String getUser(); + void localizationCompleted(LocalResourceRequest req, boolean success); + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index 01ec383..67382fa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -28,10 +28,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType; + /** * A collection of {@link LocalizedResource}s all of same @@ -49,17 +50,45 @@ private final String user; private final Dispatcher dispatcher; private final ConcurrentMap localrsrc; + private final boolean maintainHierarchicalDir; + private ConcurrentHashMap directoryMap; + /* + * It is used to keep track of resource into hierarchical directory + * while it is getting downloaded. + */ + private ConcurrentHashMap inProgressRsrcMap; + + public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher, + boolean maintainHierarchicalDir) { + this(user, dispatcher, + new ConcurrentHashMap(), + maintainHierarchicalDir); + } public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher) { this(user, dispatcher, - new ConcurrentHashMap()); + new ConcurrentHashMap(), + false); } LocalResourcesTrackerImpl(String user, Dispatcher dispatcher, ConcurrentMap localrsrc) { + this(user, dispatcher, localrsrc, false); + } + + LocalResourcesTrackerImpl(String user, Dispatcher dispatcher, + ConcurrentMap localrsrc, + boolean maintainHierarchicalDir) { this.user = user; this.dispatcher = dispatcher; this.localrsrc = localrsrc; + this.maintainHierarchicalDir = maintainHierarchicalDir; + HierarchicalDirectory.init(new YarnConfiguration()); + if ( this.maintainHierarchicalDir) { + directoryMap = new ConcurrentHashMap(); + inProgressRsrcMap = + new ConcurrentHashMap(); + } } @Override @@ -73,6 +102,7 @@ public void handle(ResourceEvent event) { LOG.info("Resource " + rsrc.getLocalPath() + " is missing, localizing it again"); localrsrc.remove(req); + decFileCountForHierarchicalPath(req, rsrc); rsrc = null; } if (null == rsrc) { @@ -90,7 +120,48 @@ public void handle(ResourceEvent event) { rsrc.handle(event); } - /** + /* + * This will retrieve the localized path for the resource from + * 1) inProgressRsrcMap if the resource was under localization and it + * failed. + * 2) LocalizedResource if the resource is already localized. + * From this path it will identiy the local directory under which the + * resource was localized. Then rest of the path will be used to decrement + * file count for the HierarchicalSubDirectory pointing to this relative + * path. + */ + private void decFileCountForHierarchicalPath(LocalResourceRequest req, + LocalizedResource rsrc) { + if ( maintainHierarchicalDir) { + Path rsrcPath = null; + if (inProgressRsrcMap.containsKey(req)) { + rsrcPath = inProgressRsrcMap.remove(req); + } + if (rsrcPath == null && rsrc != null && rsrc.getLocalPath() != null) { + rsrcPath = rsrc.getLocalPath().getParent(); + } + if ( rsrcPath != null) { + Path parentPath = new Path(rsrcPath.toUri().getRawPath()); + while (!directoryMap.containsKey(parentPath)) { + parentPath = parentPath.getParent(); + if ( parentPath == null) { + return; + } + } + String parentDir = parentPath.toUri().getRawPath().toString(); + HierarchicalDirectory dir = directoryMap.get(parentPath); + if (rsrcPath.equals(parentPath)) { + dir.decFileCountForPath(""); + } else { + dir.decFileCountForPath( + rsrcPath.toUri().getRawPath().toString().substring( + parentDir.length() + 1)); + } + } + } + } + +/** * This module checks if the resource which was localized is already present * or not * @@ -100,7 +171,8 @@ public void handle(ResourceEvent event) { public boolean isResourcePresent(LocalizedResource rsrc) { boolean ret = true; if (rsrc.getState() == ResourceState.LOCALIZED) { - File file = new File(rsrc.getLocalPath().toUri().getRawPath().toString()); + File file = new File(rsrc.getLocalPath().toUri().getRawPath(). + toString()); if (!file.exists()) { ret = false; } @@ -133,11 +205,11 @@ public boolean remove(LocalizedResource rem, DeletionService delService) { if (ResourceState.LOCALIZED.equals(rsrc.getState())) { delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath())); } + decFileCountForHierarchicalPath(rem.getRequest(), rsrc); return true; } } - /** * Returns the path up to the random directory component. */ @@ -163,4 +235,48 @@ public String getUser() { public Iterator iterator() { return localrsrc.values().iterator(); } -} + + /** + * @return {@link Path} absolute path for localization which includes + * local directory path and the relative hierarchical path (if + * hierarchical directory is enabled) + * + * @param {@link LocalResourceRequest} Resource localization request to + * localize the resource. + * @param {@link Path} local directory path + */ + @Override + public Path getPathForLocalResource(LocalResourceRequest req, + Path localDirPath) { + if (maintainHierarchicalDir) { + if (inProgressRsrcMap.containsKey(req)) { + return inProgressRsrcMap.get(req); + } + if (!directoryMap.containsKey(localDirPath)) { + directoryMap.putIfAbsent(localDirPath, new HierarchicalDirectory()); + } + HierarchicalDirectory dir = directoryMap.get(localDirPath); + Path rPath = localDirPath; + String hierarchicalPath = dir.getHierarchicalPath(); + if (!hierarchicalPath.isEmpty()) { + rPath = new Path(localDirPath, hierarchicalPath); + } + inProgressRsrcMap.putIfAbsent(req, rPath); + return rPath; + } else { + return localDirPath; + } + } + + @Override + public void localizationCompleted(LocalResourceRequest req, + boolean success) { + if (maintainHierarchicalDir) { + if (!success) { + decFileCountForHierarchicalPath(req, null); + } else { + inProgressRsrcMap.remove(req); + } + } + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 9ca812e..42541ec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -61,6 +61,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -155,7 +156,7 @@ public ResourceLocalizationService(Dispatcher dispatcher, this.delService = delService; this.dirsHandler = dirsHandler; - this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher); + this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher, true); this.cacheCleanup = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder() .setNameFormat("ResourceLocalizationService Cache Cleanup") @@ -615,6 +616,14 @@ public void addResource(LocalizerResourceRequestEvent request) { Path publicDirDestPath = dirsHandler.getLocalPathForWrite( "." + Path.SEPARATOR + ContainerLocalizer.FILECACHE, ContainerLocalizer.getEstimatedSize(resource), true); + Path hierarchicalPath = + getLocalResourcesTracker(LocalResourceVisibility.PUBLIC, null, null) + .getPathForLocalResource(key, publicDirDestPath); + if (!hierarchicalPath.equals(publicDirDestPath)) { + publicDirDestPath = hierarchicalPath; + DiskChecker.checkDir( + new File(publicDirDestPath.toUri().getPath())); + } pending.put(queue.submit(new FSDownload( lfs, null, conf, publicDirDestPath, resource, new Random())), request); @@ -650,6 +659,7 @@ public void run() { new ResourceLocalizedEvent(key, local, FileUtil.getDU(new File(local.toUri())))); synchronized (attempts) { + publicRsrc.localizationCompleted(key, true); attempts.remove(key); } } catch (ExecutionException e) { @@ -662,6 +672,7 @@ public void run() { List reqs; synchronized (attempts) { LocalResourceRequest req = assoc.getResource().getRequest(); + publicRsrc.localizationCompleted(req, false); reqs = attempts.get(req); if (null == reqs) { LOG.error("Missing pending list for " + req); @@ -926,4 +937,4 @@ public void run() { } -} +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestHierarchicalDirectory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestHierarchicalDirectory.java new file mode 100644 index 0000000..d7c9292 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestHierarchicalDirectory.java @@ -0,0 +1,76 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; + +import junit.framework.Assert; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Test; + +public class TestHierarchicalDirectory { + + @Test(timeout=10000) + public void testHierarchicalSubDirectoryCreation() { + //setting per directory file limit to 1. + YarnConfiguration conf = + new YarnConfiguration(); + conf.set(YarnConfiguration.NM_LOCAL_CACHE_NUM_FILES_PER_DIRECTORY, "37"); + HierarchicalDirectory.init(conf); + + HierarchicalDirectory hDir = new HierarchicalDirectory(); + Assert.assertTrue(hDir.getHierarchicalPath().isEmpty()); + + for ( int i = 1 ; i < 72; i++) { + StringBuffer sb = new StringBuffer(); + String num = Integer.toString(i-1, 36); + sb.append(num.charAt(0)); + for (int j=1; j < num.length(); j++) { + sb.append(Path.SEPARATOR).append(num.charAt(j)); + } + Assert.assertEquals(sb.toString(), hDir.getHierarchicalPath()); + } + String testPath1 = "4"; + String testPath2 = "2"; + hDir.decFileCountForPath(testPath1); + hDir.decFileCountForPath(testPath2); + Assert.assertEquals(testPath1, hDir.getHierarchicalPath()); + Assert.assertEquals(testPath2, hDir.getHierarchicalPath()); + } + + @Test(timeout=10000) + public void testMinimumPerDirectoryFileLimit() { + YarnConfiguration conf = + new YarnConfiguration(); + conf.set(YarnConfiguration.NM_LOCAL_CACHE_NUM_FILES_PER_DIRECTORY, "1"); + Exception e = null; + try { + HierarchicalDirectory.init(conf); + } catch (Exception e1) { + e = e1; + } + Assert.assertNotNull(e); + Assert.assertEquals(YarnException.class, e.getClass()); + Assert.assertEquals(e.getMessage(), + YarnConfiguration.NM_LOCAL_CACHE_NUM_FILES_PER_DIRECTORY + + " parameter is configured with very low value."); + + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java index 0e0a472..552f4bc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -50,11 +51,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.Test; -import org.mortbay.log.Log; public class TestLocalResourcesTrackerImpl { - @Test + @Test(timeout=10000) @SuppressWarnings("unchecked") public void test() { String user = "testuser"; @@ -152,7 +152,7 @@ public void test() { } } - @Test + @Test(timeout=10000) @SuppressWarnings("unchecked") public void testConsistency() { String user = "testuser"; @@ -221,6 +221,110 @@ public void testConsistency() { } } + @Test(timeout=10000) + @SuppressWarnings("unchecked") + public void testHierarchicalDiretories() { + String user = "testuser"; + DrainDispatcher dispatcher = null; + try { + + dispatcher = createDispatcher(new Configuration()); + + EventHandler localizerEventHandler = + mock(EventHandler.class); + EventHandler containerEvenetHandler = + mock(EventHandler.class); + dispatcher.register(LocalizerEventType.class, localizerEventHandler); + dispatcher.register(ContainerEventType.class, containerEvenetHandler); + DeletionService mockDelService = mock(DeletionService.class); + + ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1); + LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1, + LocalResourceVisibility.PUBLIC); + ConcurrentMap localrsrc = + new ConcurrentHashMap(); + + LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, + dispatcher, localrsrc, true); + //setting per directory file limit to 1. + YarnConfiguration conf = + new YarnConfiguration(); + conf.set(YarnConfiguration.NM_LOCAL_CACHE_NUM_FILES_PER_DIRECTORY, "37"); + HierarchicalDirectory.init(conf); + + LocalizerContext lc1 = new LocalizerContext(user, cId1, null); + + ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1, + LocalResourceVisibility.PUBLIC, lc1); + tracker.handle(reqEvent1); + + //This is a random path. NO File creation will take place at this place. + Path localDir = new Path ("/tmp"); + + Path hierarchicalPath1 = tracker.getPathForLocalResource(lr1, localDir); + Path hierarchicalPath2 = tracker.getPathForLocalResource(lr1, localDir); + + // Verifying that path returned is identical for multiple calls + Assert.assertTrue(hierarchicalPath1.equals(hierarchicalPath2)); + tracker.localizationCompleted(lr1, true); + + hierarchicalPath2 = tracker.getPathForLocalResource(lr1, localDir); + tracker.localizationCompleted(lr1, false); + + Assert.assertNotSame(hierarchicalPath1, hierarchicalPath2); + + LocalResourceRequest lr2 = createLocalResourceRequest(user, 2, 2, + LocalResourceVisibility.PUBLIC); + ResourceEvent reqEvent2 = new ResourceRequestEvent(lr2, + LocalResourceVisibility.PUBLIC, lc1); + tracker.handle(reqEvent2); + hierarchicalPath2 = tracker.getPathForLocalResource(lr2, localDir); + tracker.localizationCompleted(lr2, true); + + // Verifying that path created is inside the subdirectory + Assert.assertEquals(hierarchicalPath2.toUri().toString(), + hierarchicalPath1.toUri().toString() + Path.SEPARATOR + "0"); + + // Releasing first resource. + ResourceEvent relEvent1 = new ResourceReleaseEvent(lr1,cId1); + tracker.handle(relEvent1); + + ResourceLocalizedEvent rle = + new ResourceLocalizedEvent(lr1, + new Path(hierarchicalPath1.toUri().toString() + + Path.SEPARATOR + "file1"),120); + tracker.handle(rle); + + int resources = 0; + Iterator iter = tracker.iterator(); + while (iter.hasNext()) { + iter.next(); + resources++; + } + Assert.assertEquals(2, resources); + iter = tracker.iterator(); + while (iter.hasNext()) { + LocalizedResource rsrc = iter.next(); + if ( rsrc.getRefCount() == 0) { + Assert.assertTrue(tracker.remove(rsrc, mockDelService)); + resources--; + } + } + Assert.assertEquals(1, resources); + + // Path is released due to above remove call. Next request should + // once again get the same released path. + tracker.handle(reqEvent1); + hierarchicalPath2 = tracker.getPathForLocalResource(lr1, localDir); + Assert.assertEquals(hierarchicalPath1, hierarchicalPath2); + + } finally { + if (dispatcher != null) { + dispatcher.stop(); + } + } + } + private boolean createdummylocalizefile(Path path) { boolean ret = false; File file = new File(path.toUri().getRawPath().toString());