diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java index c8d8a44..12237b4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java @@ -80,6 +80,7 @@ @Unstable public class MRApps extends Apps { public static final Log LOG = LogFactory.getLog(MRApps.class); + private static final String WILDCARD = "*"; public static String toString(JobId jid) { return jid.toString(); @@ -258,14 +259,12 @@ public static void setClasspath(Map environment, environment, classpathEnvVar, MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR, conf); - MRApps.addToEnvironment( - environment, + MRApps.addToEnvironment(environment, classpathEnvVar, - MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*", conf); - MRApps.addToEnvironment( - environment, + MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + WILDCARD, conf); + MRApps.addToEnvironment(environment, classpathEnvVar, - crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", conf); + crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + WILDCARD, conf); // a * in the classpath will only find a .jar, so we need to filter out // all .jars and add everything else addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf), @@ -299,24 +298,55 @@ private static void addToClasspathIfNotJar(Path[] paths, for (URI u: withLinks) { Path p = new Path(u); FileSystem remoteFS = p.getFileSystem(conf); + String name = p.getName(); + String wildcard = null; + + if (name.equals(WILDCARD)) { + wildcard = name; + p = p.getParent(); + } + p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory())); - String name = (null == u.getFragment()) - ? p.getName() : u.getFragment(); + + if ((wildcard != null) && (u.getFragment() != null)) { + throw new IOException("Invalid path URI: " + p + " - cannot " + + "contain both a URI fragment and a wildcard"); + } else if (wildcard != null) { + name = p.getName() + Path.SEPARATOR + wildcard; + } else if (u.getFragment() != null) { + name = u.getFragment(); + } + + // If it's not a JAR, add it to the link lookup. if (!StringUtils.toLowerCase(name).endsWith(".jar")) { - linkLookup.put(p, name); + String old = linkLookup.put(p, name); + + if ((old != null) && !name.equals(old)) { + LOG.warn("The same path is included more than once " + + "with different links or wildcards: " + p + " [" + + name + ", " + old + "]"); + } } } } for (Path p : paths) { FileSystem remoteFS = p.getFileSystem(conf); + String name = p.getName(); + + if (name.equals(WILDCARD)) { + p = p.getParent(); + name = p.getName() + Path.SEPARATOR + name; + } + p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory())); - String name = linkLookup.get(p); - if (name == null) { - name = p.getName(); + + if (linkLookup.containsKey(p)) { + name = linkLookup.get(p); } + if(!StringUtils.toLowerCase(name).endsWith(".jar")) { MRApps.addToEnvironment( environment, @@ -558,16 +588,42 @@ private static void parseDistributedCacheArtifacts( URI u = uris[i]; Path p = new Path(u); FileSystem remoteFS = p.getFileSystem(conf); + String linkName = null; + + if (p.getName().equals(WILDCARD)) { + p = p.getParent(); + linkName = p.getName() + Path.SEPARATOR + WILDCARD; + } + p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory())); - // Add URI fragment or just the filename - Path name = new Path((null == u.getFragment()) - ? p.getName() - : u.getFragment()); - if (name.isAbsolute()) { - throw new IllegalArgumentException("Resource name must be relative"); + + // If there's no wildcard, try using the fragment for the link + if (linkName == null) { + linkName = u.getFragment(); + + // Because we don't know what's in the fragment, we have to handle + // it with care. + if (linkName != null) { + Path linkPath = new Path(linkName); + + if (linkPath.isAbsolute()) { + throw new IllegalArgumentException("Resource name must be " + + "relative"); + } + + linkName = linkPath.toUri().getPath(); + } + } else if (u.getFragment() != null) { + throw new IllegalArgumentException("Invalid path URI: " + p + + " - cannot contain both a URI fragment and a wildcard"); } - String linkName = name.toUri().getPath(); + + // If there's no wildcard or fragment, just link to the file name + if (linkName == null) { + linkName = p.getName(); + } + LocalResource orig = localResources.get(linkName); if(orig != null && !orig.getResource().equals( ConverterUtils.getYarnUrlFromURI(p.toUri()))) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/nbproject/project.properties b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/nbproject/project.properties new file mode 100644 index 0000000..e69de29 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java index f3e4d2f..a37001b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java @@ -126,9 +126,12 @@ public void uploadFiles(Job job, Path submitJobDir) throws IOException { for (String tmpjars : libjarsArr) { Path tmp = new Path(tmpjars); Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication); - DistributedCache.addFileToClassPath( - new Path(newPath.toUri().getPath()), conf, jtFs); + + DistributedCache.addFileOnlyToClassPath( + new Path(newPath.toUri().getPath()), conf); } + + DistributedCache.addCacheFile(new Path(libjarsDir, "*").toUri(), conf); } if (archives != null) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java index c15e647..a693f81 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java @@ -234,14 +234,25 @@ private static FileStatus getFileStatus(Configuration job, URI uri, */ static boolean isPublic(Configuration conf, URI uri, Map statCache) throws IOException { + boolean isPublic; FileSystem fs = FileSystem.get(uri, conf); Path current = new Path(uri.getPath()); current = fs.makeQualified(current); - //the leaf level file should be readable by others - if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) { - return false; + + if (current.getName().equals("*")) { + current = current.getParent(); + + // The directory must be readable and executable + isPublic = + checkPermissionOfOther(fs, current, FsAction.READ, statCache) && + checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache); + } else { + //the leaf level file should be readable by others + isPublic = checkPermissionOfOther(fs, current, FsAction.READ, statCache); } - return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache); + + return isPublic && + ancestorsHaveExecutePermissions(fs, current.getParent(), statCache); } /** @@ -286,7 +297,13 @@ private static FileStatus getFileStatus(FileSystem fs, URI uri, Map statCache) throws IOException { FileStatus stat = statCache.get(uri); if (stat == null) { - stat = fs.getFileStatus(new Path(uri)); + Path path = new Path(uri); + + if (path.getName().equals("*")) { + path = path.getParent(); + } + + stat = fs.getFileStatus(path); statCache.put(uri, stat); } return stat; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java index 51fe69a..16944a3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java @@ -308,14 +308,23 @@ public static void addFileToClassPath(Path file, Configuration conf) * @param fs FileSystem with respect to which {@code archivefile} should * be interpreted. */ - public static void addFileToClassPath - (Path file, Configuration conf, FileSystem fs) - throws IOException { + public static void addFileToClassPath(Path file, Configuration conf, + FileSystem fs) throws IOException { + // Only add to the classpath if it's not a wildcard because $PWD/* is + // added to the classpath in any case. + if (!file.getName().equals("*")) { + addFileOnlyToClassPath(file, conf); + } + + URI uri = fs.makeQualified(file).toUri(); + addCacheFile(uri, conf); + } + + public static void addFileOnlyToClassPath(Path file, Configuration conf) + throws IOException { String classpath = conf.get(MRJobConfig.CLASSPATH_FILES); conf.set(MRJobConfig.CLASSPATH_FILES, classpath == null ? file.toString() : classpath + "," + file.toString()); - URI uri = fs.makeQualified(file).toUri(); - addCacheFile(uri, conf); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index 8c74bf5..120d8b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -281,7 +281,18 @@ public void writeLaunchEnv(OutputStream out, if (resources != null) { for (Map.Entry> entry : resources.entrySet()) { for (String linkName : entry.getValue()) { - sb.symlink(entry.getKey(), new Path(linkName)); + if (new Path(linkName).getName().equals("*")) { + // If this is a wildcarded path, link to everything in the + // directory from the working directory + File directory = new File(entry.getKey().toString()); + + for (File wildLink : directory.listFiles()) { + sb.symlink(new Path(wildLink.toString()), + new Path(wildLink.getName())); + } + } else { + sb.symlink(entry.getKey(), new Path(linkName)); + } } } }