diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 2d63dad..2f091ae 100644 --- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -256,4 +256,18 @@ + + + + + + + + + + + + + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6b8353a..7878ed0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -341,7 +341,15 @@ /**List of directories to store localized files in.*/ public static final String NM_LOCAL_DIRS = NM_PREFIX + "local-dirs"; public static final String DEFAULT_NM_LOCAL_DIRS = "/tmp/nm-local-dir"; - + + /** + * Number of files in each localized directories + * Avoid tuning this too low. + */ + public static final String NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY = + NM_PREFIX + "local.cache.max-files-per-directory"; + public static final int DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY = 8192; + /** Address where the localizer IPC is.*/ public static final String NM_LOCALIZER_ADDRESS = NM_PREFIX + "localizer.address"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 8d391e2..e814f28 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -360,6 +360,25 @@ + It limits the maximum number of files which will be localized + in a single local directory. If the limit is reached then sub-directories + will be created and new files will be localized in them. This property + should never be set to low. The minimum value is 37 which includes 1 file + and 36 sub-directories( 0-9 and then a-z ). + For example; [for public cache] if this is + configured with a value of 40 ( 4 files + 36 sub-directories) and the + local-dir is "/tmp/local-dir1" then it will allow 4 files to be created + directly inside "/tmp/local-dir1/filecache". For future file localization + it will create a sub-directory "0" inside "/tmp/local-dir1/filecache" and + will localize files inside it until it becomes full. In between if a file + is removed from sub-directory (technically from local cache) then that + sub-directory will be reused. + + yarn.nodemanager.local-cache.max-files-per-directory + 8192 + + + Address where the localizer IPC is. yarn.nodemanager.localizer.address 0.0.0.0:8040 diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java new file mode 100644 index 0000000..86cf79e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * {@link LocalCacheDirectoryManager} is used for managing hierarchical + * directories for local cache. It will allow to restrict the number of files in + * a directory to {@link LocalCacheDirectoryManager#PER_DIR_FILE_LIMIT} which + * includes 36 sub-directories (named from 0 to 9 and a to z). Root directory is + * represented by an empty string. It internally maintains a vacant directory + * queue. As soon as the file count for the directory reaches its limit; its + * state is changed to FUll ( {@link Directory#isFull()} should return true) New + * sub directories are not created unless a + * {@link LocalCacheDirectoryManager#getRelativePathForLocalization()} request + * is made and vacant directories are empty. + * + * Note : this structure only returns relative localization path but doesn't + * create one on disk. + */ +public class LocalCacheDirectoryManager { + + private static final Log LOG = LogFactory + .getLog(LocalCacheDirectoryManager.class); + private static int PER_DIR_FILE_LIMIT; + // total 36 = a to z plus 0 to 9 + public static final int DIRECTORIES_PER_LEVEL = 36; + + public static void init(YarnConfiguration conf) { + PER_DIR_FILE_LIMIT = conf.getInt( + YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, + YarnConfiguration.DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY); + if (PER_DIR_FILE_LIMIT <= 36) { + LOG.error(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY + + " parameter is configured with very low value."); + throw new YarnException( + YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY + + " parameter is configured with a value less than 37."); + } else { + LOG.info("per directory file limit = " + PER_DIR_FILE_LIMIT); + } + PER_DIR_FILE_LIMIT = PER_DIR_FILE_LIMIT - 36; + } + + private Queue nonFullDirectories; + private HashMap knownDirectories; + private int totalSubDirectories; + + public LocalCacheDirectoryManager() { + totalSubDirectories = 0; + Directory rootDir = + new Directory(totalSubDirectories); + nonFullDirectories = + new LinkedList(); + knownDirectories = + new HashMap(); + knownDirectories.put("", rootDir); + nonFullDirectories.add(rootDir); + } + + /** + * This method will return relative path from the first available vacant + * directory. + * + * @return {@link String} relative path for localization + */ + public synchronized String getRelativePathForLocalization() { + if (nonFullDirectories.isEmpty()) { + totalSubDirectories++; + Directory newDir = + new Directory(totalSubDirectories); + nonFullDirectories.add(newDir); + knownDirectories.put(newDir.getRelativePath(), newDir); + } + Directory subDir = nonFullDirectories.peek(); + if (subDir.incrementAndGetCount() >= PER_DIR_FILE_LIMIT) { + nonFullDirectories.remove(); + } + return subDir.getRelativePath(); + } + + /** + * This method will reduce the file count for the directory represented by + * path. The root directory of this Local cache directory manager is + * represented by an empty string. + * + * @param {@link String} relative localization path + */ + public synchronized void decrementFileCountForPath(String relPath) { + relPath = relPath == null ? "" : relPath.trim(); + Directory subDir = knownDirectories.get(relPath); + int count = subDir.decrementAndGetCount(); + if (count < PER_DIR_FILE_LIMIT) { + nonFullDirectories.add(subDir); + } + } + + /* + * It limits the number of files and sub directories in the directory to the + * limit LocalCacheDirectoryManager#PER_DIR_FILE_LIMIT. + */ + static class Directory { + + private final String relativePath; + private int fileCount; + + public Directory(int directoryNo) { + fileCount = 0; + if (directoryNo == 0) { + relativePath = ""; + } else { + String tPath = Integer.toString(directoryNo - 1, + DIRECTORIES_PER_LEVEL); + StringBuffer sb = new StringBuffer(); + sb.append(tPath.charAt(0)); + for (int i = 1; i < tPath.length(); i++) { + sb.append(Path.SEPARATOR).append(tPath.charAt(i)); + } + relativePath = sb.toString(); + } + } + + public int incrementAndGetCount() { + return ++fileCount; + } + + public int decrementAndGetCount() { + return --fileCount; + } + + public String getRelativePath() { + return relativePath; + } + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java index b24d8af..5f45118 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -35,6 +36,11 @@ boolean remove(LocalizedResource req, DeletionService delService); + Path getPathForLocalization(LocalResourceRequest req, Path localDirPath); + String getUser(); + // TODO: Remove this in favour of EventHandler.handle + void localizationCompleted(LocalResourceRequest req, boolean success); + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index 01ec383..335ae01 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -28,10 +28,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType; + /** * A collection of {@link LocalizedResource}s all of same @@ -49,17 +50,53 @@ private final String user; private final Dispatcher dispatcher; private final ConcurrentMap localrsrc; + /* + * This flag controls whether this resource tracker uses hierarchical + * directories or not. For PRIVATE and PUBLIC resource trackers it + * will be set whereas for APPLICATION resource tracker it would + * be false. + */ + private final boolean useLocalCacheDirectoryManager; + private ConcurrentHashMap directoryManagers; + /* + * It is used to keep track of resource into hierarchical directory + * while it is getting downloaded. It is useful for reference counting + * in case resource localization fails. + */ + private ConcurrentHashMap + inProgressLocalResourcesMap; + + public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher, + boolean useLocalCacheDirectoryManager) { + this(user, dispatcher, + new ConcurrentHashMap(), + useLocalCacheDirectoryManager); + } public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher) { this(user, dispatcher, - new ConcurrentHashMap()); + new ConcurrentHashMap(), + false); } LocalResourcesTrackerImpl(String user, Dispatcher dispatcher, ConcurrentMap localrsrc) { + this(user, dispatcher, localrsrc, false); + } + + LocalResourcesTrackerImpl(String user, Dispatcher dispatcher, + ConcurrentMap localrsrc, + boolean useLocalCacheDirectoryManager) { this.user = user; this.dispatcher = dispatcher; this.localrsrc = localrsrc; + this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager; + LocalCacheDirectoryManager.init(new YarnConfiguration()); + if ( this.useLocalCacheDirectoryManager) { + directoryManagers = new ConcurrentHashMap(); + inProgressLocalResourcesMap = + new ConcurrentHashMap(); + } } @Override @@ -73,6 +110,7 @@ public void handle(ResourceEvent event) { LOG.info("Resource " + rsrc.getLocalPath() + " is missing, localizing it again"); localrsrc.remove(req); + decrementFileCountForLocalCacheDirectory(req, rsrc); rsrc = null; } if (null == rsrc) { @@ -90,7 +128,51 @@ public void handle(ResourceEvent event) { rsrc.handle(event); } - /** + /* + * Update the file-count statistics for a local cache-directory. + * This will retrieve the localized path for the resource from + * 1) inProgressRsrcMap if the resource was under localization and it + * failed. + * 2) LocalizedResource if the resource is already localized. + * From this path it will identify the local directory under which the + * resource was localized. Then rest of the path will be used to decrement + * file count for the HierarchicalSubDirectory pointing to this relative + * path. + */ + private void decrementFileCountForLocalCacheDirectory(LocalResourceRequest req, + LocalizedResource rsrc) { + if ( useLocalCacheDirectoryManager) { + Path rsrcPath = null; + if (inProgressLocalResourcesMap.containsKey(req)) { + // This happens when localization of a resource fails. + rsrcPath = inProgressLocalResourcesMap.remove(req); + } else if (rsrc != null && rsrc.getLocalPath() != null) { + rsrcPath = rsrc.getLocalPath().getParent(); + } + if (rsrcPath != null) { + Path parentPath = new Path(rsrcPath.toUri().getRawPath()); + while (!directoryManagers.containsKey(parentPath)) { + parentPath = parentPath.getParent(); + if ( parentPath == null) { + return; + } + } + if ( parentPath != null) { + String parentDir = parentPath.toUri().getRawPath().toString(); + LocalCacheDirectoryManager dir = directoryManagers.get(parentPath); + if (rsrcPath.equals(parentPath)) { + dir.decrementFileCountForPath(""); + } else { + dir.decrementFileCountForPath( + rsrcPath.toUri().getRawPath().toString().substring( + parentDir.length() + 1)); + } + } + } + } + } + +/** * This module checks if the resource which was localized is already present * or not * @@ -100,7 +182,8 @@ public void handle(ResourceEvent event) { public boolean isResourcePresent(LocalizedResource rsrc) { boolean ret = true; if (rsrc.getState() == ResourceState.LOCALIZED) { - File file = new File(rsrc.getLocalPath().toUri().getRawPath().toString()); + File file = new File(rsrc.getLocalPath().toUri().getRawPath(). + toString()); if (!file.exists()) { ret = false; } @@ -133,11 +216,11 @@ public boolean remove(LocalizedResource rem, DeletionService delService) { if (ResourceState.LOCALIZED.equals(rsrc.getState())) { delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath())); } + decrementFileCountForLocalCacheDirectory(rem.getRequest(), rsrc); return true; } } - /** * Returns the path up to the random directory component. */ @@ -163,4 +246,49 @@ public String getUser() { public Iterator iterator() { return localrsrc.values().iterator(); } -} + + /** + * @return {@link Path} absolute path for localization which includes + * local directory path and the relative hierarchical path (if + * use local cache directory manager is enabled) + * + * @param {@link LocalResourceRequest} Resource localization request to + * localize the resource. + * @param {@link Path} local directory path + */ + @Override + public Path getPathForLocalization(LocalResourceRequest req, + Path localDirPath) { + if (useLocalCacheDirectoryManager && localDirPath != null) { + + if (!directoryManagers.containsKey(localDirPath)) { + directoryManagers.putIfAbsent(localDirPath, new LocalCacheDirectoryManager()); + } + LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath); + + Path rPath = localDirPath; + String hierarchicalPath = dir.getRelativePathForLocalization(); + // For most of the scenarios we will get root path only which + // is an empty string + if (!hierarchicalPath.isEmpty()) { + rPath = new Path(localDirPath, hierarchicalPath); + } + inProgressLocalResourcesMap.put(req, rPath); + return rPath; + } else { + return localDirPath; + } + } + + @Override + public void localizationCompleted(LocalResourceRequest req, + boolean success) { + if (useLocalCacheDirectoryManager) { + if (!success) { + decrementFileCountForLocalCacheDirectory(req, null); + } else { + inProgressLocalResourcesMap.remove(req); + } + } + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 29971c5..109c988 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -64,6 +64,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -158,7 +159,7 @@ public ResourceLocalizationService(Dispatcher dispatcher, this.delService = delService; this.dirsHandler = dirsHandler; - this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher); + this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher, true); this.cacheCleanup = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder() .setNameFormat("ResourceLocalizationService Cache Cleanup") @@ -620,6 +621,13 @@ public void addResource(LocalizerResourceRequestEvent request) { Path publicDirDestPath = dirsHandler.getLocalPathForWrite( "." + Path.SEPARATOR + ContainerLocalizer.FILECACHE, ContainerLocalizer.getEstimatedSize(resource), true); + Path hierarchicalPath = + publicRsrc.getPathForLocalization(key, publicDirDestPath); + if (!hierarchicalPath.equals(publicDirDestPath)) { + publicDirDestPath = hierarchicalPath; + DiskChecker.checkDir( + new File(publicDirDestPath.toUri().getPath())); + } pending.put(queue.submit(new FSDownload( lfs, null, conf, publicDirDestPath, resource, new Random())), request); @@ -654,19 +662,21 @@ public void run() { assoc.getResource().handle( new ResourceLocalizedEvent(key, local, FileUtil.getDU(new File(local.toUri())))); + publicRsrc.localizationCompleted(key, true); synchronized (attempts) { attempts.remove(key); } } catch (ExecutionException e) { LOG.info("Failed to download rsrc " + assoc.getResource(), e.getCause()); + LocalResourceRequest req = assoc.getResource().getRequest(); dispatcher.getEventHandler().handle( new ContainerResourceFailedEvent( assoc.getContext().getContainerId(), - assoc.getResource().getRequest(), e.getCause())); + req, e.getCause())); + publicRsrc.localizationCompleted(req, false); List reqs; synchronized (attempts) { - LocalResourceRequest req = assoc.getResource().getRequest(); reqs = attempts.get(req); if (null == reqs) { LOG.error("Missing pending list for " + req); @@ -1003,4 +1013,4 @@ private void cleanUpFilesFromSubDir(FileContext lfs, DeletionService del, del.delete(null, dirPath, new Path[] {}); } -} +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java new file mode 100644 index 0000000..7bc4b585 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; + +import junit.framework.Assert; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Test; + +public class TestLocalCacheDirectoryManager { + + @Test(timeout = 10000) + public void testHierarchicalSubDirectoryCreation() { + // setting per directory file limit to 1. + YarnConfiguration conf = + new YarnConfiguration(); + conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37"); + LocalCacheDirectoryManager.init(conf); + + LocalCacheDirectoryManager hDir = new LocalCacheDirectoryManager(); + // Test root directory path = "" + Assert.assertTrue(hDir.getRelativePathForLocalization().isEmpty()); + + for (int i = 1; i <= 72; i++) { + StringBuffer sb = new StringBuffer(); + String num = Integer.toString(i - 1, 36); + sb.append(num.charAt(0)); + for (int j = 1; j < num.length(); j++) { + sb.append(Path.SEPARATOR).append(num.charAt(j)); + } + // testing subdirectory path generation from "0" to "1/z" + Assert.assertEquals(sb.toString(), hDir.getRelativePathForLocalization()); + } + String testPath1 = "4"; + String testPath2 = "2"; + /* + * Making sure directory "4" and "2" becomes non-full so that they are + * reused for future getRelativePathForLocalization() calls in the order + * they are freed. + */ + hDir.decrementFileCountForPath(testPath1); + hDir.decrementFileCountForPath(testPath2); + //After below call directory "4" should become full. + Assert.assertEquals(testPath1, hDir.getRelativePathForLocalization()); + Assert.assertEquals(testPath2, hDir.getRelativePathForLocalization()); + } + + @Test(timeout = 10000) + public void testMinimumPerDirectoryFileLimit() { + YarnConfiguration conf = + new YarnConfiguration(); + conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "1"); + Exception e = null; + try { + LocalCacheDirectoryManager.init(conf); + } catch (Exception e1) { + e = e1; + } + Assert.assertNotNull(e); + Assert.assertEquals(YarnException.class, e.getClass()); + Assert.assertEquals(e.getMessage(), + YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY + + " parameter is configured with a value less than 37."); + + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java index 0e0a472..fa9732f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -50,11 +51,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.Test; -import org.mortbay.log.Log; public class TestLocalResourcesTrackerImpl { - @Test + @Test(timeout=10000) @SuppressWarnings("unchecked") public void test() { String user = "testuser"; @@ -152,7 +152,7 @@ public void test() { } } - @Test + @Test(timeout=10000) @SuppressWarnings("unchecked") public void testConsistency() { String user = "testuser"; @@ -221,6 +221,117 @@ public void testConsistency() { } } + @Test(timeout = 10000) + @SuppressWarnings("unchecked") + public void testHierarchicalLocalCacheDirectories() { + String user = "testuser"; + DrainDispatcher dispatcher = null; + try { + + dispatcher = createDispatcher(new Configuration()); + + EventHandler localizerEventHandler = + mock(EventHandler.class); + EventHandler containerEventHandler = + mock(EventHandler.class); + dispatcher.register(LocalizerEventType.class, localizerEventHandler); + dispatcher.register(ContainerEventType.class, containerEventHandler); + DeletionService mockDelService = mock(DeletionService.class); + + ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1); + LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1, + LocalResourceVisibility.PUBLIC); + ConcurrentMap localrsrc = + new ConcurrentHashMap(); + + LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, + dispatcher, localrsrc, true); + // setting per directory file limit to 1. + YarnConfiguration conf = + new YarnConfiguration(); + conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37"); + LocalCacheDirectoryManager.init(conf); + + LocalizerContext lc1 = new LocalizerContext(user, cId1, null); + + ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1, + LocalResourceVisibility.PUBLIC, lc1); + tracker.handle(reqEvent1); + + // This is a random path. NO File creation will take place at this place. + Path localDir = new Path("/tmp"); + + Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir); + Path hierarchicalPath2 = null; + + // Localization successful. + tracker.localizationCompleted(lr1, true); + + LocalResourceRequest lr3 = createLocalResourceRequest(user, 3, 3, + LocalResourceVisibility.PUBLIC); + + hierarchicalPath2 = tracker.getPathForLocalization(lr3, localDir); + // localization failed. + tracker.localizationCompleted(lr3, false); + + /* + * The path returned for two localization should be different because we + * are limiting one file per sub-directory. + */ + Assert.assertNotSame(hierarchicalPath1, hierarchicalPath2); + + LocalResourceRequest lr2 = createLocalResourceRequest(user, 2, 2, + LocalResourceVisibility.PUBLIC); + ResourceEvent reqEvent2 = new ResourceRequestEvent(lr2, + LocalResourceVisibility.PUBLIC, lc1); + tracker.handle(reqEvent2); + hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir); + tracker.localizationCompleted(lr2, true); + + // Verifying that path created is inside the subdirectory + Assert.assertEquals(hierarchicalPath2.toUri().toString(), + hierarchicalPath1.toUri().toString() + Path.SEPARATOR + "0"); + + // Releasing first resource. + ResourceEvent relEvent1 = new ResourceReleaseEvent(lr1, cId1); + tracker.handle(relEvent1); + + ResourceLocalizedEvent rle = + new ResourceLocalizedEvent(lr1, + new Path(hierarchicalPath1.toUri().toString() + + Path.SEPARATOR + "file1"), 120); + tracker.handle(rle); + + int resources = 0; + Iterator iter = tracker.iterator(); + while (iter.hasNext()) { + iter.next(); + resources++; + } + Assert.assertEquals(2, resources); + iter = tracker.iterator(); + while (iter.hasNext()) { + LocalizedResource rsrc = iter.next(); + if (rsrc.getRefCount() == 0) { + Assert.assertTrue(tracker.remove(rsrc, mockDelService)); + resources--; + } + } + Assert.assertEquals(1, resources); + + // Path is released due to above remove call. Next request should + // once again get the same released path. + tracker.handle(reqEvent1); + hierarchicalPath2 = tracker.getPathForLocalization(lr1, localDir); + Assert.assertEquals(hierarchicalPath1, hierarchicalPath2); + + } finally { + if (dispatcher != null) { + dispatcher.stop(); + } + } + } + private boolean createdummylocalizefile(Path path) { boolean ret = false; File file = new File(path.toUri().getRawPath().toString());