diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 2d63dad..2f091ae 100644
--- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -256,4 +256,18 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 6b8353a..7878ed0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -341,7 +341,15 @@
/**List of directories to store localized files in.*/
public static final String NM_LOCAL_DIRS = NM_PREFIX + "local-dirs";
public static final String DEFAULT_NM_LOCAL_DIRS = "/tmp/nm-local-dir";
-
+
+ /**
+ * Number of files in each localized directories
+ * Avoid tuning this too low.
+ */
+ public static final String NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY =
+ NM_PREFIX + "local.cache.max-files-per-directory";
+ public static final int DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY = 8192;
+
/** Address where the localizer IPC is.*/
public static final String NM_LOCALIZER_ADDRESS =
NM_PREFIX + "localizer.address";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 8d391e2..e814f28 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -360,6 +360,25 @@
+ It limits the maximum number of files which will be localized
+ in a single local directory. If the limit is reached then sub-directories
+ will be created and new files will be localized in them. This property
+ should never be set to low. The minimum value is 37 which includes 1 file
+ and 36 sub-directories( 0-9 and then a-z ).
+ For example; [for public cache] if this is
+ configured with a value of 40 ( 4 files + 36 sub-directories) and the
+ local-dir is "/tmp/local-dir1" then it will allow 4 files to be created
+ directly inside "/tmp/local-dir1/filecache". For future file localization
+ it will create a sub-directory "0" inside "/tmp/local-dir1/filecache" and
+ will localize files inside it until it becomes full. In between if a file
+ is removed from sub-directory (technically from local cache) then that
+ sub-directory will be reused.
+
+ yarn.nodemanager.local-cache.max-files-per-directory
+ 8192
+
+
+
Address where the localizer IPC is.
yarn.nodemanager.localizer.address
0.0.0.0:8040
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
new file mode 100644
index 0000000..86cf79e
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * {@link LocalCacheDirectoryManager} is used for managing hierarchical
+ * directories for local cache. It will allow to restrict the number of files in
+ * a directory to {@link LocalCacheDirectoryManager#PER_DIR_FILE_LIMIT} which
+ * includes 36 sub-directories (named from 0 to 9 and a to z). Root directory is
+ * represented by an empty string. It internally maintains a vacant directory
+ * queue. As soon as the file count for the directory reaches its limit; its
+ * state is changed to FUll ( {@link Directory#isFull()} should return true) New
+ * sub directories are not created unless a
+ * {@link LocalCacheDirectoryManager#getRelativePathForLocalization()} request
+ * is made and vacant directories are empty.
+ *
+ * Note : this structure only returns relative localization path but doesn't
+ * create one on disk.
+ */
+public class LocalCacheDirectoryManager {
+
+ private static final Log LOG = LogFactory
+ .getLog(LocalCacheDirectoryManager.class);
+ private static int PER_DIR_FILE_LIMIT;
+ // total 36 = a to z plus 0 to 9
+ public static final int DIRECTORIES_PER_LEVEL = 36;
+
+ public static void init(YarnConfiguration conf) {
+ PER_DIR_FILE_LIMIT = conf.getInt(
+ YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY,
+ YarnConfiguration.DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY);
+ if (PER_DIR_FILE_LIMIT <= 36) {
+ LOG.error(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY +
+ " parameter is configured with very low value.");
+ throw new YarnException(
+ YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY +
+ " parameter is configured with a value less than 37.");
+ } else {
+ LOG.info("per directory file limit = " + PER_DIR_FILE_LIMIT);
+ }
+ PER_DIR_FILE_LIMIT = PER_DIR_FILE_LIMIT - 36;
+ }
+
+ private Queue nonFullDirectories;
+ private HashMap knownDirectories;
+ private int totalSubDirectories;
+
+ public LocalCacheDirectoryManager() {
+ totalSubDirectories = 0;
+ Directory rootDir =
+ new Directory(totalSubDirectories);
+ nonFullDirectories =
+ new LinkedList();
+ knownDirectories =
+ new HashMap();
+ knownDirectories.put("", rootDir);
+ nonFullDirectories.add(rootDir);
+ }
+
+ /**
+ * This method will return relative path from the first available vacant
+ * directory.
+ *
+ * @return {@link String} relative path for localization
+ */
+ public synchronized String getRelativePathForLocalization() {
+ if (nonFullDirectories.isEmpty()) {
+ totalSubDirectories++;
+ Directory newDir =
+ new Directory(totalSubDirectories);
+ nonFullDirectories.add(newDir);
+ knownDirectories.put(newDir.getRelativePath(), newDir);
+ }
+ Directory subDir = nonFullDirectories.peek();
+ if (subDir.incrementAndGetCount() >= PER_DIR_FILE_LIMIT) {
+ nonFullDirectories.remove();
+ }
+ return subDir.getRelativePath();
+ }
+
+ /**
+ * This method will reduce the file count for the directory represented by
+ * path. The root directory of this Local cache directory manager is
+ * represented by an empty string.
+ *
+ * @param {@link String} relative localization path
+ */
+ public synchronized void decrementFileCountForPath(String relPath) {
+ relPath = relPath == null ? "" : relPath.trim();
+ Directory subDir = knownDirectories.get(relPath);
+ int count = subDir.decrementAndGetCount();
+ if (count < PER_DIR_FILE_LIMIT) {
+ nonFullDirectories.add(subDir);
+ }
+ }
+
+ /*
+ * It limits the number of files and sub directories in the directory to the
+ * limit LocalCacheDirectoryManager#PER_DIR_FILE_LIMIT.
+ */
+ static class Directory {
+
+ private final String relativePath;
+ private int fileCount;
+
+ public Directory(int directoryNo) {
+ fileCount = 0;
+ if (directoryNo == 0) {
+ relativePath = "";
+ } else {
+ String tPath = Integer.toString(directoryNo - 1,
+ DIRECTORIES_PER_LEVEL);
+ StringBuffer sb = new StringBuffer();
+ sb.append(tPath.charAt(0));
+ for (int i = 1; i < tPath.length(); i++) {
+ sb.append(Path.SEPARATOR).append(tPath.charAt(i));
+ }
+ relativePath = sb.toString();
+ }
+ }
+
+ public int incrementAndGetCount() {
+ return ++fileCount;
+ }
+
+ public int decrementAndGetCount() {
+ return --fileCount;
+ }
+
+ public String getRelativePath() {
+ return relativePath;
+ }
+ }
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
index b24d8af..5f45118 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -35,6 +36,11 @@
boolean remove(LocalizedResource req, DeletionService delService);
+ Path getPathForLocalization(LocalResourceRequest req, Path localDirPath);
+
String getUser();
+ // TODO: Remove this in favour of EventHandler.handle
+ void localizationCompleted(LocalResourceRequest req, boolean success);
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
index 01ec383..335ae01 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
@@ -28,10 +28,11 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
+
/**
* A collection of {@link LocalizedResource}s all of same
@@ -49,17 +50,53 @@
private final String user;
private final Dispatcher dispatcher;
private final ConcurrentMap localrsrc;
+ /*
+ * This flag controls whether this resource tracker uses hierarchical
+ * directories or not. For PRIVATE and PUBLIC resource trackers it
+ * will be set whereas for APPLICATION resource tracker it would
+ * be false.
+ */
+ private final boolean useLocalCacheDirectoryManager;
+ private ConcurrentHashMap directoryManagers;
+ /*
+ * It is used to keep track of resource into hierarchical directory
+ * while it is getting downloaded. It is useful for reference counting
+ * in case resource localization fails.
+ */
+ private ConcurrentHashMap
+ inProgressLocalResourcesMap;
+
+ public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
+ boolean useLocalCacheDirectoryManager) {
+ this(user, dispatcher,
+ new ConcurrentHashMap(),
+ useLocalCacheDirectoryManager);
+ }
public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher) {
this(user, dispatcher,
- new ConcurrentHashMap());
+ new ConcurrentHashMap(),
+ false);
}
LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
ConcurrentMap localrsrc) {
+ this(user, dispatcher, localrsrc, false);
+ }
+
+ LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
+ ConcurrentMap localrsrc,
+ boolean useLocalCacheDirectoryManager) {
this.user = user;
this.dispatcher = dispatcher;
this.localrsrc = localrsrc;
+ this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager;
+ LocalCacheDirectoryManager.init(new YarnConfiguration());
+ if ( this.useLocalCacheDirectoryManager) {
+ directoryManagers = new ConcurrentHashMap();
+ inProgressLocalResourcesMap =
+ new ConcurrentHashMap();
+ }
}
@Override
@@ -73,6 +110,7 @@ public void handle(ResourceEvent event) {
LOG.info("Resource " + rsrc.getLocalPath()
+ " is missing, localizing it again");
localrsrc.remove(req);
+ decrementFileCountForLocalCacheDirectory(req, rsrc);
rsrc = null;
}
if (null == rsrc) {
@@ -90,7 +128,51 @@ public void handle(ResourceEvent event) {
rsrc.handle(event);
}
- /**
+ /*
+ * Update the file-count statistics for a local cache-directory.
+ * This will retrieve the localized path for the resource from
+ * 1) inProgressRsrcMap if the resource was under localization and it
+ * failed.
+ * 2) LocalizedResource if the resource is already localized.
+ * From this path it will identify the local directory under which the
+ * resource was localized. Then rest of the path will be used to decrement
+ * file count for the HierarchicalSubDirectory pointing to this relative
+ * path.
+ */
+ private void decrementFileCountForLocalCacheDirectory(LocalResourceRequest req,
+ LocalizedResource rsrc) {
+ if ( useLocalCacheDirectoryManager) {
+ Path rsrcPath = null;
+ if (inProgressLocalResourcesMap.containsKey(req)) {
+ // This happens when localization of a resource fails.
+ rsrcPath = inProgressLocalResourcesMap.remove(req);
+ } else if (rsrc != null && rsrc.getLocalPath() != null) {
+ rsrcPath = rsrc.getLocalPath().getParent();
+ }
+ if (rsrcPath != null) {
+ Path parentPath = new Path(rsrcPath.toUri().getRawPath());
+ while (!directoryManagers.containsKey(parentPath)) {
+ parentPath = parentPath.getParent();
+ if ( parentPath == null) {
+ return;
+ }
+ }
+ if ( parentPath != null) {
+ String parentDir = parentPath.toUri().getRawPath().toString();
+ LocalCacheDirectoryManager dir = directoryManagers.get(parentPath);
+ if (rsrcPath.equals(parentPath)) {
+ dir.decrementFileCountForPath("");
+ } else {
+ dir.decrementFileCountForPath(
+ rsrcPath.toUri().getRawPath().toString().substring(
+ parentDir.length() + 1));
+ }
+ }
+ }
+ }
+ }
+
+/**
* This module checks if the resource which was localized is already present
* or not
*
@@ -100,7 +182,8 @@ public void handle(ResourceEvent event) {
public boolean isResourcePresent(LocalizedResource rsrc) {
boolean ret = true;
if (rsrc.getState() == ResourceState.LOCALIZED) {
- File file = new File(rsrc.getLocalPath().toUri().getRawPath().toString());
+ File file = new File(rsrc.getLocalPath().toUri().getRawPath().
+ toString());
if (!file.exists()) {
ret = false;
}
@@ -133,11 +216,11 @@ public boolean remove(LocalizedResource rem, DeletionService delService) {
if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
}
+ decrementFileCountForLocalCacheDirectory(rem.getRequest(), rsrc);
return true;
}
}
-
/**
* Returns the path up to the random directory component.
*/
@@ -163,4 +246,49 @@ public String getUser() {
public Iterator iterator() {
return localrsrc.values().iterator();
}
-}
+
+ /**
+ * @return {@link Path} absolute path for localization which includes
+ * local directory path and the relative hierarchical path (if
+ * use local cache directory manager is enabled)
+ *
+ * @param {@link LocalResourceRequest} Resource localization request to
+ * localize the resource.
+ * @param {@link Path} local directory path
+ */
+ @Override
+ public Path getPathForLocalization(LocalResourceRequest req,
+ Path localDirPath) {
+ if (useLocalCacheDirectoryManager && localDirPath != null) {
+
+ if (!directoryManagers.containsKey(localDirPath)) {
+ directoryManagers.putIfAbsent(localDirPath, new LocalCacheDirectoryManager());
+ }
+ LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath);
+
+ Path rPath = localDirPath;
+ String hierarchicalPath = dir.getRelativePathForLocalization();
+ // For most of the scenarios we will get root path only which
+ // is an empty string
+ if (!hierarchicalPath.isEmpty()) {
+ rPath = new Path(localDirPath, hierarchicalPath);
+ }
+ inProgressLocalResourcesMap.put(req, rPath);
+ return rPath;
+ } else {
+ return localDirPath;
+ }
+ }
+
+ @Override
+ public void localizationCompleted(LocalResourceRequest req,
+ boolean success) {
+ if (useLocalCacheDirectoryManager) {
+ if (!success) {
+ decrementFileCountForLocalCacheDirectory(req, null);
+ } else {
+ inProgressLocalResourcesMap.remove(req);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 29971c5..109c988 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -64,6 +64,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -158,7 +159,7 @@ public ResourceLocalizationService(Dispatcher dispatcher,
this.delService = delService;
this.dirsHandler = dirsHandler;
- this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
+ this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher, true);
this.cacheCleanup = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder()
.setNameFormat("ResourceLocalizationService Cache Cleanup")
@@ -620,6 +621,13 @@ public void addResource(LocalizerResourceRequestEvent request) {
Path publicDirDestPath = dirsHandler.getLocalPathForWrite(
"." + Path.SEPARATOR + ContainerLocalizer.FILECACHE,
ContainerLocalizer.getEstimatedSize(resource), true);
+ Path hierarchicalPath =
+ publicRsrc.getPathForLocalization(key, publicDirDestPath);
+ if (!hierarchicalPath.equals(publicDirDestPath)) {
+ publicDirDestPath = hierarchicalPath;
+ DiskChecker.checkDir(
+ new File(publicDirDestPath.toUri().getPath()));
+ }
pending.put(queue.submit(new FSDownload(
lfs, null, conf, publicDirDestPath, resource, new Random())),
request);
@@ -654,19 +662,21 @@ public void run() {
assoc.getResource().handle(
new ResourceLocalizedEvent(key,
local, FileUtil.getDU(new File(local.toUri()))));
+ publicRsrc.localizationCompleted(key, true);
synchronized (attempts) {
attempts.remove(key);
}
} catch (ExecutionException e) {
LOG.info("Failed to download rsrc " + assoc.getResource(),
e.getCause());
+ LocalResourceRequest req = assoc.getResource().getRequest();
dispatcher.getEventHandler().handle(
new ContainerResourceFailedEvent(
assoc.getContext().getContainerId(),
- assoc.getResource().getRequest(), e.getCause()));
+ req, e.getCause()));
+ publicRsrc.localizationCompleted(req, false);
List reqs;
synchronized (attempts) {
- LocalResourceRequest req = assoc.getResource().getRequest();
reqs = attempts.get(req);
if (null == reqs) {
LOG.error("Missing pending list for " + req);
@@ -1003,4 +1013,4 @@ private void cleanUpFilesFromSubDir(FileContext lfs, DeletionService del,
del.delete(null, dirPath, new Path[] {});
}
-}
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
new file mode 100644
index 0000000..7bc4b585
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Test;
+
+public class TestLocalCacheDirectoryManager {
+
+ @Test(timeout = 10000)
+ public void testHierarchicalSubDirectoryCreation() {
+ // setting per directory file limit to 1.
+ YarnConfiguration conf =
+ new YarnConfiguration();
+ conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37");
+ LocalCacheDirectoryManager.init(conf);
+
+ LocalCacheDirectoryManager hDir = new LocalCacheDirectoryManager();
+ // Test root directory path = ""
+ Assert.assertTrue(hDir.getRelativePathForLocalization().isEmpty());
+
+ for (int i = 1; i <= 72; i++) {
+ StringBuffer sb = new StringBuffer();
+ String num = Integer.toString(i - 1, 36);
+ sb.append(num.charAt(0));
+ for (int j = 1; j < num.length(); j++) {
+ sb.append(Path.SEPARATOR).append(num.charAt(j));
+ }
+ // testing subdirectory path generation from "0" to "1/z"
+ Assert.assertEquals(sb.toString(), hDir.getRelativePathForLocalization());
+ }
+ String testPath1 = "4";
+ String testPath2 = "2";
+ /*
+ * Making sure directory "4" and "2" becomes non-full so that they are
+ * reused for future getRelativePathForLocalization() calls in the order
+ * they are freed.
+ */
+ hDir.decrementFileCountForPath(testPath1);
+ hDir.decrementFileCountForPath(testPath2);
+ //After below call directory "4" should become full.
+ Assert.assertEquals(testPath1, hDir.getRelativePathForLocalization());
+ Assert.assertEquals(testPath2, hDir.getRelativePathForLocalization());
+ }
+
+ @Test(timeout = 10000)
+ public void testMinimumPerDirectoryFileLimit() {
+ YarnConfiguration conf =
+ new YarnConfiguration();
+ conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "1");
+ Exception e = null;
+ try {
+ LocalCacheDirectoryManager.init(conf);
+ } catch (Exception e1) {
+ e = e1;
+ }
+ Assert.assertNotNull(e);
+ Assert.assertEquals(YarnException.class, e.getClass());
+ Assert.assertEquals(e.getMessage(),
+ YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY +
+ " parameter is configured with a value less than 37.");
+
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
index 0e0a472..fa9732f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -50,11 +51,10 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
-import org.mortbay.log.Log;
public class TestLocalResourcesTrackerImpl {
- @Test
+ @Test(timeout=10000)
@SuppressWarnings("unchecked")
public void test() {
String user = "testuser";
@@ -152,7 +152,7 @@ public void test() {
}
}
- @Test
+ @Test(timeout=10000)
@SuppressWarnings("unchecked")
public void testConsistency() {
String user = "testuser";
@@ -221,6 +221,117 @@ public void testConsistency() {
}
}
+ @Test(timeout = 10000)
+ @SuppressWarnings("unchecked")
+ public void testHierarchicalLocalCacheDirectories() {
+ String user = "testuser";
+ DrainDispatcher dispatcher = null;
+ try {
+
+ dispatcher = createDispatcher(new Configuration());
+
+ EventHandler localizerEventHandler =
+ mock(EventHandler.class);
+ EventHandler containerEventHandler =
+ mock(EventHandler.class);
+ dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+ dispatcher.register(ContainerEventType.class, containerEventHandler);
+ DeletionService mockDelService = mock(DeletionService.class);
+
+ ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+ LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+ LocalResourceVisibility.PUBLIC);
+ ConcurrentMap localrsrc =
+ new ConcurrentHashMap();
+
+ LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+ dispatcher, localrsrc, true);
+ // setting per directory file limit to 1.
+ YarnConfiguration conf =
+ new YarnConfiguration();
+ conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37");
+ LocalCacheDirectoryManager.init(conf);
+
+ LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+
+ ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1,
+ LocalResourceVisibility.PUBLIC, lc1);
+ tracker.handle(reqEvent1);
+
+ // This is a random path. NO File creation will take place at this place.
+ Path localDir = new Path("/tmp");
+
+ Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
+ Path hierarchicalPath2 = null;
+
+ // Localization successful.
+ tracker.localizationCompleted(lr1, true);
+
+ LocalResourceRequest lr3 = createLocalResourceRequest(user, 3, 3,
+ LocalResourceVisibility.PUBLIC);
+
+ hierarchicalPath2 = tracker.getPathForLocalization(lr3, localDir);
+ // localization failed.
+ tracker.localizationCompleted(lr3, false);
+
+ /*
+ * The path returned for two localization should be different because we
+ * are limiting one file per sub-directory.
+ */
+ Assert.assertNotSame(hierarchicalPath1, hierarchicalPath2);
+
+ LocalResourceRequest lr2 = createLocalResourceRequest(user, 2, 2,
+ LocalResourceVisibility.PUBLIC);
+ ResourceEvent reqEvent2 = new ResourceRequestEvent(lr2,
+ LocalResourceVisibility.PUBLIC, lc1);
+ tracker.handle(reqEvent2);
+ hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
+ tracker.localizationCompleted(lr2, true);
+
+ // Verifying that path created is inside the subdirectory
+ Assert.assertEquals(hierarchicalPath2.toUri().toString(),
+ hierarchicalPath1.toUri().toString() + Path.SEPARATOR + "0");
+
+ // Releasing first resource.
+ ResourceEvent relEvent1 = new ResourceReleaseEvent(lr1, cId1);
+ tracker.handle(relEvent1);
+
+ ResourceLocalizedEvent rle =
+ new ResourceLocalizedEvent(lr1,
+ new Path(hierarchicalPath1.toUri().toString() +
+ Path.SEPARATOR + "file1"), 120);
+ tracker.handle(rle);
+
+ int resources = 0;
+ Iterator iter = tracker.iterator();
+ while (iter.hasNext()) {
+ iter.next();
+ resources++;
+ }
+ Assert.assertEquals(2, resources);
+ iter = tracker.iterator();
+ while (iter.hasNext()) {
+ LocalizedResource rsrc = iter.next();
+ if (rsrc.getRefCount() == 0) {
+ Assert.assertTrue(tracker.remove(rsrc, mockDelService));
+ resources--;
+ }
+ }
+ Assert.assertEquals(1, resources);
+
+ // Path is released due to above remove call. Next request should
+ // once again get the same released path.
+ tracker.handle(reqEvent1);
+ hierarchicalPath2 = tracker.getPathForLocalization(lr1, localDir);
+ Assert.assertEquals(hierarchicalPath1, hierarchicalPath2);
+
+ } finally {
+ if (dispatcher != null) {
+ dispatcher.stop();
+ }
+ }
+ }
+
private boolean createdummylocalizefile(Path path) {
boolean ret = false;
File file = new File(path.toUri().getRawPath().toString());