diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 8937b43811..837c7dcbc6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1308,7 +1308,7 @@ public static void renameOrMoveFiles(FileSystem fs, Path src, Path dst) throws I * filename to extract taskid from */ public static String getTaskIdFromFilename(String filename) { - return getIdFromFilename(filename, FILE_NAME_TO_TASK_ID_REGEX); + return getTaskIdFromFilename(filename, FILE_NAME_TO_TASK_ID_REGEX); } /** @@ -1319,10 +1319,19 @@ public static String getTaskIdFromFilename(String filename) { * filename to extract taskid from */ public static String getPrefixedTaskIdFromFilename(String filename) { - return getIdFromFilename(filename, FILE_NAME_PREFIXED_TASK_ID_REGEX); + return getTaskIdFromFilename(filename, FILE_NAME_PREFIXED_TASK_ID_REGEX); } - private static String getIdFromFilename(String filename, Pattern pattern) { + private static String getTaskIdFromFilename(String filename, Pattern pattern) { + return getIdFromFilename(filename, pattern, 1); + } + + public static int getAttemptIdFromFilename(String filename) { + String attemptStr = getIdFromFilename(filename, FILE_NAME_PREFIXED_TASK_ID_REGEX, 3); + return Integer.parseInt(attemptStr.substring(1)); + } + + private static String getIdFromFilename(String filename, Pattern pattern, int group) { String taskId = filename; int dirEnd = filename.lastIndexOf(Path.SEPARATOR); if (dirEnd != -1) { @@ -1334,7 +1343,7 @@ private static String getIdFromFilename(String filename, Pattern pattern) { LOG.warn("Unable to get task id from file name: {}. Using last component {}" + " as task id.", filename, taskId); } else { - taskId = m.group(1); + taskId = m.group(group); } LOG.debug("TaskId for {} = {}", filename, taskId); return taskId; @@ -1823,10 +1832,10 @@ private static void ponderRemovingTempOrDuplicateFile(FileSystem fs, private static FileStatus compareTempOrDuplicateFiles(FileSystem fs, FileStatus file, FileStatus existingFile) throws IOException { - // Compare the file sizes of all the attempt files for the same task, the largest win - // any attempt files could contain partial results (due to task failures or - // speculative runs), but the largest should be the correct one since the result - // of a successful run should never be smaller than a failed/speculative run. + // Pick the one with mewest attempt ID. For sanity, check the file sizes too. + // If the file size of newest attempt is less than that for older one, + // Throw an exception as it maybe a correctness issue causing it. + // This breaks speculative execution if it ends prematurely. FileStatus toDelete = null, toRetain = null; // "LOAD .. INTO" and "INSERT INTO" commands will generate files with @@ -1847,12 +1856,24 @@ private static FileStatus compareTempOrDuplicateFiles(FileSystem fs, return existingFile; } - if (existingFile.getLen() >= file.getLen()) { - toDelete = file; + int existingFileAttemptId = getAttemptIdFromFilename(existingFile.getPath().getName()); + int fileAttemptId = getAttemptIdFromFilename(file.getPath().getName()); + + long existingFileSz = getFileSizeRecursively(fs, existingFile); + long fileSz = getFileSizeRecursively(fs, file); + // Files may come in any order irrespective of their attempt IDs + if (existingFileAttemptId > fileAttemptId && + existingFileSz >= fileSz) { + // keep existing toRetain = existingFile; - } else { - toDelete = existingFile; + toDelete = file; + } else if (existingFileAttemptId < fileAttemptId && + existingFileSz <= fileSz) { + // keep file toRetain = file; + toDelete = existingFile; + } else { + throw new IOException(" File with newer attempt ID is smaller than the one with older attempt ID."); } if (!fs.delete(toDelete.getPath(), true)) { throw new IOException( @@ -1863,9 +1884,33 @@ private static FileStatus compareTempOrDuplicateFiles(FileSystem fs, + toDelete.getLen() + ". Existing file: " + toRetain.getPath() + " with length " + toRetain.getLen()); } + return toRetain; } + // This function recurisvely fetches the size of all the files in given directory + private static long getFileSizeRecursively(FileSystem fs, FileStatus src) + throws IOException { + long size = 0; + if (src.isDirectory()) { + LOG.debug(" src " + src.getPath() + " is a directory"); + // This is a directory. + try { + FileStatus[] files = fs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); + // Recursively fetch sizes of each file + for (FileStatus file : files) { + size += getFileSizeRecursively(fs, file); + } + } catch (IOException e) { + throw new IOException("Unable to fetch files in directory " + src.getPath()); + } + } else { + size = src.getLen(); + LOG.debug("src " + src.getPath() + " is a file of size " + size); + } + return size; + } + public static boolean isCopyFile(String filename) { String taskId = filename; String copyFileSuffix = null;