diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java index c0388f6..e8a3a7a 100644 --- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.BitSet; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Random; @@ -48,13 +49,10 @@ import org.apache.hadoop.fs.Trash; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConfUtil; -import org.apache.hadoop.hive.io.HdfsUtils; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; @@ -594,10 +592,10 @@ public static boolean copy(FileSystem srcFS, Path src, * Copies files between filesystems as a fs super user using distcp, and runs * as a privileged user. */ - public static boolean privilegedCopy(FileSystem srcFS, Path src, Path dst, + public static boolean privilegedCopy(FileSystem srcFS, List srcPaths, Path dst, HiveConf conf) throws IOException { String privilegedUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); - return distCp(srcFS, src, dst, false, privilegedUser, conf, ShimLoader.getHadoopShims()); + return distCp(srcFS, srcPaths, dst, false, privilegedUser, conf, ShimLoader.getHadoopShims()); } @VisibleForTesting @@ -622,7 +620,7 @@ static boolean copy(FileSystem srcFS, Path src, HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES) + ")"); LOG.info("Launch distributed copy (distcp) job."); triedDistcp = true; - copied = distCp(srcFS, src, dst, deleteSource, null, conf, shims); + copied = distCp(srcFS, Collections.singletonList(src), dst, deleteSource, null, conf, shims); } } if (!triedDistcp) { @@ -635,17 +633,19 @@ static boolean copy(FileSystem srcFS, Path src, return copied; } - static boolean distCp(FileSystem srcFS, Path src, Path dst, + public static boolean distCp(FileSystem srcFS, List srcPaths, Path dst, boolean deleteSource, String doAsUser, HiveConf conf, HadoopShims shims) throws IOException { boolean copied = false; if (doAsUser == null){ - copied = shims.runDistCp(src, dst, conf); + copied = shims.runDistCp(srcPaths, dst, conf); } else { - copied = shims.runDistCpAs(src, dst, conf, doAsUser); + copied = shims.runDistCpAs(srcPaths, dst, conf, doAsUser); } if (copied && deleteSource) { - srcFS.delete(src, true); + for (Path path : srcPaths) { + srcFS.delete(path, true); + } } return copied; } diff --git a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java index d3c8761..3cdca20 100644 --- a/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java +++ b/common/src/test/org/apache/hadoop/hive/common/TestFileUtils.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.net.URI; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -223,10 +224,10 @@ public void testCopyWithDistcp() throws IOException { when(mockFs.getContentSummary(any(Path.class))).thenReturn(mockContentSummary); HadoopShims shims = mock(HadoopShims.class); - when(shims.runDistCp(copySrc, copyDst, conf)).thenReturn(true); + when(shims.runDistCp(Collections.singletonList(copySrc), copyDst, conf)).thenReturn(true); Assert.assertTrue(FileUtils.copy(mockFs, copySrc, mockFs, copyDst, false, false, conf, shims)); - verify(shims).runDistCp(copySrc, copyDst, conf); + verify(shims).runDistCp(Collections.singletonList(copySrc), copyDst, conf); } @Test @@ -240,14 +241,14 @@ public void testCopyWithDistCpAs() throws IOException { String doAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); HadoopShims shims = mock(HadoopShims.class); - when(shims.runDistCpAs(copySrc, copyDst, conf, doAsUser)).thenReturn(true); - when(shims.runDistCp(copySrc, copyDst, conf)).thenReturn(false); + when(shims.runDistCpAs(Collections.singletonList(copySrc), copyDst, conf, doAsUser)).thenReturn(true); + when(shims.runDistCp(Collections.singletonList(copySrc), copyDst, conf)).thenReturn(false); // doAs when asked - Assert.assertTrue(FileUtils.distCp(fs, copySrc, copyDst, true, doAsUser, conf, shims)); - verify(shims).runDistCpAs(copySrc, copyDst, conf, doAsUser); + Assert.assertTrue(FileUtils.distCp(fs, Collections.singletonList(copySrc), copyDst, true, doAsUser, conf, shims)); + verify(shims).runDistCpAs(Collections.singletonList(copySrc), copyDst, conf, doAsUser); // don't doAs when not asked - Assert.assertFalse(FileUtils.distCp(fs, copySrc, copyDst, true, null, conf, shims)); - verify(shims).runDistCp(copySrc, copyDst, conf); + Assert.assertFalse(FileUtils.distCp(fs, Collections.singletonList(copySrc), copyDst, true, null, conf, shims)); + verify(shims).runDistCp(Collections.singletonList(copySrc), copyDst, conf); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 8e7704d..7330f56 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -33,6 +33,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import org.slf4j.Logger; @@ -98,7 +99,7 @@ protected int execute(DriverContext driverContext) { List srcFiles = new ArrayList<>(); FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath); LOG.debug("ReplCopyTasks srcs=" + (srcs == null ? "null" : srcs.length)); - if (! rwork.getReadListFromInput()){ + if (!rwork.getReadListFromInput()) { if (srcs == null || srcs.length == 0) { if (work.isErrorOnSrcEmpty()) { console.printError("No files matching path: " + fromPath.toString()); @@ -132,7 +133,7 @@ protected int execute(DriverContext driverContext) { } BufferedWriter listBW = null; - if (rwork.getListFilesOnOutputBehaviour()){ + if (rwork.getListFilesOnOutputBehaviour()) { Path listPath = new Path(toPath,EximUtil.FILES_NAME); LOG.debug("ReplCopyTask : generating _files at :" + listPath.toUri().toString()); if (dstFs.exists(listPath)){ @@ -144,27 +145,33 @@ protected int execute(DriverContext driverContext) { // later(for cases where filenames have unicode chars) } + HashMap> srcMap = new HashMap<>(); for (FileStatus oneSrc : srcFiles) { console.printInfo("Copying file: " + oneSrc.getPath().toString()); LOG.debug("Copying file: " + oneSrc.getPath().toString()); FileSystem actualSrcFs = null; - if (rwork.getReadListFromInput()){ + if (rwork.getReadListFromInput()) { // TODO : filesystemcache prevents this from being a perf nightmare, but we // should still probably follow up to see if we need to do something better here. actualSrcFs = oneSrc.getPath().getFileSystem(conf); } else { actualSrcFs = srcFs; } - if (!rwork.getListFilesOnOutputBehaviour(oneSrc)){ + if (!rwork.getListFilesOnOutputBehaviour(oneSrc)) { LOG.debug("ReplCopyTask :cp:" + oneSrc.getPath() + "=>" + toPath); - if (!doCopy(toPath, dstFs, oneSrc.getPath(), actualSrcFs, conf)) { - console.printError("Failed to copy: '" + oneSrc.getPath().toString() - + "to: '" + toPath.toString() + "'"); - return 1; + + // We just make the list of files to copied using distCp. + // If files come from different file system, then just make separate lists for each filesystem. + if (srcMap.containsKey(actualSrcFs)) { + srcMap.get(actualSrcFs).add(oneSrc.getPath()); + } else { + List srcPaths = new ArrayList<>(); + srcPaths.add(oneSrc.getPath()); + srcMap.put(actualSrcFs, srcPaths); } - }else{ + } else { LOG.debug("ReplCopyTask _files now tracks:" + oneSrc.getPath().toUri()); console.printInfo("Tracking file: " + oneSrc.getPath().toUri()); String chksumString = ReplChangeManager.checksumFor(oneSrc.getPath(), actualSrcFs); @@ -177,8 +184,21 @@ protected int execute(DriverContext driverContext) { listBW.close(); } - return 0; + // If the srcMap is not empty which means we made the list of files for distCp. + // If there are files from different filesystems, then the map will have multiple entries. + if (!srcMap.isEmpty()) { + for (final HashMap.Entry> entry : srcMap.entrySet()) { + FileSystem actualSrcFs = entry.getKey(); + List srcPaths = entry.getValue(); + if (!doCopy(toPath, dstFs, srcPaths, actualSrcFs, conf)) { + console.printError("Failed to copy: " + srcPaths.size() + + " files to: '" + toPath.toString() + "'"); + return 1; + } + } + } + return 0; } catch (Exception e) { console.printError("Failed with exception " + e.getMessage(), "\n" + StringUtils.stringifyException(e)); @@ -186,26 +206,31 @@ protected int execute(DriverContext driverContext) { } } - public static boolean doCopy(Path dst, FileSystem dstFs, Path src, FileSystem srcFs, - HiveConf conf) throws IOException { + public static boolean doCopy(Path dst, FileSystem dstFs, List srcPaths, FileSystem srcFs, + HiveConf conf) throws IOException { + boolean result = true; if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) - || isLocalFile(src) || isLocalFile(dst)){ - // regular copy in test env, or when source or destination is a local file - // distcp runs inside a mapper task, and cannot handle file:/// - LOG.debug("Using regular copy for {} -> {}", src.toUri(), dst.toUri()); - return FileUtils.copy(srcFs, src, dstFs, dst, false, true, conf); + || isLocalFileSystem(dstFs) || isLocalFileSystem(srcFs)) { + for (final Path src : srcPaths) { + // regular copy in test env, or when source or destination is a local file + // distcp runs inside a mapper task, and cannot handle file:/// + LOG.debug("Using regular copy for {} -> {}", src.toUri(), dst.toUri()); + if (!FileUtils.copy(srcFs, src, dstFs, dst, false, true, conf)) { + result = false; + } + } } else { // distcp in actual deployment with privilege escalation - LOG.debug("Using privleged distcp for {} -> {}", src.toUri(), dst.toUri()); - return FileUtils.privilegedCopy(srcFs, src, dst, conf); + result = FileUtils.privilegedCopy(srcFs, srcPaths, dst, conf); } + return result; } - private static boolean isLocalFile(Path p) { - String scheme = p.toUri().getScheme(); - boolean isLocalFile = scheme.equalsIgnoreCase("file"); - LOG.debug("{} was a local file? {}, had scheme {}",p.toUri(), isLocalFile, scheme); - return isLocalFile; + private static boolean isLocalFileSystem(FileSystem fs) { + String scheme = fs.getScheme(); + boolean isLocalFileSystem = scheme.equalsIgnoreCase("file"); + LOG.debug("Scheme {} was a local file system? {}", scheme, isLocalFileSystem); + return isLocalFileSystem; } private List filesInFileListing(FileSystem fs, Path path) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index 61e004f..164ca74 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -33,6 +33,8 @@ Licensed to the Apache Software Foundation (ASF) under one import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.List; public class FileOperations { private static Logger logger = LoggerFactory.getLogger(FileOperations.class); @@ -64,10 +66,12 @@ public void export(ReplicationSpec forReplicationSpec) throws IOException, Seman private void copyFiles() throws IOException { FileStatus[] fileStatuses = LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, dataFileListPath); + List srcPaths = new ArrayList<>(); for (FileStatus fileStatus : fileStatuses) { - ReplCopyTask.doCopy(exportRootDataDir, exportFileSystem, fileStatus.getPath(), dataFileSystem, - hiveConf); + srcPaths.add(fileStatus.getPath()); } + + ReplCopyTask.doCopy(exportRootDataDir, exportFileSystem, srcPaths, dataFileSystem, hiveConf); } /** diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index e3d1199..a2e0abd 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -29,7 +29,6 @@ import java.security.NoSuchAlgorithmException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; @@ -38,7 +37,6 @@ import java.util.Set; import java.util.TreeMap; import javax.security.auth.Subject; -import javax.security.auth.login.LoginException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -1086,7 +1084,7 @@ public void setStoragePolicy(Path path, StoragePolicyValue policy) private static final String DISTCP_OPTIONS_PREFIX = "distcp.options."; - List constructDistCpParams(Path src, Path dst, Configuration conf) { + List constructDistCpParams(List srcPaths, Path dst, Configuration conf) { List params = new ArrayList(); for (Map.Entry entry : conf.getPropsWithPrefix(DISTCP_OPTIONS_PREFIX).entrySet()){ String distCpOption = entry.getKey(); @@ -1102,20 +1100,22 @@ public void setStoragePolicy(Path path, StoragePolicyValue policy) params.add("-skipcrccheck"); params.add("-pb"); } - params.add(src.toString()); + for (Path src : srcPaths) { + params.add(src.toString()); + } params.add(dst.toString()); return params; } @Override - public boolean runDistCpAs(Path src, Path dst, Configuration conf, String doAsUser) throws IOException { + public boolean runDistCpAs(List srcPaths, Path dst, Configuration conf, String doAsUser) throws IOException { UserGroupInformation proxyUser = UserGroupInformation.createProxyUser( doAsUser, UserGroupInformation.getLoginUser()); try { return proxyUser.doAs(new PrivilegedExceptionAction() { @Override public Boolean run() throws Exception { - return runDistCp(src, dst, conf); + return runDistCp(srcPaths, dst, conf); } }); } catch (InterruptedException e) { @@ -1124,14 +1124,14 @@ public Boolean run() throws Exception { } @Override - public boolean runDistCp(Path src, Path dst, Configuration conf) throws IOException { - DistCpOptions options = new DistCpOptions(Collections.singletonList(src), dst); + public boolean runDistCp(List srcPaths, Path dst, Configuration conf) throws IOException { + DistCpOptions options = new DistCpOptions(srcPaths, dst); options.setSyncFolder(true); options.setSkipCRC(true); options.preserve(FileAttribute.BLOCKSIZE); // Creates the command-line parameters for distcp - List params = constructDistCpParams(src, dst, conf); + List params = constructDistCpParams(srcPaths, dst, conf); try { conf.setBoolean("mapred.mapper.new-api", true); diff --git a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java index 6c93df5..a73dc77 100644 --- a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java +++ b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java @@ -25,6 +25,7 @@ import org.junit.Test; import java.io.IOException; +import java.util.Collections; import java.util.List; import static org.junit.Assert.assertEquals; @@ -46,7 +47,7 @@ public void testConstructDistCpParams() { Configuration conf = new Configuration(); Hadoop23Shims shims = new Hadoop23Shims(); - List paramsDefault = shims.constructDistCpParams(copySrc, copyDst, conf); + List paramsDefault = shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf); assertEquals(5, paramsDefault.size()); assertTrue("Distcp -update set by default", paramsDefault.contains("-update")); @@ -59,7 +60,7 @@ public void testConstructDistCpParams() { conf.set("distcp.options.blah", ""); // should set "-blah" conf.set("dummy", "option"); // should be ignored. List paramsWithCustomParamInjection = - shims.constructDistCpParams(copySrc, copyDst, conf); + shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf); assertEquals(5, paramsWithCustomParamInjection.size()); diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index d08ad04..0db54d1 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.shims; import java.io.IOException; -import java.io.InputStream; import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URI; @@ -43,7 +42,6 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobProfile; @@ -482,25 +480,25 @@ public void checkFileAccess(FileSystem fs, FileStatus status, FsAction action) * copy is done. This is a variation which allows proxying as a different user to perform * the distcp, and requires that the caller have requisite proxy user privileges. * - * @param src Path to the source file or directory to copy + * @param srcPaths List of Path to the source files or directories to copy * @param dst Path to the destination file or directory * @param conf The hadoop configuration object * @param doAsUser The user to perform the distcp as * @return True if it is successfull; False otherwise. */ - public boolean runDistCpAs(Path src, Path dst, Configuration conf, String doAsUser) throws IOException; + public boolean runDistCpAs(List srcPaths, Path dst, Configuration conf, String doAsUser) throws IOException; /** * Copies a source dir/file to a destination by orchestrating the copy between hdfs nodes. * This distributed process is meant to copy huge files that could take some time if a single * copy is done. * - * @param src Path to the source file or directory to copy + * @param srcPaths List of Path to the source files or directories to copy * @param dst Path to the destination file or directory * @param conf The hadoop configuration object * @return True if it is successfull; False otherwise. */ - public boolean runDistCp(Path src, Path dst, Configuration conf) throws IOException; + public boolean runDistCp(List srcPaths, Path dst, Configuration conf) throws IOException; /** * This interface encapsulates methods used to get encryption information from