diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 829791e0a9..99fa5ef549 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -585,7 +585,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal EXECPARALLEL("hive.exec.parallel", false, "Whether to execute jobs in parallel"), EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8, "How many jobs at most can be executed in parallel"), - HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution", true, + HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution", false, "Whether speculative execution for reducers should be turned on. "), HIVECOUNTERSPULLINTERVAL("hive.exec.counters.pull.interval", 1000L, "The interval with which to poll the JobTracker for the counters the running job. \n" + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 535c24519c..66cf27cb9c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1822,7 +1822,7 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path, boolean } } - taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs); + taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs, hconf); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); } @@ -1835,7 +1835,7 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path, boolean } Path mmDir = extractNonDpMmDir(writeId, stmtId, fileStats, isBaseDir); taskIDToFile = removeTempOrDuplicateFilesNonMm( - fs.listStatus(new Path(mmDir, unionSuffix)), fs); + fs.listStatus(new Path(mmDir, unionSuffix)), fs, hconf); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); } @@ -1845,13 +1845,13 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path, boolean return result; } if (!isMmTable) { - taskIDToFile = removeTempOrDuplicateFilesNonMm(fileStats, fs); + taskIDToFile = removeTempOrDuplicateFilesNonMm(fileStats, fs, hconf); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); } } else { Path mmDir = extractNonDpMmDir(writeId, stmtId, fileStats, isBaseDir); - taskIDToFile = removeTempOrDuplicateFilesNonMm(fs.listStatus(mmDir), fs); + taskIDToFile = removeTempOrDuplicateFilesNonMm(fs.listStatus(mmDir), fs, hconf); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); } @@ -1916,12 +1916,20 @@ private static void addBucketFileIfMissing(List result, } private static HashMap removeTempOrDuplicateFilesNonMm( - FileStatus[] files, FileSystem fs) throws IOException { + FileStatus[] files, FileSystem fs, Configuration conf) throws IOException { if (files == null || fs == null) { return null; } HashMap taskIdToFile = new HashMap(); + // This method currently does not support speculative execution due to + // compareTempOrDuplicateFiles not being able to de-duplicate speculative + // execution created files + if (isSpeculativeExecution(conf)) { + String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); + throw new IOException("Speculative execution is not supported for engine " + engine); + } + for (FileStatus one : files) { if (isTempPath(one)) { Path onePath = one.getPath(); @@ -1932,31 +1940,62 @@ private static void addBucketFileIfMissing(List result, } } else { // This would be a single file. See if we need to remove it. - ponderRemovingTempOrDuplicateFile(fs, one, taskIdToFile); + ponderRemovingTempOrDuplicateFile(fs, one, taskIdToFile, conf); } } return taskIdToFile; } private static void ponderRemovingTempOrDuplicateFile(FileSystem fs, - FileStatus file, HashMap taskIdToFile) throws IOException { + FileStatus file, HashMap taskIdToFile, Configuration conf) + throws IOException { Path filePath = file.getPath(); String taskId = getPrefixedTaskIdFromFilename(filePath.getName()); Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles looking at {}" + ", taskId {}", filePath, taskId); FileStatus otherFile = taskIdToFile.get(taskId); taskIdToFile.put(taskId, (otherFile == null) ? file : - compareTempOrDuplicateFiles(fs, file, otherFile)); + compareTempOrDuplicateFiles(fs, file, otherFile, conf)); + } + + private static boolean warnIfSet(Configuration conf, String value) { + if (conf.getBoolean(value, false)) { + LOG.warn(value + " support is currently deprecated."); + return true; + } + return false; + } + + private static boolean isSpeculativeExecution(Configuration conf) { + String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); + boolean isSpeculative = false; + if ("mr".equalsIgnoreCase(engine)) { + isSpeculative = warnIfSet(conf, "mapred.reduce.tasks.speculative.execution") || + warnIfSet(conf, "mapred.map.tasks.speculative.execution") || + warnIfSet(conf, "mapreduce.map.speculative") || + warnIfSet(conf, "mapreduce.reduce.speculative"); + } else if ("tez".equalsIgnoreCase(engine)) { + isSpeculative = warnIfSet(conf, "tez.am.speculation.enabled"); + } // all other engines do not support speculative execution + + return isSpeculative; } private static FileStatus compareTempOrDuplicateFiles(FileSystem fs, - FileStatus file, FileStatus existingFile) throws IOException { - // Pick the one with mewest attempt ID. For sanity, check the file sizes too. - // If the file size of newest attempt is less than that for older one, - // Throw an exception as it maybe a correctness issue causing it. - // This breaks speculative execution if it ends prematurely. + FileStatus file, FileStatus existingFile, Configuration conf) throws IOException { + // Pick the one with newest attempt ID. Previously, this function threw an + // exception when the file size of the newer attempt was less than the + // older attempt. This was an incorrect assumption due to various + // techniques like file compression and no guarantee that the new task will + // write values in the same order. FileStatus toDelete = null, toRetain = null; + // This method currently does not support speculative execution + if (isSpeculativeExecution(conf)) { + String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); + throw new IOException("Speculative execution is not supported for engine " + engine); + } + // "LOAD .. INTO" and "INSERT INTO" commands will generate files with // "_copy_x" suffix. These files are usually read by map tasks and the // task output gets written to some tmp path. The output file names will @@ -1970,68 +2009,38 @@ private static FileStatus compareTempOrDuplicateFiles(FileSystem fs, // elimination. Path filePath = file.getPath(); if (isCopyFile(filePath.getName())) { - LOG.info("{} file identified as duplicate. This file is" + - " not deleted as it has copySuffix.", filePath); + LOG.info("{} file identified as duplicate. This file is" + + " not deleted as it has copySuffix.", filePath); return existingFile; } int existingFileAttemptId = getAttemptIdFromFilename(existingFile.getPath().getName()); int fileAttemptId = getAttemptIdFromFilename(file.getPath().getName()); - - long existingFileSz = getFileSizeRecursively(fs, existingFile); - long fileSz = getFileSizeRecursively(fs, file); // Files may come in any order irrespective of their attempt IDs - if (existingFileAttemptId > fileAttemptId && - existingFileSz >= fileSz) { + if (existingFileAttemptId > fileAttemptId) { // keep existing toRetain = existingFile; toDelete = file; - } else if (existingFileAttemptId < fileAttemptId && - existingFileSz <= fileSz) { + } else if (existingFileAttemptId < fileAttemptId) { // keep file toRetain = file; toDelete = existingFile; } else { - throw new IOException(" File " + filePath + - " with newer attempt ID " + fileAttemptId + " is smaller than the file " - + existingFile.getPath() + " with older attempt ID " + existingFileAttemptId); + throw new IOException(filePath + " has same attempt ID " + fileAttemptId + " as " + + existingFile.getPath()); } + if (!fs.delete(toDelete.getPath(), true)) { - throw new IOException( - "Unable to delete duplicate file: " + toDelete.getPath() - + ". Existing file: " + toRetain.getPath()); - } else { - LOG.warn("Duplicate taskid file removed: " + toDelete.getPath() + " with length " - + toDelete.getLen() + ". Existing file: " + toRetain.getPath() + " with length " - + toRetain.getLen()); + throw new IOException("Unable to delete duplicate file: " + toDelete.getPath() + + ". Existing file: " + toRetain.getPath()); } + LOG.warn("Duplicate taskid file removed: " + toDelete.getPath() + " with length " + + toDelete.getLen() + ". Existing file: " + toRetain.getPath() + " with length " + + toRetain.getLen()); return toRetain; } - // This function recurisvely fetches the size of all the files in given directory - private static long getFileSizeRecursively(FileSystem fs, FileStatus src) - throws IOException { - long size = 0; - if (src.isDirectory()) { - LOG.debug(" src " + src.getPath() + " is a directory"); - // This is a directory. - try { - FileStatus[] files = fs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - // Recursively fetch sizes of each file - for (FileStatus file : files) { - size += getFileSizeRecursively(fs, file); - } - } catch (IOException e) { - throw new IOException("Unable to fetch files in directory " + src.getPath(), e); - } - } else { - size = src.getLen(); - LOG.debug("src " + src.getPath() + " is a file of size " + size); - } - return size; - } - public static boolean isCopyFile(String filename) { String taskId = filename; String copyFileSuffix = null;