diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index cbe4de5..cfa72b7 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3600,11 +3600,6 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_EXEC_INPUT_LISTING_MAX_THREADS("hive.exec.input.listing.max.threads", 0, new SizeValidator(0L, true, 1024L, true), "Maximum number of threads that Hive uses to list file information from file systems (recommended > 1 for blobstore)."), - HIVE_EXEC_MOVE_FILES_FROM_SOURCE_DIR("hive.exec.move.files.from.source.dir", false, - "When moving/renaming a directory from source to destination, individually move each \n" + - "file in the source directory, rather than renaming the source directory. This may \n" + - "help protect against files written to temp directories by runaway task attempts."), - /* BLOBSTORE section */ HIVE_BLOBSTORE_SUPPORTED_SCHEMES("hive.blobstore.supported.schemes", "s3,s3a,s3n", diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 1be7eab..1f57cd1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -93,6 +93,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.common.BlobStorageUtils; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveInterruptCallback; import org.apache.hadoop.hive.common.HiveInterruptUtils; @@ -1450,11 +1451,43 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, boolean success, Logger log, DynamicPartitionCtx dpCtx, FileSinkDesc conf, Reporter reporter) throws IOException, HiveException { - + + // + // Runaway task attempts (which are unable to be killed by MR/YARN) can cause HIVE-17113, + // where they can write duplicate output files to tmpPath after de-duplicating the files, + // but before tmpPath is moved to specPath. + // Fixing this issue will be done differently for blobstore (e.g. S3) + // vs non-blobstore (local filesystem, HDFS) filesystems due to differences in + // implementation - a directory move in a blobstore effectively results in file-by-file + // moves for every file in a directory, while in HDFS/localFS a directory move is just a + // single filesystem operation. + // - For non-blobstore FS, do the following: + // 1) Rename tmpPath to a new directory name to prevent additional files + // from being added by runaway processes. + // 2) Remove duplicates from the temp directory + // 3) Rename/move the temp directory to specPath + // + // - For blobstore FS, do the following: + // 1) Remove duplicates from tmpPath + // 2) Use moveSpecifiedFiles() to perform a file-by-file move of the de-duped files + // to specPath. On blobstore FS, assuming n files in the directory, this results + // in n file moves, compared to 2*n file moves with the previous solution + // (each directory move would result in a file-by-file move of the files in the directory) + // FileSystem fs = specPath.getFileSystem(hconf); + boolean isBlobStorage = BlobStorageUtils.isBlobStorageFileSystem(hconf, fs); Path tmpPath = Utilities.toTempPath(specPath); Path taskTmpPath = Utilities.toTaskTempPath(specPath); if (success) { + if (!isBlobStorage && fs.exists(tmpPath)) { + // 1) Rename tmpPath to a new directory name to prevent additional files + // from being added by runaway processes. + Path tmpPathOriginal = tmpPath; + tmpPath = new Path(tmpPath.getParent(), tmpPath.getName() + ".moved"); + Utilities.rename(fs, tmpPathOriginal, tmpPath); + } + + // Remove duplicates from tmpPath FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse( tmpPath, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs); if(statuses != null && statuses.length > 0) { @@ -1473,15 +1506,18 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, filesKept.addAll(emptyBuckets); perfLogger.PerfLogEnd("FileSinkOperator", "CreateEmptyBuckets"); } + // move to the file destination Utilities.FILE_OP_LOGGER.trace("Moving tmp dir: {} to: {}", tmpPath, specPath); perfLogger.PerfLogBegin("FileSinkOperator", "RenameOrMoveFiles"); - if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVE_EXEC_MOVE_FILES_FROM_SOURCE_DIR)) { + if (isBlobStorage) { // HIVE-17113 - avoid copying files that may have been written to the temp dir by runaway tasks, // by moving just the files we've tracked from removeTempOrDuplicateFiles(). Utilities.moveSpecifiedFiles(fs, tmpPath, specPath, filesKept); } else { + // For non-blobstore case, can just move the directory - the initial directory rename + // at the start of this method should prevent files written by runaway tasks. Utilities.renameOrMoveFiles(fs, tmpPath, specPath); } perfLogger.PerfLogEnd("FileSinkOperator", "RenameOrMoveFiles"); diff --git a/ql/src/test/queries/clientpositive/insert_with_move_files_from_source_dir.q b/ql/src/test/queries/clientpositive/insert_with_move_files_from_source_dir.q index 0117755..16d74d5 100644 --- a/ql/src/test/queries/clientpositive/insert_with_move_files_from_source_dir.q +++ b/ql/src/test/queries/clientpositive/insert_with_move_files_from_source_dir.q @@ -1,5 +1,4 @@ -set hive.exec.move.files.from.source.dir=true; set hive.enforce.bucketing=true; set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; @@ -8,7 +7,8 @@ create table emp1 (id int, name string, dept int, country string) row format del load data local inpath '../../data/files/employee_part.txt' overwrite into table emp1; select * from emp1 order by id; --- Testing inserts with hive.exec.move.files.from.source.dir=true +set hive.blobstore.supported.schemes=pfile; +-- Setting pfile to be treated as blobstore to test mvFileToFinalPath() behavior for blobstore case -- inserts into non-partitioned/non-bucketed table create table emp2 (id int, name string, dept int, country string) stored as textfile; insert overwrite table emp2 select * from emp1; diff --git a/ql/src/test/queries/clientpositive/skewjoin.q b/ql/src/test/queries/clientpositive/skewjoin.q index 9ad4b77..e4b178a 100644 --- a/ql/src/test/queries/clientpositive/skewjoin.q +++ b/ql/src/test/queries/clientpositive/skewjoin.q @@ -2,7 +2,6 @@ set hive.mapred.mode=nonstrict; set hive.explain.user=false; set hive.optimize.skewjoin = true; set hive.skewjoin.key = 2; -set hive.exec.move.files.from.source.dir=true; -- SORT_QUERY_RESULTS