diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 829791e0a9..e4855edbd8 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -585,8 +585,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal EXECPARALLEL("hive.exec.parallel", false, "Whether to execute jobs in parallel"), EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8, "How many jobs at most can be executed in parallel"), - HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution", true, - "Whether speculative execution for reducers should be turned on. "), + @Deprecated + HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution", false, + "(Deprecated) Whether speculative execution for reducers should be turned on. "), HIVECOUNTERSPULLINTERVAL("hive.exec.counters.pull.interval", 1000L, "The interval with which to poll the JobTracker for the counters the running job. \n" + "The smaller it is the more load there will be on the jobtracker, the higher it is the less granular the caught will be."), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 535c24519c..97cb8a1d07 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1822,7 +1822,7 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path, boolean } } - taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs); + taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs, hconf); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); } @@ -1835,7 +1835,7 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path, boolean } Path mmDir = extractNonDpMmDir(writeId, stmtId, fileStats, isBaseDir); taskIDToFile = removeTempOrDuplicateFilesNonMm( - fs.listStatus(new Path(mmDir, unionSuffix)), fs); + fs.listStatus(new Path(mmDir, unionSuffix)), fs, hconf); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); } @@ -1845,13 +1845,13 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path, boolean return result; } if (!isMmTable) { - taskIDToFile = removeTempOrDuplicateFilesNonMm(fileStats, fs); + taskIDToFile = removeTempOrDuplicateFilesNonMm(fileStats, fs, hconf); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); } } else { Path mmDir = extractNonDpMmDir(writeId, stmtId, fileStats, isBaseDir); - taskIDToFile = removeTempOrDuplicateFilesNonMm(fs.listStatus(mmDir), fs); + taskIDToFile = removeTempOrDuplicateFilesNonMm(fs.listStatus(mmDir), fs, hconf); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); } @@ -1916,12 +1916,20 @@ private static void addBucketFileIfMissing(List result, } private static HashMap removeTempOrDuplicateFilesNonMm( - FileStatus[] files, FileSystem fs) throws IOException { + FileStatus[] files, FileSystem fs, Configuration conf) throws IOException { if (files == null || fs == null) { return null; } HashMap taskIdToFile = new HashMap(); + // This method currently does not support speculative execution due to + // compareTempOrDuplicateFiles not being able to de-duplicate speculative + // execution created files + if (isSpeculativeExecution(conf)) { + String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); + throw new IOException("Speculative execution is not supported for engine " + engine); + } + for (FileStatus one : files) { if (isTempPath(one)) { Path onePath = one.getPath(); @@ -1932,31 +1940,62 @@ private static void addBucketFileIfMissing(List result, } } else { // This would be a single file. See if we need to remove it. - ponderRemovingTempOrDuplicateFile(fs, one, taskIdToFile); + ponderRemovingTempOrDuplicateFile(fs, one, taskIdToFile, conf); } } return taskIdToFile; } private static void ponderRemovingTempOrDuplicateFile(FileSystem fs, - FileStatus file, HashMap taskIdToFile) throws IOException { + FileStatus file, HashMap taskIdToFile, Configuration conf) + throws IOException { Path filePath = file.getPath(); String taskId = getPrefixedTaskIdFromFilename(filePath.getName()); Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles looking at {}" + ", taskId {}", filePath, taskId); FileStatus otherFile = taskIdToFile.get(taskId); taskIdToFile.put(taskId, (otherFile == null) ? file : - compareTempOrDuplicateFiles(fs, file, otherFile)); + compareTempOrDuplicateFiles(fs, file, otherFile, conf)); + } + + private static boolean warnIfSet(Configuration conf, String value) { + if (conf.getBoolean(value, false)) { + LOG.warn(value + " support is currently deprecated"); + return true; + } + return false; + } + + private static boolean isSpeculativeExecution(Configuration conf) { + String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); + boolean isSpeculative = false; + if ("mr".equalsIgnoreCase(engine)) { + isSpeculative = warnIfSet(conf, "mapreduce.map.speculative") || + warnIfSet(conf, "mapreduce.reduce.speculative") || + warnIfSet(conf, "mapred.map.tasks.speculative.execution") || + warnIfSet(conf, "mapred.reduce.tasks.speculative.execution"); + } else if ("tez".equalsIgnoreCase(engine)) { + isSpeculative = warnIfSet(conf, "tez.am.speculation.enabled"); + } // all other engines do not support speculative execution + + return isSpeculative; } private static FileStatus compareTempOrDuplicateFiles(FileSystem fs, - FileStatus file, FileStatus existingFile) throws IOException { - // Pick the one with mewest attempt ID. For sanity, check the file sizes too. - // If the file size of newest attempt is less than that for older one, - // Throw an exception as it maybe a correctness issue causing it. - // This breaks speculative execution if it ends prematurely. + FileStatus file, FileStatus existingFile, Configuration conf) throws IOException { + // Pick the one with newest attempt ID. Previously, this function threw an + // exception when the file size of the newer attempt was less than the + // older attempt. This was an incorrect assumption due to various + // techniques like file compression and no guarantee that the new task will + // write values in the same order. FileStatus toDelete = null, toRetain = null; + // This method currently does not support speculative execution + if (isSpeculativeExecution(conf)) { + String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); + throw new IOException("Speculative execution is not supported for engine " + engine); + } + // "LOAD .. INTO" and "INSERT INTO" commands will generate files with // "_copy_x" suffix. These files are usually read by map tasks and the // task output gets written to some tmp path. The output file names will @@ -1970,68 +2009,38 @@ private static FileStatus compareTempOrDuplicateFiles(FileSystem fs, // elimination. Path filePath = file.getPath(); if (isCopyFile(filePath.getName())) { - LOG.info("{} file identified as duplicate. This file is" + - " not deleted as it has copySuffix.", filePath); + LOG.info("{} file identified as duplicate. This file is" + + " not deleted as it has copySuffix.", filePath); return existingFile; } int existingFileAttemptId = getAttemptIdFromFilename(existingFile.getPath().getName()); int fileAttemptId = getAttemptIdFromFilename(file.getPath().getName()); - - long existingFileSz = getFileSizeRecursively(fs, existingFile); - long fileSz = getFileSizeRecursively(fs, file); // Files may come in any order irrespective of their attempt IDs - if (existingFileAttemptId > fileAttemptId && - existingFileSz >= fileSz) { + if (existingFileAttemptId > fileAttemptId) { // keep existing toRetain = existingFile; toDelete = file; - } else if (existingFileAttemptId < fileAttemptId && - existingFileSz <= fileSz) { + } else if (existingFileAttemptId < fileAttemptId) { // keep file toRetain = file; toDelete = existingFile; } else { - throw new IOException(" File " + filePath + - " with newer attempt ID " + fileAttemptId + " is smaller than the file " - + existingFile.getPath() + " with older attempt ID " + existingFileAttemptId); + throw new IOException(filePath + " has same attempt ID " + fileAttemptId + " as " + + existingFile.getPath()); } + if (!fs.delete(toDelete.getPath(), true)) { - throw new IOException( - "Unable to delete duplicate file: " + toDelete.getPath() - + ". Existing file: " + toRetain.getPath()); - } else { - LOG.warn("Duplicate taskid file removed: " + toDelete.getPath() + " with length " - + toDelete.getLen() + ". Existing file: " + toRetain.getPath() + " with length " - + toRetain.getLen()); + throw new IOException("Unable to delete duplicate file: " + toDelete.getPath() + + ". Existing file: " + toRetain.getPath()); } + LOG.warn("Duplicate taskid file removed: " + toDelete.getPath() + " with length " + + toDelete.getLen() + ". Existing file: " + toRetain.getPath() + " with length " + + toRetain.getLen()); return toRetain; } - // This function recurisvely fetches the size of all the files in given directory - private static long getFileSizeRecursively(FileSystem fs, FileStatus src) - throws IOException { - long size = 0; - if (src.isDirectory()) { - LOG.debug(" src " + src.getPath() + " is a directory"); - // This is a directory. - try { - FileStatus[] files = fs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - // Recursively fetch sizes of each file - for (FileStatus file : files) { - size += getFileSizeRecursively(fs, file); - } - } catch (IOException e) { - throw new IOException("Unable to fetch files in directory " + src.getPath(), e); - } - } else { - size = src.getLen(); - LOG.debug("src " + src.getPath() + " is a file of size " + size); - } - return size; - } - public static boolean isCopyFile(String filename) { String taskId = filename; String copyFileSuffix = null; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 8a8822d560..2071de3801 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -278,10 +278,9 @@ public int execute() { // set input format information if necessary setInputAttributes(job); - // Turn on speculative execution for reducers - boolean useSpeculativeExecReducers = HiveConf.getBoolVar(job, - HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS); - job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, useSpeculativeExecReducers); + // HIVE-23354 enforces that MR speculative execution is disabled + job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); + job.setBoolean(MRJobConfig.MAP_SPECULATIVE, false); String inpFormat = HiveConf.getVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index c5b717124e..bb499f9e33 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -866,12 +866,9 @@ private JobConf initializeVertexConf(JobConf baseConf, Context context, ReduceWo // Is this required ? conf.set("mapred.reducer.class", ExecReducer.class.getName()); - - boolean useSpeculativeExecReducers = HiveConf.getBoolVar(conf, - HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS); - conf.setBoolean(org.apache.hadoop.mapreduce.MRJobConfig.REDUCE_SPECULATIVE, - useSpeculativeExecReducers); - + // HIVE-23354 enforces that MR speculative execution is disabled + conf.setBoolean(org.apache.hadoop.mapreduce.MRJobConfig.REDUCE_SPECULATIVE, false); + conf.setBoolean(org.apache.hadoop.mapreduce.MRJobConfig.MAP_SPECULATIVE, false); return conf; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java index 7fb3878ee6..34519fbea8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java @@ -99,6 +99,9 @@ public int execute() { job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(0); + // HIVE-23354 enforces that MR speculative execution is disabled + job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); + job.setBoolean(MRJobConfig.MAP_SPECULATIVE, false); // create the temp directories Path outputPath = work.getOutputDir(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java index 0458c946c0..752eea9e7b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java @@ -106,6 +106,9 @@ public int execute() { // zero reducers job.setNumReduceTasks(0); + // HIVE-23354 enforces that MR speculative execution is disabled + job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); + job.setBoolean(MRJobConfig.MAP_SPECULATIVE, false); if (work.getMinSplitSize() != null) { HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index f26d4bb2c3..e6b2b20673 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -76,6 +76,7 @@ import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.Ref; @@ -328,6 +329,9 @@ private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compa job.set(DIRS_TO_SEARCH, dirsToSearch.toString()); job.setLong(MIN_TXN, minTxn); job.setLong(MAX_TXN, maxTxn); + // HIVE-23354 enforces that MR speculative execution is disabled + job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); + job.setBoolean(MRJobConfig.MAP_SPECULATIVE, false); // Add tokens for all the file system in the input path. ArrayList dirs = new ArrayList<>(); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java index b6a6bab6cb..4600ab4e16 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFFromUtcTimestamp; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -199,6 +200,9 @@ public void testRemoveTempOrDuplicateFilesOnMrWithDp() throws Exception { DynamicPartitionCtx dpCtx = getDynamicPartitionCtx(dPEnabled); Path tempDirPath = setupTempDirWithSingleOutputFile(hconf); FileSinkDesc conf = getFileSinkDesc(tempDirPath); + // HIVE-23354 enforces that MR speculative execution is disabled + hconf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false); + hconf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); List paths = Utilities.removeTempOrDuplicateFiles(localFs, tempDirPath, dpCtx, conf, hconf, false);