diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 05f6cc9..4d7ae2c 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3516,6 +3516,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_EXEC_INPUT_LISTING_MAX_THREADS("hive.exec.input.listing.max.threads", 0, new SizeValidator(0L, true, 1024L, true), "Maximum number of threads that Hive uses to list file information from file systems (recommended > 1 for blobstore)."), + HIVE_EXEC_MOVE_FILES_FROM_SOURCE_DIR("hive.exec.move.files.from.source.dir", false, + "When moving/renaming a directory from source to destination, individually move each \n" + + "file in the source directory, rather than renaming the source directory. This may \n" + + "help protect against files written to temp directories by runaway task attempts."), + /* BLOBSTORE section */ HIVE_BLOBSTORE_SUPPORTED_SCHEMES("hive.blobstore.supported.schemes", "s3,s3a,s3n", diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index af82671..eaf210f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1142,6 +1142,51 @@ public static void rename(FileSystem fs, Path src, Path dst) throws IOException, } /** + * Moves files from src to dst if it is within the specified set of paths + * @param fs + * @param src + * @param dst + * @param filesToMove + * @throws IOException + * @throws HiveException + */ + private static void moveSpecifiedFiles(FileSystem fs, Path src, Path dst, Set filesToMove) + throws IOException, HiveException { + if (!fs.exists(dst)) { + fs.mkdirs(dst); + } + + FileStatus[] files = fs.listStatus(src); + for (FileStatus file : files) { + if (filesToMove.contains(file.getPath())) { + Utilities.moveFile(fs, file, dst); + } + } + } + + private static void moveFile(FileSystem fs, FileStatus file, Path dst) throws IOException, + HiveException { + Path srcFilePath = file.getPath(); + String fileName = srcFilePath.getName(); + Path dstFilePath = new Path(dst, fileName); + if (file.isDir()) { + renameOrMoveFiles(fs, srcFilePath, dstFilePath); + } else { + if (fs.exists(dstFilePath)) { + int suffix = 0; + do { + suffix++; + dstFilePath = new Path(dst, fileName + "_" + suffix); + } while (fs.exists(dstFilePath)); + } + + if (!fs.rename(srcFilePath, dstFilePath)) { + throw new HiveException("Unable to move: " + srcFilePath + " to: " + dst); + } + } + } + + /** * Rename src to dst, or in the case dst already exists, move files in src to dst. If there is an * existing file with the same name, the new file's name will be appended with "_1", "_2", etc. * @@ -1163,26 +1208,7 @@ public static void renameOrMoveFiles(FileSystem fs, Path src, Path dst) throws I // move file by file FileStatus[] files = fs.listStatus(src); for (FileStatus file : files) { - - Path srcFilePath = file.getPath(); - String fileName = srcFilePath.getName(); - Path dstFilePath = new Path(dst, fileName); - if (file.isDir()) { - renameOrMoveFiles(fs, srcFilePath, dstFilePath); - } - else { - if (fs.exists(dstFilePath)) { - int suffix = 0; - do { - suffix++; - dstFilePath = new Path(dst, fileName + "_" + suffix); - } while (fs.exists(dstFilePath)); - } - - if (!fs.rename(srcFilePath, dstFilePath)) { - throw new HiveException("Unable to move: " + src + " to: " + dst); - } - } + Utilities.moveFile(fs, file, dst); } } } @@ -1413,21 +1439,29 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, tmpPath, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs); if(statuses != null && statuses.length > 0) { PerfLogger perfLogger = SessionState.getPerfLogger(); + Set filesKept = new HashSet(); perfLogger.PerfLogBegin("FileSinkOperator", "RemoveTempOrDuplicateFiles"); // remove any tmp file or double-committed output files - List emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, statuses, dpCtx, conf, hconf); + List emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, statuses, dpCtx, conf, hconf, filesKept); perfLogger.PerfLogEnd("FileSinkOperator", "RemoveTempOrDuplicateFiles"); // create empty buckets if necessary if (emptyBuckets.size() > 0) { perfLogger.PerfLogBegin("FileSinkOperator", "CreateEmptyBuckets"); createEmptyBuckets(hconf, emptyBuckets, conf, reporter); + filesKept.addAll(emptyBuckets); perfLogger.PerfLogEnd("FileSinkOperator", "CreateEmptyBuckets"); } // move to the file destination log.info("Moving tmp dir: " + tmpPath + " to: " + specPath); perfLogger.PerfLogBegin("FileSinkOperator", "RenameOrMoveFiles"); - Utilities.renameOrMoveFiles(fs, tmpPath, specPath); + if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVE_EXEC_MOVE_FILES_FROM_SOURCE_DIR)) { + // HIVE-17113 - avoid copying files that may have been written to the temp dir by runaway tasks, + // by moving just the files we've tracked from removeTempOrDuplicateFiles(). + Utilities.moveSpecifiedFiles(fs, tmpPath, specPath, filesKept); + } else { + Utilities.renameOrMoveFiles(fs, tmpPath, specPath); + } perfLogger.PerfLogEnd("FileSinkOperator", "RenameOrMoveFiles"); } } else { @@ -1484,6 +1518,12 @@ private static void createEmptyBuckets(Configuration hconf, List paths, } } + private static void addFilesToPathSet(Collection files, Set fileSet) { + for (FileStatus file : files) { + fileSet.add(file.getPath()); + } + } + /** * Remove all temporary files and duplicate (double-committed) files from a given directory. */ @@ -1501,13 +1541,18 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I return removeTempOrDuplicateFiles(fs, stats, dpCtx, conf, hconf); } + public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, + DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException { + return removeTempOrDuplicateFiles(fs, fileStats, dpCtx, conf, hconf, null); + } + /** * Remove all temporary files and duplicate (double-committed) files from a given directory. * * @return a list of path names corresponding to should-be-created empty buckets. */ public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, - DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException { + DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, Set filesKept) throws IOException { if (fileStats == null) { return null; } @@ -1532,6 +1577,9 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I } taskIDToFile = removeTempOrDuplicateFiles(items, fs); + if (filesKept != null && taskIDToFile != null) { + addFilesToPathSet(taskIDToFile.values(), filesKept); + } // if the table is bucketed and enforce bucketing, we should check and generate all buckets if (dpCtx.getNumBuckets() > 0 && taskIDToFile != null && !"tez".equalsIgnoreCase(hconf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname))) { // refresh the file list @@ -1556,6 +1604,9 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I return result; } taskIDToFile = removeTempOrDuplicateFiles(items, fs); + if (filesKept != null && taskIDToFile != null) { + addFilesToPathSet(taskIDToFile.values(), filesKept); + } if(taskIDToFile != null && taskIDToFile.size() > 0 && conf != null && conf.getTable() != null && (conf.getTable().getNumBuckets() > taskIDToFile.size()) && !"tez".equalsIgnoreCase(hconf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname))) { // get the missing buckets and generate empty buckets for non-dynamic partition diff --git a/ql/src/test/queries/clientpositive/skewjoin.q b/ql/src/test/queries/clientpositive/skewjoin.q index e4b178a..9ad4b77 100644 --- a/ql/src/test/queries/clientpositive/skewjoin.q +++ b/ql/src/test/queries/clientpositive/skewjoin.q @@ -2,6 +2,7 @@ set hive.mapred.mode=nonstrict; set hive.explain.user=false; set hive.optimize.skewjoin = true; set hive.skewjoin.key = 2; +set hive.exec.move.files.from.source.dir=true; -- SORT_QUERY_RESULTS