diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index e25dc54e7d..e0758118f3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1493,23 +1493,10 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, // for SELECT statements LOG.debug("Skipping rename/move files. Files to be kept are: " + filesKept.toString()); conf.getFilesToFetch().addAll(filesKept); - } else if (conf !=null && conf.isCTASorCM() && isBlobStorage) { - // for CTAS or Create MV statements - perfLogger.PerfLogBegin("FileSinkOperator", "moveSpecifiedFileStatus"); - LOG.debug("CTAS/Create MV: Files being renamed: " + filesKept.toString()); - if (emptyBuckets.isEmpty()) { - fs.rename(tmpPath, specPath); - } else { - LOG.info("Duplicate files present. Moving files sequentially"); - Utilities.moveSpecifiedFileStatus(fs, tmpPath, specPath, filesKept); - } - perfLogger.PerfLogEnd("FileSinkOperator", "moveSpecifiedFileStatus"); } else { - // for rest of the statement e.g. INSERT, LOAD etc - perfLogger.PerfLogBegin("FileSinkOperator", "RenameOrMoveFiles"); - LOG.debug("Final renaming/moving. Source: " + tmpPath + " .Destination: " + specPath); - Utilities.renameOrMoveFiles(fs, tmpPath, specPath); - perfLogger.PerfLogEnd("FileSinkOperator", "RenameOrMoveFiles"); + // Move specified files to destination folder + LOG.info("Move {} files from {} to {}", filesKept.size(), tmpPath, specPath); + moveSpecifiedFilesInParallel(hconf, fs, tmpPath, specPath, filesKept); } } } else { @@ -1520,6 +1507,104 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, fs.delete(taskTmpPath, true); } + /** + * move specified files to destination in parallel mode. + * Spins up multiple threads, schedules transfer and shuts down the pool. + * + * @param conf + * @param fs + * @param srcPath + * @param destPath + * @param filesToMove + * @throws HiveException + * @throws IOException + */ + private static void moveSpecifiedFilesInParallel(Configuration conf, FileSystem fs, + Path srcPath, Path destPath, Set filesToMove) + throws HiveException, IOException { + + int threads = Math.max(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 15), 1); + final ExecutorService pool = Executors.newFixedThreadPool(threads, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-tmp-ext-%d").build()); + + LOG.info("rename {} files from {} to dest {} with {} threads", + filesToMove.size(), srcPath, destPath, threads); + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin("FileSinkOperator", "moveSpecifiedFileStatus"); + + List> futures = new LinkedList<>(); + moveSpecifiedFilesInParallel(fs, srcPath, destPath, filesToMove, futures, pool); + + pool.shutdown(); + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Error in moving files to destination", e); + cancelTasks(futures); + throw new HiveException(e); + } + } + LOG.info("Completed rename from {} to {}", srcPath, destPath); + + perfLogger.PerfLogEnd("FileSinkOperator", "moveSpecifiedFileStatus"); + } + + /** + * Moves files from src to dst if it is within the specified set of paths + * @param fs + * @param src + * @param dst + * @param filesToMove + * @param futures List of futures + * @param pool thread pool + * @throws IOException + */ + private static void moveSpecifiedFilesInParallel(FileSystem fs, + Path src, Path dst, Set filesToMove, List> futures, + ExecutorService pool) throws IOException { + if (!fs.exists(dst)) { + LOG.info("Creating {}", dst); + fs.mkdirs(dst); + } + + FileStatus[] files = fs.listStatus(src); + for (FileStatus fileStatus : files) { + if (filesToMove.contains(fileStatus)) { + futures.add(pool.submit(new Callable() { + @Override + public Void call() throws HiveException { + try { + LOG.debug("Moving from {} to {} ", fileStatus.getPath(), dst); + Utilities.moveFile(fs, fileStatus, dst); + } catch (Exception e) { + throw new HiveException(e); + } + return null; + } + })); + } else if (fileStatus.isDir()) { + // Traverse directory contents. + // Directory nesting for dst needs to match src. + Path nestedDstPath = new Path(dst, fileStatus.getPath().getName()); + moveSpecifiedFilesInParallel(fs, fileStatus.getPath(), nestedDstPath, + filesToMove, futures, pool); + } + } + } + + /** + * cancel all futures. + * + * @param futureList + */ + private static void cancelTasks(List> futureList) { + for (Future future : futureList) { + future.cancel(true); + } + } + + /** * Check the existence of buckets according to bucket specification. Create empty buckets if