diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 2d63dad..3376e7b 100644
--- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -256,4 +256,17 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a95397a..62f630a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -338,7 +338,15 @@
/**List of directories to store localized files in.*/
public static final String NM_LOCAL_DIRS = NM_PREFIX + "local-dirs";
public static final String DEFAULT_NM_LOCAL_DIRS = "/tmp/nm-local-dir";
-
+
+ /**
+ * Number of files in each localized directories
+ * Avoid tuning this too low.
+ */
+ public static final String NM_LOCAL_CACHE_NUM_FILES_PER_DIRECTORY =
+ NM_PREFIX + "local.cache.numfilesperdirectory";
+ public static final int DEFAULT_NM_LOCAL_CACHE_NUM_FILES_PER_DIRECTORY = 8192;
+
/** Address where the localizer IPC is.*/
public static final String NM_LOCALIZER_ADDRESS =
NM_PREFIX + "localizer.address";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/HierarchicalDirectory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/HierarchicalDirectory.java
new file mode 100644
index 0000000..f15d265
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/HierarchicalDirectory.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.log4j.Logger;
+
+
+/**
+ * {@link HierarchicalDirectory} is used for managing hiearchical directories.
+ * It will allow to restrict the number of files in a directory to
+ * {@link HierarchicalDirectory#PER_DIR_FILE_LIMIT} which includes 36
+ * subdirectories (named from 0 to 9 and a to z). The per directory
+ * file count can be controlled using
+ * {@link HierarchicalDirectory#setPerDirectoryFileLimit(int)}. Root directory
+ * is represented by an empty string. It inernally maintains a vacant
+ * directory queue. As soon as the file count for the directory reaches its
+ * limit; its state is changed from {@link DirectoryState#VACANT} to
+ * {@link DirectoryState#FULL}. New sub directories are not created unless
+ * a {@link HierarchicalDirectory#getHierarchicalPath()} request is made and
+ * vacant directories are empty.
+ *
+ * Note : this structure only returns hierarchical path but doesn't create one
+ * on disk.
+ */
+public class HierarchicalDirectory {
+
+ private static int PER_DIR_FILE_LIMIT;
+ // total 36 = a to z plus 0 to 9
+ public static final int DIRECTORIES_PER_LEVEL = 36;
+
+ public static void init (YarnConfiguration conf) {
+ PER_DIR_FILE_LIMIT = conf.getInt(
+ YarnConfiguration.NM_LOCAL_CACHE_NUM_FILES_PER_DIRECTORY,
+ YarnConfiguration.DEFAULT_NM_LOCAL_CACHE_NUM_FILES_PER_DIRECTORY);
+ Logger LOG = Logger.getLogger(HierarchicalDirectory.class);
+ if (PER_DIR_FILE_LIMIT <= 36){
+ LOG.error(YarnConfiguration.NM_LOCAL_CACHE_NUM_FILES_PER_DIRECTORY +
+ " parameter is configured with very low value.");
+ throw new YarnException(
+ YarnConfiguration.NM_LOCAL_CACHE_NUM_FILES_PER_DIRECTORY +
+ " parameter is configured with very low value.");
+ } else {
+ LOG.info("per directory file limit = " + PER_DIR_FILE_LIMIT);
+ }
+ PER_DIR_FILE_LIMIT = PER_DIR_FILE_LIMIT - 36;
+ }
+
+ private Queue vacantSubDirectories;
+ private HashMap knownSubDirectories;
+ private int totalSubDirectories;
+
+ public HierarchicalDirectory() {
+ totalSubDirectories = 0;
+ HierarchicalSubDirectory rootDir =
+ new HierarchicalSubDirectory(totalSubDirectories);
+ vacantSubDirectories =
+ new LinkedList();
+ knownSubDirectories =
+ new HashMap();
+ knownSubDirectories.put("", rootDir);
+ vacantSubDirectories.add(rootDir);
+ }
+
+ /**
+ * This method will compute hierarchical path and will increment file
+ * count for the directory corresponding to the path.
+ * @return {@link String} relative hierarchical path
+ */
+ public synchronized String getHierarchicalPath() {
+ if (vacantSubDirectories.isEmpty()) {
+ totalSubDirectories++;
+ HierarchicalSubDirectory newDir =
+ new HierarchicalSubDirectory(totalSubDirectories);
+ vacantSubDirectories.add(newDir);
+ knownSubDirectories.put(newDir.getRelativePath(), newDir);
+ }
+ HierarchicalSubDirectory subDir = vacantSubDirectories.peek();
+ if (subDir.incAndGetCount() >= PER_DIR_FILE_LIMIT) {
+ vacantSubDirectories.poll();
+ subDir.setState(DirectoryState.FULL);
+ }
+ return subDir.getRelativePath();
+ }
+
+ /**
+ * This method will reduce the file count for the directory
+ * represented by path. The root directory of this hierarchical directory
+ * structure is represented by {@link Path#SEPARATOR}.
+ * @param {{@link String} relative hierarchical path
+ */
+ public synchronized void decFileCountForPath(String relPath) {
+ relPath = relPath==null ? "" : relPath.trim();
+ HierarchicalSubDirectory subDir = knownSubDirectories.get(relPath);
+ int count = subDir.decAndGetCount();
+ if (subDir.getState() == DirectoryState.FULL &&
+ count < PER_DIR_FILE_LIMIT) {
+ subDir.setState(DirectoryState.VACANT);
+ vacantSubDirectories.add(subDir);
+ }
+ }
+
+ /*
+ * VACANT : HierarchicalSubDirectory file count not yet reached its limit.
+ * FULL : HierarchicalSubDirectory file count reached its limit.
+ */
+ private enum DirectoryState { VACANT, FULL };
+
+ /*
+ * It limits the number of files and sub directories in the directory
+ * to the limit HierarchicalDirectory#PER_DIR_FILE_LIMIT.
+ */
+ static class HierarchicalSubDirectory {
+ private int fileCount;
+ private final String relativePath;
+ private DirectoryState state;
+
+ public HierarchicalSubDirectory(int directoryNo) {
+ fileCount = 0;
+ state = DirectoryState.VACANT;
+ if ( directoryNo == 0) {
+ relativePath = "";
+ } else {
+ String tPath = Integer.toString(directoryNo - 1,
+ DIRECTORIES_PER_LEVEL);
+ StringBuffer sb = new StringBuffer();
+ sb.append(tPath.charAt(0));
+ for ( int i = 1 ; i < tPath.length(); i++) {
+ sb.append(Path.SEPARATOR).append(tPath.charAt(i));
+ }
+ relativePath = sb.toString();
+ }
+ }
+
+ public int incAndGetCount() {
+ return ++fileCount;
+ }
+
+ public int decAndGetCount() {
+ return --fileCount;
+ }
+
+ public String getRelativePath() {
+ return relativePath;
+ }
+
+ public DirectoryState getState() {
+ return state;
+ }
+
+ public void setState( DirectoryState state) {
+ this.state = state;
+ }
+ }
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
index b24d8af..ab8dad5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -35,6 +36,10 @@
boolean remove(LocalizedResource req, DeletionService delService);
+ Path getPathForLocalResource(LocalResourceRequest req, Path localDirPath);
+
String getUser();
+ void localizationCompleted(LocalResourceRequest req, boolean success);
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
index 01ec383..b3d2c99 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
@@ -28,10 +28,11 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
+
/**
* A collection of {@link LocalizedResource}s all of same
@@ -49,17 +50,45 @@
private final String user;
private final Dispatcher dispatcher;
private final ConcurrentMap localrsrc;
+ private final boolean maintainHierarchicalDir;
+ private ConcurrentHashMap directoryMap;
+ /*
+ * It is used to keep track of resource into hierarchical directory
+ * while it is getting downloaded.
+ */
+ private ConcurrentHashMap inProgressRsrcMap;
+
+ public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
+ boolean maintainHierarchicalDir) {
+ this(user, dispatcher,
+ new ConcurrentHashMap(),
+ maintainHierarchicalDir);
+ }
public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher) {
this(user, dispatcher,
- new ConcurrentHashMap());
+ new ConcurrentHashMap(),
+ false);
}
LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
ConcurrentMap localrsrc) {
+ this(user, dispatcher, localrsrc, false);
+ }
+
+ LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
+ ConcurrentMap localrsrc,
+ boolean maintainHierarchicalDir) {
this.user = user;
this.dispatcher = dispatcher;
this.localrsrc = localrsrc;
+ this.maintainHierarchicalDir = maintainHierarchicalDir;
+ HierarchicalDirectory.init(new YarnConfiguration());
+ if ( this.maintainHierarchicalDir) {
+ directoryMap = new ConcurrentHashMap();
+ inProgressRsrcMap =
+ new ConcurrentHashMap();
+ }
}
@Override
@@ -73,6 +102,7 @@ public void handle(ResourceEvent event) {
LOG.info("Resource " + rsrc.getLocalPath()
+ " is missing, localizing it again");
localrsrc.remove(req);
+ decFileCountForHierarchicalPath(req, rsrc);
rsrc = null;
}
if (null == rsrc) {
@@ -90,7 +120,50 @@ public void handle(ResourceEvent event) {
rsrc.handle(event);
}
- /**
+ /*
+ * This will retrieve the localized path for the resource from
+ * 1) inProgressRsrcMap if the resource was under localization and it
+ * failed.
+ * 2) LocalizedResource if the resource is already localized.
+ * From this path it will identiy the local directory under which the
+ * resource was localized. Then rest of the path will be used to decrement
+ * file count for the HierarchicalSubDirectory pointing to this relative
+ * path.
+ */
+ private void decFileCountForHierarchicalPath(LocalResourceRequest req,
+ LocalizedResource rsrc) {
+ if ( maintainHierarchicalDir) {
+ Path rsrcPath = null;
+ if (inProgressRsrcMap.containsKey(req)) {
+ rsrcPath = inProgressRsrcMap.remove(req);
+ }
+ if (rsrcPath == null && rsrc != null && rsrc.getLocalPath() != null) {
+ rsrcPath = rsrc.getLocalPath().getParent();
+ }
+ if ( rsrcPath != null) {
+ Path parentPath = new Path(rsrcPath.toUri().getRawPath());
+ while (!directoryMap.containsKey(parentPath)) {
+ parentPath = parentPath.getParent();
+ if ( parentPath == null) {
+ return;
+ }
+ }
+ if ( parentPath != null) {
+ String parentDir = parentPath.toUri().getRawPath().toString();
+ HierarchicalDirectory dir = directoryMap.get(parentPath);
+ if (rsrcPath.equals(parentPath)) {
+ dir.decFileCountForPath("");
+ } else {
+ dir.decFileCountForPath(
+ rsrcPath.toUri().getRawPath().toString().substring(
+ parentDir.length() + 1));
+ }
+ }
+ }
+ }
+ }
+
+/**
* This module checks if the resource which was localized is already present
* or not
*
@@ -100,7 +173,8 @@ public void handle(ResourceEvent event) {
public boolean isResourcePresent(LocalizedResource rsrc) {
boolean ret = true;
if (rsrc.getState() == ResourceState.LOCALIZED) {
- File file = new File(rsrc.getLocalPath().toUri().getRawPath().toString());
+ File file = new File(rsrc.getLocalPath().toUri().getRawPath().
+ toString());
if (!file.exists()) {
ret = false;
}
@@ -133,11 +207,11 @@ public boolean remove(LocalizedResource rem, DeletionService delService) {
if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
}
+ decFileCountForHierarchicalPath(rem.getRequest(), rsrc);
return true;
}
}
-
/**
* Returns the path up to the random directory component.
*/
@@ -163,4 +237,48 @@ public String getUser() {
public Iterator iterator() {
return localrsrc.values().iterator();
}
-}
+
+ /**
+ * @return {@link Path} absolute path for localization which includes
+ * local directory path and the relative hierarchical path (if
+ * hierarchical directory is enabled)
+ *
+ * @param {@link LocalResourceRequest} Resource localization request to
+ * localize the resource.
+ * @param {@link Path} local directory path
+ */
+ @Override
+ public Path getPathForLocalResource(LocalResourceRequest req,
+ Path localDirPath) {
+ if (maintainHierarchicalDir && localDirPath != null) {
+ if (inProgressRsrcMap.containsKey(req)) {
+ return inProgressRsrcMap.get(req);
+ }
+ if (!directoryMap.containsKey(localDirPath)) {
+ directoryMap.putIfAbsent(localDirPath, new HierarchicalDirectory());
+ }
+ HierarchicalDirectory dir = directoryMap.get(localDirPath);
+ Path rPath = localDirPath;
+ String hierarchicalPath = dir.getHierarchicalPath();
+ if (!hierarchicalPath.isEmpty()) {
+ rPath = new Path(localDirPath, hierarchicalPath);
+ }
+ inProgressRsrcMap.putIfAbsent(req, rPath);
+ return inProgressRsrcMap.get(req);
+ } else {
+ return localDirPath;
+ }
+ }
+
+ @Override
+ public void localizationCompleted(LocalResourceRequest req,
+ boolean success) {
+ if (maintainHierarchicalDir) {
+ if (!success) {
+ decFileCountForHierarchicalPath(req, null);
+ } else {
+ inProgressRsrcMap.remove(req);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 29971c5..f40b6b5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -64,6 +64,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -158,7 +159,7 @@ public ResourceLocalizationService(Dispatcher dispatcher,
this.delService = delService;
this.dirsHandler = dirsHandler;
- this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
+ this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher, true);
this.cacheCleanup = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder()
.setNameFormat("ResourceLocalizationService Cache Cleanup")
@@ -620,6 +621,14 @@ public void addResource(LocalizerResourceRequestEvent request) {
Path publicDirDestPath = dirsHandler.getLocalPathForWrite(
"." + Path.SEPARATOR + ContainerLocalizer.FILECACHE,
ContainerLocalizer.getEstimatedSize(resource), true);
+ Path hierarchicalPath =
+ getLocalResourcesTracker(LocalResourceVisibility.PUBLIC, null, null)
+ .getPathForLocalResource(key, publicDirDestPath);
+ if (!hierarchicalPath.equals(publicDirDestPath)) {
+ publicDirDestPath = hierarchicalPath;
+ DiskChecker.checkDir(
+ new File(publicDirDestPath.toUri().getPath()));
+ }
pending.put(queue.submit(new FSDownload(
lfs, null, conf, publicDirDestPath, resource, new Random())),
request);
@@ -655,6 +664,7 @@ public void run() {
new ResourceLocalizedEvent(key,
local, FileUtil.getDU(new File(local.toUri()))));
synchronized (attempts) {
+ publicRsrc.localizationCompleted(key, true);
attempts.remove(key);
}
} catch (ExecutionException e) {
@@ -667,6 +677,7 @@ public void run() {
List reqs;
synchronized (attempts) {
LocalResourceRequest req = assoc.getResource().getRequest();
+ publicRsrc.localizationCompleted(req, false);
reqs = attempts.get(req);
if (null == reqs) {
LOG.error("Missing pending list for " + req);
@@ -1003,4 +1014,4 @@ private void cleanUpFilesFromSubDir(FileContext lfs, DeletionService del,
del.delete(null, dirPath, new Path[] {});
}
-}
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestHierarchicalDirectory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestHierarchicalDirectory.java
new file mode 100644
index 0000000..d7c9292
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestHierarchicalDirectory.java
@@ -0,0 +1,76 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Test;
+
+public class TestHierarchicalDirectory {
+
+ @Test(timeout=10000)
+ public void testHierarchicalSubDirectoryCreation() {
+ //setting per directory file limit to 1.
+ YarnConfiguration conf =
+ new YarnConfiguration();
+ conf.set(YarnConfiguration.NM_LOCAL_CACHE_NUM_FILES_PER_DIRECTORY, "37");
+ HierarchicalDirectory.init(conf);
+
+ HierarchicalDirectory hDir = new HierarchicalDirectory();
+ Assert.assertTrue(hDir.getHierarchicalPath().isEmpty());
+
+ for ( int i = 1 ; i < 72; i++) {
+ StringBuffer sb = new StringBuffer();
+ String num = Integer.toString(i-1, 36);
+ sb.append(num.charAt(0));
+ for (int j=1; j < num.length(); j++) {
+ sb.append(Path.SEPARATOR).append(num.charAt(j));
+ }
+ Assert.assertEquals(sb.toString(), hDir.getHierarchicalPath());
+ }
+ String testPath1 = "4";
+ String testPath2 = "2";
+ hDir.decFileCountForPath(testPath1);
+ hDir.decFileCountForPath(testPath2);
+ Assert.assertEquals(testPath1, hDir.getHierarchicalPath());
+ Assert.assertEquals(testPath2, hDir.getHierarchicalPath());
+ }
+
+ @Test(timeout=10000)
+ public void testMinimumPerDirectoryFileLimit() {
+ YarnConfiguration conf =
+ new YarnConfiguration();
+ conf.set(YarnConfiguration.NM_LOCAL_CACHE_NUM_FILES_PER_DIRECTORY, "1");
+ Exception e = null;
+ try {
+ HierarchicalDirectory.init(conf);
+ } catch (Exception e1) {
+ e = e1;
+ }
+ Assert.assertNotNull(e);
+ Assert.assertEquals(YarnException.class, e.getClass());
+ Assert.assertEquals(e.getMessage(),
+ YarnConfiguration.NM_LOCAL_CACHE_NUM_FILES_PER_DIRECTORY +
+ " parameter is configured with very low value.");
+
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
index 0e0a472..552f4bc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -50,11 +51,10 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
-import org.mortbay.log.Log;
public class TestLocalResourcesTrackerImpl {
- @Test
+ @Test(timeout=10000)
@SuppressWarnings("unchecked")
public void test() {
String user = "testuser";
@@ -152,7 +152,7 @@ public void test() {
}
}
- @Test
+ @Test(timeout=10000)
@SuppressWarnings("unchecked")
public void testConsistency() {
String user = "testuser";
@@ -221,6 +221,110 @@ public void testConsistency() {
}
}
+ @Test(timeout=10000)
+ @SuppressWarnings("unchecked")
+ public void testHierarchicalDiretories() {
+ String user = "testuser";
+ DrainDispatcher dispatcher = null;
+ try {
+
+ dispatcher = createDispatcher(new Configuration());
+
+ EventHandler localizerEventHandler =
+ mock(EventHandler.class);
+ EventHandler containerEvenetHandler =
+ mock(EventHandler.class);
+ dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+ dispatcher.register(ContainerEventType.class, containerEvenetHandler);
+ DeletionService mockDelService = mock(DeletionService.class);
+
+ ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+ LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+ LocalResourceVisibility.PUBLIC);
+ ConcurrentMap localrsrc =
+ new ConcurrentHashMap();
+
+ LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+ dispatcher, localrsrc, true);
+ //setting per directory file limit to 1.
+ YarnConfiguration conf =
+ new YarnConfiguration();
+ conf.set(YarnConfiguration.NM_LOCAL_CACHE_NUM_FILES_PER_DIRECTORY, "37");
+ HierarchicalDirectory.init(conf);
+
+ LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+
+ ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1,
+ LocalResourceVisibility.PUBLIC, lc1);
+ tracker.handle(reqEvent1);
+
+ //This is a random path. NO File creation will take place at this place.
+ Path localDir = new Path ("/tmp");
+
+ Path hierarchicalPath1 = tracker.getPathForLocalResource(lr1, localDir);
+ Path hierarchicalPath2 = tracker.getPathForLocalResource(lr1, localDir);
+
+ // Verifying that path returned is identical for multiple calls
+ Assert.assertTrue(hierarchicalPath1.equals(hierarchicalPath2));
+ tracker.localizationCompleted(lr1, true);
+
+ hierarchicalPath2 = tracker.getPathForLocalResource(lr1, localDir);
+ tracker.localizationCompleted(lr1, false);
+
+ Assert.assertNotSame(hierarchicalPath1, hierarchicalPath2);
+
+ LocalResourceRequest lr2 = createLocalResourceRequest(user, 2, 2,
+ LocalResourceVisibility.PUBLIC);
+ ResourceEvent reqEvent2 = new ResourceRequestEvent(lr2,
+ LocalResourceVisibility.PUBLIC, lc1);
+ tracker.handle(reqEvent2);
+ hierarchicalPath2 = tracker.getPathForLocalResource(lr2, localDir);
+ tracker.localizationCompleted(lr2, true);
+
+ // Verifying that path created is inside the subdirectory
+ Assert.assertEquals(hierarchicalPath2.toUri().toString(),
+ hierarchicalPath1.toUri().toString() + Path.SEPARATOR + "0");
+
+ // Releasing first resource.
+ ResourceEvent relEvent1 = new ResourceReleaseEvent(lr1,cId1);
+ tracker.handle(relEvent1);
+
+ ResourceLocalizedEvent rle =
+ new ResourceLocalizedEvent(lr1,
+ new Path(hierarchicalPath1.toUri().toString() +
+ Path.SEPARATOR + "file1"),120);
+ tracker.handle(rle);
+
+ int resources = 0;
+ Iterator iter = tracker.iterator();
+ while (iter.hasNext()) {
+ iter.next();
+ resources++;
+ }
+ Assert.assertEquals(2, resources);
+ iter = tracker.iterator();
+ while (iter.hasNext()) {
+ LocalizedResource rsrc = iter.next();
+ if ( rsrc.getRefCount() == 0) {
+ Assert.assertTrue(tracker.remove(rsrc, mockDelService));
+ resources--;
+ }
+ }
+ Assert.assertEquals(1, resources);
+
+ // Path is released due to above remove call. Next request should
+ // once again get the same released path.
+ tracker.handle(reqEvent1);
+ hierarchicalPath2 = tracker.getPathForLocalResource(lr1, localDir);
+ Assert.assertEquals(hierarchicalPath1, hierarchicalPath2);
+
+ } finally {
+ if (dispatcher != null) {
+ dispatcher.stop();
+ }
+ }
+ }
+
private boolean createdummylocalizefile(Path path) {
boolean ret = false;
File file = new File(path.toUri().getRawPath().toString());