diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index e25dc54e7d..0304a5c74a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1111,43 +1111,6 @@ public static void rename(FileSystem fs, Path src, Path dst) throws IOException, } } - /** - * Moves files from src to dst if it is within the specified set of paths - * @param fs - * @param src - * @param dst - * @param filesToMove - * @throws IOException - * @throws HiveException - */ - private static void moveSpecifiedFiles(FileSystem fs, Path src, Path dst, Set filesToMove) - throws IOException, HiveException { - if (!fs.exists(dst)) { - fs.mkdirs(dst); - } - - FileStatus[] files = fs.listStatus(src); - for (FileStatus file : files) { - if (filesToMove.contains(file.getPath())) { - Utilities.moveFile(fs, file, dst); - } else if (file.isDir()) { - // Traverse directory contents. - // Directory nesting for dst needs to match src. - Path nestedDstPath = new Path(dst, file.getPath().getName()); - Utilities.moveSpecifiedFiles(fs, file.getPath(), nestedDstPath, filesToMove); - } - } - } - - private static void moveSpecifiedFileStatus(FileSystem fs, Path src, Path dst, - Set filesToMove) throws IOException, HiveException { - Set filePaths = new HashSet<>(); - for (FileStatus fstatus : filesToMove) { - filePaths.add(fstatus.getPath()); - } - moveSpecifiedFiles(fs, src, dst, filePaths); - } - private static void moveFile(FileSystem fs, FileStatus file, Path dst) throws IOException, HiveException { Path srcFilePath = file.getPath(); @@ -1197,6 +1160,53 @@ public static void renameOrMoveFiles(FileSystem fs, Path src, Path dst) throws I } } + /** + * Rename src to dst, or in the case dst already exists, move files in src + * to dst. If there is an existing file with the same name, the new file's + * name will be appended with "_1", "_2", etc. Happens in parallel mode. + * + * @param conf + * + * @param fs + * the FileSystem where src and dst are on. + * @param src + * the src directory + * @param dst + * the target directory + * @throws IOException + */ + public static void renameOrMoveFilesInParallel(Configuration conf, + FileSystem fs, Path src, Path dst) throws IOException, HiveException { + if (!fs.exists(dst)) { + if (!fs.rename(src, dst)) { + throw new HiveException("Unable to move: " + src + " to: " + dst); + } + } else { + // move files in parallel + LOG.info("Moving files from {} to {}", src, dst); + final ExecutorService pool = createMoveThreadPool(conf); + List> futures = new LinkedList<>(); + + final FileStatus[] files = fs.listStatus(src); + for (FileStatus file : files) { + futures.add(pool.submit(new Callable() { + @Override + public Void call() throws HiveException { + try { + Utilities.moveFile(fs, file, dst); + } catch (Exception e) { + throw new HiveException(e); + } + return null; + } + })); + } + + shutdownAndCleanup(pool, futures); + LOG.info("Rename files from {} to {} is complete", src, dst); + } + } + /** * The first group will contain the task id. The second group is the optional extension. The file * name looks like: "0_0" or "0_0.gz". There may be a leading prefix (tmp_). Since getTaskId() can @@ -1497,18 +1507,13 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, // for CTAS or Create MV statements perfLogger.PerfLogBegin("FileSinkOperator", "moveSpecifiedFileStatus"); LOG.debug("CTAS/Create MV: Files being renamed: " + filesKept.toString()); - if (emptyBuckets.isEmpty()) { - fs.rename(tmpPath, specPath); - } else { - LOG.info("Duplicate files present. Moving files sequentially"); - Utilities.moveSpecifiedFileStatus(fs, tmpPath, specPath, filesKept); - } + moveSpecifiedFilesInParallel(hconf, fs, tmpPath, specPath, filesKept); perfLogger.PerfLogEnd("FileSinkOperator", "moveSpecifiedFileStatus"); } else { // for rest of the statement e.g. INSERT, LOAD etc perfLogger.PerfLogBegin("FileSinkOperator", "RenameOrMoveFiles"); LOG.debug("Final renaming/moving. Source: " + tmpPath + " .Destination: " + specPath); - Utilities.renameOrMoveFiles(fs, tmpPath, specPath); + renameOrMoveFilesInParallel(hconf, fs, tmpPath, specPath); perfLogger.PerfLogEnd("FileSinkOperator", "RenameOrMoveFiles"); } } @@ -1520,6 +1525,118 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, fs.delete(taskTmpPath, true); } + /** + * move specified files to destination in parallel mode. + * Spins up multiple threads, schedules transfer and shuts down the pool. + * + * @param conf + * @param fs + * @param srcPath + * @param destPath + * @param filesToMove + * @throws HiveException + * @throws IOException + */ + private static void moveSpecifiedFilesInParallel(Configuration conf, FileSystem fs, + Path srcPath, Path destPath, Set filesToMove) + throws HiveException, IOException { + + LOG.info("rename {} files from {} to dest {}", + filesToMove.size(), srcPath, destPath); + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin("FileSinkOperator", "moveSpecifiedFileStatus"); + + final ExecutorService pool = createMoveThreadPool(conf); + + List> futures = new LinkedList<>(); + moveSpecifiedFilesInParallel(fs, srcPath, destPath, filesToMove, futures, pool); + + shutdownAndCleanup(pool, futures); + LOG.info("Completed rename from {} to {}", srcPath, destPath); + + perfLogger.PerfLogEnd("FileSinkOperator", "moveSpecifiedFileStatus"); + } + + /** + * Moves files from src to dst if it is within the specified set of paths + * @param fs + * @param src + * @param dst + * @param filesToMove + * @param futures List of futures + * @param pool thread pool + * @throws IOException + */ + private static void moveSpecifiedFilesInParallel(FileSystem fs, + Path src, Path dst, Set filesToMove, List> futures, + ExecutorService pool) throws IOException { + if (!fs.exists(dst)) { + LOG.info("Creating {}", dst); + fs.mkdirs(dst); + } + + FileStatus[] files = fs.listStatus(src); + for (FileStatus fileStatus : files) { + if (filesToMove.contains(fileStatus)) { + futures.add(pool.submit(new Callable() { + @Override + public Void call() throws HiveException { + try { + LOG.debug("Moving from {} to {} ", fileStatus.getPath(), dst); + Utilities.moveFile(fs, fileStatus, dst); + } catch (Exception e) { + throw new HiveException(e); + } + return null; + } + })); + } else if (fileStatus.isDir()) { + // Traverse directory contents. + // Directory nesting for dst needs to match src. + Path nestedDstPath = new Path(dst, fileStatus.getPath().getName()); + moveSpecifiedFilesInParallel(fs, fileStatus.getPath(), nestedDstPath, + filesToMove, futures, pool); + } + } + } + + private static ExecutorService createMoveThreadPool(Configuration conf) { + int threads = Math.max(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 15), 1); + return Executors.newFixedThreadPool(threads, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()); + } + + private static void shutdownAndCleanup(ExecutorService pool, + List> futures) throws HiveException { + if (pool == null) { + return; + } + pool.shutdown(); + + futures = (futures != null) ? futures : Collections.emptyList(); + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Error in moving files to destination", e); + cancelTasks(futures); + throw new HiveException(e); + } + } + } + + /** + * cancel all futures. + * + * @param futureList + */ + private static void cancelTasks(List> futureList) { + for (Future future : futureList) { + future.cancel(true); + } + } + + /** * Check the existence of buckets according to bucket specification. Create empty buckets if