diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 535c24519c..a65e109c79 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1951,9 +1951,11 @@ private static void ponderRemovingTempOrDuplicateFile(FileSystem fs, private static FileStatus compareTempOrDuplicateFiles(FileSystem fs, FileStatus file, FileStatus existingFile) throws IOException { - // Pick the one with mewest attempt ID. For sanity, check the file sizes too. - // If the file size of newest attempt is less than that for older one, - // Throw an exception as it maybe a correctness issue causing it. + // Pick the one with newest attempt ID. Previously, this function threw an + // exception when the file size of the newer attempt was less than the + // older attempt. This was an incorrect assumption due to various + // techniques like file compression and no guarantee that the new task will + // write values in the same order. // This breaks speculative execution if it ends prematurely. FileStatus toDelete = null, toRetain = null; @@ -1970,68 +1972,38 @@ private static FileStatus compareTempOrDuplicateFiles(FileSystem fs, // elimination. Path filePath = file.getPath(); if (isCopyFile(filePath.getName())) { - LOG.info("{} file identified as duplicate. This file is" + - " not deleted as it has copySuffix.", filePath); + LOG.info("{} file identified as duplicate. This file is" + + " not deleted as it has copySuffix.", filePath); return existingFile; } int existingFileAttemptId = getAttemptIdFromFilename(existingFile.getPath().getName()); int fileAttemptId = getAttemptIdFromFilename(file.getPath().getName()); - - long existingFileSz = getFileSizeRecursively(fs, existingFile); - long fileSz = getFileSizeRecursively(fs, file); // Files may come in any order irrespective of their attempt IDs - if (existingFileAttemptId > fileAttemptId && - existingFileSz >= fileSz) { + if (existingFileAttemptId > fileAttemptId) { // keep existing toRetain = existingFile; toDelete = file; - } else if (existingFileAttemptId < fileAttemptId && - existingFileSz <= fileSz) { + } else if (existingFileAttemptId < fileAttemptId) { // keep file toRetain = file; toDelete = existingFile; } else { - throw new IOException(" File " + filePath + - " with newer attempt ID " + fileAttemptId + " is smaller than the file " - + existingFile.getPath() + " with older attempt ID " + existingFileAttemptId); + throw new IOException(filePath + " has same attempt ID " + fileAttemptId + " as " + + existingFile.getPath()); } + if (!fs.delete(toDelete.getPath(), true)) { - throw new IOException( - "Unable to delete duplicate file: " + toDelete.getPath() - + ". Existing file: " + toRetain.getPath()); - } else { - LOG.warn("Duplicate taskid file removed: " + toDelete.getPath() + " with length " - + toDelete.getLen() + ". Existing file: " + toRetain.getPath() + " with length " - + toRetain.getLen()); + throw new IOException("Unable to delete duplicate file: " + toDelete.getPath() + + ". Existing file: " + toRetain.getPath()); } + LOG.warn("Duplicate taskid file removed: " + toDelete.getPath() + " with length " + + toDelete.getLen() + ". Existing file: " + toRetain.getPath() + " with length " + + toRetain.getLen()); return toRetain; } - // This function recurisvely fetches the size of all the files in given directory - private static long getFileSizeRecursively(FileSystem fs, FileStatus src) - throws IOException { - long size = 0; - if (src.isDirectory()) { - LOG.debug(" src " + src.getPath() + " is a directory"); - // This is a directory. - try { - FileStatus[] files = fs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - // Recursively fetch sizes of each file - for (FileStatus file : files) { - size += getFileSizeRecursively(fs, file); - } - } catch (IOException e) { - throw new IOException("Unable to fetch files in directory " + src.getPath(), e); - } - } else { - size = src.getLen(); - LOG.debug("src " + src.getPath() + " is a file of size " + size); - } - return size; - } - public static boolean isCopyFile(String filename) { String taskId = filename; String copyFileSuffix = null;