Index: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java (revision 739d3c394d772783fe23b386274d58954c9a0236) +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java (revision 15634197819e61e94747ed99d55e5f1dde757623) @@ -34,9 +34,11 @@ import java.util.List; import java.util.jar.JarEntry; import java.util.jar.JarFile; +import java.util.jar.JarInputStream; import java.util.jar.Manifest; import java.util.regex.Pattern; +import org.apache.commons.io.input.TeeInputStream; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileUtil; @@ -94,6 +96,74 @@ unJar(jarFile, toDir, MATCH_ANY); } + /** + * Unpack matching files from a jar. Entries inside the jar that do + * not match the given pattern will be skipped. + * + * @param jarFile the .jar file to unpack + * @param toDir the destination directory into which to unpack the jar + * @param unpackRegex the pattern to match jar entries against + * + * @throws IOException if an I/O error has occurred or toDir + * cannot be created and does not already exist + */ + public static void unJar(InputStream jarFile, File toDir, + Pattern unpackRegex) + throws IOException { + try (JarInputStream jar = new JarInputStream(jarFile)) { + int numOfFailedLastModifiedSet = 0; + do { + final JarEntry entry = jar.getNextJarEntry(); + if (entry == null) { + break; + } + if (!entry.isDirectory() && + unpackRegex.matcher(entry.getName()).matches()) { + File file = new File(toDir, entry.getName()); + ensureDirectory(file.getParentFile()); + try (OutputStream out = new FileOutputStream(file)) { + byte[] buf = new byte[BUFFER_SIZE]; + int bytesRead = jar.read(buf); + while (bytesRead >= 0) { + out.write(buf, 0, bytesRead); + bytesRead = jar.read(buf); + } + } + if (!file.setLastModified(entry.getTime())) { + numOfFailedLastModifiedSet++; + } + } + } while (true); + if (numOfFailedLastModifiedSet > 0) { + LOG.warn("Could not set last modfied time for {} file(s)", + numOfFailedLastModifiedSet); + } + } + } + + /** + * Unpack matching files from a jar. Entries inside the jar that do + * not match the given pattern will be skipped. Keep also a copy + * of the entire jar in the same directory + * + * @param jarFile the .jar file to unpack + * @param toDir the destination directory into which to unpack the jar + * @param unpackRegex the pattern to match jar entries against + * + * @throws IOException if an I/O error has occurred or toDir + * cannot be created and does not already exist + */ + public static void unJarAndSave(InputStream jarFile, File toDir, + String name, Pattern unpackRegex) + throws IOException{ + File file = new File(toDir, name); + ensureDirectory(toDir); + try (OutputStream jar = new FileOutputStream(file); + TeeInputStream teeInputStream = new TeeInputStream(jarFile, jar)) { + unJar(teeInputStream, toDir, unpackRegex); + } + } + /** * Unpack matching files from a jar. Entries inside the jar that do * not match the given pattern will be skipped. Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java (revision 739d3c394d772783fe23b386274d58954c9a0236) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LocalResource.java (revision f84fbf61630ff27f70f88e0dbca1c2b56f60408c) @@ -43,6 +43,9 @@ @Public @Stable public abstract class LocalResource { + @Public + @Unstable + public static final long IGNORE_TIMESTAMP = -1; @Public @Stable Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (revision 739d3c394d772783fe23b386274d58954c9a0236) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (revision f84fbf61630ff27f70f88e0dbca1c2b56f60408c) @@ -3465,6 +3465,11 @@ DEFAULT_TIMELINE_SERVICE_COLLECTOR_WEBAPP_HTTPS_ADDRESS = DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS; + public static final String NM_DOWNLOADER_THREAD_COUNT = + NM_PREFIX + "downloader.thread.count"; + public static final int + DEFAULT_NM_DOWNLOADER_THREAD_COUNT = 10; + public YarnConfiguration() { super(); } Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java (revision 739d3c394d772783fe23b386274d58954c9a0236) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java (revision ace84150e3dbbc8c57dfd572afceed7e4e962f5f) @@ -21,14 +21,20 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; +import java.util.LinkedList; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.regex.Pattern; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; @@ -48,14 +54,19 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.util.concurrent.Futures; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import static org.apache.hadoop.yarn.api.records.LocalResource.IGNORE_TIMESTAMP; -/** + /** * Download a single URL to the local disk. * */ @@ -64,6 +75,7 @@ private static final Log LOG = LogFactory.getLog(FSDownload.class); + private ExecutorService executor; private FileContext files; private final UserGroupInformation userUgi; private Configuration conf; @@ -96,6 +108,10 @@ this.userUgi = ugi; this.resource = resource; this.statCache = statCache; + this.executor = Executors.newFixedThreadPool( + conf.getInt(YarnConfiguration.NM_DOWNLOADER_THREAD_COUNT, + YarnConfiguration.DEFAULT_NM_DOWNLOADER_THREAD_COUNT) + ); } LocalResource getResource() { @@ -247,11 +263,27 @@ } } - private Path copy(Path sCopy, Path dstdir) throws IOException { + /** + * Use asynchronous calls and streaming to localize files. + * @param destination destination directory + * @throws IOException cannot read or write file + * @throws InterruptedException operation interrupted + * @throws ExecutionException thread pool error + * @throws YarnException subcommand returned an error + */ + private void parallelVerifyAndCopy(Path destination) + throws IOException, InterruptedException, + ExecutionException, YarnException { + final Path sCopy; + try { + sCopy = resource.getResource().toPath(); + } catch (URISyntaxException e) { + throw new IOException("Invalid resource", e); + } FileSystem sourceFs = sCopy.getFileSystem(conf); - Path dCopy = new Path(dstdir, "tmp_"+sCopy.getName()); FileStatus sStat = sourceFs.getFileStatus(sCopy); - if (sStat.getModificationTime() != resource.getTimestamp()) { + if (resource.getTimestamp() != IGNORE_TIMESTAMP && + sStat.getModificationTime() != resource.getTimestamp()) { throw new IOException("Resource " + sCopy + " changed on src filesystem (expected " + resource.getTimestamp() + ", was " + sStat.getModificationTime()); @@ -264,80 +296,93 @@ } } - FileUtil.copy(sourceFs, sStat, FileSystem.getLocal(conf), dCopy, false, - true, conf); - return dCopy; + try { + parallelCopy(sCopy, destination); + } catch (RuntimeException ex) { + throw new YarnException("Parallel download failed", ex); + } } - private long unpack(File localrsrc, File dst) throws IOException { - switch (resource.getType()) { - case ARCHIVE: { - String lowerDst = StringUtils.toLowerCase(dst.getName()); - if (lowerDst.endsWith(".jar")) { - RunJar.unJar(localrsrc, dst); - } else if (lowerDst.endsWith(".zip")) { - FileUtil.unZip(localrsrc, dst); - } else if (lowerDst.endsWith(".tar.gz") || - lowerDst.endsWith(".tgz") || - lowerDst.endsWith(".tar")) { - FileUtil.unTar(localrsrc, dst); + /** + * Copy source path to destination with localization rules. + * @param source source path to copy. Typically HDFS + * @param destination destination path. Typically local filesystem + */ + private void parallelCopy(Path source, Path destination) { + try { + FileSystem sourceFileSystem = source.getFileSystem(conf); + FileSystem destinationFileSystem = destination.getFileSystem(conf); + if (sourceFileSystem.getFileStatus(source).isDirectory()) { + LinkedList> tasks = new LinkedList<>(); + for (FileStatus child : sourceFileSystem.listStatus(source)) { + tasks.add(executor.submit(() -> parallelCopy( + child.getPath(), + new Path(destination, child.getPath().getName())))); + } + LinkedList exceptions = new LinkedList<>(); + for(Future task: tasks) { + try { + task.get(); + } catch (RuntimeException ex) { + exceptions.add(ex); + } + } + if (!exceptions.isEmpty()) { + throw new FSDownloadException("Localization failed", exceptions); + } } else { - LOG.warn("Cannot unpack " + localrsrc); - if (!localrsrc.renameTo(dst)) { - throw new IOException("Unable to rename file: [" + localrsrc - + "] to [" + dst + "]"); - } - } - } - break; - case PATTERN: { - String lowerDst = StringUtils.toLowerCase(dst.getName()); - if (lowerDst.endsWith(".jar")) { - String p = resource.getPattern(); - RunJar.unJar(localrsrc, dst, - p == null ? RunJar.MATCH_ANY : Pattern.compile(p)); - File newDst = new File(dst, dst.getName()); - if (!dst.exists() && !dst.mkdir()) { - throw new IOException("Unable to create directory: [" + dst + "]"); - } - if (!localrsrc.renameTo(newDst)) { - throw new IOException("Unable to rename file: [" + localrsrc - + "] to [" + newDst + "]"); - } - } else if (lowerDst.endsWith(".zip")) { - LOG.warn("Treating [" + localrsrc + "] as an archive even though it " + - "was specified as PATTERN"); - FileUtil.unZip(localrsrc, dst); - } else if (lowerDst.endsWith(".tar.gz") || - lowerDst.endsWith(".tgz") || - lowerDst.endsWith(".tar")) { - LOG.warn("Treating [" + localrsrc + "] as an archive even though it " + - "was specified as PATTERN"); - FileUtil.unTar(localrsrc, dst); - } else { - LOG.warn("Cannot unpack " + localrsrc); - if (!localrsrc.renameTo(dst)) { - throw new IOException("Unable to rename file: [" + localrsrc - + "] to [" + dst + "]"); + unpack(source, destination, sourceFileSystem, destinationFileSystem); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Do the localization action on the input stream. + * @param source Source path + * @param destination Destination pth + * @param sourceFileSystem Source filesystem + * @param destinationFileSystem Destination filesystem + * @throws IOException Could not read or write stream + * @throws InterruptedException Operation interrupted by caller + * @throws ExecutionException Could not create thread pool execution + */ + private void unpack(Path source, Path destination, + FileSystem sourceFileSystem, + FileSystem destinationFileSystem) + throws IOException, InterruptedException, ExecutionException { + try (InputStream inputStream = sourceFileSystem.open(source)) { + String destinationFile = + StringUtils.toLowerCase(destination.getName()); + if (resource.getType() == LocalResourceType.PATTERN) { + if (destinationFile.endsWith(".jar")) { + // Unpack and keep a copy of the whole jar for mapreduce + String p = resource.getPattern(); + RunJar.unJarAndSave(inputStream, new File(destination.toUri()), + source.getName(), + p == null ? RunJar.MATCH_ANY : Pattern.compile(p)); + return; + } else { + LOG.warn("Treating [" + source + "] " + + "as an archive even though it " + + "was specified as PATTERN"); + } + } + if (resource.getType() == LocalResourceType.ARCHIVE) { + if (FileUtil.decompress( + inputStream, source.getName(), destination, executor)) { + return; + } else { + LOG.warn("Cannot unpack [" + source + "] trying to copy"); } } - } - break; - case FILE: - default: - if (!localrsrc.renameTo(dst)) { - throw new IOException("Unable to rename file: [" + localrsrc - + "] to [" + dst + "]"); + // Fall back to copying + try (OutputStream outputStream = + destinationFileSystem.create(destination, true)) { + IOUtils.copy(inputStream, outputStream); } - break; } - if(localrsrc.isFile()){ - try { - files.delete(new Path(localrsrc.toString()), false); - } catch (IOException ignore) { - } - } - return 0; // TODO Should calculate here before returning //return FileUtil.getDU(destDir); } @@ -352,27 +397,34 @@ } if (LOG.isDebugEnabled()) { - LOG.debug("Starting to download " + sCopy); + LOG.info(String.format("Starting to download %s %s %s", + sCopy, + resource.getType(), + resource.getPattern())); } - createDir(destDirPath, cachePerms); - final Path dst_work = new Path(destDirPath + "_tmp"); - createDir(dst_work, cachePerms); - Path dFinal = files.makeQualified(new Path(dst_work, sCopy.getName())); + final Path destionationTmp = new Path(destDirPath + "_tmp"); + createDir(destionationTmp, PRIVATE_DIR_PERMS); + Path dFinal = + files.makeQualified(new Path(destionationTmp, sCopy.getName())); try { - Path dTmp = null == userUgi ? files.makeQualified(copy(sCopy, dst_work)) - : userUgi.doAs(new PrivilegedExceptionAction() { - public Path run() throws Exception { - return files.makeQualified(copy(sCopy, dst_work)); - }; - }); - unpack(new File(dTmp.toUri()), new File(dFinal.toUri())); + if (userUgi == null) { + parallelVerifyAndCopy(dFinal); + } else { + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + parallelVerifyAndCopy(dFinal); + return null; + } + }); + } changePermissions(dFinal.getFileSystem(conf), dFinal); - files.rename(dst_work, destDirPath, Rename.OVERWRITE); + files.rename(destionationTmp, destDirPath, Rename.OVERWRITE); if (LOG.isDebugEnabled()) { - LOG.debug("File has been downloaded to " + - new Path(destDirPath, sCopy.getName())); + LOG.debug(String.format("File has been downloaded to %s from %s", + new Path(destDirPath, sCopy.getName()), sCopy)); } } catch (Exception e) { try { @@ -382,7 +434,7 @@ throw e; } finally { try { - files.delete(dst_work, true); + files.delete(destionationTmp, true); } catch (FileNotFoundException ignore) { } conf = null; Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownloadException.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownloadException.java (revision f84fbf61630ff27f70f88e0dbca1c2b56f60408c) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownloadException.java (revision f84fbf61630ff27f70f88e0dbca1c2b56f60408c) @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.util; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +import java.util.LinkedList; + +/** + * Aggregated exception for multiple download exceptions. + */ +public class FSDownloadException extends YarnException { + private LinkedList innerExceptions; + + public FSDownloadException(String message, LinkedList exceptions) { + super(message, exceptions.getFirst()); + this.innerExceptions = exceptions; + } + public LinkedList getInnerExceptions() { + return innerExceptions; + } + @Override + public String toString() { + String nl = System.getProperty("line.separator"); + StringBuilder builder = new StringBuilder(); + builder.append(super.toString()); + builder.append(nl); + for(Exception exception: innerExceptions) { + builder.append(exception.toString()); + builder.append(nl); + } + return builder.toString(); + } +} Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (revision 739d3c394d772783fe23b386274d58954c9a0236) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (revision f84fbf61630ff27f70f88e0dbca1c2b56f60408c) @@ -3623,4 +3623,11 @@ + + + Localizer thread count. + + yarn.nodemanager.downloader.thread.count + 10 + Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java (revision 739d3c394d772783fe23b386274d58954c9a0236) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java (revision f84fbf61630ff27f70f88e0dbca1c2b56f60408c) @@ -132,6 +132,18 @@ FileOutputStream stream = new FileOutputStream(jarFile); LOG.info("Create jar out stream "); JarOutputStream out = new JarOutputStream(stream, new Manifest()); + ZipEntry entry = new ZipEntry("classes/1.class"); + out.putNextEntry(entry); + out.write(1); + out.write(2); + out.write(3); + out.closeEntry(); + ZipEntry entry2 = new ZipEntry("classes/2.class"); + out.putNextEntry(entry2); + out.write(1); + out.write(2); + out.write(3); + out.closeEntry(); LOG.info("Done writing jar stream "); out.close(); LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); @@ -438,7 +450,7 @@ FileStatus status = files.getFileStatus(localized.getParent()); FsPermission perm = status.getPermission(); assertEquals("Cache directory permissions are incorrect", - new FsPermission((short)0755), perm); + new FsPermission((short)0700), perm); status = files.getFileStatus(localized); perm = status.getPermission(); @@ -542,7 +554,7 @@ downloadWithFileType(TEST_FILE_TYPE.JAR); } - @Test (timeout=10000) + @Test (timeout=10000) public void testDownloadArchiveZip() throws IOException, URISyntaxException, InterruptedException { downloadWithFileType(TEST_FILE_TYPE.ZIP); Index: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java (revision f84fbf61630ff27f70f88e0dbca1c2b56f60408c) +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java (revision 15634197819e61e94747ed99d55e5f1dde757623) @@ -35,6 +35,10 @@ import java.util.Enumeration; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.jar.Attributes; import java.util.jar.JarOutputStream; import java.util.jar.Manifest; @@ -620,6 +624,143 @@ } } + /** + * Decompress the given stream with file name name to the destination path. + * @param inputStream compressed data + * @param name file name ending with the compression type like tar.gz + * @param destination Destination path + * @param executorService Optional thread pool service + * @return Whether this is a compressed file and decompression was successful + * @throws IOException Could not read or write data + * @throws InterruptedException Operation interrupted + * @throws ExecutionException Could not submit thread pool items + */ + public static boolean decompress( + InputStream inputStream, + String name, + Path destination, + ExecutorService executorService) + throws IOException, InterruptedException, ExecutionException { + String fileName = + StringUtils.toLowerCase(name); + String destinationPath = + new File(destination.toUri()).getAbsolutePath() + .replace("'", "\\'"); + StringBuilder command = new StringBuilder(); + boolean tar = false; + boolean zipOrJar = false; + boolean gzipped = false; + while (true) { + if (fileName.endsWith(".tgz")) { + fileName = + fileName + .substring(0, fileName.length() - 3) + "tar.gz"; + } else if (fileName.endsWith(".gz")) { + command.append("gzip -dc | "); + fileName = + fileName.substring(0, fileName.length() - 3); + gzipped = true; + } else if (fileName.endsWith(".tar")) { + command + .append("(") + .append("rm -rf '") + .append(destinationPath) + .append("';") + .append("mkdir '") + .append(destinationPath) + .append("'; cd '") + .append(destinationPath) + .append("';") + .append("tar -xv") + .append(")"); + tar = true; + break; + } else if (fileName.endsWith(".jar") || + fileName.endsWith(".zip")) { + command + .append("(") + .append("rm -rf '") + .append(destinationPath) + .append("';") + .append("mkdir '") + .append(destinationPath) + .append("'; cd '") + .append(destinationPath) + .append("';") + .append("jar xv)"); + zipOrJar = true; + break; + } else { + break; + } + } + if(Shell.WINDOWS && tar) { + // Tar is not native to Windows. Use simple Java based implementation for + // tests and simple tar archives + unTarUsingJava(inputStream, + new File(destination.toUri()), gzipped); + return true; + } else if (tar || zipOrJar) { + runCommandOnStream(inputStream, executorService, command.toString()); + return true; + } + return false; + } + + /** + * Run a command and send the contents of an input stream to it. + * @param inputStream Input stream to forward to the shell command + * @param executor0 Optional executor + * @param command shell command to run + * @throws IOException read or write failed + * @throws InterruptedException command interrupted + * @throws ExecutionException task submit failed + */ + private static void runCommandOnStream( + InputStream inputStream, ExecutorService executor0, String command) + throws IOException, InterruptedException, ExecutionException { + String shell = Shell.WINDOWS ? "cmd" : "bash"; + String cmdSwitch = Shell.WINDOWS ? "/c" : "-c"; + ExecutorService executor = + executor0 != null ? executor0 : + Executors.newFixedThreadPool(2); + ProcessBuilder builder = new ProcessBuilder(); + builder.command(shell, cmdSwitch, command); + Process process = builder.start(); + Future output = executor.submit(() -> { + // Read until the output stream receives an EOF and closed. + try { + return org.apache.commons.io.IOUtils.toString(process.getInputStream()); + } catch (IOException e) { + return e.getMessage(); + } finally { + process.getInputStream().close(); + } + }); + Future error = executor.submit(() -> { + try { + // Read until the error stream receives an EOF and closed. + return org.apache.commons.io.IOUtils.toString(process.getErrorStream()); + } catch (IOException e) { + return e.getMessage(); + } finally { + process.getErrorStream().close(); + } + }); + try { + org.apache.commons.io.IOUtils.copy( + inputStream, process.getOutputStream()); + } finally { + process.getOutputStream().close(); + } + if (process.waitFor() != 0) { + throw new IOException("Process " + command + " exited with " + + process.exitValue() + + "\n" + error.get() + "\n" + output.get() + "\n"); + } + LOG.info(error.get() + "\n" + output.get() + "\n"); + } + /** * Given a Tar File as input it will untar the file in a the untar directory * passed as the second parameter @@ -690,6 +831,29 @@ inputStream = new BufferedInputStream(new FileInputStream(inFile)); } + tis = new TarArchiveInputStream(inputStream); + + for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null;) { + unpackEntries(tis, entry, untarDir); + entry = tis.getNextTarEntry(); + } + } finally { + IOUtils.cleanupWithLogger(LOG, tis, inputStream); + } + } + + private static void unTarUsingJava(InputStream inputStream, File untarDir, + boolean gzipped) throws IOException { + TarArchiveInputStream tis = null; + try { + if (gzipped) { + inputStream = new BufferedInputStream(new GZIPInputStream( + inputStream)); + } else { + inputStream = + new BufferedInputStream(inputStream); + } + tis = new TarArchiveInputStream(inputStream); for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null;) {