Index: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java (revision dc735b286bb656903df49aee776d22ee0c61f860) +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java (revision 96f1a886ebf7a5d34ec3daefaec716f9a1d03e6b) @@ -34,9 +34,11 @@ import java.util.List; import java.util.jar.JarEntry; import java.util.jar.JarFile; +import java.util.jar.JarInputStream; import java.util.jar.Manifest; import java.util.regex.Pattern; +import org.apache.commons.io.input.TeeInputStream; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileUtil; @@ -94,6 +96,69 @@ unJar(jarFile, toDir, MATCH_ANY); } + /** + * Unpack matching files from a jar. Entries inside the jar that do + * not match the given pattern will be skipped. + * + * @param jarFile the .jar file to unpack + * @param toDir the destination directory into which to unpack the jar + * @param unpackRegex the pattern to match jar entries against + * + * @throws IOException if an I/O error has occurred or toDir + * cannot be created and does not already exist + */ + public static void unJar(InputStream jarFile, File toDir, + Pattern unpackRegex) + throws IOException { + try (JarInputStream jar = new JarInputStream(jarFile)) { + int numOfFailedLastModifiedSet = 0; + do { + final JarEntry entry = jar.getNextJarEntry(); + if (entry == null) { + break; + } + if (!entry.isDirectory() && + unpackRegex.matcher(entry.getName()).matches()) { + File file = new File(toDir, entry.getName()); + ensureDirectory(file.getParentFile()); + try (OutputStream out = new FileOutputStream(file)) { + IOUtils.copyBytes(jar, out, BUFFER_SIZE); + } + if (!file.setLastModified(entry.getTime())) { + numOfFailedLastModifiedSet++; + } + } + } while (true); + if (numOfFailedLastModifiedSet > 0) { + LOG.warn("Could not set last modfied time for {} file(s)", + numOfFailedLastModifiedSet); + } + } + } + + /** + * Unpack matching files from a jar. Entries inside the jar that do + * not match the given pattern will be skipped. Keep also a copy + * of the entire jar in the same directory + * + * @param jarFile the .jar file to unpack + * @param toDir the destination directory into which to unpack the jar + * @param unpackRegex the pattern to match jar entries against + * + * @throws IOException if an I/O error has occurred or toDir + * cannot be created and does not already exist + */ + public static void unJarAndSave(InputStream jarFile, File toDir, + String name, Pattern unpackRegex) + throws IOException{ + File file = new File(toDir, name); + ensureDirectory(toDir); + try (OutputStream jar = new FileOutputStream(file); + TeeInputStream teeInputStream = new TeeInputStream(jarFile, jar)) { + unJar(teeInputStream, toDir, unpackRegex); + } + } + /** * Unpack matching files from a jar. Entries inside the jar that do * not match the given pattern will be skipped. Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java (revision dc735b286bb656903df49aee776d22ee0c61f860) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java (revision 96f1a886ebf7a5d34ec3daefaec716f9a1d03e6b) @@ -43,7 +43,6 @@ @Public @Stable public abstract class LocalResource { - @Public @Stable public static LocalResource newInstance(URL url, LocalResourceType type, Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (revision dc735b286bb656903df49aee776d22ee0c61f860) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (revision 96f1a886ebf7a5d34ec3daefaec716f9a1d03e6b) @@ -3465,6 +3465,16 @@ DEFAULT_TIMELINE_SERVICE_COLLECTOR_WEBAPP_HTTPS_ADDRESS = DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS; + public static final String NM_DOWNLOADER_THREAD_COUNT = + NM_PREFIX + "downloader.thread.count"; + public static final int + DEFAULT_NM_DOWNLOADER_THREAD_COUNT = 10; + + public static final String NM_DOWNLOADER_USE_OS_COMMAND = + NM_PREFIX + "downloader.use-os-command"; + public static final boolean + DEFAULT_NM_DOWNLOADER_USE_OS_COMMAND = false; + public YarnConfiguration() { super(); } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java (revision dc735b286bb656903df49aee776d22ee0c61f860) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java (revision 96f1a886ebf7a5d34ec3daefaec716f9a1d03e6b) @@ -21,14 +21,19 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.regex.Pattern; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; @@ -48,12 +53,15 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.util.concurrent.Futures; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; /** * Download a single URL to the local disk. @@ -247,9 +255,24 @@ } } - private Path copy(Path sCopy, Path dstdir) throws IOException { + /** + * Use asynchronous calls and streaming to localize files. + * @param destination destination directory + * @throws IOException cannot read or write file + * @throws InterruptedException operation interrupted + * @throws ExecutionException thread pool error + * @throws YarnException subcommand returned an error + */ + private void parallelVerifyAndCopy(Path destination) + throws IOException, InterruptedException, + ExecutionException, YarnException { + final Path sCopy; + try { + sCopy = resource.getResource().toPath(); + } catch (URISyntaxException e) { + throw new IOException("Invalid resource", e); + } FileSystem sourceFs = sCopy.getFileSystem(conf); - Path dCopy = new Path(dstdir, "tmp_"+sCopy.getName()); FileStatus sStat = sourceFs.getFileStatus(sCopy); if (sStat.getModificationTime() != resource.getTimestamp()) { throw new IOException("Resource " + sCopy + @@ -264,80 +287,112 @@ } } - FileUtil.copy(sourceFs, sStat, FileSystem.getLocal(conf), dCopy, false, - true, conf); - return dCopy; - } - - private long unpack(File localrsrc, File dst) throws IOException { - switch (resource.getType()) { - case ARCHIVE: { - String lowerDst = StringUtils.toLowerCase(dst.getName()); - if (lowerDst.endsWith(".jar")) { - RunJar.unJar(localrsrc, dst); - } else if (lowerDst.endsWith(".zip")) { - FileUtil.unZip(localrsrc, dst); - } else if (lowerDst.endsWith(".tar.gz") || - lowerDst.endsWith(".tgz") || - lowerDst.endsWith(".tar")) { - FileUtil.unTar(localrsrc, dst); + try { + downloadAndUnpack(sCopy, destination); + } catch (RuntimeException ex) { + throw new YarnException("Parallel download failed", ex); + } + } + + /** + * Copy source path to destination with localization rules. + * @param source source path to copy. Typically HDFS + * @param destination destination path. Typically local filesystem + */ + private void downloadAndUnpack(Path source, Path destination) { + try { + FileSystem sourceFileSystem = source.getFileSystem(conf); + FileSystem destinationFileSystem = destination.getFileSystem(conf); + if (sourceFileSystem.getFileStatus(source).isDirectory()) { + FileUtil.copy( + sourceFileSystem, source, + destinationFileSystem, destination, false, + true, conf); } else { - LOG.warn("Cannot unpack " + localrsrc); - if (!localrsrc.renameTo(dst)) { - throw new IOException("Unable to rename file: [" + localrsrc - + "] to [" + dst + "]"); - } - } - } - break; - case PATTERN: { - String lowerDst = StringUtils.toLowerCase(dst.getName()); - if (lowerDst.endsWith(".jar")) { - String p = resource.getPattern(); - RunJar.unJar(localrsrc, dst, - p == null ? RunJar.MATCH_ANY : Pattern.compile(p)); - File newDst = new File(dst, dst.getName()); - if (!dst.exists() && !dst.mkdir()) { - throw new IOException("Unable to create directory: [" + dst + "]"); - } - if (!localrsrc.renameTo(newDst)) { - throw new IOException("Unable to rename file: [" + localrsrc - + "] to [" + newDst + "]"); - } - } else if (lowerDst.endsWith(".zip")) { - LOG.warn("Treating [" + localrsrc + "] as an archive even though it " + - "was specified as PATTERN"); - FileUtil.unZip(localrsrc, dst); - } else if (lowerDst.endsWith(".tar.gz") || - lowerDst.endsWith(".tgz") || - lowerDst.endsWith(".tar")) { - LOG.warn("Treating [" + localrsrc + "] as an archive even though it " + - "was specified as PATTERN"); - FileUtil.unTar(localrsrc, dst); - } else { - LOG.warn("Cannot unpack " + localrsrc); - if (!localrsrc.renameTo(dst)) { - throw new IOException("Unable to rename file: [" + localrsrc - + "] to [" + dst + "]"); + unpack(source, destination, sourceFileSystem, destinationFileSystem); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Do the localization action on the input stream. + * @param source Source path + * @param destination Destination pth + * @param sourceFileSystem Source filesystem + * @param destinationFileSystem Destination filesystem + * @throws IOException Could not read or write stream + * @throws InterruptedException Operation interrupted by caller + * @throws ExecutionException Could not create thread pool execution + */ + private void unpack(Path source, Path destination, + FileSystem sourceFileSystem, + FileSystem destinationFileSystem) + throws IOException, InterruptedException, YarnException, + ExecutionException { + try (InputStream inputStream = sourceFileSystem.open(source)) { + String destinationFile = + StringUtils.toLowerCase(destination.getName()); + boolean fallback = false; + if (resource.getType() == LocalResourceType.PATTERN) { + if (destinationFile.endsWith(".jar")) { + // Unpack and keep a copy of the whole jar for mapreduce + String p = resource.getPattern(); + RunJar.unJarAndSave(inputStream, new File(destination.toUri()), + source.getName(), + p == null ? RunJar.MATCH_ANY : Pattern.compile(p)); + return; + } else { + LOG.warn("Treating [" + source + "] " + + "as an archive even though it " + + "was specified as PATTERN"); + fallback = true; } } - } - break; - case FILE: - default: - if (!localrsrc.renameTo(dst)) { - throw new IOException("Unable to rename file: [" + localrsrc - + "] to [" + dst + "]"); - } - break; - } - if(localrsrc.isFile()){ - try { - files.delete(new Path(localrsrc.toString()), false); - } catch (IOException ignore) { + if (resource.getType() == LocalResourceType.ARCHIVE || fallback) { + ExecutorService executor = null; + try { + // Set up an executor. We do it here in FSDownload, + // so that the thread count and other flats are + // FSDownload specific and not Hadoop general + int threadCount = conf.getInt( + YarnConfiguration.NM_DOWNLOADER_THREAD_COUNT, + YarnConfiguration.DEFAULT_NM_DOWNLOADER_THREAD_COUNT); + if (threadCount < 2) { + throw new YarnException("Thread count less than 2. This may hang."); + } + executor = Executors.newFixedThreadPool(threadCount); + + boolean useOSCommand = conf.getBoolean( + YarnConfiguration.NM_DOWNLOADER_USE_OS_COMMAND, + YarnConfiguration.DEFAULT_NM_DOWNLOADER_USE_OS_COMMAND); + + destinationFileSystem.delete(destination, true); + if (!destinationFileSystem.mkdirs( + destination, PRIVATE_DIR_PERMS)) { + throw new YarnException("Could not create " + destination); + } + + if (FileUtil.decompress( + inputStream, destinationFile, destination, + executor, useOSCommand)) { + return; + } else { + LOG.warn("Cannot unpack [" + source + "] trying to copy"); + } + } finally { + if (executor != null) { + executor.shutdown(); + } + } } + // Fall back to copying + try (OutputStream outputStream = + destinationFileSystem.create(destination, true)) { + IOUtils.copy(inputStream, outputStream); + } } - return 0; // TODO Should calculate here before returning //return FileUtil.getDU(destDir); } @@ -352,27 +407,34 @@ } if (LOG.isDebugEnabled()) { - LOG.debug("Starting to download " + sCopy); + LOG.debug(String.format("Starting to download %s %s %s", + sCopy, + resource.getType(), + resource.getPattern())); } - createDir(destDirPath, cachePerms); - final Path dst_work = new Path(destDirPath + "_tmp"); - createDir(dst_work, cachePerms); - Path dFinal = files.makeQualified(new Path(dst_work, sCopy.getName())); + final Path destionationTmp = new Path(destDirPath + "_tmp"); + createDir(destionationTmp, PRIVATE_DIR_PERMS); + Path dFinal = + files.makeQualified(new Path(destionationTmp, sCopy.getName())); try { - Path dTmp = null == userUgi ? files.makeQualified(copy(sCopy, dst_work)) - : userUgi.doAs(new PrivilegedExceptionAction() { - public Path run() throws Exception { - return files.makeQualified(copy(sCopy, dst_work)); - }; - }); - unpack(new File(dTmp.toUri()), new File(dFinal.toUri())); + if (userUgi == null) { + parallelVerifyAndCopy(dFinal); + } else { + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + parallelVerifyAndCopy(dFinal); + return null; + } + }); + } changePermissions(dFinal.getFileSystem(conf), dFinal); - files.rename(dst_work, destDirPath, Rename.OVERWRITE); + files.rename(destionationTmp, destDirPath, Rename.OVERWRITE); if (LOG.isDebugEnabled()) { - LOG.debug("File has been downloaded to " + - new Path(destDirPath, sCopy.getName())); + LOG.debug(String.format("File has been downloaded to %s from %s", + new Path(destDirPath, sCopy.getName()), sCopy)); } } catch (Exception e) { try { @@ -382,7 +444,7 @@ throw e; } finally { try { - files.delete(dst_work, true); + files.delete(destionationTmp, true); } catch (FileNotFoundException ignore) { } conf = null; Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownloadException.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownloadException.java (revision c529eb4ec4f870c84fbe4273bf1b76a01706dd35) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownloadException.java (revision c529eb4ec4f870c84fbe4273bf1b76a01706dd35) @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.util; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +import java.util.LinkedList; + +/** + * Aggregated exception for multiple download exceptions. + */ +public class FSDownloadException extends YarnException { + private LinkedList innerExceptions; + + public FSDownloadException(String message, LinkedList exceptions) { + super(message, exceptions.getFirst()); + this.innerExceptions = exceptions; + } + public LinkedList getInnerExceptions() { + return innerExceptions; + } + @Override + public String toString() { + String nl = System.getProperty("line.separator"); + StringBuilder builder = new StringBuilder(); + builder.append(super.toString()); + builder.append(nl); + for(Exception exception: innerExceptions) { + builder.append(exception.toString()); + builder.append(nl); + } + return builder.toString(); + } +} Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (revision dc735b286bb656903df49aee776d22ee0c61f860) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (revision 96f1a886ebf7a5d34ec3daefaec716f9a1d03e6b) @@ -3623,4 +3623,19 @@ + + + Localizer thread count. It should be a minimum of 2. + + yarn.nodemanager.downloader.thread.count + 10 + + + + + It specifies whether the localizer should use OS commands to extract archives. + + yarn.nodemanager.downloader.use-os-command + false + Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java (revision dc735b286bb656903df49aee776d22ee0c61f860) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java (revision 96f1a886ebf7a5d34ec3daefaec716f9a1d03e6b) @@ -30,6 +30,8 @@ import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -48,12 +50,14 @@ import java.util.jar.JarEntry; import java.util.jar.JarOutputStream; import java.util.jar.Manifest; +import java.util.stream.Collectors; import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.Assert; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; @@ -76,21 +80,38 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.junit.AfterClass; +import org.junit.Rule; import org.junit.Test; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class TestFSDownload { + private static Boolean[] PARAMETERS = {false, true}; private static final Log LOG = LogFactory.getLog(TestFSDownload.class); private static AtomicLong uniqueNumberGenerator = new AtomicLong(System.currentTimeMillis()); private enum TEST_FILE_TYPE { TAR, JAR, ZIP, TGZ }; - + private Configuration conf = new Configuration(); + + @Parameterized.Parameters(name = "{0}") + public static Collection getParameters() { + return Arrays.stream(PARAMETERS).map( + type -> new Object[]{type}).collect(Collectors.toList()); + } + + public TestFSDownload(boolean useCommand) { + conf.setBoolean(YarnConfiguration.NM_DOWNLOADER_USE_OS_COMMAND, useCommand); + } + @AfterClass public static void deleteTestDir() throws IOException { FileContext fs = FileContext.getLocalFSFileContext(); @@ -132,6 +153,18 @@ FileOutputStream stream = new FileOutputStream(jarFile); LOG.info("Create jar out stream "); JarOutputStream out = new JarOutputStream(stream, new Manifest()); + ZipEntry entry = new ZipEntry("classes/1.class"); + out.putNextEntry(entry); + out.write(1); + out.write(2); + out.write(3); + out.closeEntry(); + ZipEntry entry2 = new ZipEntry("classes/2.class"); + out.putNextEntry(entry2); + out.write(1); + out.write(2); + out.write(3); + out.closeEntry(); LOG.info("Done writing jar stream "); out.close(); LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); @@ -256,7 +289,6 @@ @Test (timeout=10000) public void testDownloadBadPublic() throws IOException, URISyntaxException, InterruptedException { - Configuration conf = new Configuration(); conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); FileContext files = FileContext.getLocalFSFileContext(conf); final Path basedir = files.makeQualified(new Path("target", @@ -307,7 +339,6 @@ @Test (timeout=60000) public void testDownloadPublicWithStatCache() throws IOException, URISyntaxException, InterruptedException, ExecutionException { - final Configuration conf = new Configuration(); FileContext files = FileContext.getLocalFSFileContext(conf); Path basedir = files.makeQualified(new Path("target", TestFSDownload.class.getSimpleName())); @@ -382,7 +413,6 @@ @Test (timeout=10000) public void testDownload() throws IOException, URISyntaxException, InterruptedException { - Configuration conf = new Configuration(); conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); FileContext files = FileContext.getLocalFSFileContext(conf); final Path basedir = files.makeQualified(new Path("target", @@ -438,7 +468,7 @@ FileStatus status = files.getFileStatus(localized.getParent()); FsPermission perm = status.getPermission(); assertEquals("Cache directory permissions are incorrect", - new FsPermission((short)0755), perm); + new FsPermission((short)0700), perm); status = files.getFileStatus(localized); perm = status.getPermission(); @@ -455,7 +485,6 @@ private void downloadWithFileType(TEST_FILE_TYPE fileType) throws IOException, URISyntaxException, InterruptedException{ - Configuration conf = new Configuration(); conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); FileContext files = FileContext.getLocalFSFileContext(conf); final Path basedir = files.makeQualified(new Path("target", @@ -530,7 +559,7 @@ } } - @Test (timeout=10000) + @Test (timeout=10000) public void testDownloadArchive() throws IOException, URISyntaxException, InterruptedException { downloadWithFileType(TEST_FILE_TYPE.TAR); @@ -542,12 +571,24 @@ downloadWithFileType(TEST_FILE_TYPE.JAR); } - @Test (timeout=10000) + @Test (timeout=10000) public void testDownloadArchiveZip() throws IOException, URISyntaxException, InterruptedException { downloadWithFileType(TEST_FILE_TYPE.ZIP); } + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test (timeout=10000) + public void testDownloadArchiveFewThreads() + throws IOException, URISyntaxException, + InterruptedException { + conf.setInt(YarnConfiguration.NM_DOWNLOADER_THREAD_COUNT, 1); + thrown.expect(IOException.class); + downloadWithFileType(TEST_FILE_TYPE.TAR); + } + /* * To test fix for YARN-3029 */ @@ -603,7 +644,6 @@ @Test (timeout=10000) public void testDirDownload() throws IOException, InterruptedException { - Configuration conf = new Configuration(); FileContext files = FileContext.getLocalFSFileContext(conf); final Path basedir = files.makeQualified(new Path("target", TestFSDownload.class.getSimpleName())); @@ -668,7 +708,6 @@ @Test (timeout=10000) public void testUniqueDestinationPath() throws Exception { - Configuration conf = new Configuration(); FileContext files = FileContext.getLocalFSFileContext(conf); final Path basedir = files.makeQualified(new Path("target", TestFSDownload.class.getSimpleName())); Index: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java (revision c529eb4ec4f870c84fbe4273bf1b76a01706dd35) +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java (revision 96f1a886ebf7a5d34ec3daefaec716f9a1d03e6b) @@ -35,12 +35,17 @@ import java.util.Enumeration; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.jar.Attributes; import java.util.jar.JarOutputStream; import java.util.jar.Manifest; import java.util.zip.GZIPInputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; +import java.util.zip.ZipInputStream; import org.apache.commons.collections.map.CaseInsensitiveMap; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; @@ -52,6 +57,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.util.RunJar; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.StringUtils; @@ -74,6 +80,11 @@ * */ public static final int SYMLINK_NO_PRIVILEGE = 2; + /** + * Buffer size for copy the content of compressed file to new file. + */ + private static final int BUFFER_SIZE = 8_192; + /** * convert an array of FileStatus to an array of Path * @@ -620,6 +631,151 @@ } } + /** + * Decompress the given stream with file name name to the destination path. + * @param inputStream compressed data + * @param name file name ending with the compression type like tar.gz + * @param destination Destination path + * @param executorService Optional thread pool service + * @param useOSCommand Whether to use OS commands to decompress + * @return Whether this is a compressed file and decompression was successful + * @throws IOException Could not read or write data + * @throws InterruptedException Operation interrupted + * @throws ExecutionException Could not submit thread pool items + */ + public static boolean decompress( + InputStream inputStream, + String name, + Path destination, + ExecutorService executorService, + boolean useOSCommand) + throws IOException, InterruptedException, ExecutionException { + String fileName = + StringUtils.toLowerCase(name); + String destinationPath = + new File(destination.toUri()).getAbsolutePath(); + if (destinationPath.contains("'")) { + throw new IOException("Invalid path " + destinationPath + + ". Possible code injection attack"); + } + StringBuilder command = new StringBuilder(); + boolean isGzipped = fileName.endsWith(".gz"); + boolean isTar = + fileName.endsWith(".tar") || + fileName.endsWith(".tar.gz") || + fileName.endsWith(".tgz"); + boolean isZip = fileName.endsWith(".zip"); + boolean isJar = fileName.endsWith(".jar"); + String verbose = LOG.isDebugEnabled() ? "v" : ""; + + if(Shell.WINDOWS || !useOSCommand) { + if (isTar) { + unTarUsingJava(inputStream, + new File(destination.toUri()), isGzipped); + return true; + } else if (isZip) { + unZipUsingJava(inputStream, + new File(destination.toUri())); + return true; + } else if (isJar) { + RunJar.unJar( + inputStream, new File(destination.toUri()), RunJar.MATCH_ANY); + return true; + } + } else { + if (isGzipped) { + command.append("gzip -dc | "); + } + if (isTar) { + command + .append("(") + .append("cd '") + .append(destinationPath) + .append("' &&") + .append("tar -x") + .append(verbose) + .append(")"); + } else if (isZip || isJar) { + command + .append("(") + .append("cd '") + .append(destinationPath) + .append("'&&") + .append("jar x") + .append(verbose) + .append(")"); + } + if (!command.toString().isEmpty()) { + runCommandOnStream(inputStream, executorService, command.toString()); + return true; + } + } + return false; + } + + /** + * Run a command and send the contents of an input stream to it. + * @param inputStream Input stream to forward to the shell command + * @param executor0 Optional executor + * @param command shell command to run + * @throws IOException read or write failed + * @throws InterruptedException command interrupted + * @throws ExecutionException task submit failed + */ + private static void runCommandOnStream( + InputStream inputStream, ExecutorService executor0, String command) + throws IOException, InterruptedException, ExecutionException { + String shell = Shell.WINDOWS ? "cmd" : "bash"; + String cmdSwitch = Shell.WINDOWS ? "/c" : "-c"; + ExecutorService executor = + executor0 != null ? executor0 : + Executors.newFixedThreadPool(2); + ProcessBuilder builder = new ProcessBuilder(); + builder.command(shell, cmdSwitch, command); + Process process = builder.start(); + Future output = null; + Future error = null; + String result = + String.format("\nEnable debug logs on %s for details", LOG.getName()); + if (LOG.isDebugEnabled()) { + output = executor.submit(() -> { + // Read until the output stream receives an EOF and closed. + try { + return + org.apache.commons.io.IOUtils.toString(process.getInputStream()); + } catch (IOException e) { + return e.getMessage(); + } finally { + process.getInputStream().close(); + } + }); + error = executor.submit(() -> { + try { + // Read until the error stream receives an EOF and closed. + return + org.apache.commons.io.IOUtils.toString(process.getErrorStream()); + } catch (IOException e) { + return e.getMessage(); + } finally { + process.getErrorStream().close(); + } + }); + } + try { + org.apache.commons.io.IOUtils.copy( + inputStream, process.getOutputStream()); + } finally { + process.getOutputStream().close(); + } + if (output != null) { + result = error.get() + "\n" + output.get(); + LOG.debug(result); + } + if (process.waitFor() != 0) { + throw new IOException("Process " + command + " exited with " + result); + } + } + /** * Given a Tar File as input it will untar the file in a the untar directory * passed as the second parameter @@ -701,6 +857,61 @@ } } + private static void unTarUsingJava(InputStream inputStream, File untarDir, + boolean gzipped) throws IOException { + TarArchiveInputStream tis = null; + try { + if (gzipped) { + inputStream = new BufferedInputStream(new GZIPInputStream( + inputStream)); + } else { + inputStream = + new BufferedInputStream(inputStream); + } + + tis = new TarArchiveInputStream(inputStream); + + for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null;) { + unpackEntries(tis, entry, untarDir); + entry = tis.getNextTarEntry(); + } + } finally { + IOUtils.cleanupWithLogger(LOG, tis, inputStream); + } + } + + private static void unZipUsingJava(InputStream inputStream, File toDir) + throws IOException { + try (ZipInputStream zip = new ZipInputStream(inputStream)) { + int numOfFailedLastModifiedSet = 0; + do { + final ZipEntry entry = zip.getNextEntry(); + if (entry == null) { + break; + } + if (!entry.isDirectory()) { + File file = new File(toDir, entry.getName()); + File parent = file.getParentFile(); + if (!parent.mkdirs() && + !parent.isDirectory()) { + throw new IOException("Mkdirs failed to create " + + parent.getAbsolutePath()); + } + try (OutputStream out = new FileOutputStream(file)) { + IOUtils.copyBytes(zip, out, BUFFER_SIZE); + } + if (!file.setLastModified(entry.getTime())) { + numOfFailedLastModifiedSet++; + } + } + } while (true); + if (numOfFailedLastModifiedSet > 0) { + LOG.warn("Could not set last modfied time for {} file(s)", + numOfFailedLastModifiedSet); + } + } + } + private static void unpackEntries(TarArchiveInputStream tis, TarArchiveEntry entry, File outputDir) throws IOException { if (entry.isDirectory()) {