diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 45a44e995a..0dec5945d8 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -647,6 +647,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Provide the maximum number of partitions of a table that will be batched together during \n" + "repl load. All the partitions in a batch will make a single metastore call to update the metadata. \n" + "The data for these partitions will be copied before copying the metadata batch. "), + REPL_PARALLEL_COPY_TASKS("hive.repl.parallel.copy.tasks",1000, + "Provide the maximum number of parallel copy tasks(distcp or regular copy) launched for a table \n" + + "or partition. "), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 27e97b90ad..41048c9de8 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -106,7 +106,6 @@ import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; -import static org.apache.hadoop.hive.metastore.Warehouse.getFs; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 5a662ff3c1..76b8b3be4e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -45,6 +45,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; public class CopyUtils { @@ -60,12 +63,14 @@ private final boolean hiveInReplTest; private final String copyAsUser; private FileSystem destinationFs; + private final int maxParallelCopyTask; public CopyUtils(String distCpDoAsUser, HiveConf hiveConf, FileSystem destinationFs) { this.hiveConf = hiveConf; maxNumberOfFiles = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES); maxCopyFileSize = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE); hiveInReplTest = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL); + maxParallelCopyTask = hiveConf.getIntVar(HiveConf.ConfVars.REPL_PARALLEL_COPY_TASKS); this.copyAsUser = distCpDoAsUser; this.destinationFs = destinationFs; } @@ -82,27 +87,31 @@ public void copyAndVerify(Path destRoot, List srcFil } FileSystem sourceFs = srcFiles.get(0).getSrcFs(); boolean useRegularCopy = regularCopy(sourceFs, srcFiles); + ExecutorService executorService = null; try { if (useRegularCopy || readSrcAsFilesList) { + executorService = Executors.newFixedThreadPool(maxParallelCopyTask); // Layout of data files may differ based on the type of tables. Map>> map = fsToFileMap(srcFiles, destRoot); for (Map.Entry>> entry : map.entrySet()) { Map> destMap = entry.getValue(); - for (Map.Entry> destMapEntry : destMap.entrySet()) { - Path destination = destMapEntry.getKey(); - List fileInfoList = destMapEntry.getValue(); - // Get the file system again from cache. There is a chance that the file system stored in the map is closed. - // For instance, doCopyRetry closes the file system in case of i/o exceptions. - sourceFs = fileInfoList.get(0).getSourcePath().getFileSystem(hiveConf); - if (!destinationFs.exists(destination) - && !FileUtils.mkdir(destinationFs, destination, hiveConf)) { - LOG.error("Failed to create destination directory: " + destination); - throw new IOException("Destination directory creation failed"); + if (destMap.size() > 1) { + //Multiple files, do copy in parallel + List> copyList = new ArrayList<>(); + for (Map.Entry> destMapEntry : destMap.entrySet()) { + copyList.add(() -> { + doCopy(destMapEntry, proxyUser, useRegularCopy, overwrite); + return null; + }); + } + executorService.invokeAll(copyList); + } else { + //Since just a single file, just do a copy in the same thread + for (Map.Entry> destMapEntry : destMap.entrySet()) { + doCopy(destMapEntry, proxyUser, useRegularCopy, overwrite); } - - // Copy files with retry logic on failure or source file is dropped or changed. - doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, useRegularCopy, overwrite); } + } } else { // When distCp is to be used and the srcFiles doesn't contain subDirs (readSrcAsFilesList=false), @@ -112,11 +121,34 @@ public void copyAndVerify(Path destRoot, List srcFil srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPath, null)); doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy, overwrite); } + } catch (InterruptedException e) { + LOG.error("Failed to copy ", e); + throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg()); } finally { if (proxyUser != null) { FileSystem.closeAllForUGI(proxyUser); } + if (executorService != null) { + executorService.shutdown(); + } + } + } + + private void doCopy(Map.Entry> destMapEntry, UserGroupInformation proxyUser, + boolean useRegularCopy, boolean overwrite) throws IOException, LoginException, + HiveFatalException { + Path destination = destMapEntry.getKey(); + List fileInfoList = destMapEntry.getValue(); + // Get the file system again from cache. There is a chance that the file system stored in the map is closed. + // For instance, doCopyRetry closes the file system in case of i/o exceptions. + FileSystem sourceFsOfFileInfo = fileInfoList.get(0).getSourcePath().getFileSystem(hiveConf); + if (!destinationFs.exists(destination) + && !FileUtils.mkdir(destinationFs, destination, hiveConf)) { + LOG.error("Failed to create destination directory: " + destination); + throw new IOException("Destination directory creation failed"); } + // Copy files with retry logic on failure or source file is dropped or changed. + doCopyRetry(sourceFsOfFileInfo, fileInfoList, destination, proxyUser, useRegularCopy, overwrite); } private void doCopyRetry(FileSystem sourceFs, List srcFileList,